source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.net;
19
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.OutputStream;
23import java.net.SocketTimeoutException;
24import java.nio.channels.Pipe;
25import java.util.Arrays;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29
30import junit.framework.TestCase;
31
32/**
33 * This tests timout out from SocketInputStream and
34 * SocketOutputStream using pipes.
35 *
36 * Normal read and write using these streams are tested by pretty much
37 * every DFS unit test.
38 */
39public class TestSocketIOWithTimeout extends TestCase {
40
41  static Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
42 
43  private static int TIMEOUT = 1*1000; 
44  private static String TEST_STRING = "1234567890";
45 
46  private void doIO(InputStream in, OutputStream out) throws IOException {
47    /* Keep on writing or reading until we get SocketTimeoutException.
48     * It expects this exception to occur within 100 millis of TIMEOUT.
49     */
50    byte buf[] = new byte[4192];
51   
52    while (true) {
53      long start = System.currentTimeMillis();
54      try {
55        if (in != null) {
56          in.read(buf);
57        } else {
58          out.write(buf);
59        }
60      } catch (SocketTimeoutException e) {
61        long diff = System.currentTimeMillis() - start;
62        LOG.info("Got SocketTimeoutException as expected after " + 
63                 diff + " millis : " + e.getMessage());
64        assertTrue(Math.abs(TIMEOUT - diff) <= 200);
65        break;
66      }
67    }
68  }
69 
70  /**
71   * Just reads one byte from the input stream.
72   */
73  static class ReadRunnable implements Runnable {
74    private InputStream in;
75
76    public ReadRunnable(InputStream in) {
77      this.in = in;
78    }
79    public void run() {
80      try {
81        in.read();
82      } catch (IOException e) {
83        LOG.info("Got expection while reading as expected : " + 
84                 e.getMessage());
85        return;
86      }
87      assertTrue(false);
88    }
89  }
90 
91  public void testSocketIOWithTimeout() throws IOException {
92   
93    // first open pipe:
94    Pipe pipe = Pipe.open();
95    Pipe.SourceChannel source = pipe.source();
96    Pipe.SinkChannel sink = pipe.sink();
97   
98    try {
99      InputStream in = new SocketInputStream(source, TIMEOUT);
100      OutputStream out = new SocketOutputStream(sink, TIMEOUT);
101     
102      byte[] writeBytes = TEST_STRING.getBytes();
103      byte[] readBytes = new byte[writeBytes.length];
104     
105      out.write(writeBytes);
106      doIO(null, out);
107     
108      in.read(readBytes);
109      assertTrue(Arrays.equals(writeBytes, readBytes));
110      doIO(in, null);
111     
112      /*
113       * Verify that it handles interrupted threads properly.
114       * Use a large timeout and expect the thread to return quickly.
115       */
116      in = new SocketInputStream(source, 0);
117      Thread thread = new Thread(new ReadRunnable(in));
118      thread.start();
119     
120      try {
121        Thread.sleep(1000);
122      } catch (InterruptedException ignored) {}
123     
124      thread.interrupt();
125     
126      try {
127        thread.join();
128      } catch (InterruptedException e) {
129        throw new IOException("Unexpected InterruptedException : " + e);
130      }
131     
132      //make sure the channels are still open
133      assertTrue(source.isOpen());
134      assertTrue(sink.isOpen());
135
136      out.close();
137      assertFalse(sink.isOpen());
138     
139      // close sink and expect -1 from source.read()
140      assertEquals(-1, in.read());
141     
142      // make sure close() closes the underlying channel.
143      in.close();
144      assertFalse(source.isOpen());
145     
146    } finally {
147      if (source != null) {
148        source.close();
149      }
150      if (sink != null) {
151        sink.close();
152      }
153    }
154  }
155}
Note: See TracBrowser for help on using the repository browser.