source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 12.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.DataOutputStream;
24import java.io.File;
25import java.io.IOException;
26import java.util.Iterator;
27
28import junit.framework.TestCase;
29
30import org.apache.hadoop.conf.Configuration;
31import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
32import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
33import org.apache.hadoop.examples.SecondarySort.IntPair;
34import org.apache.hadoop.examples.SecondarySort;
35import org.apache.hadoop.examples.WordCount;
36import org.apache.hadoop.fs.FileSystem;
37import org.apache.hadoop.fs.Path;
38import org.apache.hadoop.io.IntWritable;
39import org.apache.hadoop.io.Text;
40import org.apache.hadoop.io.Writable;
41import org.apache.hadoop.io.WritableComparable;
42import org.apache.hadoop.io.WritableUtils;
43import org.apache.hadoop.mapred.MRCaching.TestResult;
44import org.apache.hadoop.mapreduce.Job;
45import org.apache.hadoop.mapreduce.TestMapReduceLocal;
46import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
47import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48import org.apache.hadoop.util.Progressable;
49
50/**
51 * A JUnit test to test min map-reduce cluster with local file system.
52 */
53public class TestMiniMRLocalFS extends TestCase {
54  private static String TEST_ROOT_DIR =
55    new File(System.getProperty("test.build.data","/tmp"))
56    .toURI().toString().replace(' ', '+');
57   
58  public void testWithLocal()
59      throws IOException, InterruptedException, ClassNotFoundException {
60    MiniMRCluster mr = null;
61    try {
62      mr = new MiniMRCluster(2, "file:///", 3);
63      TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
64
65      // run the wordcount example with caching
66      JobConf job = mr.createJobConf();
67      TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
68                                            TEST_ROOT_DIR + "/wc/output", 
69                                            TEST_ROOT_DIR + "/cachedir",
70                                            job,
71                                            "The quick brown fox\n" 
72                                            + "has many silly\n"
73                                            + "red fox sox\n");
74      // assert the number of lines read during caching
75      assertTrue("Failed test archives not matching", ret.isOutputOk);
76      // test the task report fetchers
77      JobClient client = new JobClient(job);
78      JobID jobid = ret.job.getID();
79      TaskReport[] reports;
80      reports = client.getSetupTaskReports(jobid);
81      assertEquals("number of setups", 2, reports.length);
82      reports = client.getMapTaskReports(jobid);
83      assertEquals("number of maps", 1, reports.length);
84      reports = client.getReduceTaskReports(jobid);
85      assertEquals("number of reduces", 1, reports.length);
86      reports = client.getCleanupTaskReports(jobid);
87      assertEquals("number of cleanups", 2, reports.length);
88      Counters counters = ret.job.getCounters();
89      assertEquals("number of map inputs", 3, 
90                   counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));
91      assertEquals("number of reduce outputs", 9, 
92                   counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
93      runCustomFormats(mr);
94      runSecondarySort(mr.createJobConf());
95    } finally {
96      if (mr != null) { mr.shutdown(); }
97    }
98  }
99 
100  private void runCustomFormats(MiniMRCluster mr) throws IOException {
101    JobConf job = mr.createJobConf();
102    FileSystem fileSys = FileSystem.get(job);
103    Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
104    Path outDir = new Path(testDir, "out");
105    System.out.println("testDir= " + testDir);
106    fileSys.delete(testDir, true);
107   
108    job.setInputFormat(MyInputFormat.class);
109    job.setOutputFormat(MyOutputFormat.class);
110    job.setOutputKeyClass(Text.class);
111    job.setOutputValueClass(IntWritable.class);
112   
113    job.setMapperClass(MyMapper.class);       
114    job.setReducerClass(MyReducer.class);
115    job.setNumMapTasks(100);
116    job.setNumReduceTasks(1);
117    // explicitly do not use "normal" job.setOutputPath to make sure
118    // that it is not hardcoded anywhere in the framework.
119    job.set("non.std.out", outDir.toString());
120    try {
121      JobClient.runJob(job);
122      String result = 
123        TestMiniMRWithDFS.readOutput(outDir, job);
124      assertEquals("output", ("aunt annie\t1\n" +
125                              "bumble boat\t4\n" +
126                              "crocodile pants\t0\n" +
127                              "duck-dog\t5\n"+
128                              "eggs\t2\n" + 
129                              "finagle the agent\t3\n"), result);
130    } finally {
131      fileSys.delete(testDir, true);
132    }
133   
134  }
135 
136  private static class MyInputFormat
137    implements InputFormat<IntWritable, Text> {
138   
139    static final String[] data = new String[]{
140      "crocodile pants", 
141      "aunt annie", 
142      "eggs",
143      "finagle the agent",
144      "bumble boat", 
145      "duck-dog",
146    };
147
148    private static class MySplit implements InputSplit {
149      int first;
150      int length;
151
152      public MySplit() { }
153
154      public MySplit(int first, int length) {
155        this.first = first;
156        this.length = length;
157      }
158
159      public String[] getLocations() {
160        return new String[0];
161      }
162
163      public long getLength() {
164        return length;
165      }
166
167      public void write(DataOutput out) throws IOException {
168        WritableUtils.writeVInt(out, first);
169        WritableUtils.writeVInt(out, length);
170      }
171
172      public void readFields(DataInput in) throws IOException {
173        first = WritableUtils.readVInt(in);
174        length = WritableUtils.readVInt(in);
175      }
176    }
177
178    static class MyRecordReader implements RecordReader<IntWritable, Text> {
179      int index;
180      int past;
181      int length;
182     
183      MyRecordReader(int index, int length) {
184        this.index = index;
185        this.past = index + length;
186        this.length = length;
187      }
188
189      public boolean next(IntWritable key, Text value) throws IOException {
190        if (index < past) {
191          key.set(index);
192          value.set(data[index]);
193          index += 1;
194          return true;
195        }
196        return false;
197      }
198     
199      public IntWritable createKey() {
200        return new IntWritable();
201      }
202     
203      public Text createValue() {
204        return new Text();
205      }
206
207      public long getPos() throws IOException {
208        return index;
209      }
210
211      public void close() throws IOException {}
212
213      public float getProgress() throws IOException {
214        return 1.0f - (past-index)/length;
215      }
216    }
217   
218    public InputSplit[] getSplits(JobConf job, 
219                                  int numSplits) throws IOException {
220      return new MySplit[]{new MySplit(0, 1), new MySplit(1, 3),
221                           new MySplit(4, 2)};
222    }
223
224    public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
225                                                           JobConf job, 
226                                                           Reporter reporter)
227                                                           throws IOException {
228      MySplit sp = (MySplit) split;
229      return new MyRecordReader(sp.first, sp.length);
230    }
231   
232  }
233 
234  static class MyMapper extends MapReduceBase
235    implements Mapper<WritableComparable, Writable,
236                      WritableComparable, Writable> {
237   
238    public void map(WritableComparable key, Writable value, 
239                    OutputCollector<WritableComparable, Writable> out,
240                    Reporter reporter) throws IOException {
241      System.out.println("map: " + key + ", " + value);
242      out.collect((WritableComparable) value, key);
243      InputSplit split = reporter.getInputSplit();
244      if (split.getClass() != MyInputFormat.MySplit.class) {
245        throw new IOException("Got wrong split in MyMapper! " + 
246                              split.getClass().getName());
247      }
248    }
249  }
250
251  static class MyReducer extends MapReduceBase
252    implements Reducer<WritableComparable, Writable,
253                      WritableComparable, Writable> {
254    public void reduce(WritableComparable key, Iterator<Writable> values, 
255                       OutputCollector<WritableComparable, Writable> output,
256                       Reporter reporter) throws IOException {
257      try {
258        InputSplit split = reporter.getInputSplit();
259        throw new IOException("Got an input split of " + split);
260      } catch (UnsupportedOperationException e) {
261        // expected result
262      }
263      while (values.hasNext()) {
264        Writable value = values.next();
265        System.out.println("reduce: " + key + ", " + value);
266        output.collect(key, value);
267      }
268    }
269  }
270
271  static class MyOutputFormat implements OutputFormat {
272    static class MyRecordWriter implements RecordWriter<Object, Object> {
273      private DataOutputStream out;
274     
275      public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
276        out = outputFile.getFileSystem(job).create(outputFile);
277      }
278     
279      public void write(Object key, Object value) throws IOException {
280        out.writeBytes(key.toString() + "\t" + value.toString() + "\n");
281      }
282
283      public void close(Reporter reporter) throws IOException { 
284        out.close();
285      }
286    }
287   
288    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
289                                        String name,
290                                        Progressable progress
291                                        ) throws IOException {
292      return new MyRecordWriter(new Path(job.get("non.std.out")), job);
293    }
294
295    public void checkOutputSpecs(FileSystem ignored, 
296                                 JobConf job) throws IOException {
297    }
298  }
299
300  private void runSecondarySort(Configuration conf) throws IOException,
301                                                        InterruptedException,
302                                                        ClassNotFoundException {
303    FileSystem localFs = FileSystem.getLocal(conf);
304    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
305    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
306    TestMapReduceLocal.writeFile
307             ("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
308              "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
309    Job job = new Job(conf, "word count");
310    job.setJarByClass(WordCount.class);
311    job.setNumReduceTasks(2);
312    job.setMapperClass(SecondarySort.MapClass.class);
313    job.setReducerClass(SecondarySort.Reduce.class);
314    // group and partition by the first int in the pair
315    job.setPartitionerClass(FirstPartitioner.class);
316    job.setGroupingComparatorClass(FirstGroupingComparator.class);
317
318    // the map output is IntPair, IntWritable
319    job.setMapOutputKeyClass(IntPair.class);
320    job.setMapOutputValueClass(IntWritable.class);
321
322    // the reduce output is Text, IntWritable
323    job.setOutputKeyClass(Text.class);
324    job.setOutputValueClass(IntWritable.class);
325
326    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
327    FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
328    assertTrue(job.waitForCompletion(true));
329    String out = TestMapReduceLocal.readFile("out/part-r-00000");
330    assertEquals("------------------------------------------------\n" +
331                 "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
332                 "------------------------------------------------\n" +
333                 "10\t20\n10\t25\n10\t30\n", out);
334    out = TestMapReduceLocal.readFile("out/part-r-00001");
335    assertEquals("------------------------------------------------\n" +
336                 "-3\t23\n" +
337                 "------------------------------------------------\n" +
338                 "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
339                 "------------------------------------------------\n" +
340                 "5\t10\n", out);
341  }
342}
Note: See TracBrowser for help on using the repository browser.