source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/ipc/Server.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 42.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.ipc;
20
21import java.io.IOException;
22import java.io.DataInputStream;
23import java.io.DataOutputStream;
24import java.io.ByteArrayInputStream;
25import java.io.ByteArrayOutputStream;
26
27import java.nio.ByteBuffer;
28import java.nio.channels.CancelledKeyException;
29import java.nio.channels.ClosedChannelException;
30import java.nio.channels.ReadableByteChannel;
31import java.nio.channels.SelectionKey;
32import java.nio.channels.Selector;
33import java.nio.channels.ServerSocketChannel;
34import java.nio.channels.SocketChannel;
35import java.nio.channels.WritableByteChannel;
36
37import java.net.BindException;
38import java.net.InetAddress;
39import java.net.InetSocketAddress;
40import java.net.ServerSocket;
41import java.net.Socket;
42import java.net.SocketException;
43import java.net.UnknownHostException;
44
45import java.security.PrivilegedActionException;
46import java.security.PrivilegedExceptionAction;
47import java.util.ArrayList;
48import java.util.Collections;
49import java.util.LinkedList;
50import java.util.List;
51import java.util.Iterator;
52import java.util.Map;
53import java.util.Random;
54import java.util.concurrent.BlockingQueue;
55import java.util.concurrent.ConcurrentHashMap;
56import java.util.concurrent.LinkedBlockingQueue;
57
58import javax.security.auth.Subject;
59
60import org.apache.commons.logging.Log;
61import org.apache.commons.logging.LogFactory;
62import org.apache.hadoop.conf.Configuration;
63import org.apache.hadoop.security.SecurityUtil;
64import org.apache.hadoop.io.Writable;
65import org.apache.hadoop.io.WritableUtils;
66import org.apache.hadoop.util.ReflectionUtils;
67import org.apache.hadoop.util.StringUtils;
68import org.apache.hadoop.ipc.metrics.RpcMetrics;
69import org.apache.hadoop.security.authorize.AuthorizationException;
70
71/** An abstract IPC service.  IPC calls take a single {@link Writable} as a
72 * parameter, and return a {@link Writable} as their value.  A service runs on
73 * a port and is defined by a parameter class and a value class.
74 *
75 * @see Client
76 */
77public abstract class Server {
78 
79  /**
80   * The first four bytes of Hadoop RPC connections
81   */
82  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
83 
84  // 1 : Introduce ping and server does not throw away RPCs
85  // 3 : Introduce the protocol into the RPC connection header
86  public static final byte CURRENT_VERSION = 3;
87 
88  /**
89   * How many calls/handler are allowed in the queue.
90   */
91  private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
92 
93  public static final Log LOG = LogFactory.getLog(Server.class);
94
95  private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
96
97  private static final Map<String, Class<?>> PROTOCOL_CACHE = 
98    new ConcurrentHashMap<String, Class<?>>();
99 
100  static Class<?> getProtocolClass(String protocolName, Configuration conf) 
101  throws ClassNotFoundException {
102    Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
103    if (protocol == null) {
104      protocol = conf.getClassByName(protocolName);
105      PROTOCOL_CACHE.put(protocolName, protocol);
106    }
107    return protocol;
108  }
109 
110  /** Returns the server instance called under or null.  May be called under
111   * {@link #call(Writable, long)} implementations, and under {@link Writable}
112   * methods of paramters and return values.  Permits applications to access
113   * the server context.*/
114  public static Server get() {
115    return SERVER.get();
116  }
117 
118  /** This is set to Call object before Handler invokes an RPC and reset
119   * after the call returns.
120   */
121  private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
122 
123  /** Returns the remote side ip address when invoked inside an RPC
124   *  Returns null incase of an error.
125   */
126  public static InetAddress getRemoteIp() {
127    Call call = CurCall.get();
128    if (call != null) {
129      return call.connection.socket.getInetAddress();
130    }
131    return null;
132  }
133  /** Returns remote address as a string when invoked inside an RPC.
134   *  Returns null in case of an error.
135   */
136  public static String getRemoteAddress() {
137    InetAddress addr = getRemoteIp();
138    return (addr == null) ? null : addr.getHostAddress();
139  }
140
141  private String bindAddress; 
142  private int port;                               // port we listen on
143  private int handlerCount;                       // number of handler threads
144  private Class<? extends Writable> paramClass;   // class of call parameters
145  private int maxIdleTime;                        // the maximum idle time after
146                                                  // which a client may be disconnected
147  private int thresholdIdleConnections;           // the number of idle connections
148                                                  // after which we will start
149                                                  // cleaning up idle
150                                                  // connections
151  int maxConnectionsToNuke;                       // the max number of
152                                                  // connections to nuke
153                                                  //during a cleanup
154 
155  protected RpcMetrics  rpcMetrics;
156 
157  private Configuration conf;
158
159  private int maxQueueSize;
160  private int socketSendBufferSize;
161  private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
162
163  volatile private boolean running = true;         // true while server runs
164  private BlockingQueue<Call> callQueue; // queued calls
165
166  private List<Connection> connectionList = 
167    Collections.synchronizedList(new LinkedList<Connection>());
168  //maintain a list
169  //of client connections
170  private Listener listener = null;
171  private Responder responder = null;
172  private int numConnections = 0;
173  private Handler[] handlers = null;
174
175  /**
176   * A convenience method to bind to a given address and report
177   * better exceptions if the address is not a valid host.
178   * @param socket the socket to bind
179   * @param address the address to bind to
180   * @param backlog the number of connections allowed in the queue
181   * @throws BindException if the address can't be bound
182   * @throws UnknownHostException if the address isn't a valid host name
183   * @throws IOException other random errors from bind
184   */
185  public static void bind(ServerSocket socket, InetSocketAddress address, 
186                          int backlog) throws IOException {
187    try {
188      socket.bind(address, backlog);
189    } catch (BindException e) {
190      BindException bindException = new BindException("Problem binding to " + address
191                                                      + " : " + e.getMessage());
192      bindException.initCause(e);
193      throw bindException;
194    } catch (SocketException e) {
195      // If they try to bind to a different host's address, give a better
196      // error message.
197      if ("Unresolved address".equals(e.getMessage())) {
198        throw new UnknownHostException("Invalid hostname for server: " + 
199                                       address.getHostName());
200      } else {
201        throw e;
202      }
203    }
204  }
205
206  /** A call queued for handling. */
207  private static class Call {
208    private int id;                               // the client's call id
209    private Writable param;                       // the parameter passed
210    private Connection connection;                // connection to client
211    private long timestamp;     // the time received when response is null
212                                   // the time served when response is not null
213    private ByteBuffer response;                      // the response for this call
214
215    public Call(int id, Writable param, Connection connection) { 
216      this.id = id;
217      this.param = param;
218      this.connection = connection;
219      this.timestamp = System.currentTimeMillis();
220      this.response = null;
221    }
222   
223    @Override
224    public String toString() {
225      return param.toString() + " from " + connection.toString();
226    }
227
228    public void setResponse(ByteBuffer response) {
229      this.response = response;
230    }
231  }
232
233  /** Listens on the socket. Creates jobs for the handler threads*/
234  private class Listener extends Thread {
235   
236    private ServerSocketChannel acceptChannel = null; //the accept channel
237    private Selector selector = null; //the selector that we use for the server
238    private InetSocketAddress address; //the address we bind at
239    private Random rand = new Random();
240    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
241                                         //-tion (for idle connections) ran
242    private long cleanupInterval = 10000; //the minimum interval between
243                                          //two cleanup runs
244    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
245   
246    public Listener() throws IOException {
247      address = new InetSocketAddress(bindAddress, port);
248      // Create a new server socket and set to non blocking mode
249      acceptChannel = ServerSocketChannel.open();
250      acceptChannel.configureBlocking(false);
251
252      // Bind the server socket to the local host and port
253      bind(acceptChannel.socket(), address, backlogLength);
254      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
255      // create a selector;
256      selector= Selector.open();
257
258      // Register accepts on the server socket with the selector.
259      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
260      this.setName("IPC Server listener on " + port);
261      this.setDaemon(true);
262    }
263    /** cleanup connections from connectionList. Choose a random range
264     * to scan and also have a limit on the number of the connections
265     * that will be cleanedup per run. The criteria for cleanup is the time
266     * for which the connection was idle. If 'force' is true then all
267     * connections will be looked at for the cleanup.
268     */
269    private void cleanupConnections(boolean force) {
270      if (force || numConnections > thresholdIdleConnections) {
271        long currentTime = System.currentTimeMillis();
272        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
273          return;
274        }
275        int start = 0;
276        int end = numConnections - 1;
277        if (!force) {
278          start = rand.nextInt() % numConnections;
279          end = rand.nextInt() % numConnections;
280          int temp;
281          if (end < start) {
282            temp = start;
283            start = end;
284            end = temp;
285          }
286        }
287        int i = start;
288        int numNuked = 0;
289        while (i <= end) {
290          Connection c;
291          synchronized (connectionList) {
292            try {
293              c = connectionList.get(i);
294            } catch (Exception e) {return;}
295          }
296          if (c.timedOut(currentTime)) {
297            if (LOG.isDebugEnabled())
298              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
299            closeConnection(c);
300            numNuked++;
301            end--;
302            c = null;
303            if (!force && numNuked == maxConnectionsToNuke) break;
304          }
305          else i++;
306        }
307        lastCleanupRunTime = System.currentTimeMillis();
308      }
309    }
310
311    @Override
312    public void run() {
313      LOG.info(getName() + ": starting");
314      SERVER.set(Server.this);
315      while (running) {
316        SelectionKey key = null;
317        try {
318          selector.select();
319          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
320          while (iter.hasNext()) {
321            key = iter.next();
322            iter.remove();
323            try {
324              if (key.isValid()) {
325                if (key.isAcceptable())
326                  doAccept(key);
327                else if (key.isReadable())
328                  doRead(key);
329              }
330            } catch (IOException e) {
331            }
332            key = null;
333          }
334        } catch (OutOfMemoryError e) {
335          // we can run out of memory if we have too many threads
336          // log the event and sleep for a minute and give
337          // some thread(s) a chance to finish
338          LOG.warn("Out of Memory in server select", e);
339          closeCurrentConnection(key, e);
340          cleanupConnections(true);
341          try { Thread.sleep(60000); } catch (Exception ie) {}
342        } catch (InterruptedException e) {
343          if (running) {                          // unexpected -- log it
344            LOG.info(getName() + " caught: " +
345                     StringUtils.stringifyException(e));
346          }
347        } catch (Exception e) {
348          closeCurrentConnection(key, e);
349        }
350        cleanupConnections(false);
351      }
352      LOG.info("Stopping " + this.getName());
353
354      synchronized (this) {
355        try {
356          acceptChannel.close();
357          selector.close();
358        } catch (IOException e) { }
359
360        selector= null;
361        acceptChannel= null;
362       
363        // clean up all connections
364        while (!connectionList.isEmpty()) {
365          closeConnection(connectionList.remove(0));
366        }
367      }
368    }
369
370    private void closeCurrentConnection(SelectionKey key, Throwable e) {
371      if (key != null) {
372        Connection c = (Connection)key.attachment();
373        if (c != null) {
374          if (LOG.isDebugEnabled())
375            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
376          closeConnection(c);
377          c = null;
378        }
379      }
380    }
381
382    InetSocketAddress getAddress() {
383      return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
384    }
385   
386    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
387      Connection c = null;
388      ServerSocketChannel server = (ServerSocketChannel) key.channel();
389      // accept up to 10 connections
390      for (int i=0; i<10; i++) {
391        SocketChannel channel = server.accept();
392        if (channel==null) return;
393
394        channel.configureBlocking(false);
395        channel.socket().setTcpNoDelay(tcpNoDelay);
396        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
397        c = new Connection(readKey, channel, System.currentTimeMillis());
398        readKey.attach(c);
399        synchronized (connectionList) {
400          connectionList.add(numConnections, c);
401          numConnections++;
402        }
403        if (LOG.isDebugEnabled())
404          LOG.debug("Server connection from " + c.toString() +
405              "; # active connections: " + numConnections +
406              "; # queued calls: " + callQueue.size());
407      }
408    }
409
410    void doRead(SelectionKey key) throws InterruptedException {
411      int count = 0;
412      Connection c = (Connection)key.attachment();
413      if (c == null) {
414        return; 
415      }
416      c.setLastContact(System.currentTimeMillis());
417     
418      try {
419        count = c.readAndProcess();
420      } catch (InterruptedException ieo) {
421        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
422        throw ieo;
423      } catch (Exception e) {
424        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
425        count = -1; //so that the (count < 0) block is executed
426      }
427      if (count < 0) {
428        if (LOG.isDebugEnabled())
429          LOG.debug(getName() + ": disconnecting client " + 
430                    c.getHostAddress() + ". Number of active connections: "+
431                    numConnections);
432        closeConnection(c);
433        c = null;
434      }
435      else {
436        c.setLastContact(System.currentTimeMillis());
437      }
438    }   
439
440    synchronized void doStop() {
441      if (selector != null) {
442        selector.wakeup();
443        Thread.yield();
444      }
445      if (acceptChannel != null) {
446        try {
447          acceptChannel.socket().close();
448        } catch (IOException e) {
449          LOG.info(getName() + ":Exception in closing listener socket. " + e);
450        }
451      }
452    }
453  }
454
455  // Sends responses of RPC back to clients.
456  private class Responder extends Thread {
457    private Selector writeSelector;
458    private int pending;         // connections waiting to register
459   
460    final static int PURGE_INTERVAL = 900000; // 15mins
461
462    Responder() throws IOException {
463      this.setName("IPC Server Responder");
464      this.setDaemon(true);
465      writeSelector = Selector.open(); // create a selector
466      pending = 0;
467    }
468
469    @Override
470    public void run() {
471      LOG.info(getName() + ": starting");
472      SERVER.set(Server.this);
473      long lastPurgeTime = 0;   // last check for old calls.
474
475      while (running) {
476        try {
477          waitPending();     // If a channel is being registered, wait.
478          writeSelector.select(PURGE_INTERVAL);
479          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
480          while (iter.hasNext()) {
481            SelectionKey key = iter.next();
482            iter.remove();
483            try {
484              if (key.isValid() && key.isWritable()) {
485                  doAsyncWrite(key);
486              }
487            } catch (IOException e) {
488              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
489            }
490          }
491          long now = System.currentTimeMillis();
492          if (now < lastPurgeTime + PURGE_INTERVAL) {
493            continue;
494          }
495          lastPurgeTime = now;
496          //
497          // If there were some calls that have not been sent out for a
498          // long time, discard them.
499          //
500          LOG.debug("Checking for old call responses.");
501          ArrayList<Call> calls;
502         
503          // get the list of channels from list of keys.
504          synchronized (writeSelector.keys()) {
505            calls = new ArrayList<Call>(writeSelector.keys().size());
506            iter = writeSelector.keys().iterator();
507            while (iter.hasNext()) {
508              SelectionKey key = iter.next();
509              Call call = (Call)key.attachment();
510              if (call != null && key.channel() == call.connection.channel) { 
511                calls.add(call);
512              }
513            }
514          }
515         
516          for(Call call : calls) {
517            try {
518              doPurge(call, now);
519            } catch (IOException e) {
520              LOG.warn("Error in purging old calls " + e);
521            }
522          }
523        } catch (OutOfMemoryError e) {
524          //
525          // we can run out of memory if we have too many threads
526          // log the event and sleep for a minute and give
527          // some thread(s) a chance to finish
528          //
529          LOG.warn("Out of Memory in server select", e);
530          try { Thread.sleep(60000); } catch (Exception ie) {}
531        } catch (Exception e) {
532          LOG.warn("Exception in Responder " + 
533                   StringUtils.stringifyException(e));
534        }
535      }
536      LOG.info("Stopping " + this.getName());
537    }
538
539    private void doAsyncWrite(SelectionKey key) throws IOException {
540      Call call = (Call)key.attachment();
541      if (call == null) {
542        return;
543      }
544      if (key.channel() != call.connection.channel) {
545        throw new IOException("doAsyncWrite: bad channel");
546      }
547
548      synchronized(call.connection.responseQueue) {
549        if (processResponse(call.connection.responseQueue, false)) {
550          try {
551            key.interestOps(0);
552          } catch (CancelledKeyException e) {
553            /* The Listener/reader might have closed the socket.
554             * We don't explicitly cancel the key, so not sure if this will
555             * ever fire.
556             * This warning could be removed.
557             */
558            LOG.warn("Exception while changing ops : " + e);
559          }
560        }
561      }
562    }
563
564    //
565    // Remove calls that have been pending in the responseQueue
566    // for a long time.
567    //
568    private void doPurge(Call call, long now) throws IOException {
569      LinkedList<Call> responseQueue = call.connection.responseQueue;
570      synchronized (responseQueue) {
571        Iterator<Call> iter = responseQueue.listIterator(0);
572        while (iter.hasNext()) {
573          call = iter.next();
574          if (now > call.timestamp + PURGE_INTERVAL) {
575            closeConnection(call.connection);
576            break;
577          }
578        }
579      }
580    }
581
582    // Processes one response. Returns true if there are no more pending
583    // data for this channel.
584    //
585    private boolean processResponse(LinkedList<Call> responseQueue,
586                                    boolean inHandler) throws IOException {
587      boolean error = true;
588      boolean done = false;       // there is more data for this channel.
589      int numElements = 0;
590      Call call = null;
591      try {
592        synchronized (responseQueue) {
593          //
594          // If there are no items for this channel, then we are done
595          //
596          numElements = responseQueue.size();
597          if (numElements == 0) {
598            error = false;
599            return true;              // no more data for this channel.
600          }
601          //
602          // Extract the first call
603          //
604          call = responseQueue.removeFirst();
605          SocketChannel channel = call.connection.channel;
606          if (LOG.isDebugEnabled()) {
607            LOG.debug(getName() + ": responding to #" + call.id + " from " +
608                      call.connection);
609          }
610          //
611          // Send as much data as we can in the non-blocking fashion
612          //
613          int numBytes = channelWrite(channel, call.response);
614          if (numBytes < 0) {
615            return true;
616          }
617          if (!call.response.hasRemaining()) {
618            call.connection.decRpcCount();
619            if (numElements == 1) {    // last call fully processes.
620              done = true;             // no more data for this channel.
621            } else {
622              done = false;            // more calls pending to be sent.
623            }
624            if (LOG.isDebugEnabled()) {
625              LOG.debug(getName() + ": responding to #" + call.id + " from " +
626                        call.connection + " Wrote " + numBytes + " bytes.");
627            }
628          } else {
629            //
630            // If we were unable to write the entire response out, then
631            // insert in Selector queue.
632            //
633            call.connection.responseQueue.addFirst(call);
634           
635            if (inHandler) {
636              // set the serve time when the response has to be sent later
637              call.timestamp = System.currentTimeMillis();
638             
639              incPending();
640              try {
641                // Wakeup the thread blocked on select, only then can the call
642                // to channel.register() complete.
643                writeSelector.wakeup();
644                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
645              } catch (ClosedChannelException e) {
646                //Its ok. channel might be closed else where.
647                done = true;
648              } finally {
649                decPending();
650              }
651            }
652            if (LOG.isDebugEnabled()) {
653              LOG.debug(getName() + ": responding to #" + call.id + " from " +
654                        call.connection + " Wrote partial " + numBytes + 
655                        " bytes.");
656            }
657          }
658          error = false;              // everything went off well
659        }
660      } finally {
661        if (error && call != null) {
662          LOG.warn(getName()+", call " + call + ": output error");
663          done = true;               // error. no more data for this channel.
664          closeConnection(call.connection);
665        }
666      }
667      return done;
668    }
669
670    //
671    // Enqueue a response from the application.
672    //
673    void doRespond(Call call) throws IOException {
674      synchronized (call.connection.responseQueue) {
675        call.connection.responseQueue.addLast(call);
676        if (call.connection.responseQueue.size() == 1) {
677          processResponse(call.connection.responseQueue, true);
678        }
679      }
680    }
681
682    private synchronized void incPending() {   // call waiting to be enqueued.
683      pending++;
684    }
685
686    private synchronized void decPending() { // call done enqueueing.
687      pending--;
688      notify();
689    }
690
691    private synchronized void waitPending() throws InterruptedException {
692      while (pending > 0) {
693        wait();
694      }
695    }
696  }
697
698  /** Reads calls from a connection and queues them for handling. */
699  private class Connection {
700    private boolean versionRead = false; //if initial signature and
701                                         //version are read
702    private boolean headerRead = false;  //if the connection header that
703                                         //follows version is read.
704
705    private SocketChannel channel;
706    private ByteBuffer data;
707    private ByteBuffer dataLengthBuffer;
708    private LinkedList<Call> responseQueue;
709    private volatile int rpcCount = 0; // number of outstanding rpcs
710    private long lastContact;
711    private int dataLength;
712    private Socket socket;
713    // Cache the remote host & port info so that even if the socket is
714    // disconnected, we can say where it used to connect to.
715    private String hostAddress;
716    private int remotePort;
717   
718    ConnectionHeader header = new ConnectionHeader();
719    Class<?> protocol;
720   
721    Subject user = null;
722
723    // Fake 'call' for failed authorization response
724    private final int AUTHROIZATION_FAILED_CALLID = -1;
725    private final Call authFailedCall = 
726      new Call(AUTHROIZATION_FAILED_CALLID, null, null);
727    private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
728   
729    public Connection(SelectionKey key, SocketChannel channel, 
730                      long lastContact) {
731      this.channel = channel;
732      this.lastContact = lastContact;
733      this.data = null;
734      this.dataLengthBuffer = ByteBuffer.allocate(4);
735      this.socket = channel.socket();
736      InetAddress addr = socket.getInetAddress();
737      if (addr == null) {
738        this.hostAddress = "*Unknown*";
739      } else {
740        this.hostAddress = addr.getHostAddress();
741      }
742      this.remotePort = socket.getPort();
743      this.responseQueue = new LinkedList<Call>();
744      if (socketSendBufferSize != 0) {
745        try {
746          socket.setSendBufferSize(socketSendBufferSize);
747        } catch (IOException e) {
748          LOG.warn("Connection: unable to set socket send buffer size to " +
749                   socketSendBufferSize);
750        }
751      }
752    }   
753
754    @Override
755    public String toString() {
756      return getHostAddress() + ":" + remotePort; 
757    }
758   
759    public String getHostAddress() {
760      return hostAddress;
761    }
762
763    public void setLastContact(long lastContact) {
764      this.lastContact = lastContact;
765    }
766
767    public long getLastContact() {
768      return lastContact;
769    }
770
771    /* Return true if the connection has no outstanding rpc */
772    private boolean isIdle() {
773      return rpcCount == 0;
774    }
775   
776    /* Decrement the outstanding RPC count */
777    private void decRpcCount() {
778      rpcCount--;
779    }
780   
781    /* Increment the outstanding RPC count */
782    private void incRpcCount() {
783      rpcCount++;
784    }
785   
786    private boolean timedOut(long currentTime) {
787      if (isIdle() && currentTime -  lastContact > maxIdleTime)
788        return true;
789      return false;
790    }
791
792    public int readAndProcess() throws IOException, InterruptedException {
793      while (true) {
794        /* Read at most one RPC. If the header is not read completely yet
795         * then iterate until we read first RPC or until there is no data left.
796         */   
797        int count = -1;
798        if (dataLengthBuffer.remaining() > 0) {
799          count = channelRead(channel, dataLengthBuffer);       
800          if (count < 0 || dataLengthBuffer.remaining() > 0) 
801            return count;
802        }
803     
804        if (!versionRead) {
805          //Every connection is expected to send the header.
806          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
807          count = channelRead(channel, versionBuffer);
808          if (count <= 0) {
809            return count;
810          }
811          int version = versionBuffer.get(0);
812         
813          dataLengthBuffer.flip();         
814          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
815            //Warning is ok since this is not supposed to happen.
816            LOG.warn("Incorrect header or version mismatch from " + 
817                     hostAddress + ":" + remotePort +
818                     " got version " + version + 
819                     " expected version " + CURRENT_VERSION);
820            return -1;
821          }
822          dataLengthBuffer.clear();
823          versionRead = true;
824          continue;
825        }
826       
827        if (data == null) {
828          dataLengthBuffer.flip();
829          dataLength = dataLengthBuffer.getInt();
830       
831          if (dataLength == Client.PING_CALL_ID) {
832            dataLengthBuffer.clear();
833            return 0;  //ping message
834          }
835          data = ByteBuffer.allocate(dataLength);
836          incRpcCount();  // Increment the rpc count
837        }
838       
839        count = channelRead(channel, data);
840       
841        if (data.remaining() == 0) {
842          dataLengthBuffer.clear();
843          data.flip();
844          if (headerRead) {
845            processData();
846            data = null;
847            return count;
848          } else {
849            processHeader();
850            headerRead = true;
851            data = null;
852           
853            // Authorize the connection
854            try {
855              authorize(user, header);
856             
857              if (LOG.isDebugEnabled()) {
858                LOG.debug("Successfully authorized " + header);
859              }
860            } catch (AuthorizationException ae) {
861              authFailedCall.connection = this;
862              setupResponse(authFailedResponse, authFailedCall, 
863                            Status.FATAL, null, 
864                            ae.getClass().getName(), ae.getMessage());
865              responder.doRespond(authFailedCall);
866             
867              // Close this connection
868              return -1;
869            }
870
871            continue;
872          }
873        } 
874        return count;
875      }
876    }
877
878    /// Reads the connection header following version
879    private void processHeader() throws IOException {
880      DataInputStream in =
881        new DataInputStream(new ByteArrayInputStream(data.array()));
882      header.readFields(in);
883      try {
884        String protocolClassName = header.getProtocol();
885        if (protocolClassName != null) {
886          protocol = getProtocolClass(header.getProtocol(), conf);
887        }
888      } catch (ClassNotFoundException cnfe) {
889        throw new IOException("Unknown protocol: " + header.getProtocol());
890      }
891     
892      // TODO: Get the user name from the GSS API for Kerberbos-based security
893      // Create the user subject
894      user = SecurityUtil.getSubject(header.getUgi());
895    }
896   
897    private void processData() throws  IOException, InterruptedException {
898      DataInputStream dis =
899        new DataInputStream(new ByteArrayInputStream(data.array()));
900      int id = dis.readInt();                    // try to read an id
901       
902      if (LOG.isDebugEnabled())
903        LOG.debug(" got #" + id);
904
905      Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
906      param.readFields(dis);       
907       
908      Call call = new Call(id, param, this);
909      callQueue.put(call);              // queue the call; maybe blocked here
910    }
911
912    private synchronized void close() throws IOException {
913      data = null;
914      dataLengthBuffer = null;
915      if (!channel.isOpen())
916        return;
917      try {socket.shutdownOutput();} catch(Exception e) {}
918      if (channel.isOpen()) {
919        try {channel.close();} catch(Exception e) {}
920      }
921      try {socket.close();} catch(Exception e) {}
922    }
923  }
924
925  /** Handles queued calls . */
926  private class Handler extends Thread {
927    public Handler(int instanceNumber) {
928      this.setDaemon(true);
929      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
930    }
931
932    @Override
933    public void run() {
934      LOG.info(getName() + ": starting");
935      SERVER.set(Server.this);
936      ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
937      while (running) {
938        try {
939          final Call call = callQueue.take(); // pop the queue; maybe blocked here
940
941          if (LOG.isDebugEnabled())
942            LOG.debug(getName() + ": has #" + call.id + " from " +
943                      call.connection);
944         
945          String errorClass = null;
946          String error = null;
947          Writable value = null;
948
949          CurCall.set(call);
950          try {
951            // Make the call as the user via Subject.doAs, thus associating
952            // the call with the Subject
953            value = 
954              Subject.doAs(call.connection.user, 
955                           new PrivilegedExceptionAction<Writable>() {
956                              @Override
957                              public Writable run() throws Exception {
958                                // make the call
959                                return call(call.connection.protocol, 
960                                            call.param, call.timestamp);
961
962                              }
963                           }
964                          );
965             
966          } catch (PrivilegedActionException pae) {
967            Exception e = pae.getException();
968            LOG.info(getName()+", call "+call+": error: " + e, e);
969            errorClass = e.getClass().getName();
970            error = StringUtils.stringifyException(e);
971          } catch (Throwable e) {
972            LOG.info(getName()+", call "+call+": error: " + e, e);
973            errorClass = e.getClass().getName();
974            error = StringUtils.stringifyException(e);
975          }
976          CurCall.set(null);
977
978          setupResponse(buf, call, 
979                        (error == null) ? Status.SUCCESS : Status.ERROR, 
980                        value, errorClass, error);
981          responder.doRespond(call);
982        } catch (InterruptedException e) {
983          if (running) {                          // unexpected -- log it
984            LOG.info(getName() + " caught: " +
985                     StringUtils.stringifyException(e));
986          }
987        } catch (Exception e) {
988          LOG.info(getName() + " caught: " +
989                   StringUtils.stringifyException(e));
990        }
991      }
992      LOG.info(getName() + ": exiting");
993    }
994
995  }
996 
997  protected Server(String bindAddress, int port,
998                  Class<? extends Writable> paramClass, int handlerCount, 
999                  Configuration conf)
1000    throws IOException
1001  {
1002    this(bindAddress, port, paramClass, handlerCount,  conf, Integer.toString(port));
1003  }
1004  /** Constructs a server listening on the named port and address.  Parameters passed must
1005   * be of the named class.  The <code>handlerCount</handlerCount> determines
1006   * the number of handler threads that will be used to process calls.
1007   *
1008   */
1009  protected Server(String bindAddress, int port, 
1010                  Class<? extends Writable> paramClass, int handlerCount, 
1011                  Configuration conf, String serverName) 
1012    throws IOException {
1013    this.bindAddress = bindAddress;
1014    this.conf = conf;
1015    this.port = port;
1016    this.paramClass = paramClass;
1017    this.handlerCount = handlerCount;
1018    this.socketSendBufferSize = 0;
1019    this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
1020    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
1021    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1022    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1023    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1024   
1025    // Start the listener here and let it bind to the port
1026    listener = new Listener();
1027    this.port = listener.getAddress().getPort();   
1028    this.rpcMetrics = new RpcMetrics(serverName,
1029                          Integer.toString(this.port), this);
1030    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
1031
1032
1033    // Create the responder here
1034    responder = new Responder();
1035  }
1036
1037  private void closeConnection(Connection connection) {
1038    synchronized (connectionList) {
1039      if (connectionList.remove(connection))
1040        numConnections--;
1041    }
1042    try {
1043      connection.close();
1044    } catch (IOException e) {
1045    }
1046  }
1047 
1048  /**
1049   * Setup response for the IPC Call.
1050   *
1051   * @param response buffer to serialize the response into
1052   * @param call {@link Call} to which we are setting up the response
1053   * @param status {@link Status} of the IPC call
1054   * @param rv return value for the IPC Call, if the call was successful
1055   * @param errorClass error class, if the the call failed
1056   * @param error error message, if the call failed
1057   * @throws IOException
1058   */
1059  private void setupResponse(ByteArrayOutputStream response, 
1060                             Call call, Status status, 
1061                             Writable rv, String errorClass, String error) 
1062  throws IOException {
1063    response.reset();
1064    DataOutputStream out = new DataOutputStream(response);
1065    out.writeInt(call.id);                // write call id
1066    out.writeInt(status.state);           // write status
1067
1068    if (status == Status.SUCCESS) {
1069      rv.write(out);
1070    } else {
1071      WritableUtils.writeString(out, errorClass);
1072      WritableUtils.writeString(out, error);
1073    }
1074    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
1075  }
1076 
1077  Configuration getConf() {
1078    return conf;
1079  }
1080 
1081  /** Sets the socket buffer size used for responding to RPCs */
1082  public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1083
1084  /** Starts the service.  Must be called before any calls will be handled. */
1085  public synchronized void start() throws IOException {
1086    responder.start();
1087    listener.start();
1088    handlers = new Handler[handlerCount];
1089   
1090    for (int i = 0; i < handlerCount; i++) {
1091      handlers[i] = new Handler(i);
1092      handlers[i].start();
1093    }
1094  }
1095
1096  /** Stops the service.  No new calls will be handled after this is called. */
1097  public synchronized void stop() {
1098    LOG.info("Stopping server on " + port);
1099    running = false;
1100    if (handlers != null) {
1101      for (int i = 0; i < handlerCount; i++) {
1102        if (handlers[i] != null) {
1103          handlers[i].interrupt();
1104        }
1105      }
1106    }
1107    listener.interrupt();
1108    listener.doStop();
1109    responder.interrupt();
1110    notifyAll();
1111    if (this.rpcMetrics != null) {
1112      this.rpcMetrics.shutdown();
1113    }
1114  }
1115
1116  /** Wait for the server to be stopped.
1117   * Does not wait for all subthreads to finish.
1118   *  See {@link #stop()}.
1119   */
1120  public synchronized void join() throws InterruptedException {
1121    while (running) {
1122      wait();
1123    }
1124  }
1125
1126  /**
1127   * Return the socket (ip+port) on which the RPC server is listening to.
1128   * @return the socket (ip+port) on which the RPC server is listening to.
1129   */
1130  public synchronized InetSocketAddress getListenerAddress() {
1131    return listener.getAddress();
1132  }
1133 
1134  /**
1135   * Called for each call.
1136   * @deprecated Use {@link #call(Class, Writable, long)} instead
1137   */
1138  @Deprecated
1139  public Writable call(Writable param, long receiveTime) throws IOException {
1140    return call(null, param, receiveTime);
1141  }
1142 
1143  /** Called for each call. */
1144  public abstract Writable call(Class<?> protocol,
1145                               Writable param, long receiveTime)
1146  throws IOException;
1147 
1148  /**
1149   * Authorize the incoming client connection.
1150   *
1151   * @param user client user
1152   * @param connection incoming connection
1153   * @throws AuthorizationException when the client isn't authorized to talk the protocol
1154   */
1155  public void authorize(Subject user, ConnectionHeader connection) 
1156  throws AuthorizationException {}
1157 
1158  /**
1159   * The number of open RPC conections
1160   * @return the number of open rpc connections
1161   */
1162  public int getNumOpenConnections() {
1163    return numConnections;
1164  }
1165 
1166  /**
1167   * The number of rpc calls in the queue.
1168   * @return The number of rpc calls in the queue.
1169   */
1170  public int getCallQueueLen() {
1171    return callQueue.size();
1172  }
1173 
1174 
1175  /**
1176   * When the read or write buffer size is larger than this limit, i/o will be
1177   * done in chunks of this size. Most RPC requests and responses would be
1178   * be smaller.
1179   */
1180  private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
1181 
1182  /**
1183   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
1184   * If the amount of data is large, it writes to channel in smaller chunks.
1185   * This is to avoid jdk from creating many direct buffers as the size of
1186   * buffer increases. This also minimizes extra copies in NIO layer
1187   * as a result of multiple write operations required to write a large
1188   * buffer. 
1189   *
1190   * @see WritableByteChannel#write(ByteBuffer)
1191   */
1192  private static int channelWrite(WritableByteChannel channel, 
1193                                  ByteBuffer buffer) throws IOException {
1194   
1195    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1196           channel.write(buffer) : channelIO(null, channel, buffer);
1197  }
1198 
1199 
1200  /**
1201   * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
1202   * If the amount of data is large, it writes to channel in smaller chunks.
1203   * This is to avoid jdk from creating many direct buffers as the size of
1204   * ByteBuffer increases. There should not be any performance degredation.
1205   *
1206   * @see ReadableByteChannel#read(ByteBuffer)
1207   */
1208  private static int channelRead(ReadableByteChannel channel, 
1209                                 ByteBuffer buffer) throws IOException {
1210   
1211    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1212           channel.read(buffer) : channelIO(channel, null, buffer);
1213  }
1214 
1215  /**
1216   * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
1217   * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
1218   * one of readCh or writeCh should be non-null.
1219   *
1220   * @see #channelRead(ReadableByteChannel, ByteBuffer)
1221   * @see #channelWrite(WritableByteChannel, ByteBuffer)
1222   */
1223  private static int channelIO(ReadableByteChannel readCh, 
1224                               WritableByteChannel writeCh,
1225                               ByteBuffer buf) throws IOException {
1226   
1227    int originalLimit = buf.limit();
1228    int initialRemaining = buf.remaining();
1229    int ret = 0;
1230   
1231    while (buf.remaining() > 0) {
1232      try {
1233        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
1234        buf.limit(buf.position() + ioSize);
1235       
1236        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 
1237       
1238        if (ret < ioSize) {
1239          break;
1240        }
1241
1242      } finally {
1243        buf.limit(originalLimit);       
1244      }
1245    }
1246
1247    int nBytes = initialRemaining - buf.remaining(); 
1248    return (nBytes > 0) ? nBytes : ret;
1249  }     
1250}
Note: See TracBrowser for help on using the repository browser.