source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestComparators.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 15.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.hadoop.fs.*;
21import org.apache.hadoop.io.*;
22import org.apache.hadoop.io.BooleanWritable.Comparator;
23import junit.framework.TestCase;
24import java.io.*;
25import java.util.*;
26
27/**
28 * Two different types of comparators can be used in MapReduce. One is used
29 * during the Map and Reduce phases, to sort/merge key-value pairs. Another
30 * is used to group values for a particular key, when calling the user's
31 * reducer. A user can override both of these two.
32 * This class has tests for making sure we use the right comparators at the
33 * right places. See Hadoop issues 485 and 1535. Our tests:
34 * 1. Test that the same comparator is used for all sort/merge operations
35 * during the Map and Reduce phases. 
36 * 2. Test the common use case where values are grouped by keys but values
37 * within each key are grouped by a secondary key (a timestamp, for example).
38 */
39public class TestComparators extends TestCase
40{
41  JobConf conf = new JobConf(TestMapOutputType.class);
42  JobClient jc;
43  static Random rng = new Random();
44
45  /**
46   * RandomGen is a mapper that generates 5 random values for each key
47   * in the input. The values are in the range [0-4]. The mapper also
48   * generates a composite key. If the input key is x and the generated
49   * value is y, the composite key is x0y (x-zero-y). Therefore, the inter-
50   * mediate key value pairs are ordered by {input key, value}.
51   * Think of the random value as a timestamp associated with the record.
52   */
53  static class RandomGenMapper
54    implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
55   
56    public void configure(JobConf job) {
57    }
58   
59    public void map(IntWritable key, Writable value,
60                    OutputCollector<IntWritable, IntWritable> out,
61                    Reporter reporter) throws IOException {
62      int num_values = 5;
63      for(int i = 0; i < num_values; ++i) {
64        int val = rng.nextInt(num_values);
65        int compositeKey = key.get() * 100 + val;
66        out.collect(new IntWritable(compositeKey), new IntWritable(val));
67      }
68    }
69   
70    public void close() {
71    }
72  }
73 
74  /**
75   * Your basic identity mapper.
76   */
77  static class IdentityMapper
78    implements Mapper<WritableComparable, Writable,
79                      WritableComparable, Writable> {
80   
81    public void configure(JobConf job) {
82    }
83   
84    public void map(WritableComparable key, Writable value,
85                    OutputCollector<WritableComparable, Writable> out,
86                    Reporter reporter) throws IOException {
87      out.collect(key, value);
88    }
89   
90    public void close() {
91    }
92  }
93 
94  /**
95   * Checks whether keys are in ascending order. 
96   */
97  static class AscendingKeysReducer
98    implements Reducer<IntWritable, Writable, IntWritable, Text> {
99   
100    public void configure(JobConf job) {}
101
102    // keep track of the last key we've seen
103    private int lastKey = Integer.MIN_VALUE;
104    public void reduce(IntWritable key, Iterator<Writable> values, 
105                       OutputCollector<IntWritable, Text> out,
106                       Reporter reporter) throws IOException {
107      int currentKey = key.get();
108      // keys should be in ascending order
109      if (currentKey < lastKey) {
110        fail("Keys not in sorted ascending order");
111      }
112      lastKey = currentKey;
113      out.collect(key, new Text("success"));
114    }
115   
116    public void close() {}
117  }
118 
119  /**
120   * Checks whether keys are in ascending order. 
121   */
122  static class DescendingKeysReducer
123    implements Reducer<IntWritable, Writable, IntWritable, Text> {
124    public void configure(JobConf job) {}
125
126    // keep track of the last key we've seen
127    private int lastKey = Integer.MAX_VALUE;
128    public void reduce(IntWritable key, Iterator<Writable> values, 
129                       OutputCollector<IntWritable, Text> out,
130                       Reporter reporter) throws IOException {
131      int currentKey = ((IntWritable)(key)).get();
132      // keys should be in descending order
133      if (currentKey > lastKey) {
134        fail("Keys not in sorted descending order");
135      }
136      lastKey = currentKey;
137      out.collect(key, new Text("success"));
138    }
139   
140    public void close() {}
141  }
142 
143  /** The reducer checks whether the input values are in ascending order and
144   * whether they are correctly grouped by key (i.e. each call to reduce
145   * should have 5 values if the grouping is correct). It also checks whether
146   * the keys themselves are in ascending order.
147   */
148  static class AscendingGroupReducer
149    implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
150   
151    public void configure(JobConf job) {
152    }
153
154    // keep track of the last key we've seen
155    private int lastKey = Integer.MIN_VALUE;
156    public void reduce(IntWritable key,
157                       Iterator<IntWritable> values,
158                       OutputCollector<IntWritable, Text> out,
159                       Reporter reporter) throws IOException {
160      // check key order
161      int currentKey = key.get();
162      if (currentKey < lastKey) {
163        fail("Keys not in sorted ascending order");
164      }
165      lastKey = currentKey;
166      // check order of values
167      IntWritable previous = new IntWritable(Integer.MIN_VALUE);
168      int valueCount = 0;
169      while (values.hasNext()) {
170        IntWritable current = values.next();
171       
172        // Check that the values are sorted
173        if (current.compareTo(previous) < 0)
174          fail("Values generated by Mapper not in order");
175        previous = current;
176        ++valueCount;
177      }
178      if (valueCount != 5) {
179        fail("Values not grouped by primary key");
180      }
181      out.collect(key, new Text("success"));
182    }
183
184    public void close() {
185    }
186  }
187 
188  /** The reducer checks whether the input values are in descending order and
189   * whether they are correctly grouped by key (i.e. each call to reduce
190   * should have 5 values if the grouping is correct).
191   */
192  static class DescendingGroupReducer
193    implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
194   
195    public void configure(JobConf job) {
196    }
197
198    // keep track of the last key we've seen
199    private int lastKey = Integer.MAX_VALUE;
200    public void reduce(IntWritable key,
201                       Iterator<IntWritable> values,
202                       OutputCollector<IntWritable, Text> out,
203                       Reporter reporter) throws IOException {
204      // check key order
205      int currentKey = key.get();
206      if (currentKey > lastKey) {
207        fail("Keys not in sorted descending order");
208      }
209      lastKey = currentKey;
210      // check order of values
211      IntWritable previous = new IntWritable(Integer.MAX_VALUE);
212      int valueCount = 0;
213      while (values.hasNext()) {
214        IntWritable current = values.next();
215       
216        // Check that the values are sorted
217        if (current.compareTo(previous) > 0)
218          fail("Values generated by Mapper not in order");
219        previous = current;
220        ++valueCount;
221      }
222      if (valueCount != 5) {
223        fail("Values not grouped by primary key");
224      }
225      out.collect(key, new Text("success"));
226    }
227
228    public void close() {
229    }
230  }
231 
232  /**
233   * A decreasing Comparator for IntWritable
234   */ 
235  public static class DecreasingIntComparator extends IntWritable.Comparator {
236    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
237      return -super.compare(b1, s1, l1, b2, s2, l2);
238    }
239    static {                    // register this comparator
240      WritableComparator.define(DecreasingIntComparator.class, new Comparator());
241    }
242  }
243
244  /** Grouping function for values based on the composite key. This
245   * comparator strips off the secondary key part from the x0y composite
246   * and only compares the primary key value (x).
247   */
248  public static class CompositeIntGroupFn extends WritableComparator {
249    public CompositeIntGroupFn() {
250      super(IntWritable.class);
251    }
252    public int compare (WritableComparable v1, WritableComparable v2) {
253      int val1 = ((IntWritable)(v1)).get() / 100;
254      int val2 = ((IntWritable)(v2)).get() / 100;
255      if (val1 < val2)
256        return 1;
257      else if (val1 > val2)
258        return -1;
259      else
260        return 0;
261    }
262   
263    public boolean equals (IntWritable v1, IntWritable v2) {
264      int val1 = v1.get();
265      int val2 = v2.get();
266     
267      return (val1/100) == (val2/100);
268    }
269   
270    static {
271      WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
272    }
273  }
274
275  /** Reverse grouping function for values based on the composite key.
276   */
277  public static class CompositeIntReverseGroupFn extends CompositeIntGroupFn {
278    public int compare (WritableComparable v1, WritableComparable v2) {
279      return -super.compare(v1, v2);
280    }
281   
282    public boolean equals (IntWritable v1, IntWritable v2) {
283      return !(super.equals(v1, v2));
284    }
285   
286    static {
287      WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator());
288    }
289  }
290
291
292  public void configure() throws Exception {
293    Path testdir = new Path("build/test/test.mapred.spill");
294    Path inDir = new Path(testdir, "in");
295    Path outDir = new Path(testdir, "out");
296    FileSystem fs = FileSystem.get(conf);
297    fs.delete(testdir, true);
298    conf.setInputFormat(SequenceFileInputFormat.class);
299    FileInputFormat.setInputPaths(conf, inDir);
300    FileOutputFormat.setOutputPath(conf, outDir);
301    conf.setOutputKeyClass(IntWritable.class);
302    conf.setOutputValueClass(Text.class);
303    conf.setMapOutputValueClass(IntWritable.class);
304    // set up two map jobs, so we can test merge phase in Reduce also
305    conf.setNumMapTasks(2);
306   
307    conf.setOutputFormat(SequenceFileOutputFormat.class);
308    if (!fs.mkdirs(testdir)) {
309      throw new IOException("Mkdirs failed to create " + testdir.toString());
310    }
311    if (!fs.mkdirs(inDir)) {
312      throw new IOException("Mkdirs failed to create " + inDir.toString());
313    }
314    // set up input data in 2 files
315    Path inFile = new Path(inDir, "part0");
316    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
317        IntWritable.class, IntWritable.class);
318    writer.append(new IntWritable(11), new IntWritable(999));
319    writer.append(new IntWritable(23), new IntWritable(456));
320    writer.append(new IntWritable(10), new IntWritable(780));
321    writer.close();
322    inFile = new Path(inDir, "part1");
323    writer = SequenceFile.createWriter(fs, conf, inFile, 
324        IntWritable.class, IntWritable.class);
325    writer.append(new IntWritable(45), new IntWritable(100));
326    writer.append(new IntWritable(18), new IntWritable(200));
327    writer.append(new IntWritable(27), new IntWritable(300));
328    writer.close();
329   
330    jc = new JobClient(conf);
331  }
332 
333  /**
334   * Test the default comparator for Map/Reduce.
335   * Use the identity mapper and see if the keys are sorted at the end
336   * @throws Exception
337   */
338  public void testDefaultMRComparator() throws Exception { 
339    configure();
340    conf.setMapperClass(IdentityMapper.class);
341    conf.setReducerClass(AscendingKeysReducer.class);
342   
343    RunningJob r_job = jc.submitJob(conf);
344    while (!r_job.isComplete()) {
345      Thread.sleep(1000);
346    }
347   
348    if (!r_job.isSuccessful()) {
349      fail("Oops! The job broke due to an unexpected error");
350    }
351  }
352 
353  /**
354   * Test user-defined comparator for Map/Reduce.
355   * We provide our own comparator that is the reverse of the default int
356   * comparator. Keys should be sorted in reverse order in the reducer.
357   * @throws Exception
358   */
359  public void testUserMRComparator() throws Exception { 
360    configure();
361    conf.setMapperClass(IdentityMapper.class);
362    conf.setReducerClass(DescendingKeysReducer.class);
363    conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
364   
365    RunningJob r_job = jc.submitJob(conf);
366    while (!r_job.isComplete()) {
367      Thread.sleep(1000);
368    }
369   
370    if (!r_job.isSuccessful()) {
371      fail("Oops! The job broke due to an unexpected error");
372    }
373  }
374 
375  /**
376   * Test user-defined grouping comparator for grouping values in Reduce.
377   * We generate composite keys that contain a random number, which acts
378   * as a timestamp associated with the record. In our Reduce function,
379   * values for a key should be sorted by the 'timestamp'.
380   * @throws Exception
381   */
382  public void testUserValueGroupingComparator() throws Exception { 
383    configure();
384    conf.setMapperClass(RandomGenMapper.class);
385    conf.setReducerClass(AscendingGroupReducer.class);
386    conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class);
387   
388    RunningJob r_job = jc.submitJob(conf);
389    while (!r_job.isComplete()) {
390      Thread.sleep(1000);
391    }
392   
393    if (!r_job.isSuccessful()) {
394      fail("Oops! The job broke due to an unexpected error");
395    }
396  }
397 
398  /**
399   * Test all user comparators. Super-test of all tests here.
400   * We generate composite keys that contain a random number, which acts
401   * as a timestamp associated with the record. In our Reduce function,
402   * values for a key should be sorted by the 'timestamp'.
403   * We also provide our own comparators that reverse the default sorting
404   * order. This lets us make sure that the right comparators are used.
405   * @throws Exception
406   */
407  public void testAllUserComparators() throws Exception { 
408    configure();
409    conf.setMapperClass(RandomGenMapper.class);
410    // use a decreasing comparator so keys are sorted in reverse order
411    conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
412    conf.setReducerClass(DescendingGroupReducer.class);
413    conf.setOutputValueGroupingComparator(CompositeIntReverseGroupFn.class);
414    RunningJob r_job = jc.submitJob(conf);
415    while (!r_job.isComplete()) {
416      Thread.sleep(1000);
417    }
418   
419    if (!r_job.isSuccessful()) {
420      fail("Oops! The job broke due to an unexpected error");
421    }
422  }
423
424  /**
425   * Test a user comparator that relies on deserializing both arguments
426   * for each compare.
427   */
428  public void testBakedUserComparator() throws Exception {
429    MyWritable a = new MyWritable(8, 8);
430    MyWritable b = new MyWritable(7, 9);
431    assertTrue(a.compareTo(b) > 0);
432    assertTrue(WritableComparator.get(MyWritable.class).compare(a, b) < 0);
433  }
434
435  public static class MyWritable implements WritableComparable<MyWritable> {
436    int i, j;
437    public MyWritable() { }
438    public MyWritable(int i, int j) {
439      this.i = i;
440      this.j = j;
441    }
442    public void readFields(DataInput in) throws IOException {
443      i = in.readInt();
444      j = in.readInt();
445    }
446    public void write(DataOutput out) throws IOException {
447      out.writeInt(i);
448      out.writeInt(j);
449    }
450    public int compareTo(MyWritable b) {
451      return this.i - b.i;
452    }
453    static {
454      WritableComparator.define(MyWritable.class, new MyCmp());
455    }
456  }
457
458  public static class MyCmp extends WritableComparator {
459    public MyCmp() { super(MyWritable.class, true); }
460    public int compare(WritableComparable a, WritableComparable b) {
461      MyWritable aa = (MyWritable)a;
462      MyWritable bb = (MyWritable)b;
463      return aa.j - bb.j;
464    }
465  }
466
467}
Note: See TracBrowser for help on using the repository browser.