[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.ipc; |
---|
| 20 | |
---|
| 21 | import java.net.Socket; |
---|
| 22 | import java.net.InetSocketAddress; |
---|
| 23 | import java.net.SocketTimeoutException; |
---|
| 24 | import java.net.UnknownHostException; |
---|
| 25 | import java.net.ConnectException; |
---|
| 26 | |
---|
| 27 | import java.io.IOException; |
---|
| 28 | import java.io.DataInputStream; |
---|
| 29 | import java.io.DataOutputStream; |
---|
| 30 | import java.io.BufferedInputStream; |
---|
| 31 | import java.io.BufferedOutputStream; |
---|
| 32 | import java.io.FilterInputStream; |
---|
| 33 | import java.io.InputStream; |
---|
| 34 | |
---|
| 35 | import java.util.Hashtable; |
---|
| 36 | import java.util.Iterator; |
---|
| 37 | import java.util.Map.Entry; |
---|
| 38 | import java.util.concurrent.atomic.AtomicBoolean; |
---|
| 39 | import java.util.concurrent.atomic.AtomicLong; |
---|
| 40 | |
---|
| 41 | import javax.net.SocketFactory; |
---|
| 42 | |
---|
| 43 | import org.apache.commons.logging.*; |
---|
| 44 | |
---|
| 45 | import org.apache.hadoop.conf.Configuration; |
---|
| 46 | import org.apache.hadoop.io.IOUtils; |
---|
| 47 | import org.apache.hadoop.io.Writable; |
---|
| 48 | import org.apache.hadoop.io.WritableUtils; |
---|
| 49 | import org.apache.hadoop.io.DataOutputBuffer; |
---|
| 50 | import org.apache.hadoop.net.NetUtils; |
---|
| 51 | import org.apache.hadoop.security.UserGroupInformation; |
---|
| 52 | import org.apache.hadoop.util.ReflectionUtils; |
---|
| 53 | |
---|
| 54 | /** A client for an IPC service. IPC calls take a single {@link Writable} as a |
---|
| 55 | * parameter, and return a {@link Writable} as their value. A service runs on |
---|
| 56 | * a port and is defined by a parameter class and a value class. |
---|
| 57 | * |
---|
| 58 | * @see Server |
---|
| 59 | */ |
---|
| 60 | public class Client { |
---|
| 61 | |
---|
| 62 | public static final Log LOG = |
---|
| 63 | LogFactory.getLog(Client.class); |
---|
| 64 | private Hashtable<ConnectionId, Connection> connections = |
---|
| 65 | new Hashtable<ConnectionId, Connection>(); |
---|
| 66 | |
---|
| 67 | private Class<? extends Writable> valueClass; // class of call values |
---|
| 68 | private int counter; // counter for call ids |
---|
| 69 | private AtomicBoolean running = new AtomicBoolean(true); // if client runs |
---|
| 70 | final private Configuration conf; |
---|
| 71 | final private int maxIdleTime; //connections will be culled if it was idle for |
---|
| 72 | //maxIdleTime msecs |
---|
| 73 | final private int maxRetries; //the max. no. of retries for socket connections |
---|
| 74 | private boolean tcpNoDelay; // if T then disable Nagle's Algorithm |
---|
| 75 | private int pingInterval; // how often sends ping to the server in msecs |
---|
| 76 | |
---|
| 77 | private SocketFactory socketFactory; // how to create sockets |
---|
| 78 | private int refCount = 1; |
---|
| 79 | |
---|
| 80 | final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; |
---|
| 81 | final static int DEFAULT_PING_INTERVAL = 60000; // 1 min |
---|
| 82 | final static int PING_CALL_ID = -1; |
---|
| 83 | |
---|
| 84 | /** |
---|
| 85 | * set the ping interval value in configuration |
---|
| 86 | * |
---|
| 87 | * @param conf Configuration |
---|
| 88 | * @param pingInterval the ping interval |
---|
| 89 | */ |
---|
| 90 | final public static void setPingInterval(Configuration conf, int pingInterval) { |
---|
| 91 | conf.setInt(PING_INTERVAL_NAME, pingInterval); |
---|
| 92 | } |
---|
| 93 | |
---|
| 94 | /** |
---|
| 95 | * Get the ping interval from configuration; |
---|
| 96 | * If not set in the configuration, return the default value. |
---|
| 97 | * |
---|
| 98 | * @param conf Configuration |
---|
| 99 | * @return the ping interval |
---|
| 100 | */ |
---|
| 101 | final static int getPingInterval(Configuration conf) { |
---|
| 102 | return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL); |
---|
| 103 | } |
---|
| 104 | |
---|
| 105 | /** |
---|
| 106 | * Increment this client's reference count |
---|
| 107 | * |
---|
| 108 | */ |
---|
| 109 | synchronized void incCount() { |
---|
| 110 | refCount++; |
---|
| 111 | } |
---|
| 112 | |
---|
| 113 | /** |
---|
| 114 | * Decrement this client's reference count |
---|
| 115 | * |
---|
| 116 | */ |
---|
| 117 | synchronized void decCount() { |
---|
| 118 | refCount--; |
---|
| 119 | } |
---|
| 120 | |
---|
| 121 | /** |
---|
| 122 | * Return if this client has no reference |
---|
| 123 | * |
---|
| 124 | * @return true if this client has no reference; false otherwise |
---|
| 125 | */ |
---|
| 126 | synchronized boolean isZeroReference() { |
---|
| 127 | return refCount==0; |
---|
| 128 | } |
---|
| 129 | |
---|
| 130 | /** A call waiting for a value. */ |
---|
| 131 | private class Call { |
---|
| 132 | int id; // call id |
---|
| 133 | Writable param; // parameter |
---|
| 134 | Writable value; // value, null if error |
---|
| 135 | IOException error; // exception, null if value |
---|
| 136 | boolean done; // true when call is done |
---|
| 137 | |
---|
| 138 | protected Call(Writable param) { |
---|
| 139 | this.param = param; |
---|
| 140 | synchronized (Client.this) { |
---|
| 141 | this.id = counter++; |
---|
| 142 | } |
---|
| 143 | } |
---|
| 144 | |
---|
| 145 | /** Indicate when the call is complete and the |
---|
| 146 | * value or error are available. Notifies by default. */ |
---|
| 147 | protected synchronized void callComplete() { |
---|
| 148 | this.done = true; |
---|
| 149 | notify(); // notify caller |
---|
| 150 | } |
---|
| 151 | |
---|
| 152 | /** Set the exception when there is an error. |
---|
| 153 | * Notify the caller the call is done. |
---|
| 154 | * |
---|
| 155 | * @param error exception thrown by the call; either local or remote |
---|
| 156 | */ |
---|
| 157 | public synchronized void setException(IOException error) { |
---|
| 158 | this.error = error; |
---|
| 159 | callComplete(); |
---|
| 160 | } |
---|
| 161 | |
---|
| 162 | /** Set the return value when there is no error. |
---|
| 163 | * Notify the caller the call is done. |
---|
| 164 | * |
---|
| 165 | * @param value return value of the call. |
---|
| 166 | */ |
---|
| 167 | public synchronized void setValue(Writable value) { |
---|
| 168 | this.value = value; |
---|
| 169 | callComplete(); |
---|
| 170 | } |
---|
| 171 | } |
---|
| 172 | |
---|
| 173 | /** Thread that reads responses and notifies callers. Each connection owns a |
---|
| 174 | * socket connected to a remote address. Calls are multiplexed through this |
---|
| 175 | * socket: responses may be delivered out of order. */ |
---|
| 176 | private class Connection extends Thread { |
---|
| 177 | private InetSocketAddress server; // server ip:port |
---|
| 178 | private ConnectionHeader header; // connection header |
---|
| 179 | private ConnectionId remoteId; // connection id |
---|
| 180 | |
---|
| 181 | private Socket socket = null; // connected socket |
---|
| 182 | private DataInputStream in; |
---|
| 183 | private DataOutputStream out; |
---|
| 184 | |
---|
| 185 | // currently active calls |
---|
| 186 | private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); |
---|
| 187 | private AtomicLong lastActivity = new AtomicLong();// last I/O activity time |
---|
| 188 | private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed |
---|
| 189 | private IOException closeException; // close reason |
---|
| 190 | |
---|
| 191 | public Connection(ConnectionId remoteId) throws IOException { |
---|
| 192 | this.remoteId = remoteId; |
---|
| 193 | this.server = remoteId.getAddress(); |
---|
| 194 | if (server.isUnresolved()) { |
---|
| 195 | throw new UnknownHostException("unknown host: " + |
---|
| 196 | remoteId.getAddress().getHostName()); |
---|
| 197 | } |
---|
| 198 | |
---|
| 199 | UserGroupInformation ticket = remoteId.getTicket(); |
---|
| 200 | Class<?> protocol = remoteId.getProtocol(); |
---|
| 201 | header = |
---|
| 202 | new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket); |
---|
| 203 | |
---|
| 204 | this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + |
---|
| 205 | remoteId.getAddress().toString() + |
---|
| 206 | " from " + ((ticket==null)?"an unknown user":ticket.getUserName())); |
---|
| 207 | this.setDaemon(true); |
---|
| 208 | } |
---|
| 209 | |
---|
| 210 | /** Update lastActivity with the current time. */ |
---|
| 211 | private void touch() { |
---|
| 212 | lastActivity.set(System.currentTimeMillis()); |
---|
| 213 | } |
---|
| 214 | |
---|
| 215 | /** |
---|
| 216 | * Add a call to this connection's call queue and notify |
---|
| 217 | * a listener; synchronized. |
---|
| 218 | * Returns false if called during shutdown. |
---|
| 219 | * @param call to add |
---|
| 220 | * @return true if the call was added. |
---|
| 221 | */ |
---|
| 222 | private synchronized boolean addCall(Call call) { |
---|
| 223 | if (shouldCloseConnection.get()) |
---|
| 224 | return false; |
---|
| 225 | calls.put(call.id, call); |
---|
| 226 | notify(); |
---|
| 227 | return true; |
---|
| 228 | } |
---|
| 229 | |
---|
| 230 | /** This class sends a ping to the remote side when timeout on |
---|
| 231 | * reading. If no failure is detected, it retries until at least |
---|
| 232 | * a byte is read. |
---|
| 233 | */ |
---|
| 234 | private class PingInputStream extends FilterInputStream { |
---|
| 235 | /* constructor */ |
---|
| 236 | protected PingInputStream(InputStream in) { |
---|
| 237 | super(in); |
---|
| 238 | } |
---|
| 239 | |
---|
| 240 | /* Process timeout exception |
---|
| 241 | * if the connection is not going to be closed, send a ping. |
---|
| 242 | * otherwise, throw the timeout exception. |
---|
| 243 | */ |
---|
| 244 | private void handleTimeout(SocketTimeoutException e) throws IOException { |
---|
| 245 | if (shouldCloseConnection.get() || !running.get()) { |
---|
| 246 | throw e; |
---|
| 247 | } else { |
---|
| 248 | sendPing(); |
---|
| 249 | } |
---|
| 250 | } |
---|
| 251 | |
---|
| 252 | /** Read a byte from the stream. |
---|
| 253 | * Send a ping if timeout on read. Retries if no failure is detected |
---|
| 254 | * until a byte is read. |
---|
| 255 | * @throws IOException for any IO problem other than socket timeout |
---|
| 256 | */ |
---|
| 257 | public int read() throws IOException { |
---|
| 258 | do { |
---|
| 259 | try { |
---|
| 260 | return super.read(); |
---|
| 261 | } catch (SocketTimeoutException e) { |
---|
| 262 | handleTimeout(e); |
---|
| 263 | } |
---|
| 264 | } while (true); |
---|
| 265 | } |
---|
| 266 | |
---|
| 267 | /** Read bytes into a buffer starting from offset <code>off</code> |
---|
| 268 | * Send a ping if timeout on read. Retries if no failure is detected |
---|
| 269 | * until a byte is read. |
---|
| 270 | * |
---|
| 271 | * @return the total number of bytes read; -1 if the connection is closed. |
---|
| 272 | */ |
---|
| 273 | public int read(byte[] buf, int off, int len) throws IOException { |
---|
| 274 | do { |
---|
| 275 | try { |
---|
| 276 | return super.read(buf, off, len); |
---|
| 277 | } catch (SocketTimeoutException e) { |
---|
| 278 | handleTimeout(e); |
---|
| 279 | } |
---|
| 280 | } while (true); |
---|
| 281 | } |
---|
| 282 | } |
---|
| 283 | |
---|
| 284 | /** Connect to the server and set up the I/O streams. It then sends |
---|
| 285 | * a header to the server and starts |
---|
| 286 | * the connection thread that waits for responses. |
---|
| 287 | */ |
---|
| 288 | private synchronized void setupIOstreams() { |
---|
| 289 | if (socket != null || shouldCloseConnection.get()) { |
---|
| 290 | return; |
---|
| 291 | } |
---|
| 292 | |
---|
| 293 | short ioFailures = 0; |
---|
| 294 | short timeoutFailures = 0; |
---|
| 295 | try { |
---|
| 296 | if (LOG.isDebugEnabled()) { |
---|
| 297 | LOG.debug("Connecting to "+server); |
---|
| 298 | } |
---|
| 299 | while (true) { |
---|
| 300 | try { |
---|
| 301 | this.socket = socketFactory.createSocket(); |
---|
| 302 | this.socket.setTcpNoDelay(tcpNoDelay); |
---|
| 303 | // connection time out is 20s |
---|
| 304 | NetUtils.connect(this.socket, remoteId.getAddress(), 20000); |
---|
| 305 | this.socket.setSoTimeout(pingInterval); |
---|
| 306 | break; |
---|
| 307 | } catch (SocketTimeoutException toe) { |
---|
| 308 | /* The max number of retries is 45, |
---|
| 309 | * which amounts to 20s*45 = 15 minutes retries. |
---|
| 310 | */ |
---|
| 311 | handleConnectionFailure(timeoutFailures++, 45, toe); |
---|
| 312 | } catch (IOException ie) { |
---|
| 313 | handleConnectionFailure(ioFailures++, maxRetries, ie); |
---|
| 314 | } |
---|
| 315 | } |
---|
| 316 | this.in = new DataInputStream(new BufferedInputStream |
---|
| 317 | (new PingInputStream(NetUtils.getInputStream(socket)))); |
---|
| 318 | this.out = new DataOutputStream |
---|
| 319 | (new BufferedOutputStream(NetUtils.getOutputStream(socket))); |
---|
| 320 | writeHeader(); |
---|
| 321 | |
---|
| 322 | // update last activity time |
---|
| 323 | touch(); |
---|
| 324 | |
---|
| 325 | // start the receiver thread after the socket connection has been set up |
---|
| 326 | start(); |
---|
| 327 | } catch (IOException e) { |
---|
| 328 | markClosed(e); |
---|
| 329 | close(); |
---|
| 330 | } |
---|
| 331 | } |
---|
| 332 | |
---|
| 333 | /* Handle connection failures |
---|
| 334 | * |
---|
| 335 | * If the current number of retries is equal to the max number of retries, |
---|
| 336 | * stop retrying and throw the exception; Otherwise backoff 1 second and |
---|
| 337 | * try connecting again. |
---|
| 338 | * |
---|
| 339 | * This Method is only called from inside setupIOstreams(), which is |
---|
| 340 | * synchronized. Hence the sleep is synchronized; the locks will be retained. |
---|
| 341 | * |
---|
| 342 | * @param curRetries current number of retries |
---|
| 343 | * @param maxRetries max number of retries allowed |
---|
| 344 | * @param ioe failure reason |
---|
| 345 | * @throws IOException if max number of retries is reached |
---|
| 346 | */ |
---|
| 347 | private void handleConnectionFailure( |
---|
| 348 | int curRetries, int maxRetries, IOException ioe) throws IOException { |
---|
| 349 | // close the current connection |
---|
| 350 | try { |
---|
| 351 | socket.close(); |
---|
| 352 | } catch (IOException e) { |
---|
| 353 | LOG.warn("Not able to close a socket", e); |
---|
| 354 | } |
---|
| 355 | // set socket to null so that the next call to setupIOstreams |
---|
| 356 | // can start the process of connect all over again. |
---|
| 357 | socket = null; |
---|
| 358 | |
---|
| 359 | // throw the exception if the maximum number of retries is reached |
---|
| 360 | if (curRetries >= maxRetries) { |
---|
| 361 | throw ioe; |
---|
| 362 | } |
---|
| 363 | |
---|
| 364 | // otherwise back off and retry |
---|
| 365 | try { |
---|
| 366 | Thread.sleep(1000); |
---|
| 367 | } catch (InterruptedException ignored) {} |
---|
| 368 | |
---|
| 369 | LOG.info("Retrying connect to server: " + server + |
---|
| 370 | ". Already tried " + curRetries + " time(s)."); |
---|
| 371 | } |
---|
| 372 | |
---|
| 373 | /* Write the header for each connection |
---|
| 374 | * Out is not synchronized because only the first thread does this. |
---|
| 375 | */ |
---|
| 376 | private void writeHeader() throws IOException { |
---|
| 377 | // Write out the header and version |
---|
| 378 | out.write(Server.HEADER.array()); |
---|
| 379 | out.write(Server.CURRENT_VERSION); |
---|
| 380 | |
---|
| 381 | // Write out the ConnectionHeader |
---|
| 382 | DataOutputBuffer buf = new DataOutputBuffer(); |
---|
| 383 | header.write(buf); |
---|
| 384 | |
---|
| 385 | // Write out the payload length |
---|
| 386 | int bufLen = buf.getLength(); |
---|
| 387 | out.writeInt(bufLen); |
---|
| 388 | out.write(buf.getData(), 0, bufLen); |
---|
| 389 | } |
---|
| 390 | |
---|
| 391 | /* wait till someone signals us to start reading RPC response or |
---|
| 392 | * it is idle too long, it is marked as to be closed, |
---|
| 393 | * or the client is marked as not running. |
---|
| 394 | * |
---|
| 395 | * Return true if it is time to read a response; false otherwise. |
---|
| 396 | */ |
---|
| 397 | private synchronized boolean waitForWork() { |
---|
| 398 | if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { |
---|
| 399 | long timeout = maxIdleTime- |
---|
| 400 | (System.currentTimeMillis()-lastActivity.get()); |
---|
| 401 | if (timeout>0) { |
---|
| 402 | try { |
---|
| 403 | wait(timeout); |
---|
| 404 | } catch (InterruptedException e) {} |
---|
| 405 | } |
---|
| 406 | } |
---|
| 407 | |
---|
| 408 | if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { |
---|
| 409 | return true; |
---|
| 410 | } else if (shouldCloseConnection.get()) { |
---|
| 411 | return false; |
---|
| 412 | } else if (calls.isEmpty()) { // idle connection closed or stopped |
---|
| 413 | markClosed(null); |
---|
| 414 | return false; |
---|
| 415 | } else { // get stopped but there are still pending requests |
---|
| 416 | markClosed((IOException)new IOException().initCause( |
---|
| 417 | new InterruptedException())); |
---|
| 418 | return false; |
---|
| 419 | } |
---|
| 420 | } |
---|
| 421 | |
---|
| 422 | public InetSocketAddress getRemoteAddress() { |
---|
| 423 | return server; |
---|
| 424 | } |
---|
| 425 | |
---|
| 426 | /* Send a ping to the server if the time elapsed |
---|
| 427 | * since last I/O activity is equal to or greater than the ping interval |
---|
| 428 | */ |
---|
| 429 | private synchronized void sendPing() throws IOException { |
---|
| 430 | long curTime = System.currentTimeMillis(); |
---|
| 431 | if ( curTime - lastActivity.get() >= pingInterval) { |
---|
| 432 | lastActivity.set(curTime); |
---|
| 433 | synchronized (out) { |
---|
| 434 | out.writeInt(PING_CALL_ID); |
---|
| 435 | out.flush(); |
---|
| 436 | } |
---|
| 437 | } |
---|
| 438 | } |
---|
| 439 | |
---|
| 440 | public void run() { |
---|
| 441 | if (LOG.isDebugEnabled()) |
---|
| 442 | LOG.debug(getName() + ": starting, having connections " |
---|
| 443 | + connections.size()); |
---|
| 444 | |
---|
| 445 | while (waitForWork()) {//wait here for work - read or close connection |
---|
| 446 | receiveResponse(); |
---|
| 447 | } |
---|
| 448 | |
---|
| 449 | close(); |
---|
| 450 | |
---|
| 451 | if (LOG.isDebugEnabled()) |
---|
| 452 | LOG.debug(getName() + ": stopped, remaining connections " |
---|
| 453 | + connections.size()); |
---|
| 454 | } |
---|
| 455 | |
---|
| 456 | /** Initiates a call by sending the parameter to the remote server. |
---|
| 457 | * Note: this is not called from the Connection thread, but by other |
---|
| 458 | * threads. |
---|
| 459 | */ |
---|
| 460 | public void sendParam(Call call) { |
---|
| 461 | if (shouldCloseConnection.get()) { |
---|
| 462 | return; |
---|
| 463 | } |
---|
| 464 | |
---|
| 465 | DataOutputBuffer d=null; |
---|
| 466 | try { |
---|
| 467 | synchronized (this.out) { |
---|
| 468 | if (LOG.isDebugEnabled()) |
---|
| 469 | LOG.debug(getName() + " sending #" + call.id); |
---|
| 470 | |
---|
| 471 | //for serializing the |
---|
| 472 | //data to be written |
---|
| 473 | d = new DataOutputBuffer(); |
---|
| 474 | d.writeInt(call.id); |
---|
| 475 | call.param.write(d); |
---|
| 476 | byte[] data = d.getData(); |
---|
| 477 | int dataLength = d.getLength(); |
---|
| 478 | out.writeInt(dataLength); //first put the data length |
---|
| 479 | out.write(data, 0, dataLength);//write the data |
---|
| 480 | out.flush(); |
---|
| 481 | } |
---|
| 482 | } catch(IOException e) { |
---|
| 483 | markClosed(e); |
---|
| 484 | } finally { |
---|
| 485 | //the buffer is just an in-memory buffer, but it is still polite to |
---|
| 486 | // close early |
---|
| 487 | IOUtils.closeStream(d); |
---|
| 488 | } |
---|
| 489 | } |
---|
| 490 | |
---|
| 491 | /* Receive a response. |
---|
| 492 | * Because only one receiver, so no synchronization on in. |
---|
| 493 | */ |
---|
| 494 | private void receiveResponse() { |
---|
| 495 | if (shouldCloseConnection.get()) { |
---|
| 496 | return; |
---|
| 497 | } |
---|
| 498 | touch(); |
---|
| 499 | |
---|
| 500 | try { |
---|
| 501 | int id = in.readInt(); // try to read an id |
---|
| 502 | |
---|
| 503 | if (LOG.isDebugEnabled()) |
---|
| 504 | LOG.debug(getName() + " got value #" + id); |
---|
| 505 | |
---|
| 506 | Call call = calls.remove(id); |
---|
| 507 | |
---|
| 508 | int state = in.readInt(); // read call status |
---|
| 509 | if (state == Status.SUCCESS.state) { |
---|
| 510 | Writable value = ReflectionUtils.newInstance(valueClass, conf); |
---|
| 511 | value.readFields(in); // read value |
---|
| 512 | call.setValue(value); |
---|
| 513 | } else if (state == Status.ERROR.state) { |
---|
| 514 | call.setException(new RemoteException(WritableUtils.readString(in), |
---|
| 515 | WritableUtils.readString(in))); |
---|
| 516 | } else if (state == Status.FATAL.state) { |
---|
| 517 | // Close the connection |
---|
| 518 | markClosed(new RemoteException(WritableUtils.readString(in), |
---|
| 519 | WritableUtils.readString(in))); |
---|
| 520 | } |
---|
| 521 | } catch (IOException e) { |
---|
| 522 | markClosed(e); |
---|
| 523 | } |
---|
| 524 | } |
---|
| 525 | |
---|
| 526 | private synchronized void markClosed(IOException e) { |
---|
| 527 | if (shouldCloseConnection.compareAndSet(false, true)) { |
---|
| 528 | closeException = e; |
---|
| 529 | notifyAll(); |
---|
| 530 | } |
---|
| 531 | } |
---|
| 532 | |
---|
| 533 | /** Close the connection. */ |
---|
| 534 | private synchronized void close() { |
---|
| 535 | if (!shouldCloseConnection.get()) { |
---|
| 536 | LOG.error("The connection is not in the closed state"); |
---|
| 537 | return; |
---|
| 538 | } |
---|
| 539 | |
---|
| 540 | // release the resources |
---|
| 541 | // first thing to do;take the connection out of the connection list |
---|
| 542 | synchronized (connections) { |
---|
| 543 | if (connections.get(remoteId) == this) { |
---|
| 544 | connections.remove(remoteId); |
---|
| 545 | } |
---|
| 546 | } |
---|
| 547 | |
---|
| 548 | // close the streams and therefore the socket |
---|
| 549 | IOUtils.closeStream(out); |
---|
| 550 | IOUtils.closeStream(in); |
---|
| 551 | |
---|
| 552 | // clean up all calls |
---|
| 553 | if (closeException == null) { |
---|
| 554 | if (!calls.isEmpty()) { |
---|
| 555 | LOG.warn( |
---|
| 556 | "A connection is closed for no cause and calls are not empty"); |
---|
| 557 | |
---|
| 558 | // clean up calls anyway |
---|
| 559 | closeException = new IOException("Unexpected closed connection"); |
---|
| 560 | cleanupCalls(); |
---|
| 561 | } |
---|
| 562 | } else { |
---|
| 563 | // log the info |
---|
| 564 | if (LOG.isDebugEnabled()) { |
---|
| 565 | LOG.debug("closing ipc connection to " + server + ": " + |
---|
| 566 | closeException.getMessage(),closeException); |
---|
| 567 | } |
---|
| 568 | |
---|
| 569 | // cleanup calls |
---|
| 570 | cleanupCalls(); |
---|
| 571 | } |
---|
| 572 | if (LOG.isDebugEnabled()) |
---|
| 573 | LOG.debug(getName() + ": closed"); |
---|
| 574 | } |
---|
| 575 | |
---|
| 576 | /* Cleanup all calls and mark them as done */ |
---|
| 577 | private void cleanupCalls() { |
---|
| 578 | Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ; |
---|
| 579 | while (itor.hasNext()) { |
---|
| 580 | Call c = itor.next().getValue(); |
---|
| 581 | c.setException(closeException); // local exception |
---|
| 582 | itor.remove(); |
---|
| 583 | } |
---|
| 584 | } |
---|
| 585 | } |
---|
| 586 | |
---|
| 587 | /** Call implementation used for parallel calls. */ |
---|
| 588 | private class ParallelCall extends Call { |
---|
| 589 | private ParallelResults results; |
---|
| 590 | private int index; |
---|
| 591 | |
---|
| 592 | public ParallelCall(Writable param, ParallelResults results, int index) { |
---|
| 593 | super(param); |
---|
| 594 | this.results = results; |
---|
| 595 | this.index = index; |
---|
| 596 | } |
---|
| 597 | |
---|
| 598 | /** Deliver result to result collector. */ |
---|
| 599 | protected void callComplete() { |
---|
| 600 | results.callComplete(this); |
---|
| 601 | } |
---|
| 602 | } |
---|
| 603 | |
---|
| 604 | /** Result collector for parallel calls. */ |
---|
| 605 | private static class ParallelResults { |
---|
| 606 | private Writable[] values; |
---|
| 607 | private int size; |
---|
| 608 | private int count; |
---|
| 609 | |
---|
| 610 | public ParallelResults(int size) { |
---|
| 611 | this.values = new Writable[size]; |
---|
| 612 | this.size = size; |
---|
| 613 | } |
---|
| 614 | |
---|
| 615 | /** Collect a result. */ |
---|
| 616 | public synchronized void callComplete(ParallelCall call) { |
---|
| 617 | values[call.index] = call.value; // store the value |
---|
| 618 | count++; // count it |
---|
| 619 | if (count == size) // if all values are in |
---|
| 620 | notify(); // then notify waiting caller |
---|
| 621 | } |
---|
| 622 | } |
---|
| 623 | |
---|
| 624 | /** Construct an IPC client whose values are of the given {@link Writable} |
---|
| 625 | * class. */ |
---|
| 626 | public Client(Class<? extends Writable> valueClass, Configuration conf, |
---|
| 627 | SocketFactory factory) { |
---|
| 628 | this.valueClass = valueClass; |
---|
| 629 | this.maxIdleTime = |
---|
| 630 | conf.getInt("ipc.client.connection.maxidletime", 10000); //10s |
---|
| 631 | this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10); |
---|
| 632 | this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false); |
---|
| 633 | this.pingInterval = getPingInterval(conf); |
---|
| 634 | if (LOG.isDebugEnabled()) { |
---|
| 635 | LOG.debug("The ping interval is" + this.pingInterval + "ms."); |
---|
| 636 | } |
---|
| 637 | this.conf = conf; |
---|
| 638 | this.socketFactory = factory; |
---|
| 639 | } |
---|
| 640 | |
---|
| 641 | /** |
---|
| 642 | * Construct an IPC client with the default SocketFactory |
---|
| 643 | * @param valueClass |
---|
| 644 | * @param conf |
---|
| 645 | */ |
---|
| 646 | public Client(Class<? extends Writable> valueClass, Configuration conf) { |
---|
| 647 | this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); |
---|
| 648 | } |
---|
| 649 | |
---|
| 650 | /** Return the socket factory of this client |
---|
| 651 | * |
---|
| 652 | * @return this client's socket factory |
---|
| 653 | */ |
---|
| 654 | SocketFactory getSocketFactory() { |
---|
| 655 | return socketFactory; |
---|
| 656 | } |
---|
| 657 | |
---|
| 658 | /** Stop all threads related to this client. No further calls may be made |
---|
| 659 | * using this client. */ |
---|
| 660 | public void stop() { |
---|
| 661 | if (LOG.isDebugEnabled()) { |
---|
| 662 | LOG.debug("Stopping client"); |
---|
| 663 | } |
---|
| 664 | |
---|
| 665 | if (!running.compareAndSet(true, false)) { |
---|
| 666 | return; |
---|
| 667 | } |
---|
| 668 | |
---|
| 669 | // wake up all connections |
---|
| 670 | synchronized (connections) { |
---|
| 671 | for (Connection conn : connections.values()) { |
---|
| 672 | conn.interrupt(); |
---|
| 673 | } |
---|
| 674 | } |
---|
| 675 | |
---|
| 676 | // wait until all connections are closed |
---|
| 677 | while (!connections.isEmpty()) { |
---|
| 678 | try { |
---|
| 679 | Thread.sleep(100); |
---|
| 680 | } catch (InterruptedException e) { |
---|
| 681 | } |
---|
| 682 | } |
---|
| 683 | } |
---|
| 684 | |
---|
| 685 | /** Make a call, passing <code>param</code>, to the IPC server running at |
---|
| 686 | * <code>address</code>, returning the value. Throws exceptions if there are |
---|
| 687 | * network problems or if the remote code threw an exception. |
---|
| 688 | * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead |
---|
| 689 | */ |
---|
| 690 | @Deprecated |
---|
| 691 | public Writable call(Writable param, InetSocketAddress address) |
---|
| 692 | throws InterruptedException, IOException { |
---|
| 693 | return call(param, address, null); |
---|
| 694 | } |
---|
| 695 | |
---|
| 696 | /** Make a call, passing <code>param</code>, to the IPC server running at |
---|
| 697 | * <code>address</code> with the <code>ticket</code> credentials, returning |
---|
| 698 | * the value. |
---|
| 699 | * Throws exceptions if there are network problems or if the remote code |
---|
| 700 | * threw an exception. |
---|
| 701 | * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead |
---|
| 702 | */ |
---|
| 703 | @Deprecated |
---|
| 704 | public Writable call(Writable param, InetSocketAddress addr, |
---|
| 705 | UserGroupInformation ticket) |
---|
| 706 | throws InterruptedException, IOException { |
---|
| 707 | return call(param, addr, null, ticket); |
---|
| 708 | } |
---|
| 709 | |
---|
| 710 | /** Make a call, passing <code>param</code>, to the IPC server running at |
---|
| 711 | * <code>address</code> which is servicing the <code>protocol</code> protocol, |
---|
| 712 | * with the <code>ticket</code> credentials, returning the value. |
---|
| 713 | * Throws exceptions if there are network problems or if the remote code |
---|
| 714 | * threw an exception. */ |
---|
| 715 | public Writable call(Writable param, InetSocketAddress addr, |
---|
| 716 | Class<?> protocol, UserGroupInformation ticket) |
---|
| 717 | throws InterruptedException, IOException { |
---|
| 718 | Call call = new Call(param); |
---|
| 719 | Connection connection = getConnection(addr, protocol, ticket, call); |
---|
| 720 | connection.sendParam(call); // send the parameter |
---|
| 721 | boolean interrupted = false; |
---|
| 722 | synchronized (call) { |
---|
| 723 | while (!call.done) { |
---|
| 724 | try { |
---|
| 725 | call.wait(); // wait for the result |
---|
| 726 | } catch (InterruptedException ie) { |
---|
| 727 | // save the fact that we were interrupted |
---|
| 728 | interrupted = true; |
---|
| 729 | } |
---|
| 730 | } |
---|
| 731 | |
---|
| 732 | if (interrupted) { |
---|
| 733 | // set the interrupt flag now that we are done waiting |
---|
| 734 | Thread.currentThread().interrupt(); |
---|
| 735 | } |
---|
| 736 | |
---|
| 737 | if (call.error != null) { |
---|
| 738 | if (call.error instanceof RemoteException) { |
---|
| 739 | call.error.fillInStackTrace(); |
---|
| 740 | throw call.error; |
---|
| 741 | } else { // local exception |
---|
| 742 | throw wrapException(addr, call.error); |
---|
| 743 | } |
---|
| 744 | } else { |
---|
| 745 | return call.value; |
---|
| 746 | } |
---|
| 747 | } |
---|
| 748 | } |
---|
| 749 | |
---|
| 750 | /** |
---|
| 751 | * Take an IOException and the address we were trying to connect to |
---|
| 752 | * and return an IOException with the input exception as the cause. |
---|
| 753 | * The new exception provides the stack trace of the place where |
---|
| 754 | * the exception is thrown and some extra diagnostics information. |
---|
| 755 | * If the exception is ConnectException or SocketTimeoutException, |
---|
| 756 | * return a new one of the same type; Otherwise return an IOException. |
---|
| 757 | * |
---|
| 758 | * @param addr target address |
---|
| 759 | * @param exception the relevant exception |
---|
| 760 | * @return an exception to throw |
---|
| 761 | */ |
---|
| 762 | private IOException wrapException(InetSocketAddress addr, |
---|
| 763 | IOException exception) { |
---|
| 764 | if (exception instanceof ConnectException) { |
---|
| 765 | //connection refused; include the host:port in the error |
---|
| 766 | return (ConnectException)new ConnectException( |
---|
| 767 | "Call to " + addr + " failed on connection exception: " + exception) |
---|
| 768 | .initCause(exception); |
---|
| 769 | } else if (exception instanceof SocketTimeoutException) { |
---|
| 770 | return (SocketTimeoutException)new SocketTimeoutException( |
---|
| 771 | "Call to " + addr + " failed on socket timeout exception: " |
---|
| 772 | + exception).initCause(exception); |
---|
| 773 | } else { |
---|
| 774 | return (IOException)new IOException( |
---|
| 775 | "Call to " + addr + " failed on local exception: " + exception) |
---|
| 776 | .initCause(exception); |
---|
| 777 | |
---|
| 778 | } |
---|
| 779 | } |
---|
| 780 | |
---|
| 781 | /** |
---|
| 782 | * Makes a set of calls in parallel. Each parameter is sent to the |
---|
| 783 | * corresponding address. When all values are available, or have timed out |
---|
| 784 | * or errored, the collected results are returned in an array. The array |
---|
| 785 | * contains nulls for calls that timed out or errored. |
---|
| 786 | * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, UserGroupInformation)} instead |
---|
| 787 | */ |
---|
| 788 | @Deprecated |
---|
| 789 | public Writable[] call(Writable[] params, InetSocketAddress[] addresses) |
---|
| 790 | throws IOException { |
---|
| 791 | return call(params, addresses, null, null); |
---|
| 792 | } |
---|
| 793 | |
---|
| 794 | /** Makes a set of calls in parallel. Each parameter is sent to the |
---|
| 795 | * corresponding address. When all values are available, or have timed out |
---|
| 796 | * or errored, the collected results are returned in an array. The array |
---|
| 797 | * contains nulls for calls that timed out or errored. */ |
---|
| 798 | public Writable[] call(Writable[] params, InetSocketAddress[] addresses, |
---|
| 799 | Class<?> protocol, UserGroupInformation ticket) |
---|
| 800 | throws IOException { |
---|
| 801 | if (addresses.length == 0) return new Writable[0]; |
---|
| 802 | |
---|
| 803 | ParallelResults results = new ParallelResults(params.length); |
---|
| 804 | synchronized (results) { |
---|
| 805 | for (int i = 0; i < params.length; i++) { |
---|
| 806 | ParallelCall call = new ParallelCall(params[i], results, i); |
---|
| 807 | try { |
---|
| 808 | Connection connection = |
---|
| 809 | getConnection(addresses[i], protocol, ticket, call); |
---|
| 810 | connection.sendParam(call); // send each parameter |
---|
| 811 | } catch (IOException e) { |
---|
| 812 | // log errors |
---|
| 813 | LOG.info("Calling "+addresses[i]+" caught: " + |
---|
| 814 | e.getMessage(),e); |
---|
| 815 | results.size--; // wait for one fewer result |
---|
| 816 | } |
---|
| 817 | } |
---|
| 818 | while (results.count != results.size) { |
---|
| 819 | try { |
---|
| 820 | results.wait(); // wait for all results |
---|
| 821 | } catch (InterruptedException e) {} |
---|
| 822 | } |
---|
| 823 | |
---|
| 824 | return results.values; |
---|
| 825 | } |
---|
| 826 | } |
---|
| 827 | |
---|
| 828 | /** Get a connection from the pool, or create a new one and add it to the |
---|
| 829 | * pool. Connections to a given host/port are reused. */ |
---|
| 830 | private Connection getConnection(InetSocketAddress addr, |
---|
| 831 | Class<?> protocol, |
---|
| 832 | UserGroupInformation ticket, |
---|
| 833 | Call call) |
---|
| 834 | throws IOException { |
---|
| 835 | if (!running.get()) { |
---|
| 836 | // the client is stopped |
---|
| 837 | throw new IOException("The client is stopped"); |
---|
| 838 | } |
---|
| 839 | Connection connection; |
---|
| 840 | /* we could avoid this allocation for each RPC by having a |
---|
| 841 | * connectionsId object and with set() method. We need to manage the |
---|
| 842 | * refs for keys in HashMap properly. For now its ok. |
---|
| 843 | */ |
---|
| 844 | ConnectionId remoteId = new ConnectionId(addr, protocol, ticket); |
---|
| 845 | do { |
---|
| 846 | synchronized (connections) { |
---|
| 847 | connection = connections.get(remoteId); |
---|
| 848 | if (connection == null) { |
---|
| 849 | connection = new Connection(remoteId); |
---|
| 850 | connections.put(remoteId, connection); |
---|
| 851 | } |
---|
| 852 | } |
---|
| 853 | } while (!connection.addCall(call)); |
---|
| 854 | |
---|
| 855 | //we don't invoke the method below inside "synchronized (connections)" |
---|
| 856 | //block above. The reason for that is if the server happens to be slow, |
---|
| 857 | //it will take longer to establish a connection and that will slow the |
---|
| 858 | //entire system down. |
---|
| 859 | connection.setupIOstreams(); |
---|
| 860 | return connection; |
---|
| 861 | } |
---|
| 862 | |
---|
| 863 | /** |
---|
| 864 | * This class holds the address and the user ticket. The client connections |
---|
| 865 | * to servers are uniquely identified by <remoteAddress, protocol, ticket> |
---|
| 866 | */ |
---|
| 867 | private static class ConnectionId { |
---|
| 868 | InetSocketAddress address; |
---|
| 869 | UserGroupInformation ticket; |
---|
| 870 | Class<?> protocol; |
---|
| 871 | private static final int PRIME = 16777619; |
---|
| 872 | |
---|
| 873 | ConnectionId(InetSocketAddress address, Class<?> protocol, |
---|
| 874 | UserGroupInformation ticket) { |
---|
| 875 | this.protocol = protocol; |
---|
| 876 | this.address = address; |
---|
| 877 | this.ticket = ticket; |
---|
| 878 | } |
---|
| 879 | |
---|
| 880 | InetSocketAddress getAddress() { |
---|
| 881 | return address; |
---|
| 882 | } |
---|
| 883 | |
---|
| 884 | Class<?> getProtocol() { |
---|
| 885 | return protocol; |
---|
| 886 | } |
---|
| 887 | |
---|
| 888 | UserGroupInformation getTicket() { |
---|
| 889 | return ticket; |
---|
| 890 | } |
---|
| 891 | |
---|
| 892 | |
---|
| 893 | @Override |
---|
| 894 | public boolean equals(Object obj) { |
---|
| 895 | if (obj instanceof ConnectionId) { |
---|
| 896 | ConnectionId id = (ConnectionId) obj; |
---|
| 897 | return address.equals(id.address) && protocol == id.protocol && |
---|
| 898 | ticket == id.ticket; |
---|
| 899 | //Note : ticket is a ref comparision. |
---|
| 900 | } |
---|
| 901 | return false; |
---|
| 902 | } |
---|
| 903 | |
---|
| 904 | @Override |
---|
| 905 | public int hashCode() { |
---|
| 906 | return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^ |
---|
| 907 | System.identityHashCode(ticket); |
---|
| 908 | } |
---|
| 909 | } |
---|
| 910 | } |
---|