source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred.lib;
20
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23import org.apache.hadoop.io.LongWritable;
24import org.apache.hadoop.io.Text;
25import org.apache.hadoop.mapred.*;
26
27import java.io.DataOutputStream;
28import java.io.IOException;
29import java.util.Iterator;
30
31public class TestMultithreadedMapRunner extends HadoopTestCase {
32
33  public TestMultithreadedMapRunner() throws IOException {
34    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
35  }
36
37  public void testOKRun() throws Exception {
38    run(false, false);
39  }
40
41  public void testIOExRun() throws Exception {
42    run(true, false);
43  }
44  public void testRuntimeExRun() throws Exception {
45    run(false, true);
46  }
47
48  private void run(boolean ioEx, boolean rtEx) throws Exception {
49    Path inDir = new Path("testing/mt/input");
50    Path outDir = new Path("testing/mt/output");
51
52    // Hack for local FS that does not have the concept of a 'mounting point'
53    if (isLocalFS()) {
54      String localPathRoot = System.getProperty("test.build.data", "/tmp")
55              .replace(' ', '+');
56      inDir = new Path(localPathRoot, inDir);
57      outDir = new Path(localPathRoot, outDir);
58    }
59
60
61    JobConf conf = createJobConf();
62    FileSystem fs = FileSystem.get(conf);
63
64    fs.delete(outDir, true);
65    if (!fs.mkdirs(inDir)) {
66      throw new IOException("Mkdirs failed to create " + inDir.toString());
67    }
68    {
69      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
70      file.writeBytes("a\nb\n\nc\nd\ne");
71      file.close();
72    }
73
74    conf.setJobName("mt");
75    conf.setInputFormat(TextInputFormat.class);
76
77    conf.setOutputKeyClass(LongWritable.class);
78    conf.setOutputValueClass(Text.class);
79
80    conf.setMapOutputKeyClass(LongWritable.class);
81    conf.setMapOutputValueClass(Text.class);
82
83    conf.setOutputFormat(TextOutputFormat.class);
84    conf.setOutputKeyClass(LongWritable.class);
85    conf.setOutputValueClass(Text.class);
86
87    conf.setMapperClass(IDMap.class);
88    conf.setReducerClass(IDReduce.class);
89
90    FileInputFormat.setInputPaths(conf, inDir);
91    FileOutputFormat.setOutputPath(conf, outDir);
92
93    conf.setMapRunnerClass(MultithreadedMapRunner.class);
94   
95    conf.setInt("mapred.map.multithreadedrunner.threads", 2);
96
97    if (ioEx) {
98      conf.setBoolean("multithreaded.ioException", true);
99    }
100    if (rtEx) {
101      conf.setBoolean("multithreaded.runtimeException", true);
102    }
103    JobClient jc = new JobClient(conf);
104    RunningJob job =jc.submitJob(conf);
105    while (!job.isComplete()) {
106      Thread.sleep(100);
107    }
108
109    if (job.isSuccessful()) {
110      assertFalse(ioEx || rtEx);
111    }
112    else {
113      assertTrue(ioEx || rtEx);
114    }
115
116  }
117
118  public static class IDMap implements Mapper<LongWritable, Text,
119                                              LongWritable, Text> {
120    private boolean ioEx = false;
121    private boolean rtEx = false;
122
123    public void configure(JobConf job) {
124      ioEx = job.getBoolean("multithreaded.ioException", false);
125      rtEx = job.getBoolean("multithreaded.runtimeException", false);
126    }
127
128    public void map(LongWritable key, Text value,
129                    OutputCollector<LongWritable, Text> output,
130                    Reporter reporter)
131            throws IOException {
132      if (ioEx) {
133        throw new IOException();
134      }
135      if (rtEx) {
136        throw new RuntimeException();
137      }
138      output.collect(key, value);
139      try {
140        Thread.sleep(100);
141      } catch (InterruptedException ex) {
142        throw new RuntimeException(ex);
143      }
144    }
145
146
147    public void close() throws IOException {
148    }
149  }
150
151  public static class IDReduce implements Reducer<LongWritable, Text,
152                                                  LongWritable, Text> {
153
154    public void configure(JobConf job) {
155    }
156
157    public void reduce(LongWritable key, Iterator<Text> values,
158                       OutputCollector<LongWritable, Text> output,
159                       Reporter reporter)
160            throws IOException {
161      while (values.hasNext()) {
162        output.collect(key, values.next());
163      }
164    }
165
166    public void close() throws IOException {
167    }
168  }
169}
170
Note: See TracBrowser for help on using the repository browser.