source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMapRed.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 28.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.BufferedReader;
21import java.io.BufferedWriter;
22import java.io.DataInputStream;
23import java.io.DataOutputStream;
24import java.io.IOException;
25import java.io.InputStreamReader;
26import java.io.OutputStreamWriter;
27import java.util.EnumSet;
28import java.util.Iterator;
29import java.util.Random;
30
31import junit.framework.TestCase;
32
33import org.apache.hadoop.conf.Configuration;
34import org.apache.hadoop.fs.FileStatus;
35import org.apache.hadoop.fs.FileSystem;
36import org.apache.hadoop.fs.Path;
37import org.apache.hadoop.io.IntWritable;
38import org.apache.hadoop.io.LongWritable;
39import org.apache.hadoop.io.NullWritable;
40import org.apache.hadoop.io.SequenceFile;
41import org.apache.hadoop.io.Text;
42import org.apache.hadoop.io.WritableComparable;
43import org.apache.hadoop.io.SequenceFile.CompressionType;
44import org.apache.hadoop.mapred.lib.IdentityMapper;
45import org.apache.hadoop.mapred.lib.IdentityReducer;
46
47/**********************************************************
48 * MapredLoadTest generates a bunch of work that exercises
49 * a Hadoop Map-Reduce system (and DFS, too).  It goes through
50 * the following steps:
51 *
52 * 1) Take inputs 'range' and 'counts'.
53 * 2) Generate 'counts' random integers between 0 and range-1.
54 * 3) Create a file that lists each integer between 0 and range-1,
55 *    and lists the number of times that integer was generated.
56 * 4) Emit a (very large) file that contains all the integers
57 *    in the order generated.
58 * 5) After the file has been generated, read it back and count
59 *    how many times each int was generated.
60 * 6) Compare this big count-map against the original one.  If
61 *    they match, then SUCCESS!  Otherwise, FAILURE!
62 *
63 * OK, that's how we can think about it.  What are the map-reduce
64 * steps that get the job done?
65 *
66 * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
67 * 2) In a non-mapread thread, generate the answer-key and write to disk.
68 * 3) In a mapred job, divide the answer key into K jobs.
69 * 4) A mapred 'generator' task consists of K map jobs.  Each reads
70 *    an individual "sub-key", and generates integers according to
71 *    to it (though with a random ordering).
72 * 5) The generator's reduce task agglomerates all of those files
73 *    into a single one.
74 * 6) A mapred 'reader' task consists of M map jobs.  The output
75 *    file is cut into M pieces. Each of the M jobs counts the
76 *    individual ints in its chunk and creates a map of all seen ints.
77 * 7) A mapred job integrates all the count files into a single one.
78 *
79 **********************************************************/
80public class TestMapRed extends TestCase {
81  /**
82   * Modified to make it a junit test.
83   * The RandomGen Job does the actual work of creating
84   * a huge file of assorted numbers.  It receives instructions
85   * as to how many times each number should be counted.  Then
86   * it emits those numbers in a crazy order.
87   *
88   * The map() function takes a key/val pair that describes
89   * a value-to-be-emitted (the key) and how many times it
90   * should be emitted (the value), aka "numtimes".  map() then
91   * emits a series of intermediate key/val pairs.  It emits
92   * 'numtimes' of these.  The key is a random number and the
93   * value is the 'value-to-be-emitted'.
94   *
95   * The system collates and merges these pairs according to
96   * the random number.  reduce() function takes in a key/value
97   * pair that consists of a crazy random number and a series
98   * of values that should be emitted.  The random number key
99   * is now dropped, and reduce() emits a pair for every intermediate value.
100   * The emitted key is an intermediate value.  The emitted value
101   * is just a blank string.  Thus, we've created a huge file
102   * of numbers in random order, but where each number appears
103   * as many times as we were instructed.
104   */
105  static class RandomGenMapper
106    implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
107   
108    public void configure(JobConf job) {
109    }
110
111    public void map(IntWritable key, IntWritable val,
112                    OutputCollector<IntWritable, IntWritable> out,
113                    Reporter reporter) throws IOException {
114      int randomVal = key.get();
115      int randomCount = val.get();
116
117      for (int i = 0; i < randomCount; i++) {
118        out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
119      }
120    }
121    public void close() {
122    }
123  }
124  /**
125   */
126  static class RandomGenReducer
127    implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
128   
129    public void configure(JobConf job) {
130    }
131
132    public void reduce(IntWritable key, Iterator<IntWritable> it,
133                       OutputCollector<IntWritable, IntWritable> out,
134                       Reporter reporter) throws IOException {
135      while (it.hasNext()) {
136        out.collect(it.next(), null);
137      }
138    }
139    public void close() {
140    }
141  }
142
143  /**
144   * The RandomCheck Job does a lot of our work.  It takes
145   * in a num/string keyspace, and transforms it into a
146   * key/count(int) keyspace.
147   *
148   * The map() function just emits a num/1 pair for every
149   * num/string input pair.
150   *
151   * The reduce() function sums up all the 1s that were
152   * emitted for a single key.  It then emits the key/total
153   * pair.
154   *
155   * This is used to regenerate the random number "answer key".
156   * Each key here is a random number, and the count is the
157   * number of times the number was emitted.
158   */
159  static class RandomCheckMapper
160    implements Mapper<WritableComparable, Text, IntWritable, IntWritable> {
161   
162    public void configure(JobConf job) {
163    }
164
165    public void map(WritableComparable key, Text val,
166                    OutputCollector<IntWritable, IntWritable> out,
167                    Reporter reporter) throws IOException {
168      out.collect(new IntWritable(Integer.parseInt(val.toString().trim())), new IntWritable(1));
169    }
170    public void close() {
171    }
172  }
173  /**
174   */
175  static class RandomCheckReducer
176      implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
177    public void configure(JobConf job) {
178    }
179       
180    public void reduce(IntWritable key, Iterator<IntWritable> it,
181                       OutputCollector<IntWritable, IntWritable> out,
182                       Reporter reporter) throws IOException {
183      int keyint = key.get();
184      int count = 0;
185      while (it.hasNext()) {
186        it.next();
187        count++;
188      }
189      out.collect(new IntWritable(keyint), new IntWritable(count));
190    }
191    public void close() {
192    }
193  }
194
195  /**
196   * The Merge Job is a really simple one.  It takes in
197   * an int/int key-value set, and emits the same set.
198   * But it merges identical keys by adding their values.
199   *
200   * Thus, the map() function is just the identity function
201   * and reduce() just sums.  Nothing to see here!
202   */
203  static class MergeMapper
204    implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
205   
206    public void configure(JobConf job) {
207    }
208
209    public void map(IntWritable key, IntWritable val,
210                    OutputCollector<IntWritable, IntWritable> out,
211                    Reporter reporter) throws IOException {
212      int keyint = key.get();
213      int valint = val.get();
214
215      out.collect(new IntWritable(keyint), new IntWritable(valint));
216    }
217    public void close() {
218    }
219  }
220  static class MergeReducer
221    implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
222    public void configure(JobConf job) {
223    }
224       
225    public void reduce(IntWritable key, Iterator<IntWritable> it,
226                       OutputCollector<IntWritable, IntWritable> out,
227                       Reporter reporter) throws IOException {
228      int keyint = key.get();
229      int total = 0;
230      while (it.hasNext()) {
231        total += it.next().get();
232      }
233      out.collect(new IntWritable(keyint), new IntWritable(total));
234    }
235    public void close() {
236    }
237  }
238
239  private static int range = 10;
240  private static int counts = 100;
241  private static Random r = new Random();
242
243  /**
244     public TestMapRed(int range, int counts, Configuration conf) throws IOException {
245     this.range = range;
246     this.counts = counts;
247     this.conf = conf;
248     }
249  **/
250
251  public void testMapred() throws Exception {
252    launch();
253  }
254
255  private static class MyMap
256    implements Mapper<WritableComparable, Text, Text, Text> {
257     
258    public void configure(JobConf conf) {
259    }
260     
261    public void map(WritableComparable key, Text value,
262                    OutputCollector<Text, Text> output,
263                    Reporter reporter) throws IOException {
264      String str = value.toString().toLowerCase();
265      output.collect(new Text(str), value);
266    }
267
268    public void close() throws IOException {
269    }
270  }
271   
272  private static class MyReduce extends IdentityReducer {
273    private JobConf conf;
274    private boolean compressInput;
275    private TaskAttemptID taskId;
276    private boolean first = true;
277     
278    @Override
279    public void configure(JobConf conf) {
280      this.conf = conf;
281      compressInput = conf.getCompressMapOutput();
282      taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
283    }
284     
285    public void reduce(WritableComparable key, Iterator values,
286                       OutputCollector output, Reporter reporter
287                       ) throws IOException {
288      if (first) {
289        first = false;
290        MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
291        mapOutputFile.setConf(conf);
292        Path input = mapOutputFile.getInputFile(0, taskId);
293        FileSystem fs = FileSystem.get(conf);
294        assertTrue("reduce input exists " + input, fs.exists(input));
295        SequenceFile.Reader rdr = 
296          new SequenceFile.Reader(fs, input, conf);
297        assertEquals("is reduce input compressed " + input, 
298                     compressInput, 
299                     rdr.isCompressed());
300        rdr.close();         
301      }
302    }
303     
304  }
305
306  private static class BadPartitioner
307      implements Partitioner<LongWritable,Text> {
308    boolean low;
309    public void configure(JobConf conf) {
310      low = conf.getBoolean("test.testmapred.badpartition", true);
311    }
312    public int getPartition(LongWritable k, Text v, int numPartitions) {
313      return low ? -1 : numPartitions;
314    }
315  }
316
317  public void testPartitioner() throws Exception {
318    JobConf conf = new JobConf(TestMapRed.class);
319    conf.setPartitionerClass(BadPartitioner.class);
320    FileSystem fs = FileSystem.getLocal(conf);
321    Path testdir = new Path(
322        System.getProperty("test.build.data","/tmp")).makeQualified(fs);
323    Path inFile = new Path(testdir, "blah/blah");
324    DataOutputStream f = fs.create(inFile);
325    f.writeBytes("blah blah blah\n");
326    f.close();
327    FileInputFormat.setInputPaths(conf, inFile);
328    FileOutputFormat.setOutputPath(conf, new Path(testdir, "out"));
329    conf.setMapperClass(IdentityMapper.class);
330    conf.setReducerClass(IdentityReducer.class);
331    conf.setOutputKeyClass(LongWritable.class);
332    conf.setOutputValueClass(Text.class);
333
334    // partition too low
335    conf.setBoolean("test.testmapred.badpartition", true);
336    boolean pass = true;
337    try {
338      JobClient.runJob(conf);
339    } catch (IOException e) {
340      pass = false;
341    }
342    assertFalse("should fail for partition < 0", pass);
343
344    // partition too high
345    conf.setBoolean("test.testmapred.badpartition", false);
346    pass = true;
347    try {
348      JobClient.runJob(conf);
349    } catch (IOException e) {
350      pass = false;
351    }
352    assertFalse("should fail for partition >= numPartitions", pass);
353  }
354
355  public static class NullMapper
356      implements Mapper<NullWritable,Text,NullWritable,Text> {
357    public void map(NullWritable key, Text val,
358        OutputCollector<NullWritable,Text> output, Reporter reporter)
359        throws IOException {
360      output.collect(NullWritable.get(), val);
361    }
362    public void configure(JobConf conf) { }
363    public void close() { }
364  }
365
366  public void testNullKeys() throws Exception {
367    JobConf conf = new JobConf(TestMapRed.class);
368    FileSystem fs = FileSystem.getLocal(conf);
369    Path testdir = new Path(
370        System.getProperty("test.build.data","/tmp")).makeQualified(fs);
371    fs.delete(testdir, true);
372    Path inFile = new Path(testdir, "nullin/blah");
373    SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
374        NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
375    Text t = new Text();
376    t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
377    t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
378    t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
379    t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
380    t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
381    t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
382    t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
383    t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
384    w.close();
385    FileInputFormat.setInputPaths(conf, inFile);
386    FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
387    conf.setMapperClass(NullMapper.class);
388    conf.setReducerClass(IdentityReducer.class);
389    conf.setOutputKeyClass(NullWritable.class);
390    conf.setOutputValueClass(Text.class);
391    conf.setInputFormat(SequenceFileInputFormat.class);
392    conf.setOutputFormat(SequenceFileOutputFormat.class);
393    conf.setNumReduceTasks(1);
394
395    JobClient.runJob(conf);
396
397    SequenceFile.Reader r = new SequenceFile.Reader(fs,
398        new Path(testdir, "nullout/part-00000"), conf);
399    String m = "AAAAAAAAAAAAAA";
400    for (int i = 1; r.next(NullWritable.get(), t); ++i) {
401      assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
402      m = m.replace((char)('A' + i - 1), (char)('A' + i));
403    }
404  }
405
406  private void checkCompression(boolean compressMapOutputs,
407                                CompressionType redCompression,
408                                boolean includeCombine
409                                ) throws Exception {
410    JobConf conf = new JobConf(TestMapRed.class);
411    Path testdir = new Path("build/test/test.mapred.compress");
412    Path inDir = new Path(testdir, "in");
413    Path outDir = new Path(testdir, "out");
414    FileSystem fs = FileSystem.get(conf);
415    fs.delete(testdir, true);
416    FileInputFormat.setInputPaths(conf, inDir);
417    FileOutputFormat.setOutputPath(conf, outDir);
418    conf.setMapperClass(MyMap.class);
419    conf.setReducerClass(MyReduce.class);
420    conf.setOutputKeyClass(Text.class);
421    conf.setOutputValueClass(Text.class);
422    conf.setOutputFormat(SequenceFileOutputFormat.class);
423    if (includeCombine) {
424      conf.setCombinerClass(IdentityReducer.class);
425    }
426    conf.setCompressMapOutput(compressMapOutputs);
427    SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression);
428    try {
429      if (!fs.mkdirs(testdir)) {
430        throw new IOException("Mkdirs failed to create " + testdir.toString());
431      }
432      if (!fs.mkdirs(inDir)) {
433        throw new IOException("Mkdirs failed to create " + inDir.toString());
434      }
435      Path inFile = new Path(inDir, "part0");
436      DataOutputStream f = fs.create(inFile);
437      f.writeBytes("Owen was here\n");
438      f.writeBytes("Hadoop is fun\n");
439      f.writeBytes("Is this done, yet?\n");
440      f.close();
441      RunningJob rj = JobClient.runJob(conf);
442      assertTrue("job was complete", rj.isComplete());
443      assertTrue("job was successful", rj.isSuccessful());
444      Path output = new Path(outDir,
445                             Task.getOutputName(0));
446      assertTrue("reduce output exists " + output, fs.exists(output));
447      SequenceFile.Reader rdr = 
448        new SequenceFile.Reader(fs, output, conf);
449      assertEquals("is reduce output compressed " + output, 
450                   redCompression != CompressionType.NONE, 
451                   rdr.isCompressed());
452      rdr.close();
453    } finally {
454      fs.delete(testdir, true);
455    }
456  }
457   
458  public void testCompression() throws Exception {
459    EnumSet<SequenceFile.CompressionType> seq =
460      EnumSet.allOf(SequenceFile.CompressionType.class);
461    for (CompressionType redCompression : seq) {
462      for(int combine=0; combine < 2; ++combine) {
463        checkCompression(false, redCompression, combine == 1);
464        checkCompression(true, redCompression, combine == 1);
465      }
466    }
467  }
468   
469   
470  /**
471   *
472   */
473  public static void launch() throws Exception {
474    //
475    // Generate distribution of ints.  This is the answer key.
476    //
477    JobConf conf = new JobConf(TestMapRed.class);
478    int countsToGo = counts;
479    int dist[] = new int[range];
480    for (int i = 0; i < range; i++) {
481      double avgInts = (1.0 * countsToGo) / (range - i);
482      dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
483      countsToGo -= dist[i];
484    }
485    if (countsToGo > 0) {
486      dist[dist.length-1] += countsToGo;
487    }
488
489    //
490    // Write the answer key to a file. 
491    //
492    FileSystem fs = FileSystem.get(conf);
493    Path testdir = new Path("mapred.loadtest");
494    if (!fs.mkdirs(testdir)) {
495      throw new IOException("Mkdirs failed to create " + testdir.toString());
496    }
497
498    Path randomIns = new Path(testdir, "genins");
499    if (!fs.mkdirs(randomIns)) {
500      throw new IOException("Mkdirs failed to create " + randomIns.toString());
501    }
502
503    Path answerkey = new Path(randomIns, "answer.key");
504    SequenceFile.Writer out = 
505      SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
506                                IntWritable.class, 
507                                SequenceFile.CompressionType.NONE);
508    try {
509      for (int i = 0; i < range; i++) {
510        out.append(new IntWritable(i), new IntWritable(dist[i]));
511      }
512    } finally {
513      out.close();
514    }
515    //printFiles(randomIns, conf);
516
517    //
518    // Now we need to generate the random numbers according to
519    // the above distribution.
520    //
521    // We create a lot of map tasks, each of which takes at least
522    // one "line" of the distribution.  (That is, a certain number
523    // X is to be generated Y number of times.)
524    //
525    // A map task emits Y key/val pairs.  The val is X.  The key
526    // is a randomly-generated number.
527    //
528    // The reduce task gets its input sorted by key.  That is, sorted
529    // in random order.  It then emits a single line of text that
530    // for the given values.  It does not emit the key.
531    //
532    // Because there's just one reduce task, we emit a single big
533    // file of random numbers.
534    //
535    Path randomOuts = new Path(testdir, "genouts");
536    fs.delete(randomOuts, true);
537
538
539    JobConf genJob = new JobConf(conf, TestMapRed.class);
540    FileInputFormat.setInputPaths(genJob, randomIns);
541    genJob.setInputFormat(SequenceFileInputFormat.class);
542    genJob.setMapperClass(RandomGenMapper.class);
543
544    FileOutputFormat.setOutputPath(genJob, randomOuts);
545    genJob.setOutputKeyClass(IntWritable.class);
546    genJob.setOutputValueClass(IntWritable.class);
547    genJob.setOutputFormat(TextOutputFormat.class);
548    genJob.setReducerClass(RandomGenReducer.class);
549    genJob.setNumReduceTasks(1);
550
551    JobClient.runJob(genJob);
552    //printFiles(randomOuts, conf);
553
554    //
555    // Next, we read the big file in and regenerate the
556    // original map.  It's split into a number of parts.
557    // (That number is 'intermediateReduces'.)
558    //
559    // We have many map tasks, each of which read at least one
560    // of the output numbers.  For each number read in, the
561    // map task emits a key/value pair where the key is the
562    // number and the value is "1".
563    //
564    // We have a single reduce task, which receives its input
565    // sorted by the key emitted above.  For each key, there will
566    // be a certain number of "1" values.  The reduce task sums
567    // these values to compute how many times the given key was
568    // emitted.
569    //
570    // The reduce task then emits a key/val pair where the key
571    // is the number in question, and the value is the number of
572    // times the key was emitted.  This is the same format as the
573    // original answer key (except that numbers emitted zero times
574    // will not appear in the regenerated key.)  The answer set
575    // is split into a number of pieces.  A final MapReduce job
576    // will merge them.
577    //
578    // There's not really a need to go to 10 reduces here
579    // instead of 1.  But we want to test what happens when
580    // you have multiple reduces at once.
581    //
582    int intermediateReduces = 10;
583    Path intermediateOuts = new Path(testdir, "intermediateouts");
584    fs.delete(intermediateOuts, true);
585    JobConf checkJob = new JobConf(conf, TestMapRed.class);
586    FileInputFormat.setInputPaths(checkJob, randomOuts);
587    checkJob.setInputFormat(TextInputFormat.class);
588    checkJob.setMapperClass(RandomCheckMapper.class);
589
590    FileOutputFormat.setOutputPath(checkJob, intermediateOuts);
591    checkJob.setOutputKeyClass(IntWritable.class);
592    checkJob.setOutputValueClass(IntWritable.class);
593    checkJob.setOutputFormat(MapFileOutputFormat.class);
594    checkJob.setReducerClass(RandomCheckReducer.class);
595    checkJob.setNumReduceTasks(intermediateReduces);
596
597    JobClient.runJob(checkJob);
598    //printFiles(intermediateOuts, conf);
599
600    //
601    // OK, now we take the output from the last job and
602    // merge it down to a single file.  The map() and reduce()
603    // functions don't really do anything except reemit tuples.
604    // But by having a single reduce task here, we end up merging
605    // all the files.
606    //
607    Path finalOuts = new Path(testdir, "finalouts");
608    fs.delete(finalOuts, true);
609    JobConf mergeJob = new JobConf(conf, TestMapRed.class);
610    FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
611    mergeJob.setInputFormat(SequenceFileInputFormat.class);
612    mergeJob.setMapperClass(MergeMapper.class);
613       
614    FileOutputFormat.setOutputPath(mergeJob, finalOuts);
615    mergeJob.setOutputKeyClass(IntWritable.class);
616    mergeJob.setOutputValueClass(IntWritable.class);
617    mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
618    mergeJob.setReducerClass(MergeReducer.class);
619    mergeJob.setNumReduceTasks(1);
620       
621    JobClient.runJob(mergeJob);
622    //printFiles(finalOuts, conf);
623 
624    //
625    // Finally, we compare the reconstructed answer key with the
626    // original one.  Remember, we need to ignore zero-count items
627    // in the original key.
628    //
629    boolean success = true;
630    Path recomputedkey = new Path(finalOuts, "part-00000");
631    SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
632    int totalseen = 0;
633    try {
634      IntWritable key = new IntWritable();
635      IntWritable val = new IntWritable();           
636      for (int i = 0; i < range; i++) {
637        if (dist[i] == 0) {
638          continue;
639        }
640        if (!in.next(key, val)) {
641          System.err.println("Cannot read entry " + i);
642          success = false;
643          break;
644        } else {
645          if (!((key.get() == i) && (val.get() == dist[i]))) {
646            System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + 
647                               ", val=" + val.get() + ", dist[i]=" + dist[i]);
648            success = false;
649          }
650          totalseen += val.get();
651        }
652      }
653      if (success) {
654        if (in.next(key, val)) {
655          System.err.println("Unnecessary lines in recomputed key!");
656          success = false;
657        }
658      }
659    } finally {
660      in.close();
661    }
662    int originalTotal = 0;
663    for (int i = 0; i < dist.length; i++) {
664      originalTotal += dist[i];
665    }
666    System.out.println("Original sum: " + originalTotal);
667    System.out.println("Recomputed sum: " + totalseen);
668
669    //
670    // Write to "results" whether the test succeeded or not.
671    //
672    Path resultFile = new Path(testdir, "results");
673    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
674    try {
675      bw.write("Success=" + success + "\n");
676      System.out.println("Success=" + success);
677    } finally {
678      bw.close();
679    }
680    assertTrue("testMapRed failed", success);
681    fs.delete(testdir, true);
682  }
683
684  private static void printTextFile(FileSystem fs, Path p) throws IOException {
685    BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p)));
686    String line;
687    while ((line = in.readLine()) != null) {
688      System.out.println("  Row: " + line);
689    }
690    in.close();
691  }
692
693  private static void printSequenceFile(FileSystem fs, Path p, 
694                                        Configuration conf) throws IOException {
695    SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
696    Object key = null;
697    Object value = null;
698    while ((key = r.next(key)) != null) {
699      value = r.getCurrentValue(value);
700      System.out.println("  Row: " + key + ", " + value);
701    }
702    r.close();   
703  }
704
705  private static boolean isSequenceFile(FileSystem fs,
706                                        Path f) throws IOException {
707    DataInputStream in = fs.open(f);
708    byte[] seq = "SEQ".getBytes();
709    for(int i=0; i < seq.length; ++i) {
710      if (seq[i] != in.read()) {
711        return false;
712      }
713    }
714    return true;
715  }
716
717  private static void printFiles(Path dir, 
718                                 Configuration conf) throws IOException {
719    FileSystem fs = dir.getFileSystem(conf);
720    for(FileStatus f: fs.listStatus(dir)) {
721      System.out.println("Reading " + f.getPath() + ": ");
722      if (f.isDir()) {
723        System.out.println("  it is a map file.");
724        printSequenceFile(fs, new Path(f.getPath(), "data"), conf);
725      } else if (isSequenceFile(fs, f.getPath())) {
726        System.out.println("  it is a sequence file.");
727        printSequenceFile(fs, f.getPath(), conf);
728      } else {
729        System.out.println("  it is a text file.");
730        printTextFile(fs, f.getPath());
731      }
732    }
733  }
734
735  /**
736   * Launches all the tasks in order.
737   */
738  public static void main(String[] argv) throws Exception {
739    if (argv.length < 2) {
740      System.err.println("Usage: TestMapRed <range> <counts>");
741      System.err.println();
742      System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
743      return;
744    }
745
746    int i = 0;
747    range = Integer.parseInt(argv[i++]);
748    counts = Integer.parseInt(argv[i++]);
749    launch();
750  }
751   
752  public void testSmallInput(){
753    runJob(100);
754  }
755
756  public void testBiggerInput(){
757    runJob(1000);
758  }
759
760  public void runJob(int items) {
761    try {
762      JobConf conf = new JobConf(TestMapRed.class);
763      Path testdir = new Path("build/test/test.mapred.spill");
764      Path inDir = new Path(testdir, "in");
765      Path outDir = new Path(testdir, "out");
766      FileSystem fs = FileSystem.get(conf);
767      fs.delete(testdir, true);
768      conf.setInt("io.sort.mb", 1);
769      conf.setInputFormat(SequenceFileInputFormat.class);
770      FileInputFormat.setInputPaths(conf, inDir);
771      FileOutputFormat.setOutputPath(conf, outDir);
772      conf.setMapperClass(IdentityMapper.class);
773      conf.setReducerClass(IdentityReducer.class);
774      conf.setOutputKeyClass(Text.class);
775      conf.setOutputValueClass(Text.class);
776      conf.setOutputFormat(SequenceFileOutputFormat.class);
777      if (!fs.mkdirs(testdir)) {
778        throw new IOException("Mkdirs failed to create " + testdir.toString());
779      }
780      if (!fs.mkdirs(inDir)) {
781        throw new IOException("Mkdirs failed to create " + inDir.toString());
782      }
783      Path inFile = new Path(inDir, "part0");
784      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
785                                                             Text.class, Text.class);
786
787      StringBuffer content = new StringBuffer();
788
789      for (int i = 0; i < 1000; i++) {
790        content.append(i).append(": This is one more line of content\n");
791      }
792
793      Text text = new Text(content.toString());
794
795      for (int i = 0; i < items; i++) {
796        writer.append(new Text("rec:" + i), text);
797      }
798      writer.close();
799
800      JobClient.runJob(conf);
801    } catch (Exception e) {
802      fail("Threw exception:" + e);
803    }
804  }
805}
Note: See TracBrowser for help on using the repository browser.