source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/ipc/TestRPC.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.ipc;
20
21import java.io.IOException;
22import java.net.ConnectException;
23import java.net.InetSocketAddress;
24import java.lang.reflect.Method;
25
26import junit.framework.TestCase;
27
28import java.util.Arrays;
29
30import org.apache.commons.logging.*;
31
32import org.apache.hadoop.conf.Configuration;
33import org.apache.hadoop.io.UTF8;
34import org.apache.hadoop.io.Writable;
35
36import org.apache.hadoop.net.NetUtils;
37import org.apache.hadoop.security.SecurityUtil;
38import org.apache.hadoop.security.authorize.AuthorizationException;
39import org.apache.hadoop.security.authorize.ConfiguredPolicy;
40import org.apache.hadoop.security.authorize.PolicyProvider;
41import org.apache.hadoop.security.authorize.Service;
42import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
43
44/** Unit tests for RPC. */
45public class TestRPC extends TestCase {
46  private static final String ADDRESS = "0.0.0.0";
47
48  public static final Log LOG =
49    LogFactory.getLog(TestRPC.class);
50 
51  private static Configuration conf = new Configuration();
52
53  int datasize = 1024*100;
54  int numThreads = 50;
55
56  public TestRPC(String name) { super(name); }
57       
58  public interface TestProtocol extends VersionedProtocol {
59    public static final long versionID = 1L;
60   
61    void ping() throws IOException;
62    void slowPing(boolean shouldSlow) throws IOException;
63    String echo(String value) throws IOException;
64    String[] echo(String[] value) throws IOException;
65    Writable echo(Writable value) throws IOException;
66    int add(int v1, int v2) throws IOException;
67    int add(int[] values) throws IOException;
68    int error() throws IOException;
69    void testServerGet() throws IOException;
70    int[] exchange(int[] values) throws IOException;
71  }
72
73  public class TestImpl implements TestProtocol {
74    int fastPingCounter = 0;
75   
76    public long getProtocolVersion(String protocol, long clientVersion) {
77      return TestProtocol.versionID;
78    }
79   
80    public void ping() {}
81
82    public synchronized void slowPing(boolean shouldSlow) {
83      if (shouldSlow) {
84        while (fastPingCounter < 2) {
85          try {
86          wait();  // slow response until two fast pings happened
87          } catch (InterruptedException ignored) {}
88        }
89        fastPingCounter -= 2;
90      } else {
91        fastPingCounter++;
92        notify();
93      }
94    }
95   
96    public String echo(String value) throws IOException { return value; }
97
98    public String[] echo(String[] values) throws IOException { return values; }
99
100    public Writable echo(Writable writable) {
101      return writable;
102    }
103    public int add(int v1, int v2) {
104      return v1 + v2;
105    }
106
107    public int add(int[] values) {
108      int sum = 0;
109      for (int i = 0; i < values.length; i++) {
110        sum += values[i];
111      }
112      return sum;
113    }
114
115    public int error() throws IOException {
116      throw new IOException("bobo");
117    }
118
119    public void testServerGet() throws IOException {
120      if (!(Server.get() instanceof RPC.Server)) {
121        throw new IOException("Server.get() failed");
122      }
123    }
124
125    public int[] exchange(int[] values) {
126      for (int i = 0; i < values.length; i++) {
127        values[i] = i;
128      }
129      return values;
130    }
131  }
132
133  //
134  // an object that does a bunch of transactions
135  //
136  static class Transactions implements Runnable {
137    int datasize;
138    TestProtocol proxy;
139
140    Transactions(TestProtocol proxy, int datasize) {
141      this.proxy = proxy;
142      this.datasize = datasize;
143    }
144
145    // do two RPC that transfers data.
146    public void run() {
147      int[] indata = new int[datasize];
148      int[] outdata = null;
149      int val = 0;
150      try {
151        outdata = proxy.exchange(indata);
152        val = proxy.add(1,2);
153      } catch (IOException e) {
154        assertTrue("Exception from RPC exchange() "  + e, false);
155      }
156      assertEquals(indata.length, outdata.length);
157      assertEquals(val, 3);
158      for (int i = 0; i < outdata.length; i++) {
159        assertEquals(outdata[i], i);
160      }
161    }
162  }
163
164  //
165  // A class that does an RPC but does not read its response.
166  //
167  static class SlowRPC implements Runnable {
168    private TestProtocol proxy;
169    private volatile boolean done;
170   
171    SlowRPC(TestProtocol proxy) {
172      this.proxy = proxy;
173      done = false;
174    }
175
176    boolean isDone() {
177      return done;
178    }
179
180    public void run() {
181      try {
182        proxy.slowPing(true);   // this would hang until two fast pings happened
183        done = true;
184      } catch (IOException e) {
185        assertTrue("SlowRPC ping exception " + e, false);
186      }
187    }
188  }
189
190  public void testSlowRpc() throws Exception {
191    System.out.println("Testing Slow RPC");
192    // create a server with two handlers
193    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf);
194    TestProtocol proxy = null;
195   
196    try {
197    server.start();
198
199    InetSocketAddress addr = NetUtils.getConnectAddress(server);
200
201    // create a client
202    proxy = (TestProtocol)RPC.getProxy(
203        TestProtocol.class, TestProtocol.versionID, addr, conf);
204
205    SlowRPC slowrpc = new SlowRPC(proxy);
206    Thread thread = new Thread(slowrpc, "SlowRPC");
207    thread.start(); // send a slow RPC, which won't return until two fast pings
208    assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());
209
210    proxy.slowPing(false); // first fast ping
211   
212    // verify that the first RPC is still stuck
213    assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());
214
215    proxy.slowPing(false); // second fast ping
216   
217    // Now the slow ping should be able to be executed
218    while (!slowrpc.isDone()) {
219      System.out.println("Waiting for slow RPC to get done.");
220      try {
221        Thread.sleep(1000);
222      } catch (InterruptedException e) {}
223    }
224    } finally {
225      server.stop();
226      if (proxy != null) {
227        RPC.stopProxy(proxy);
228      }
229      System.out.println("Down slow rpc testing");
230    }
231  }
232
233
234  public void testCalls() throws Exception {
235    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
236    TestProtocol proxy = null;
237    try {
238    server.start();
239
240    InetSocketAddress addr = NetUtils.getConnectAddress(server);
241    proxy = (TestProtocol)RPC.getProxy(
242        TestProtocol.class, TestProtocol.versionID, addr, conf);
243     
244    proxy.ping();
245
246    String stringResult = proxy.echo("foo");
247    assertEquals(stringResult, "foo");
248
249    stringResult = proxy.echo((String)null);
250    assertEquals(stringResult, null);
251
252    String[] stringResults = proxy.echo(new String[]{"foo","bar"});
253    assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));
254
255    stringResults = proxy.echo((String[])null);
256    assertTrue(Arrays.equals(stringResults, null));
257
258    UTF8 utf8Result = (UTF8)proxy.echo(new UTF8("hello world"));
259    assertEquals(utf8Result, new UTF8("hello world"));
260
261    utf8Result = (UTF8)proxy.echo((UTF8)null);
262    assertEquals(utf8Result, null);
263
264    int intResult = proxy.add(1, 2);
265    assertEquals(intResult, 3);
266
267    intResult = proxy.add(new int[] {1, 2});
268    assertEquals(intResult, 3);
269
270    boolean caught = false;
271    try {
272      proxy.error();
273    } catch (IOException e) {
274      LOG.debug("Caught " + e);
275      caught = true;
276    }
277    assertTrue(caught);
278
279    proxy.testServerGet();
280
281    // create multiple threads and make them do large data transfers
282    System.out.println("Starting multi-threaded RPC test...");
283    server.setSocketSendBufSize(1024);
284    Thread threadId[] = new Thread[numThreads];
285    for (int i = 0; i < numThreads; i++) {
286      Transactions trans = new Transactions(proxy, datasize);
287      threadId[i] = new Thread(trans, "TransactionThread-" + i);
288      threadId[i].start();
289    }
290
291    // wait for all transactions to get over
292    System.out.println("Waiting for all threads to finish RPCs...");
293    for (int i = 0; i < numThreads; i++) {
294      try {
295        threadId[i].join();
296      } catch (InterruptedException e) {
297        i--;      // retry
298      }
299    }
300
301    // try some multi-calls
302    Method echo =
303      TestProtocol.class.getMethod("echo", new Class[] { String.class });
304    String[] strings = (String[])RPC.call(echo, new String[][]{{"a"},{"b"}},
305                                          new InetSocketAddress[] {addr, addr}, conf);
306    assertTrue(Arrays.equals(strings, new String[]{"a","b"}));
307
308    Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
309    Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}},
310                                        new InetSocketAddress[] {addr, addr}, conf);
311    assertEquals(voids, null);
312    } finally {
313      server.stop();
314      if(proxy!=null) RPC.stopProxy(proxy);
315    }
316  }
317 
318  public void testStandaloneClient() throws IOException {
319    try {
320      RPC.waitForProxy(TestProtocol.class,
321        TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
322      fail("We should not have reached here");
323    } catch (ConnectException ioe) {
324      //this is what we expected
325    }
326  }
327 
328  private static final String ACL_CONFIG = "test.protocol.acl";
329 
330  private static class TestPolicyProvider extends PolicyProvider {
331
332    @Override
333    public Service[] getServices() {
334      return new Service[] { new Service(ACL_CONFIG, TestProtocol.class) };
335    }
336   
337  }
338 
339  private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
340    SecurityUtil.setPolicy(new ConfiguredPolicy(conf, new TestPolicyProvider()));
341   
342    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 5, true, conf);
343
344    TestProtocol proxy = null;
345
346    server.start();
347
348    InetSocketAddress addr = NetUtils.getConnectAddress(server);
349   
350    try {
351      proxy = (TestProtocol)RPC.getProxy(
352          TestProtocol.class, TestProtocol.versionID, addr, conf);
353      proxy.ping();
354
355      if (expectFailure) {
356        fail("Expect RPC.getProxy to fail with AuthorizationException!");
357      }
358    } catch (RemoteException e) {
359      if (expectFailure) {
360        assertTrue(e.unwrapRemoteException() instanceof AuthorizationException);
361      } else {
362        throw e;
363      }
364    } finally {
365      server.stop();
366      if (proxy != null) {
367        RPC.stopProxy(proxy);
368      }
369    }
370  }
371 
372  public void testAuthorization() throws Exception {
373    Configuration conf = new Configuration();
374    conf.setBoolean(
375        ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, true);
376   
377    // Expect to succeed
378    conf.set(ACL_CONFIG, "*");
379    doRPCs(conf, false);
380   
381    // Reset authorization to expect failure
382    conf.set(ACL_CONFIG, "invalid invalid");
383    doRPCs(conf, true);
384  }
385 
386  public static void main(String[] args) throws Exception {
387
388    new TestRPC("test").testCalls();
389
390  }
391}
Note: See TracBrowser for help on using the repository browser.