source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/net/SocketInputStream.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.net;
20
21import java.io.IOException;
22import java.io.InputStream;
23import java.net.Socket;
24import java.net.SocketTimeoutException;
25import java.nio.ByteBuffer;
26import java.nio.channels.FileChannel;
27import java.nio.channels.ReadableByteChannel;
28import java.nio.channels.SelectableChannel;
29import java.nio.channels.SelectionKey;
30
31
32/**
33 * This implements an input stream that can have a timeout while reading.
34 * This sets non-blocking flag on the socket channel.
35 * So after create this object, read() on
36 * {@link Socket#getInputStream()} and write() on
37 * {@link Socket#getOutputStream()} for the associated socket will throw
38 * IllegalBlockingModeException.
39 * Please use {@link SocketOutputStream} for writing.
40 */
41public class SocketInputStream extends InputStream
42                               implements ReadableByteChannel {
43
44  private Reader reader;
45
46  private static class Reader extends SocketIOWithTimeout {
47    ReadableByteChannel channel;
48   
49    Reader(ReadableByteChannel channel, long timeout) throws IOException {
50      super((SelectableChannel)channel, timeout);
51      this.channel = channel;
52    }
53   
54    int performIO(ByteBuffer buf) throws IOException {
55      return channel.read(buf);
56    }
57  }
58 
59  /**
60   * Create a new input stream with the given timeout. If the timeout
61   * is zero, it will be treated as infinite timeout. The socket's
62   * channel will be configured to be non-blocking.
63   *
64   * @param channel
65   *        Channel for reading, should also be a {@link SelectableChannel}.
66   *        The channel will be configured to be non-blocking.
67   * @param timeout timeout in milliseconds. must not be negative.
68   * @throws IOException
69   */
70  public SocketInputStream(ReadableByteChannel channel, long timeout)
71                                                        throws IOException {
72    SocketIOWithTimeout.checkChannelValidity(channel);
73    reader = new Reader(channel, timeout);
74  }
75
76  /**
77   * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
78   *
79   * Create a new input stream with the given timeout. If the timeout
80   * is zero, it will be treated as infinite timeout. The socket's
81   * channel will be configured to be non-blocking.
82   *
83   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
84   * 
85   * @param socket should have a channel associated with it.
86   * @param timeout timeout timeout in milliseconds. must not be negative.
87   * @throws IOException
88   */
89  public SocketInputStream(Socket socket, long timeout) 
90                                         throws IOException {
91    this(socket.getChannel(), timeout);
92  }
93 
94  /**
95   * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
96   * :<br><br>
97   *
98   * Create a new input stream with the given timeout. If the timeout
99   * is zero, it will be treated as infinite timeout. The socket's
100   * channel will be configured to be non-blocking.
101   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
102   * 
103   * @param socket should have a channel associated with it.
104   * @throws IOException
105   */
106  public SocketInputStream(Socket socket) throws IOException {
107    this(socket.getChannel(), socket.getSoTimeout());
108  }
109 
110  @Override
111  public int read() throws IOException {
112    /* Allocation can be removed if required.
113     * probably no need to optimize or encourage single byte read.
114     */
115    byte[] buf = new byte[1];
116    int ret = read(buf, 0, 1);
117    if (ret > 0) {
118      return (byte)buf[0];
119    }
120    if (ret != -1) {
121      // unexpected
122      throw new IOException("Could not read from stream");
123    }
124    return ret;
125  }
126
127  public int read(byte[] b, int off, int len) throws IOException {
128    return read(ByteBuffer.wrap(b, off, len));
129  }
130
131  public synchronized void close() throws IOException {
132    /* close the channel since Socket.getInputStream().close()
133     * closes the socket.
134     */
135    reader.channel.close();
136    reader.close();
137  }
138
139  /**
140   * Returns underlying channel used by inputstream.
141   * This is useful in certain cases like channel for
142   * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
143   */
144  public ReadableByteChannel getChannel() {
145    return reader.channel; 
146  }
147 
148  //ReadableByteChannel interface
149   
150  public boolean isOpen() {
151    return reader.isOpen();
152  }
153   
154  public int read(ByteBuffer dst) throws IOException {
155    return reader.doIO(dst, SelectionKey.OP_READ);
156  }
157 
158  /**
159   * waits for the underlying channel to be ready for reading.
160   * The timeout specified for this stream applies to this wait.
161   *
162   * @throws SocketTimeoutException
163   *         if select on the channel times out.
164   * @throws IOException
165   *         if any other I/O error occurs.
166   */
167  public void waitForReadable() throws IOException {
168    reader.waitForIO(SelectionKey.OP_READ);
169  }
170}
Note: See TracBrowser for help on using the repository browser.