source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/ipc/TestIPC.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.ipc;
20
21import org.apache.commons.logging.*;
22
23import org.apache.hadoop.io.Writable;
24import org.apache.hadoop.io.LongWritable;
25import org.apache.hadoop.util.StringUtils;
26import org.apache.hadoop.net.NetUtils;
27
28import java.util.Random;
29import java.io.IOException;
30import java.net.InetSocketAddress;
31
32import junit.framework.TestCase;
33
34import org.apache.hadoop.conf.Configuration;
35
36/** Unit tests for IPC. */
37public class TestIPC extends TestCase {
38  public static final Log LOG =
39    LogFactory.getLog(TestIPC.class);
40 
41  final private static Configuration conf = new Configuration();
42  final static private int PING_INTERVAL = 1000;
43 
44  static {
45    Client.setPingInterval(conf, PING_INTERVAL);
46  }
47  public TestIPC(String name) { super(name); }
48
49  private static final Random RANDOM = new Random();
50
51  private static final String ADDRESS = "0.0.0.0";
52
53  private static class TestServer extends Server {
54    private boolean sleep;
55
56    public TestServer(int handlerCount, boolean sleep) 
57      throws IOException {
58      super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
59      this.sleep = sleep;
60    }
61
62    @Override
63    public Writable call(Class<?> protocol, Writable param, long receiveTime)
64        throws IOException {
65      if (sleep) {
66        try {
67          Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL));      // sleep a bit
68        } catch (InterruptedException e) {}
69      }
70      return param;                               // echo param as result
71    }
72  }
73
74  private static class SerialCaller extends Thread {
75    private Client client;
76    private InetSocketAddress server;
77    private int count;
78    private boolean failed;
79
80    public SerialCaller(Client client, InetSocketAddress server, int count) {
81      this.client = client;
82      this.server = server;
83      this.count = count;
84    }
85
86    public void run() {
87      for (int i = 0; i < count; i++) {
88        try {
89          LongWritable param = new LongWritable(RANDOM.nextLong());
90          LongWritable value =
91            (LongWritable)client.call(param, server);
92          if (!param.equals(value)) {
93            LOG.fatal("Call failed!");
94            failed = true;
95            break;
96          }
97        } catch (Exception e) {
98          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
99          failed = true;
100        }
101      }
102    }
103  }
104
105  private static class ParallelCaller extends Thread {
106    private Client client;
107    private int count;
108    private InetSocketAddress[] addresses;
109    private boolean failed;
110   
111    public ParallelCaller(Client client, InetSocketAddress[] addresses,
112                          int count) {
113      this.client = client;
114      this.addresses = addresses;
115      this.count = count;
116    }
117
118    public void run() {
119      for (int i = 0; i < count; i++) {
120        try {
121          Writable[] params = new Writable[addresses.length];
122          for (int j = 0; j < addresses.length; j++)
123            params[j] = new LongWritable(RANDOM.nextLong());
124          Writable[] values = client.call(params, addresses);
125          for (int j = 0; j < addresses.length; j++) {
126            if (!params[j].equals(values[j])) {
127              LOG.fatal("Call failed!");
128              failed = true;
129              break;
130            }
131          }
132        } catch (Exception e) {
133          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
134          failed = true;
135        }
136      }
137    }
138  }
139
140  public void testSerial() throws Exception {
141    testSerial(3, false, 2, 5, 100);
142  }
143
144  public void testSerial(int handlerCount, boolean handlerSleep, 
145                         int clientCount, int callerCount, int callCount)
146    throws Exception {
147    Server server = new TestServer(handlerCount, handlerSleep);
148    InetSocketAddress addr = NetUtils.getConnectAddress(server);
149    server.start();
150
151    Client[] clients = new Client[clientCount];
152    for (int i = 0; i < clientCount; i++) {
153      clients[i] = new Client(LongWritable.class, conf);
154    }
155   
156    SerialCaller[] callers = new SerialCaller[callerCount];
157    for (int i = 0; i < callerCount; i++) {
158      callers[i] = new SerialCaller(clients[i%clientCount], addr, callCount);
159      callers[i].start();
160    }
161    for (int i = 0; i < callerCount; i++) {
162      callers[i].join();
163      assertFalse(callers[i].failed);
164    }
165    for (int i = 0; i < clientCount; i++) {
166      clients[i].stop();
167    }
168    server.stop();
169  }
170       
171  public void testParallel() throws Exception {
172    testParallel(10, false, 2, 4, 2, 4, 100);
173  }
174
175  public void testParallel(int handlerCount, boolean handlerSleep,
176                           int serverCount, int addressCount,
177                           int clientCount, int callerCount, int callCount)
178    throws Exception {
179    Server[] servers = new Server[serverCount];
180    for (int i = 0; i < serverCount; i++) {
181      servers[i] = new TestServer(handlerCount, handlerSleep);
182      servers[i].start();
183    }
184
185    InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
186    for (int i = 0; i < addressCount; i++) {
187      addresses[i] = NetUtils.getConnectAddress(servers[i%serverCount]);
188    }
189
190    Client[] clients = new Client[clientCount];
191    for (int i = 0; i < clientCount; i++) {
192      clients[i] = new Client(LongWritable.class, conf);
193    }
194   
195    ParallelCaller[] callers = new ParallelCaller[callerCount];
196    for (int i = 0; i < callerCount; i++) {
197      callers[i] =
198        new ParallelCaller(clients[i%clientCount], addresses, callCount);
199      callers[i].start();
200    }
201    for (int i = 0; i < callerCount; i++) {
202      callers[i].join();
203      assertFalse(callers[i].failed);
204    }
205    for (int i = 0; i < clientCount; i++) {
206      clients[i].stop();
207    }
208    for (int i = 0; i < serverCount; i++) {
209      servers[i].stop();
210    }
211  }
212       
213  public void testStandAloneClient() throws Exception {
214    testParallel(10, false, 2, 4, 2, 4, 100);
215    Client client = new Client(LongWritable.class, conf);
216    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
217    try {
218      client.call(new LongWritable(RANDOM.nextLong()),
219              address);
220      fail("Expected an exception to have been thrown");
221    } catch (IOException e) {
222      String message = e.getMessage();
223      String addressText = address.toString();
224      assertTrue("Did not find "+addressText+" in "+message,
225              message.contains(addressText));
226      Throwable cause=e.getCause();
227      assertNotNull("No nested exception in "+e,cause);
228      String causeText=cause.getMessage();
229      assertTrue("Did not find " + causeText + " in " + message,
230              message.contains(causeText));
231    }
232  }
233
234
235  public static void main(String[] args) throws Exception {
236
237    //new TestIPC("test").testSerial(5, false, 2, 10, 1000);
238
239    new TestIPC("test").testParallel(10, false, 2, 4, 2, 4, 1000);
240
241  }
242
243}
Note: See TracBrowser for help on using the repository browser.