1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.ipc; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.io.DataInputStream; |
---|
23 | import java.io.DataOutputStream; |
---|
24 | import java.io.ByteArrayInputStream; |
---|
25 | import java.io.ByteArrayOutputStream; |
---|
26 | |
---|
27 | import java.nio.ByteBuffer; |
---|
28 | import java.nio.channels.CancelledKeyException; |
---|
29 | import java.nio.channels.ClosedChannelException; |
---|
30 | import java.nio.channels.ReadableByteChannel; |
---|
31 | import java.nio.channels.SelectionKey; |
---|
32 | import java.nio.channels.Selector; |
---|
33 | import java.nio.channels.ServerSocketChannel; |
---|
34 | import java.nio.channels.SocketChannel; |
---|
35 | import java.nio.channels.WritableByteChannel; |
---|
36 | |
---|
37 | import java.net.BindException; |
---|
38 | import java.net.InetAddress; |
---|
39 | import java.net.InetSocketAddress; |
---|
40 | import java.net.ServerSocket; |
---|
41 | import java.net.Socket; |
---|
42 | import java.net.SocketException; |
---|
43 | import java.net.UnknownHostException; |
---|
44 | |
---|
45 | import java.security.PrivilegedActionException; |
---|
46 | import java.security.PrivilegedExceptionAction; |
---|
47 | import java.util.ArrayList; |
---|
48 | import java.util.Collections; |
---|
49 | import java.util.LinkedList; |
---|
50 | import java.util.List; |
---|
51 | import java.util.Iterator; |
---|
52 | import java.util.Map; |
---|
53 | import java.util.Random; |
---|
54 | import java.util.concurrent.BlockingQueue; |
---|
55 | import java.util.concurrent.ConcurrentHashMap; |
---|
56 | import java.util.concurrent.LinkedBlockingQueue; |
---|
57 | |
---|
58 | import javax.security.auth.Subject; |
---|
59 | |
---|
60 | import org.apache.commons.logging.Log; |
---|
61 | import org.apache.commons.logging.LogFactory; |
---|
62 | import org.apache.hadoop.conf.Configuration; |
---|
63 | import org.apache.hadoop.security.SecurityUtil; |
---|
64 | import org.apache.hadoop.io.Writable; |
---|
65 | import org.apache.hadoop.io.WritableUtils; |
---|
66 | import org.apache.hadoop.util.ReflectionUtils; |
---|
67 | import org.apache.hadoop.util.StringUtils; |
---|
68 | import org.apache.hadoop.ipc.metrics.RpcMetrics; |
---|
69 | import org.apache.hadoop.security.authorize.AuthorizationException; |
---|
70 | |
---|
71 | /** An abstract IPC service. IPC calls take a single {@link Writable} as a |
---|
72 | * parameter, and return a {@link Writable} as their value. A service runs on |
---|
73 | * a port and is defined by a parameter class and a value class. |
---|
74 | * |
---|
75 | * @see Client |
---|
76 | */ |
---|
77 | public abstract class Server { |
---|
78 | |
---|
79 | /** |
---|
80 | * The first four bytes of Hadoop RPC connections |
---|
81 | */ |
---|
82 | public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); |
---|
83 | |
---|
84 | // 1 : Introduce ping and server does not throw away RPCs |
---|
85 | // 3 : Introduce the protocol into the RPC connection header |
---|
86 | public static final byte CURRENT_VERSION = 3; |
---|
87 | |
---|
88 | /** |
---|
89 | * How many calls/handler are allowed in the queue. |
---|
90 | */ |
---|
91 | private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; |
---|
92 | |
---|
93 | public static final Log LOG = LogFactory.getLog(Server.class); |
---|
94 | |
---|
95 | private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>(); |
---|
96 | |
---|
97 | private static final Map<String, Class<?>> PROTOCOL_CACHE = |
---|
98 | new ConcurrentHashMap<String, Class<?>>(); |
---|
99 | |
---|
100 | static Class<?> getProtocolClass(String protocolName, Configuration conf) |
---|
101 | throws ClassNotFoundException { |
---|
102 | Class<?> protocol = PROTOCOL_CACHE.get(protocolName); |
---|
103 | if (protocol == null) { |
---|
104 | protocol = conf.getClassByName(protocolName); |
---|
105 | PROTOCOL_CACHE.put(protocolName, protocol); |
---|
106 | } |
---|
107 | return protocol; |
---|
108 | } |
---|
109 | |
---|
110 | /** Returns the server instance called under or null. May be called under |
---|
111 | * {@link #call(Writable, long)} implementations, and under {@link Writable} |
---|
112 | * methods of paramters and return values. Permits applications to access |
---|
113 | * the server context.*/ |
---|
114 | public static Server get() { |
---|
115 | return SERVER.get(); |
---|
116 | } |
---|
117 | |
---|
118 | /** This is set to Call object before Handler invokes an RPC and reset |
---|
119 | * after the call returns. |
---|
120 | */ |
---|
121 | private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); |
---|
122 | |
---|
123 | /** Returns the remote side ip address when invoked inside an RPC |
---|
124 | * Returns null incase of an error. |
---|
125 | */ |
---|
126 | public static InetAddress getRemoteIp() { |
---|
127 | Call call = CurCall.get(); |
---|
128 | if (call != null) { |
---|
129 | return call.connection.socket.getInetAddress(); |
---|
130 | } |
---|
131 | return null; |
---|
132 | } |
---|
133 | /** Returns remote address as a string when invoked inside an RPC. |
---|
134 | * Returns null in case of an error. |
---|
135 | */ |
---|
136 | public static String getRemoteAddress() { |
---|
137 | InetAddress addr = getRemoteIp(); |
---|
138 | return (addr == null) ? null : addr.getHostAddress(); |
---|
139 | } |
---|
140 | |
---|
141 | private String bindAddress; |
---|
142 | private int port; // port we listen on |
---|
143 | private int handlerCount; // number of handler threads |
---|
144 | private Class<? extends Writable> paramClass; // class of call parameters |
---|
145 | private int maxIdleTime; // the maximum idle time after |
---|
146 | // which a client may be disconnected |
---|
147 | private int thresholdIdleConnections; // the number of idle connections |
---|
148 | // after which we will start |
---|
149 | // cleaning up idle |
---|
150 | // connections |
---|
151 | int maxConnectionsToNuke; // the max number of |
---|
152 | // connections to nuke |
---|
153 | //during a cleanup |
---|
154 | |
---|
155 | protected RpcMetrics rpcMetrics; |
---|
156 | |
---|
157 | private Configuration conf; |
---|
158 | |
---|
159 | private int maxQueueSize; |
---|
160 | private int socketSendBufferSize; |
---|
161 | private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm |
---|
162 | |
---|
163 | volatile private boolean running = true; // true while server runs |
---|
164 | private BlockingQueue<Call> callQueue; // queued calls |
---|
165 | |
---|
166 | private List<Connection> connectionList = |
---|
167 | Collections.synchronizedList(new LinkedList<Connection>()); |
---|
168 | //maintain a list |
---|
169 | //of client connections |
---|
170 | private Listener listener = null; |
---|
171 | private Responder responder = null; |
---|
172 | private int numConnections = 0; |
---|
173 | private Handler[] handlers = null; |
---|
174 | |
---|
175 | /** |
---|
176 | * A convenience method to bind to a given address and report |
---|
177 | * better exceptions if the address is not a valid host. |
---|
178 | * @param socket the socket to bind |
---|
179 | * @param address the address to bind to |
---|
180 | * @param backlog the number of connections allowed in the queue |
---|
181 | * @throws BindException if the address can't be bound |
---|
182 | * @throws UnknownHostException if the address isn't a valid host name |
---|
183 | * @throws IOException other random errors from bind |
---|
184 | */ |
---|
185 | public static void bind(ServerSocket socket, InetSocketAddress address, |
---|
186 | int backlog) throws IOException { |
---|
187 | try { |
---|
188 | socket.bind(address, backlog); |
---|
189 | } catch (BindException e) { |
---|
190 | BindException bindException = new BindException("Problem binding to " + address |
---|
191 | + " : " + e.getMessage()); |
---|
192 | bindException.initCause(e); |
---|
193 | throw bindException; |
---|
194 | } catch (SocketException e) { |
---|
195 | // If they try to bind to a different host's address, give a better |
---|
196 | // error message. |
---|
197 | if ("Unresolved address".equals(e.getMessage())) { |
---|
198 | throw new UnknownHostException("Invalid hostname for server: " + |
---|
199 | address.getHostName()); |
---|
200 | } else { |
---|
201 | throw e; |
---|
202 | } |
---|
203 | } |
---|
204 | } |
---|
205 | |
---|
206 | /** A call queued for handling. */ |
---|
207 | private static class Call { |
---|
208 | private int id; // the client's call id |
---|
209 | private Writable param; // the parameter passed |
---|
210 | private Connection connection; // connection to client |
---|
211 | private long timestamp; // the time received when response is null |
---|
212 | // the time served when response is not null |
---|
213 | private ByteBuffer response; // the response for this call |
---|
214 | |
---|
215 | public Call(int id, Writable param, Connection connection) { |
---|
216 | this.id = id; |
---|
217 | this.param = param; |
---|
218 | this.connection = connection; |
---|
219 | this.timestamp = System.currentTimeMillis(); |
---|
220 | this.response = null; |
---|
221 | } |
---|
222 | |
---|
223 | @Override |
---|
224 | public String toString() { |
---|
225 | return param.toString() + " from " + connection.toString(); |
---|
226 | } |
---|
227 | |
---|
228 | public void setResponse(ByteBuffer response) { |
---|
229 | this.response = response; |
---|
230 | } |
---|
231 | } |
---|
232 | |
---|
233 | /** Listens on the socket. Creates jobs for the handler threads*/ |
---|
234 | private class Listener extends Thread { |
---|
235 | |
---|
236 | private ServerSocketChannel acceptChannel = null; //the accept channel |
---|
237 | private Selector selector = null; //the selector that we use for the server |
---|
238 | private InetSocketAddress address; //the address we bind at |
---|
239 | private Random rand = new Random(); |
---|
240 | private long lastCleanupRunTime = 0; //the last time when a cleanup connec- |
---|
241 | //-tion (for idle connections) ran |
---|
242 | private long cleanupInterval = 10000; //the minimum interval between |
---|
243 | //two cleanup runs |
---|
244 | private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); |
---|
245 | |
---|
246 | public Listener() throws IOException { |
---|
247 | address = new InetSocketAddress(bindAddress, port); |
---|
248 | // Create a new server socket and set to non blocking mode |
---|
249 | acceptChannel = ServerSocketChannel.open(); |
---|
250 | acceptChannel.configureBlocking(false); |
---|
251 | |
---|
252 | // Bind the server socket to the local host and port |
---|
253 | bind(acceptChannel.socket(), address, backlogLength); |
---|
254 | port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port |
---|
255 | // create a selector; |
---|
256 | selector= Selector.open(); |
---|
257 | |
---|
258 | // Register accepts on the server socket with the selector. |
---|
259 | acceptChannel.register(selector, SelectionKey.OP_ACCEPT); |
---|
260 | this.setName("IPC Server listener on " + port); |
---|
261 | this.setDaemon(true); |
---|
262 | } |
---|
263 | /** cleanup connections from connectionList. Choose a random range |
---|
264 | * to scan and also have a limit on the number of the connections |
---|
265 | * that will be cleanedup per run. The criteria for cleanup is the time |
---|
266 | * for which the connection was idle. If 'force' is true then all |
---|
267 | * connections will be looked at for the cleanup. |
---|
268 | */ |
---|
269 | private void cleanupConnections(boolean force) { |
---|
270 | if (force || numConnections > thresholdIdleConnections) { |
---|
271 | long currentTime = System.currentTimeMillis(); |
---|
272 | if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { |
---|
273 | return; |
---|
274 | } |
---|
275 | int start = 0; |
---|
276 | int end = numConnections - 1; |
---|
277 | if (!force) { |
---|
278 | start = rand.nextInt() % numConnections; |
---|
279 | end = rand.nextInt() % numConnections; |
---|
280 | int temp; |
---|
281 | if (end < start) { |
---|
282 | temp = start; |
---|
283 | start = end; |
---|
284 | end = temp; |
---|
285 | } |
---|
286 | } |
---|
287 | int i = start; |
---|
288 | int numNuked = 0; |
---|
289 | while (i <= end) { |
---|
290 | Connection c; |
---|
291 | synchronized (connectionList) { |
---|
292 | try { |
---|
293 | c = connectionList.get(i); |
---|
294 | } catch (Exception e) {return;} |
---|
295 | } |
---|
296 | if (c.timedOut(currentTime)) { |
---|
297 | if (LOG.isDebugEnabled()) |
---|
298 | LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); |
---|
299 | closeConnection(c); |
---|
300 | numNuked++; |
---|
301 | end--; |
---|
302 | c = null; |
---|
303 | if (!force && numNuked == maxConnectionsToNuke) break; |
---|
304 | } |
---|
305 | else i++; |
---|
306 | } |
---|
307 | lastCleanupRunTime = System.currentTimeMillis(); |
---|
308 | } |
---|
309 | } |
---|
310 | |
---|
311 | @Override |
---|
312 | public void run() { |
---|
313 | LOG.info(getName() + ": starting"); |
---|
314 | SERVER.set(Server.this); |
---|
315 | while (running) { |
---|
316 | SelectionKey key = null; |
---|
317 | try { |
---|
318 | selector.select(); |
---|
319 | Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); |
---|
320 | while (iter.hasNext()) { |
---|
321 | key = iter.next(); |
---|
322 | iter.remove(); |
---|
323 | try { |
---|
324 | if (key.isValid()) { |
---|
325 | if (key.isAcceptable()) |
---|
326 | doAccept(key); |
---|
327 | else if (key.isReadable()) |
---|
328 | doRead(key); |
---|
329 | } |
---|
330 | } catch (IOException e) { |
---|
331 | } |
---|
332 | key = null; |
---|
333 | } |
---|
334 | } catch (OutOfMemoryError e) { |
---|
335 | // we can run out of memory if we have too many threads |
---|
336 | // log the event and sleep for a minute and give |
---|
337 | // some thread(s) a chance to finish |
---|
338 | LOG.warn("Out of Memory in server select", e); |
---|
339 | closeCurrentConnection(key, e); |
---|
340 | cleanupConnections(true); |
---|
341 | try { Thread.sleep(60000); } catch (Exception ie) {} |
---|
342 | } catch (InterruptedException e) { |
---|
343 | if (running) { // unexpected -- log it |
---|
344 | LOG.info(getName() + " caught: " + |
---|
345 | StringUtils.stringifyException(e)); |
---|
346 | } |
---|
347 | } catch (Exception e) { |
---|
348 | closeCurrentConnection(key, e); |
---|
349 | } |
---|
350 | cleanupConnections(false); |
---|
351 | } |
---|
352 | LOG.info("Stopping " + this.getName()); |
---|
353 | |
---|
354 | synchronized (this) { |
---|
355 | try { |
---|
356 | acceptChannel.close(); |
---|
357 | selector.close(); |
---|
358 | } catch (IOException e) { } |
---|
359 | |
---|
360 | selector= null; |
---|
361 | acceptChannel= null; |
---|
362 | |
---|
363 | // clean up all connections |
---|
364 | while (!connectionList.isEmpty()) { |
---|
365 | closeConnection(connectionList.remove(0)); |
---|
366 | } |
---|
367 | } |
---|
368 | } |
---|
369 | |
---|
370 | private void closeCurrentConnection(SelectionKey key, Throwable e) { |
---|
371 | if (key != null) { |
---|
372 | Connection c = (Connection)key.attachment(); |
---|
373 | if (c != null) { |
---|
374 | if (LOG.isDebugEnabled()) |
---|
375 | LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); |
---|
376 | closeConnection(c); |
---|
377 | c = null; |
---|
378 | } |
---|
379 | } |
---|
380 | } |
---|
381 | |
---|
382 | InetSocketAddress getAddress() { |
---|
383 | return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); |
---|
384 | } |
---|
385 | |
---|
386 | void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { |
---|
387 | Connection c = null; |
---|
388 | ServerSocketChannel server = (ServerSocketChannel) key.channel(); |
---|
389 | // accept up to 10 connections |
---|
390 | for (int i=0; i<10; i++) { |
---|
391 | SocketChannel channel = server.accept(); |
---|
392 | if (channel==null) return; |
---|
393 | |
---|
394 | channel.configureBlocking(false); |
---|
395 | channel.socket().setTcpNoDelay(tcpNoDelay); |
---|
396 | SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); |
---|
397 | c = new Connection(readKey, channel, System.currentTimeMillis()); |
---|
398 | readKey.attach(c); |
---|
399 | synchronized (connectionList) { |
---|
400 | connectionList.add(numConnections, c); |
---|
401 | numConnections++; |
---|
402 | } |
---|
403 | if (LOG.isDebugEnabled()) |
---|
404 | LOG.debug("Server connection from " + c.toString() + |
---|
405 | "; # active connections: " + numConnections + |
---|
406 | "; # queued calls: " + callQueue.size()); |
---|
407 | } |
---|
408 | } |
---|
409 | |
---|
410 | void doRead(SelectionKey key) throws InterruptedException { |
---|
411 | int count = 0; |
---|
412 | Connection c = (Connection)key.attachment(); |
---|
413 | if (c == null) { |
---|
414 | return; |
---|
415 | } |
---|
416 | c.setLastContact(System.currentTimeMillis()); |
---|
417 | |
---|
418 | try { |
---|
419 | count = c.readAndProcess(); |
---|
420 | } catch (InterruptedException ieo) { |
---|
421 | LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); |
---|
422 | throw ieo; |
---|
423 | } catch (Exception e) { |
---|
424 | LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); |
---|
425 | count = -1; //so that the (count < 0) block is executed |
---|
426 | } |
---|
427 | if (count < 0) { |
---|
428 | if (LOG.isDebugEnabled()) |
---|
429 | LOG.debug(getName() + ": disconnecting client " + |
---|
430 | c.getHostAddress() + ". Number of active connections: "+ |
---|
431 | numConnections); |
---|
432 | closeConnection(c); |
---|
433 | c = null; |
---|
434 | } |
---|
435 | else { |
---|
436 | c.setLastContact(System.currentTimeMillis()); |
---|
437 | } |
---|
438 | } |
---|
439 | |
---|
440 | synchronized void doStop() { |
---|
441 | if (selector != null) { |
---|
442 | selector.wakeup(); |
---|
443 | Thread.yield(); |
---|
444 | } |
---|
445 | if (acceptChannel != null) { |
---|
446 | try { |
---|
447 | acceptChannel.socket().close(); |
---|
448 | } catch (IOException e) { |
---|
449 | LOG.info(getName() + ":Exception in closing listener socket. " + e); |
---|
450 | } |
---|
451 | } |
---|
452 | } |
---|
453 | } |
---|
454 | |
---|
455 | // Sends responses of RPC back to clients. |
---|
456 | private class Responder extends Thread { |
---|
457 | private Selector writeSelector; |
---|
458 | private int pending; // connections waiting to register |
---|
459 | |
---|
460 | final static int PURGE_INTERVAL = 900000; // 15mins |
---|
461 | |
---|
462 | Responder() throws IOException { |
---|
463 | this.setName("IPC Server Responder"); |
---|
464 | this.setDaemon(true); |
---|
465 | writeSelector = Selector.open(); // create a selector |
---|
466 | pending = 0; |
---|
467 | } |
---|
468 | |
---|
469 | @Override |
---|
470 | public void run() { |
---|
471 | LOG.info(getName() + ": starting"); |
---|
472 | SERVER.set(Server.this); |
---|
473 | long lastPurgeTime = 0; // last check for old calls. |
---|
474 | |
---|
475 | while (running) { |
---|
476 | try { |
---|
477 | waitPending(); // If a channel is being registered, wait. |
---|
478 | writeSelector.select(PURGE_INTERVAL); |
---|
479 | Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); |
---|
480 | while (iter.hasNext()) { |
---|
481 | SelectionKey key = iter.next(); |
---|
482 | iter.remove(); |
---|
483 | try { |
---|
484 | if (key.isValid() && key.isWritable()) { |
---|
485 | doAsyncWrite(key); |
---|
486 | } |
---|
487 | } catch (IOException e) { |
---|
488 | LOG.info(getName() + ": doAsyncWrite threw exception " + e); |
---|
489 | } |
---|
490 | } |
---|
491 | long now = System.currentTimeMillis(); |
---|
492 | if (now < lastPurgeTime + PURGE_INTERVAL) { |
---|
493 | continue; |
---|
494 | } |
---|
495 | lastPurgeTime = now; |
---|
496 | // |
---|
497 | // If there were some calls that have not been sent out for a |
---|
498 | // long time, discard them. |
---|
499 | // |
---|
500 | LOG.debug("Checking for old call responses."); |
---|
501 | ArrayList<Call> calls; |
---|
502 | |
---|
503 | // get the list of channels from list of keys. |
---|
504 | synchronized (writeSelector.keys()) { |
---|
505 | calls = new ArrayList<Call>(writeSelector.keys().size()); |
---|
506 | iter = writeSelector.keys().iterator(); |
---|
507 | while (iter.hasNext()) { |
---|
508 | SelectionKey key = iter.next(); |
---|
509 | Call call = (Call)key.attachment(); |
---|
510 | if (call != null && key.channel() == call.connection.channel) { |
---|
511 | calls.add(call); |
---|
512 | } |
---|
513 | } |
---|
514 | } |
---|
515 | |
---|
516 | for(Call call : calls) { |
---|
517 | try { |
---|
518 | doPurge(call, now); |
---|
519 | } catch (IOException e) { |
---|
520 | LOG.warn("Error in purging old calls " + e); |
---|
521 | } |
---|
522 | } |
---|
523 | } catch (OutOfMemoryError e) { |
---|
524 | // |
---|
525 | // we can run out of memory if we have too many threads |
---|
526 | // log the event and sleep for a minute and give |
---|
527 | // some thread(s) a chance to finish |
---|
528 | // |
---|
529 | LOG.warn("Out of Memory in server select", e); |
---|
530 | try { Thread.sleep(60000); } catch (Exception ie) {} |
---|
531 | } catch (Exception e) { |
---|
532 | LOG.warn("Exception in Responder " + |
---|
533 | StringUtils.stringifyException(e)); |
---|
534 | } |
---|
535 | } |
---|
536 | LOG.info("Stopping " + this.getName()); |
---|
537 | } |
---|
538 | |
---|
539 | private void doAsyncWrite(SelectionKey key) throws IOException { |
---|
540 | Call call = (Call)key.attachment(); |
---|
541 | if (call == null) { |
---|
542 | return; |
---|
543 | } |
---|
544 | if (key.channel() != call.connection.channel) { |
---|
545 | throw new IOException("doAsyncWrite: bad channel"); |
---|
546 | } |
---|
547 | |
---|
548 | synchronized(call.connection.responseQueue) { |
---|
549 | if (processResponse(call.connection.responseQueue, false)) { |
---|
550 | try { |
---|
551 | key.interestOps(0); |
---|
552 | } catch (CancelledKeyException e) { |
---|
553 | /* The Listener/reader might have closed the socket. |
---|
554 | * We don't explicitly cancel the key, so not sure if this will |
---|
555 | * ever fire. |
---|
556 | * This warning could be removed. |
---|
557 | */ |
---|
558 | LOG.warn("Exception while changing ops : " + e); |
---|
559 | } |
---|
560 | } |
---|
561 | } |
---|
562 | } |
---|
563 | |
---|
564 | // |
---|
565 | // Remove calls that have been pending in the responseQueue |
---|
566 | // for a long time. |
---|
567 | // |
---|
568 | private void doPurge(Call call, long now) throws IOException { |
---|
569 | LinkedList<Call> responseQueue = call.connection.responseQueue; |
---|
570 | synchronized (responseQueue) { |
---|
571 | Iterator<Call> iter = responseQueue.listIterator(0); |
---|
572 | while (iter.hasNext()) { |
---|
573 | call = iter.next(); |
---|
574 | if (now > call.timestamp + PURGE_INTERVAL) { |
---|
575 | closeConnection(call.connection); |
---|
576 | break; |
---|
577 | } |
---|
578 | } |
---|
579 | } |
---|
580 | } |
---|
581 | |
---|
582 | // Processes one response. Returns true if there are no more pending |
---|
583 | // data for this channel. |
---|
584 | // |
---|
585 | private boolean processResponse(LinkedList<Call> responseQueue, |
---|
586 | boolean inHandler) throws IOException { |
---|
587 | boolean error = true; |
---|
588 | boolean done = false; // there is more data for this channel. |
---|
589 | int numElements = 0; |
---|
590 | Call call = null; |
---|
591 | try { |
---|
592 | synchronized (responseQueue) { |
---|
593 | // |
---|
594 | // If there are no items for this channel, then we are done |
---|
595 | // |
---|
596 | numElements = responseQueue.size(); |
---|
597 | if (numElements == 0) { |
---|
598 | error = false; |
---|
599 | return true; // no more data for this channel. |
---|
600 | } |
---|
601 | // |
---|
602 | // Extract the first call |
---|
603 | // |
---|
604 | call = responseQueue.removeFirst(); |
---|
605 | SocketChannel channel = call.connection.channel; |
---|
606 | if (LOG.isDebugEnabled()) { |
---|
607 | LOG.debug(getName() + ": responding to #" + call.id + " from " + |
---|
608 | call.connection); |
---|
609 | } |
---|
610 | // |
---|
611 | // Send as much data as we can in the non-blocking fashion |
---|
612 | // |
---|
613 | int numBytes = channelWrite(channel, call.response); |
---|
614 | if (numBytes < 0) { |
---|
615 | return true; |
---|
616 | } |
---|
617 | if (!call.response.hasRemaining()) { |
---|
618 | call.connection.decRpcCount(); |
---|
619 | if (numElements == 1) { // last call fully processes. |
---|
620 | done = true; // no more data for this channel. |
---|
621 | } else { |
---|
622 | done = false; // more calls pending to be sent. |
---|
623 | } |
---|
624 | if (LOG.isDebugEnabled()) { |
---|
625 | LOG.debug(getName() + ": responding to #" + call.id + " from " + |
---|
626 | call.connection + " Wrote " + numBytes + " bytes."); |
---|
627 | } |
---|
628 | } else { |
---|
629 | // |
---|
630 | // If we were unable to write the entire response out, then |
---|
631 | // insert in Selector queue. |
---|
632 | // |
---|
633 | call.connection.responseQueue.addFirst(call); |
---|
634 | |
---|
635 | if (inHandler) { |
---|
636 | // set the serve time when the response has to be sent later |
---|
637 | call.timestamp = System.currentTimeMillis(); |
---|
638 | |
---|
639 | incPending(); |
---|
640 | try { |
---|
641 | // Wakeup the thread blocked on select, only then can the call |
---|
642 | // to channel.register() complete. |
---|
643 | writeSelector.wakeup(); |
---|
644 | channel.register(writeSelector, SelectionKey.OP_WRITE, call); |
---|
645 | } catch (ClosedChannelException e) { |
---|
646 | //Its ok. channel might be closed else where. |
---|
647 | done = true; |
---|
648 | } finally { |
---|
649 | decPending(); |
---|
650 | } |
---|
651 | } |
---|
652 | if (LOG.isDebugEnabled()) { |
---|
653 | LOG.debug(getName() + ": responding to #" + call.id + " from " + |
---|
654 | call.connection + " Wrote partial " + numBytes + |
---|
655 | " bytes."); |
---|
656 | } |
---|
657 | } |
---|
658 | error = false; // everything went off well |
---|
659 | } |
---|
660 | } finally { |
---|
661 | if (error && call != null) { |
---|
662 | LOG.warn(getName()+", call " + call + ": output error"); |
---|
663 | done = true; // error. no more data for this channel. |
---|
664 | closeConnection(call.connection); |
---|
665 | } |
---|
666 | } |
---|
667 | return done; |
---|
668 | } |
---|
669 | |
---|
670 | // |
---|
671 | // Enqueue a response from the application. |
---|
672 | // |
---|
673 | void doRespond(Call call) throws IOException { |
---|
674 | synchronized (call.connection.responseQueue) { |
---|
675 | call.connection.responseQueue.addLast(call); |
---|
676 | if (call.connection.responseQueue.size() == 1) { |
---|
677 | processResponse(call.connection.responseQueue, true); |
---|
678 | } |
---|
679 | } |
---|
680 | } |
---|
681 | |
---|
682 | private synchronized void incPending() { // call waiting to be enqueued. |
---|
683 | pending++; |
---|
684 | } |
---|
685 | |
---|
686 | private synchronized void decPending() { // call done enqueueing. |
---|
687 | pending--; |
---|
688 | notify(); |
---|
689 | } |
---|
690 | |
---|
691 | private synchronized void waitPending() throws InterruptedException { |
---|
692 | while (pending > 0) { |
---|
693 | wait(); |
---|
694 | } |
---|
695 | } |
---|
696 | } |
---|
697 | |
---|
698 | /** Reads calls from a connection and queues them for handling. */ |
---|
699 | private class Connection { |
---|
700 | private boolean versionRead = false; //if initial signature and |
---|
701 | //version are read |
---|
702 | private boolean headerRead = false; //if the connection header that |
---|
703 | //follows version is read. |
---|
704 | |
---|
705 | private SocketChannel channel; |
---|
706 | private ByteBuffer data; |
---|
707 | private ByteBuffer dataLengthBuffer; |
---|
708 | private LinkedList<Call> responseQueue; |
---|
709 | private volatile int rpcCount = 0; // number of outstanding rpcs |
---|
710 | private long lastContact; |
---|
711 | private int dataLength; |
---|
712 | private Socket socket; |
---|
713 | // Cache the remote host & port info so that even if the socket is |
---|
714 | // disconnected, we can say where it used to connect to. |
---|
715 | private String hostAddress; |
---|
716 | private int remotePort; |
---|
717 | |
---|
718 | ConnectionHeader header = new ConnectionHeader(); |
---|
719 | Class<?> protocol; |
---|
720 | |
---|
721 | Subject user = null; |
---|
722 | |
---|
723 | // Fake 'call' for failed authorization response |
---|
724 | private final int AUTHROIZATION_FAILED_CALLID = -1; |
---|
725 | private final Call authFailedCall = |
---|
726 | new Call(AUTHROIZATION_FAILED_CALLID, null, null); |
---|
727 | private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); |
---|
728 | |
---|
729 | public Connection(SelectionKey key, SocketChannel channel, |
---|
730 | long lastContact) { |
---|
731 | this.channel = channel; |
---|
732 | this.lastContact = lastContact; |
---|
733 | this.data = null; |
---|
734 | this.dataLengthBuffer = ByteBuffer.allocate(4); |
---|
735 | this.socket = channel.socket(); |
---|
736 | InetAddress addr = socket.getInetAddress(); |
---|
737 | if (addr == null) { |
---|
738 | this.hostAddress = "*Unknown*"; |
---|
739 | } else { |
---|
740 | this.hostAddress = addr.getHostAddress(); |
---|
741 | } |
---|
742 | this.remotePort = socket.getPort(); |
---|
743 | this.responseQueue = new LinkedList<Call>(); |
---|
744 | if (socketSendBufferSize != 0) { |
---|
745 | try { |
---|
746 | socket.setSendBufferSize(socketSendBufferSize); |
---|
747 | } catch (IOException e) { |
---|
748 | LOG.warn("Connection: unable to set socket send buffer size to " + |
---|
749 | socketSendBufferSize); |
---|
750 | } |
---|
751 | } |
---|
752 | } |
---|
753 | |
---|
754 | @Override |
---|
755 | public String toString() { |
---|
756 | return getHostAddress() + ":" + remotePort; |
---|
757 | } |
---|
758 | |
---|
759 | public String getHostAddress() { |
---|
760 | return hostAddress; |
---|
761 | } |
---|
762 | |
---|
763 | public void setLastContact(long lastContact) { |
---|
764 | this.lastContact = lastContact; |
---|
765 | } |
---|
766 | |
---|
767 | public long getLastContact() { |
---|
768 | return lastContact; |
---|
769 | } |
---|
770 | |
---|
771 | /* Return true if the connection has no outstanding rpc */ |
---|
772 | private boolean isIdle() { |
---|
773 | return rpcCount == 0; |
---|
774 | } |
---|
775 | |
---|
776 | /* Decrement the outstanding RPC count */ |
---|
777 | private void decRpcCount() { |
---|
778 | rpcCount--; |
---|
779 | } |
---|
780 | |
---|
781 | /* Increment the outstanding RPC count */ |
---|
782 | private void incRpcCount() { |
---|
783 | rpcCount++; |
---|
784 | } |
---|
785 | |
---|
786 | private boolean timedOut(long currentTime) { |
---|
787 | if (isIdle() && currentTime - lastContact > maxIdleTime) |
---|
788 | return true; |
---|
789 | return false; |
---|
790 | } |
---|
791 | |
---|
792 | public int readAndProcess() throws IOException, InterruptedException { |
---|
793 | while (true) { |
---|
794 | /* Read at most one RPC. If the header is not read completely yet |
---|
795 | * then iterate until we read first RPC or until there is no data left. |
---|
796 | */ |
---|
797 | int count = -1; |
---|
798 | if (dataLengthBuffer.remaining() > 0) { |
---|
799 | count = channelRead(channel, dataLengthBuffer); |
---|
800 | if (count < 0 || dataLengthBuffer.remaining() > 0) |
---|
801 | return count; |
---|
802 | } |
---|
803 | |
---|
804 | if (!versionRead) { |
---|
805 | //Every connection is expected to send the header. |
---|
806 | ByteBuffer versionBuffer = ByteBuffer.allocate(1); |
---|
807 | count = channelRead(channel, versionBuffer); |
---|
808 | if (count <= 0) { |
---|
809 | return count; |
---|
810 | } |
---|
811 | int version = versionBuffer.get(0); |
---|
812 | |
---|
813 | dataLengthBuffer.flip(); |
---|
814 | if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { |
---|
815 | //Warning is ok since this is not supposed to happen. |
---|
816 | LOG.warn("Incorrect header or version mismatch from " + |
---|
817 | hostAddress + ":" + remotePort + |
---|
818 | " got version " + version + |
---|
819 | " expected version " + CURRENT_VERSION); |
---|
820 | return -1; |
---|
821 | } |
---|
822 | dataLengthBuffer.clear(); |
---|
823 | versionRead = true; |
---|
824 | continue; |
---|
825 | } |
---|
826 | |
---|
827 | if (data == null) { |
---|
828 | dataLengthBuffer.flip(); |
---|
829 | dataLength = dataLengthBuffer.getInt(); |
---|
830 | |
---|
831 | if (dataLength == Client.PING_CALL_ID) { |
---|
832 | dataLengthBuffer.clear(); |
---|
833 | return 0; //ping message |
---|
834 | } |
---|
835 | data = ByteBuffer.allocate(dataLength); |
---|
836 | incRpcCount(); // Increment the rpc count |
---|
837 | } |
---|
838 | |
---|
839 | count = channelRead(channel, data); |
---|
840 | |
---|
841 | if (data.remaining() == 0) { |
---|
842 | dataLengthBuffer.clear(); |
---|
843 | data.flip(); |
---|
844 | if (headerRead) { |
---|
845 | processData(); |
---|
846 | data = null; |
---|
847 | return count; |
---|
848 | } else { |
---|
849 | processHeader(); |
---|
850 | headerRead = true; |
---|
851 | data = null; |
---|
852 | |
---|
853 | // Authorize the connection |
---|
854 | try { |
---|
855 | authorize(user, header); |
---|
856 | |
---|
857 | if (LOG.isDebugEnabled()) { |
---|
858 | LOG.debug("Successfully authorized " + header); |
---|
859 | } |
---|
860 | } catch (AuthorizationException ae) { |
---|
861 | authFailedCall.connection = this; |
---|
862 | setupResponse(authFailedResponse, authFailedCall, |
---|
863 | Status.FATAL, null, |
---|
864 | ae.getClass().getName(), ae.getMessage()); |
---|
865 | responder.doRespond(authFailedCall); |
---|
866 | |
---|
867 | // Close this connection |
---|
868 | return -1; |
---|
869 | } |
---|
870 | |
---|
871 | continue; |
---|
872 | } |
---|
873 | } |
---|
874 | return count; |
---|
875 | } |
---|
876 | } |
---|
877 | |
---|
878 | /// Reads the connection header following version |
---|
879 | private void processHeader() throws IOException { |
---|
880 | DataInputStream in = |
---|
881 | new DataInputStream(new ByteArrayInputStream(data.array())); |
---|
882 | header.readFields(in); |
---|
883 | try { |
---|
884 | String protocolClassName = header.getProtocol(); |
---|
885 | if (protocolClassName != null) { |
---|
886 | protocol = getProtocolClass(header.getProtocol(), conf); |
---|
887 | } |
---|
888 | } catch (ClassNotFoundException cnfe) { |
---|
889 | throw new IOException("Unknown protocol: " + header.getProtocol()); |
---|
890 | } |
---|
891 | |
---|
892 | // TODO: Get the user name from the GSS API for Kerberbos-based security |
---|
893 | // Create the user subject |
---|
894 | user = SecurityUtil.getSubject(header.getUgi()); |
---|
895 | } |
---|
896 | |
---|
897 | private void processData() throws IOException, InterruptedException { |
---|
898 | DataInputStream dis = |
---|
899 | new DataInputStream(new ByteArrayInputStream(data.array())); |
---|
900 | int id = dis.readInt(); // try to read an id |
---|
901 | |
---|
902 | if (LOG.isDebugEnabled()) |
---|
903 | LOG.debug(" got #" + id); |
---|
904 | |
---|
905 | Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param |
---|
906 | param.readFields(dis); |
---|
907 | |
---|
908 | Call call = new Call(id, param, this); |
---|
909 | callQueue.put(call); // queue the call; maybe blocked here |
---|
910 | } |
---|
911 | |
---|
912 | private synchronized void close() throws IOException { |
---|
913 | data = null; |
---|
914 | dataLengthBuffer = null; |
---|
915 | if (!channel.isOpen()) |
---|
916 | return; |
---|
917 | try {socket.shutdownOutput();} catch(Exception e) {} |
---|
918 | if (channel.isOpen()) { |
---|
919 | try {channel.close();} catch(Exception e) {} |
---|
920 | } |
---|
921 | try {socket.close();} catch(Exception e) {} |
---|
922 | } |
---|
923 | } |
---|
924 | |
---|
925 | /** Handles queued calls . */ |
---|
926 | private class Handler extends Thread { |
---|
927 | public Handler(int instanceNumber) { |
---|
928 | this.setDaemon(true); |
---|
929 | this.setName("IPC Server handler "+ instanceNumber + " on " + port); |
---|
930 | } |
---|
931 | |
---|
932 | @Override |
---|
933 | public void run() { |
---|
934 | LOG.info(getName() + ": starting"); |
---|
935 | SERVER.set(Server.this); |
---|
936 | ByteArrayOutputStream buf = new ByteArrayOutputStream(10240); |
---|
937 | while (running) { |
---|
938 | try { |
---|
939 | final Call call = callQueue.take(); // pop the queue; maybe blocked here |
---|
940 | |
---|
941 | if (LOG.isDebugEnabled()) |
---|
942 | LOG.debug(getName() + ": has #" + call.id + " from " + |
---|
943 | call.connection); |
---|
944 | |
---|
945 | String errorClass = null; |
---|
946 | String error = null; |
---|
947 | Writable value = null; |
---|
948 | |
---|
949 | CurCall.set(call); |
---|
950 | try { |
---|
951 | // Make the call as the user via Subject.doAs, thus associating |
---|
952 | // the call with the Subject |
---|
953 | value = |
---|
954 | Subject.doAs(call.connection.user, |
---|
955 | new PrivilegedExceptionAction<Writable>() { |
---|
956 | @Override |
---|
957 | public Writable run() throws Exception { |
---|
958 | // make the call |
---|
959 | return call(call.connection.protocol, |
---|
960 | call.param, call.timestamp); |
---|
961 | |
---|
962 | } |
---|
963 | } |
---|
964 | ); |
---|
965 | |
---|
966 | } catch (PrivilegedActionException pae) { |
---|
967 | Exception e = pae.getException(); |
---|
968 | LOG.info(getName()+", call "+call+": error: " + e, e); |
---|
969 | errorClass = e.getClass().getName(); |
---|
970 | error = StringUtils.stringifyException(e); |
---|
971 | } catch (Throwable e) { |
---|
972 | LOG.info(getName()+", call "+call+": error: " + e, e); |
---|
973 | errorClass = e.getClass().getName(); |
---|
974 | error = StringUtils.stringifyException(e); |
---|
975 | } |
---|
976 | CurCall.set(null); |
---|
977 | |
---|
978 | setupResponse(buf, call, |
---|
979 | (error == null) ? Status.SUCCESS : Status.ERROR, |
---|
980 | value, errorClass, error); |
---|
981 | responder.doRespond(call); |
---|
982 | } catch (InterruptedException e) { |
---|
983 | if (running) { // unexpected -- log it |
---|
984 | LOG.info(getName() + " caught: " + |
---|
985 | StringUtils.stringifyException(e)); |
---|
986 | } |
---|
987 | } catch (Exception e) { |
---|
988 | LOG.info(getName() + " caught: " + |
---|
989 | StringUtils.stringifyException(e)); |
---|
990 | } |
---|
991 | } |
---|
992 | LOG.info(getName() + ": exiting"); |
---|
993 | } |
---|
994 | |
---|
995 | } |
---|
996 | |
---|
997 | protected Server(String bindAddress, int port, |
---|
998 | Class<? extends Writable> paramClass, int handlerCount, |
---|
999 | Configuration conf) |
---|
1000 | throws IOException |
---|
1001 | { |
---|
1002 | this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port)); |
---|
1003 | } |
---|
1004 | /** Constructs a server listening on the named port and address. Parameters passed must |
---|
1005 | * be of the named class. The <code>handlerCount</handlerCount> determines |
---|
1006 | * the number of handler threads that will be used to process calls. |
---|
1007 | * |
---|
1008 | */ |
---|
1009 | protected Server(String bindAddress, int port, |
---|
1010 | Class<? extends Writable> paramClass, int handlerCount, |
---|
1011 | Configuration conf, String serverName) |
---|
1012 | throws IOException { |
---|
1013 | this.bindAddress = bindAddress; |
---|
1014 | this.conf = conf; |
---|
1015 | this.port = port; |
---|
1016 | this.paramClass = paramClass; |
---|
1017 | this.handlerCount = handlerCount; |
---|
1018 | this.socketSendBufferSize = 0; |
---|
1019 | this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; |
---|
1020 | this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); |
---|
1021 | this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); |
---|
1022 | this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); |
---|
1023 | this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); |
---|
1024 | |
---|
1025 | // Start the listener here and let it bind to the port |
---|
1026 | listener = new Listener(); |
---|
1027 | this.port = listener.getAddress().getPort(); |
---|
1028 | this.rpcMetrics = new RpcMetrics(serverName, |
---|
1029 | Integer.toString(this.port), this); |
---|
1030 | this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); |
---|
1031 | |
---|
1032 | |
---|
1033 | // Create the responder here |
---|
1034 | responder = new Responder(); |
---|
1035 | } |
---|
1036 | |
---|
1037 | private void closeConnection(Connection connection) { |
---|
1038 | synchronized (connectionList) { |
---|
1039 | if (connectionList.remove(connection)) |
---|
1040 | numConnections--; |
---|
1041 | } |
---|
1042 | try { |
---|
1043 | connection.close(); |
---|
1044 | } catch (IOException e) { |
---|
1045 | } |
---|
1046 | } |
---|
1047 | |
---|
1048 | /** |
---|
1049 | * Setup response for the IPC Call. |
---|
1050 | * |
---|
1051 | * @param response buffer to serialize the response into |
---|
1052 | * @param call {@link Call} to which we are setting up the response |
---|
1053 | * @param status {@link Status} of the IPC call |
---|
1054 | * @param rv return value for the IPC Call, if the call was successful |
---|
1055 | * @param errorClass error class, if the the call failed |
---|
1056 | * @param error error message, if the call failed |
---|
1057 | * @throws IOException |
---|
1058 | */ |
---|
1059 | private void setupResponse(ByteArrayOutputStream response, |
---|
1060 | Call call, Status status, |
---|
1061 | Writable rv, String errorClass, String error) |
---|
1062 | throws IOException { |
---|
1063 | response.reset(); |
---|
1064 | DataOutputStream out = new DataOutputStream(response); |
---|
1065 | out.writeInt(call.id); // write call id |
---|
1066 | out.writeInt(status.state); // write status |
---|
1067 | |
---|
1068 | if (status == Status.SUCCESS) { |
---|
1069 | rv.write(out); |
---|
1070 | } else { |
---|
1071 | WritableUtils.writeString(out, errorClass); |
---|
1072 | WritableUtils.writeString(out, error); |
---|
1073 | } |
---|
1074 | call.setResponse(ByteBuffer.wrap(response.toByteArray())); |
---|
1075 | } |
---|
1076 | |
---|
1077 | Configuration getConf() { |
---|
1078 | return conf; |
---|
1079 | } |
---|
1080 | |
---|
1081 | /** Sets the socket buffer size used for responding to RPCs */ |
---|
1082 | public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } |
---|
1083 | |
---|
1084 | /** Starts the service. Must be called before any calls will be handled. */ |
---|
1085 | public synchronized void start() throws IOException { |
---|
1086 | responder.start(); |
---|
1087 | listener.start(); |
---|
1088 | handlers = new Handler[handlerCount]; |
---|
1089 | |
---|
1090 | for (int i = 0; i < handlerCount; i++) { |
---|
1091 | handlers[i] = new Handler(i); |
---|
1092 | handlers[i].start(); |
---|
1093 | } |
---|
1094 | } |
---|
1095 | |
---|
1096 | /** Stops the service. No new calls will be handled after this is called. */ |
---|
1097 | public synchronized void stop() { |
---|
1098 | LOG.info("Stopping server on " + port); |
---|
1099 | running = false; |
---|
1100 | if (handlers != null) { |
---|
1101 | for (int i = 0; i < handlerCount; i++) { |
---|
1102 | if (handlers[i] != null) { |
---|
1103 | handlers[i].interrupt(); |
---|
1104 | } |
---|
1105 | } |
---|
1106 | } |
---|
1107 | listener.interrupt(); |
---|
1108 | listener.doStop(); |
---|
1109 | responder.interrupt(); |
---|
1110 | notifyAll(); |
---|
1111 | if (this.rpcMetrics != null) { |
---|
1112 | this.rpcMetrics.shutdown(); |
---|
1113 | } |
---|
1114 | } |
---|
1115 | |
---|
1116 | /** Wait for the server to be stopped. |
---|
1117 | * Does not wait for all subthreads to finish. |
---|
1118 | * See {@link #stop()}. |
---|
1119 | */ |
---|
1120 | public synchronized void join() throws InterruptedException { |
---|
1121 | while (running) { |
---|
1122 | wait(); |
---|
1123 | } |
---|
1124 | } |
---|
1125 | |
---|
1126 | /** |
---|
1127 | * Return the socket (ip+port) on which the RPC server is listening to. |
---|
1128 | * @return the socket (ip+port) on which the RPC server is listening to. |
---|
1129 | */ |
---|
1130 | public synchronized InetSocketAddress getListenerAddress() { |
---|
1131 | return listener.getAddress(); |
---|
1132 | } |
---|
1133 | |
---|
1134 | /** |
---|
1135 | * Called for each call. |
---|
1136 | * @deprecated Use {@link #call(Class, Writable, long)} instead |
---|
1137 | */ |
---|
1138 | @Deprecated |
---|
1139 | public Writable call(Writable param, long receiveTime) throws IOException { |
---|
1140 | return call(null, param, receiveTime); |
---|
1141 | } |
---|
1142 | |
---|
1143 | /** Called for each call. */ |
---|
1144 | public abstract Writable call(Class<?> protocol, |
---|
1145 | Writable param, long receiveTime) |
---|
1146 | throws IOException; |
---|
1147 | |
---|
1148 | /** |
---|
1149 | * Authorize the incoming client connection. |
---|
1150 | * |
---|
1151 | * @param user client user |
---|
1152 | * @param connection incoming connection |
---|
1153 | * @throws AuthorizationException when the client isn't authorized to talk the protocol |
---|
1154 | */ |
---|
1155 | public void authorize(Subject user, ConnectionHeader connection) |
---|
1156 | throws AuthorizationException {} |
---|
1157 | |
---|
1158 | /** |
---|
1159 | * The number of open RPC conections |
---|
1160 | * @return the number of open rpc connections |
---|
1161 | */ |
---|
1162 | public int getNumOpenConnections() { |
---|
1163 | return numConnections; |
---|
1164 | } |
---|
1165 | |
---|
1166 | /** |
---|
1167 | * The number of rpc calls in the queue. |
---|
1168 | * @return The number of rpc calls in the queue. |
---|
1169 | */ |
---|
1170 | public int getCallQueueLen() { |
---|
1171 | return callQueue.size(); |
---|
1172 | } |
---|
1173 | |
---|
1174 | |
---|
1175 | /** |
---|
1176 | * When the read or write buffer size is larger than this limit, i/o will be |
---|
1177 | * done in chunks of this size. Most RPC requests and responses would be |
---|
1178 | * be smaller. |
---|
1179 | */ |
---|
1180 | private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB. |
---|
1181 | |
---|
1182 | /** |
---|
1183 | * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}. |
---|
1184 | * If the amount of data is large, it writes to channel in smaller chunks. |
---|
1185 | * This is to avoid jdk from creating many direct buffers as the size of |
---|
1186 | * buffer increases. This also minimizes extra copies in NIO layer |
---|
1187 | * as a result of multiple write operations required to write a large |
---|
1188 | * buffer. |
---|
1189 | * |
---|
1190 | * @see WritableByteChannel#write(ByteBuffer) |
---|
1191 | */ |
---|
1192 | private static int channelWrite(WritableByteChannel channel, |
---|
1193 | ByteBuffer buffer) throws IOException { |
---|
1194 | |
---|
1195 | return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? |
---|
1196 | channel.write(buffer) : channelIO(null, channel, buffer); |
---|
1197 | } |
---|
1198 | |
---|
1199 | |
---|
1200 | /** |
---|
1201 | * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}. |
---|
1202 | * If the amount of data is large, it writes to channel in smaller chunks. |
---|
1203 | * This is to avoid jdk from creating many direct buffers as the size of |
---|
1204 | * ByteBuffer increases. There should not be any performance degredation. |
---|
1205 | * |
---|
1206 | * @see ReadableByteChannel#read(ByteBuffer) |
---|
1207 | */ |
---|
1208 | private static int channelRead(ReadableByteChannel channel, |
---|
1209 | ByteBuffer buffer) throws IOException { |
---|
1210 | |
---|
1211 | return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? |
---|
1212 | channel.read(buffer) : channelIO(channel, null, buffer); |
---|
1213 | } |
---|
1214 | |
---|
1215 | /** |
---|
1216 | * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)} |
---|
1217 | * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only |
---|
1218 | * one of readCh or writeCh should be non-null. |
---|
1219 | * |
---|
1220 | * @see #channelRead(ReadableByteChannel, ByteBuffer) |
---|
1221 | * @see #channelWrite(WritableByteChannel, ByteBuffer) |
---|
1222 | */ |
---|
1223 | private static int channelIO(ReadableByteChannel readCh, |
---|
1224 | WritableByteChannel writeCh, |
---|
1225 | ByteBuffer buf) throws IOException { |
---|
1226 | |
---|
1227 | int originalLimit = buf.limit(); |
---|
1228 | int initialRemaining = buf.remaining(); |
---|
1229 | int ret = 0; |
---|
1230 | |
---|
1231 | while (buf.remaining() > 0) { |
---|
1232 | try { |
---|
1233 | int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); |
---|
1234 | buf.limit(buf.position() + ioSize); |
---|
1235 | |
---|
1236 | ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); |
---|
1237 | |
---|
1238 | if (ret < ioSize) { |
---|
1239 | break; |
---|
1240 | } |
---|
1241 | |
---|
1242 | } finally { |
---|
1243 | buf.limit(originalLimit); |
---|
1244 | } |
---|
1245 | } |
---|
1246 | |
---|
1247 | int nBytes = initialRemaining - buf.remaining(); |
---|
1248 | return (nBytes > 0) ? nBytes : ret; |
---|
1249 | } |
---|
1250 | } |
---|