source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.ipc;
20
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.util.Random;
24
25import junit.framework.TestCase;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.io.BytesWritable;
31import org.apache.hadoop.io.Writable;
32import org.apache.hadoop.net.NetUtils;
33
34/**
35 * This test provokes partial writes in the server, which is
36 * serving multiple clients.
37 */
38public class TestIPCServerResponder extends TestCase {
39
40  public static final Log LOG = 
41            LogFactory.getLog(TestIPCServerResponder.class);
42
43  private static Configuration conf = new Configuration();
44
45  public TestIPCServerResponder(final String name) {
46    super(name);
47  }
48
49  private static final Random RANDOM = new Random();
50
51  private static final String ADDRESS = "0.0.0.0";
52
53  private static final int BYTE_COUNT = 1024;
54  private static final byte[] BYTES = new byte[BYTE_COUNT];
55  static {
56    for (int i = 0; i < BYTE_COUNT; i++)
57      BYTES[i] = (byte) ('a' + (i % 26));
58  }
59
60  private static class TestServer extends Server {
61
62    private boolean sleep;
63
64    public TestServer(final int handlerCount, final boolean sleep) 
65                                              throws IOException {
66      super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
67      // Set the buffer size to half of the maximum parameter/result size
68      // to force the socket to block
69      this.setSocketSendBufSize(BYTE_COUNT / 2);
70      this.sleep = sleep;
71    }
72
73    @Override
74    public Writable call(Class<?> protocol, Writable param, long receiveTime)
75        throws IOException {
76      if (sleep) {
77        try {
78          Thread.sleep(RANDOM.nextInt(20)); // sleep a bit
79        } catch (InterruptedException e) {}
80      }
81      return param;
82    }
83  }
84
85  private static class Caller extends Thread {
86
87    private Client client;
88    private int count;
89    private InetSocketAddress address;
90    private boolean failed;
91
92    public Caller(final Client client, final InetSocketAddress address, 
93                                       final int count) {
94      this.client = client;
95      this.address = address;
96      this.count = count;
97    }
98
99    @Override
100    public void run() {
101      for (int i = 0; i < count; i++) {
102        try {
103          int byteSize = RANDOM.nextInt(BYTE_COUNT);
104          byte[] bytes = new byte[byteSize];
105          System.arraycopy(BYTES, 0, bytes, 0, byteSize);
106          Writable param = new BytesWritable(bytes);
107          Writable value = client.call(param, address);
108          Thread.sleep(RANDOM.nextInt(20));
109        } catch (Exception e) {
110          LOG.fatal("Caught: " + e);
111          failed = true;
112        }
113      }
114    }
115  }
116
117  public void testServerResponder() throws Exception {
118    testServerResponder(10, true, 1, 10, 200);
119  }
120
121  public void testServerResponder(final int handlerCount, 
122                                  final boolean handlerSleep, 
123                                  final int clientCount,
124                                  final int callerCount,
125                                  final int callCount) throws Exception {
126    Server server = new TestServer(handlerCount, handlerSleep);
127    server.start();
128
129    InetSocketAddress address = NetUtils.getConnectAddress(server);
130    Client[] clients = new Client[clientCount];
131    for (int i = 0; i < clientCount; i++) {
132      clients[i] = new Client(BytesWritable.class, conf);
133    }
134
135    Caller[] callers = new Caller[callerCount];
136    for (int i = 0; i < callerCount; i++) {
137      callers[i] = new Caller(clients[i % clientCount], address, callCount);
138      callers[i].start();
139    }
140    for (int i = 0; i < callerCount; i++) {
141      callers[i].join();
142      assertFalse(callers[i].failed);
143    }
144    for (int i = 0; i < clientCount; i++) {
145      clients[i].stop();
146    }
147    server.stop();
148  }
149
150}
Note: See TracBrowser for help on using the repository browser.