source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/record/TestRecordMR.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 16.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.record;
20
21import org.apache.hadoop.mapred.*;
22import org.apache.hadoop.fs.*;
23import org.apache.hadoop.io.*;
24import org.apache.hadoop.io.SequenceFile.CompressionType;
25import org.apache.hadoop.conf.*;
26import junit.framework.TestCase;
27import java.io.*;
28import java.util.*;
29
30
31/**********************************************************
32 * MapredLoadTest generates a bunch of work that exercises
33 * a Hadoop Map-Reduce system (and DFS, too).  It goes through
34 * the following steps:
35 *
36 * 1) Take inputs 'range' and 'counts'.
37 * 2) Generate 'counts' random integers between 0 and range-1.
38 * 3) Create a file that lists each integer between 0 and range-1,
39 *    and lists the number of times that integer was generated.
40 * 4) Emit a (very large) file that contains all the integers
41 *    in the order generated.
42 * 5) After the file has been generated, read it back and count
43 *    how many times each int was generated.
44 * 6) Compare this big count-map against the original one.  If
45 *    they match, then SUCCESS!  Otherwise, FAILURE!
46 *
47 * OK, that's how we can think about it.  What are the map-reduce
48 * steps that get the job done?
49 *
50 * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
51 * 2) In a non-mapread thread, generate the answer-key and write to disk.
52 * 3) In a mapred job, divide the answer key into K jobs.
53 * 4) A mapred 'generator' task consists of K map jobs.  Each reads
54 *    an individual "sub-key", and generates integers according to
55 *    to it (though with a random ordering).
56 * 5) The generator's reduce task agglomerates all of those files
57 *    into a single one.
58 * 6) A mapred 'reader' task consists of M map jobs.  The output
59 *    file is cut into M pieces. Each of the M jobs counts the
60 *    individual ints in its chunk and creates a map of all seen ints.
61 * 7) A mapred job integrates all the count files into a single one.
62 *
63 **********************************************************/
64public class TestRecordMR extends TestCase {
65  /**
66   * Modified to make it a junit test.
67   * The RandomGen Job does the actual work of creating
68   * a huge file of assorted numbers.  It receives instructions
69   * as to how many times each number should be counted.  Then
70   * it emits those numbers in a crazy order.
71   *
72   * The map() function takes a key/val pair that describes
73   * a value-to-be-emitted (the key) and how many times it
74   * should be emitted (the value), aka "numtimes".  map() then
75   * emits a series of intermediate key/val pairs.  It emits
76   * 'numtimes' of these.  The key is a random number and the
77   * value is the 'value-to-be-emitted'.
78   *
79   * The system collates and merges these pairs according to
80   * the random number.  reduce() function takes in a key/value
81   * pair that consists of a crazy random number and a series
82   * of values that should be emitted.  The random number key
83   * is now dropped, and reduce() emits a pair for every intermediate value.
84   * The emitted key is an intermediate value.  The emitted value
85   * is just a blank string.  Thus, we've created a huge file
86   * of numbers in random order, but where each number appears
87   * as many times as we were instructed.
88   */
89  static public class RandomGenMapper implements Mapper<RecInt, RecInt,
90                                                        RecInt, RecString> {
91    Random r = new Random();
92    public void configure(JobConf job) {
93    }
94
95    public void map(RecInt key,
96                    RecInt val,
97                    OutputCollector<RecInt, RecString> out,
98                    Reporter reporter) throws IOException {
99      int randomVal = key.getData();
100      int randomCount = val.getData();
101
102      for (int i = 0; i < randomCount; i++) {
103        out.collect(new RecInt(Math.abs(r.nextInt())),
104                    new RecString(Integer.toString(randomVal)));
105      }
106    }
107    public void close() {
108    }
109  }
110  /**
111   */
112  static public class RandomGenReducer implements Reducer<RecInt, RecString,
113                                                          RecInt, RecString> {
114    public void configure(JobConf job) {
115    }
116
117    public void reduce(RecInt key,
118                       Iterator<RecString> it,
119                       OutputCollector<RecInt, RecString> out,
120                       Reporter reporter) throws IOException {
121      int keyint = key.getData();
122      while (it.hasNext()) {
123        String val = it.next().getData();
124        out.collect(new RecInt(Integer.parseInt(val)),
125                    new RecString(""));
126      }
127    }
128    public void close() {
129    }
130  }
131
132  /**
133   * The RandomCheck Job does a lot of our work.  It takes
134   * in a num/string keyspace, and transforms it into a
135   * key/count(int) keyspace.
136   *
137   * The map() function just emits a num/1 pair for every
138   * num/string input pair.
139   *
140   * The reduce() function sums up all the 1s that were
141   * emitted for a single key.  It then emits the key/total
142   * pair.
143   *
144   * This is used to regenerate the random number "answer key".
145   * Each key here is a random number, and the count is the
146   * number of times the number was emitted.
147   */
148  static public class RandomCheckMapper implements Mapper<RecInt, RecString,
149                                                          RecInt, RecString> {
150    public void configure(JobConf job) {
151    }
152
153    public void map(RecInt key,
154                    RecString val,
155                    OutputCollector<RecInt, RecString> out,
156                    Reporter reporter) throws IOException {
157      int pos = key.getData();
158      String str = val.getData();
159      out.collect(new RecInt(pos), new RecString("1"));
160    }
161    public void close() {
162    }
163  }
164  /**
165   */
166  static public class RandomCheckReducer implements Reducer<RecInt, RecString,
167                                                            RecInt, RecString> {
168    public void configure(JobConf job) {
169    }
170       
171    public void reduce(RecInt key,
172                       Iterator<RecString> it,
173                       OutputCollector<RecInt, RecString> out,
174                       Reporter reporter) throws IOException {
175      int keyint = key.getData();
176      int count = 0;
177      while (it.hasNext()) {
178        it.next();
179        count++;
180      }
181      out.collect(new RecInt(keyint), new RecString(Integer.toString(count)));
182    }
183    public void close() {
184    }
185  }
186
187  /**
188   * The Merge Job is a really simple one.  It takes in
189   * an int/int key-value set, and emits the same set.
190   * But it merges identical keys by adding their values.
191   *
192   * Thus, the map() function is just the identity function
193   * and reduce() just sums.  Nothing to see here!
194   */
195  static public class MergeMapper implements Mapper<RecInt, RecString,
196                                                    RecInt, RecInt> {
197    public void configure(JobConf job) {
198    }
199
200    public void map(RecInt key,
201                    RecString val,
202                    OutputCollector<RecInt, RecInt> out,
203                    Reporter reporter) throws IOException {
204      int keyint = key.getData();
205      String valstr = val.getData();
206      out.collect(new RecInt(keyint), new RecInt(Integer.parseInt(valstr)));
207    }
208    public void close() {
209    }
210  }
211  static public class MergeReducer implements Reducer<RecInt, RecInt,
212                                                      RecInt, RecInt> {
213    public void configure(JobConf job) {
214    }
215       
216    public void reduce(RecInt key,
217                       Iterator<RecInt> it,
218                       OutputCollector<RecInt, RecInt> out,
219                       Reporter reporter) throws IOException {
220      int keyint = key.getData();
221      int total = 0;
222      while (it.hasNext()) {
223        total += it.next().getData();
224      }
225      out.collect(new RecInt(keyint), new RecInt(total));
226    }
227    public void close() {
228    }
229  }
230
231  private static int range = 10;
232  private static int counts = 100;
233  private static Random r = new Random();
234  private static Configuration conf = new Configuration();
235
236  public void testMapred() throws Exception {
237    launch();
238  }
239
240  /**
241   *
242   */
243  public static void launch() throws Exception {
244    //
245    // Generate distribution of ints.  This is the answer key.
246    //
247    int countsToGo = counts;
248    int dist[] = new int[range];
249    for (int i = 0; i < range; i++) {
250      double avgInts = (1.0 * countsToGo) / (range - i);
251      dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
252      countsToGo -= dist[i];
253    }
254    if (countsToGo > 0) {
255      dist[dist.length-1] += countsToGo;
256    }
257
258    //
259    // Write the answer key to a file. 
260    //
261    FileSystem fs = FileSystem.get(conf);
262    Path testdir = new Path("mapred.loadtest");
263    if (!fs.mkdirs(testdir)) {
264      throw new IOException("Mkdirs failed to create directory " + testdir.toString());
265    }
266
267    Path randomIns = new Path(testdir, "genins");
268    if (!fs.mkdirs(randomIns)) {
269      throw new IOException("Mkdirs failed to create directory " + randomIns.toString());
270    }
271
272    Path answerkey = new Path(randomIns, "answer.key");
273    SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, 
274                                                        answerkey, RecInt.class, RecInt.class, 
275                                                        CompressionType.NONE);
276    try {
277      for (int i = 0; i < range; i++) {
278        RecInt k = new RecInt();
279        RecInt v = new RecInt();
280        k.setData(i);
281        v.setData(dist[i]);
282        out.append(k, v);
283      }
284    } finally {
285      out.close();
286    }
287
288    //
289    // Now we need to generate the random numbers according to
290    // the above distribution.
291    //
292    // We create a lot of map tasks, each of which takes at least
293    // one "line" of the distribution.  (That is, a certain number
294    // X is to be generated Y number of times.)
295    //
296    // A map task emits Y key/val pairs.  The val is X.  The key
297    // is a randomly-generated number.
298    //
299    // The reduce task gets its input sorted by key.  That is, sorted
300    // in random order.  It then emits a single line of text that
301    // for the given values.  It does not emit the key.
302    //
303    // Because there's just one reduce task, we emit a single big
304    // file of random numbers.
305    //
306    Path randomOuts = new Path(testdir, "genouts");
307    fs.delete(randomOuts, true);
308
309
310    JobConf genJob = new JobConf(conf, TestRecordMR.class);
311    FileInputFormat.setInputPaths(genJob, randomIns);
312    genJob.setInputFormat(SequenceFileInputFormat.class);
313    genJob.setMapperClass(RandomGenMapper.class);
314
315    FileOutputFormat.setOutputPath(genJob, randomOuts);
316    genJob.setOutputKeyClass(RecInt.class);
317    genJob.setOutputValueClass(RecString.class);
318    genJob.setOutputFormat(SequenceFileOutputFormat.class);
319    genJob.setReducerClass(RandomGenReducer.class);
320    genJob.setNumReduceTasks(1);
321
322    JobClient.runJob(genJob);
323
324    //
325    // Next, we read the big file in and regenerate the
326    // original map.  It's split into a number of parts.
327    // (That number is 'intermediateReduces'.)
328    //
329    // We have many map tasks, each of which read at least one
330    // of the output numbers.  For each number read in, the
331    // map task emits a key/value pair where the key is the
332    // number and the value is "1".
333    //
334    // We have a single reduce task, which receives its input
335    // sorted by the key emitted above.  For each key, there will
336    // be a certain number of "1" values.  The reduce task sums
337    // these values to compute how many times the given key was
338    // emitted.
339    //
340    // The reduce task then emits a key/val pair where the key
341    // is the number in question, and the value is the number of
342    // times the key was emitted.  This is the same format as the
343    // original answer key (except that numbers emitted zero times
344    // will not appear in the regenerated key.)  The answer set
345    // is split into a number of pieces.  A final MapReduce job
346    // will merge them.
347    //
348    // There's not really a need to go to 10 reduces here
349    // instead of 1.  But we want to test what happens when
350    // you have multiple reduces at once.
351    //
352    int intermediateReduces = 10;
353    Path intermediateOuts = new Path(testdir, "intermediateouts");
354    fs.delete(intermediateOuts, true);
355    JobConf checkJob = new JobConf(conf, TestRecordMR.class);
356    FileInputFormat.setInputPaths(checkJob, randomOuts);
357    checkJob.setInputFormat(SequenceFileInputFormat.class);
358    checkJob.setMapperClass(RandomCheckMapper.class);
359
360    FileOutputFormat.setOutputPath(checkJob, intermediateOuts);
361    checkJob.setOutputKeyClass(RecInt.class);
362    checkJob.setOutputValueClass(RecString.class);
363    checkJob.setOutputFormat(SequenceFileOutputFormat.class);
364    checkJob.setReducerClass(RandomCheckReducer.class);
365    checkJob.setNumReduceTasks(intermediateReduces);
366
367    JobClient.runJob(checkJob);
368
369    //
370    // OK, now we take the output from the last job and
371    // merge it down to a single file.  The map() and reduce()
372    // functions don't really do anything except reemit tuples.
373    // But by having a single reduce task here, we end up merging
374    // all the files.
375    //
376    Path finalOuts = new Path(testdir, "finalouts");       
377    fs.delete(finalOuts, true);
378    JobConf mergeJob = new JobConf(conf, TestRecordMR.class);
379    FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
380    mergeJob.setInputFormat(SequenceFileInputFormat.class);
381    mergeJob.setMapperClass(MergeMapper.class);
382       
383    FileOutputFormat.setOutputPath(mergeJob, finalOuts);
384    mergeJob.setOutputKeyClass(RecInt.class);
385    mergeJob.setOutputValueClass(RecInt.class);
386    mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
387    mergeJob.setReducerClass(MergeReducer.class);
388    mergeJob.setNumReduceTasks(1);
389       
390    JobClient.runJob(mergeJob);
391       
392 
393    //
394    // Finally, we compare the reconstructed answer key with the
395    // original one.  Remember, we need to ignore zero-count items
396    // in the original key.
397    //
398    boolean success = true;
399    Path recomputedkey = new Path(finalOuts, "part-00000");
400    SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
401    int totalseen = 0;
402    try {
403      RecInt key = new RecInt();
404      RecInt val = new RecInt();           
405      for (int i = 0; i < range; i++) {
406        if (dist[i] == 0) {
407          continue;
408        }
409        if (!in.next(key, val)) {
410          System.err.println("Cannot read entry " + i);
411          success = false;
412          break;
413        } else {
414          if (!((key.getData() == i) && (val.getData() == dist[i]))) {
415            System.err.println("Mismatch!  Pos=" + key.getData() + ", i=" + i + ", val=" + val.getData() + ", dist[i]=" + dist[i]);
416            success = false;
417          }
418          totalseen += val.getData();
419        }
420      }
421      if (success) {
422        if (in.next(key, val)) {
423          System.err.println("Unnecessary lines in recomputed key!");
424          success = false;
425        }
426      }
427    } finally {
428      in.close();
429    }
430    int originalTotal = 0;
431    for (int i = 0; i < dist.length; i++) {
432      originalTotal += dist[i];
433    }
434    System.out.println("Original sum: " + originalTotal);
435    System.out.println("Recomputed sum: " + totalseen);
436
437    //
438    // Write to "results" whether the test succeeded or not.
439    //
440    Path resultFile = new Path(testdir, "results");
441    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
442    try {
443      bw.write("Success=" + success + "\n");
444      System.out.println("Success=" + success);           
445    } finally {
446      bw.close();
447    }
448    fs.delete(testdir, true);
449  }
450
451  /**
452   * Launches all the tasks in order.
453   */
454  public static void main(String[] argv) throws Exception {
455    if (argv.length < 2) {
456      System.err.println("Usage: TestRecordMR <range> <counts>");
457      System.err.println();
458      System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
459      return;
460    }
461
462    int i = 0;
463    int range = Integer.parseInt(argv[i++]);
464    int counts = Integer.parseInt(argv[i++]);
465    launch();
466  }
467}
Note: See TracBrowser for help on using the repository browser.