source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/SortValidator.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 21.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.*;
22import java.net.URI;
23import java.util.*;
24
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.conf.Configured;
27import org.apache.hadoop.io.BytesWritable;
28import org.apache.hadoop.io.IntWritable;
29import org.apache.hadoop.io.SequenceFile;
30import org.apache.hadoop.io.Text;
31import org.apache.hadoop.io.Writable;
32import org.apache.hadoop.io.WritableComparable;
33import org.apache.hadoop.io.WritableComparator;
34import org.apache.hadoop.io.WritableUtils;
35import org.apache.hadoop.mapred.lib.HashPartitioner;
36import org.apache.hadoop.util.Tool;
37import org.apache.hadoop.util.ToolRunner;
38import org.apache.hadoop.fs.*;
39
40/**
41 * A set of utilities to validate the <b>sort</b> of the map-reduce framework.
42 * This utility program has 2 main parts:
43 * 1. Checking the records' statistics
44 *   a) Validates the no. of bytes and records in sort's input & output.
45 *   b) Validates the xor of the md5's of each key/value pair.
46 *   c) Ensures same key/value is present in both input and output.
47 * 2. Check individual records  to ensure each record is present in both
48 *    the input and the output of the sort (expensive on large data-sets).
49 *   
50 * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
51 *            [-m <i>maps</i>] [-r <i>reduces</i>] [-deep]
52 *            -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i>
53 */
54public class SortValidator extends Configured implements Tool {
55
56  static private final IntWritable sortInput = new IntWritable(1); 
57  static private final IntWritable sortOutput = new IntWritable(2); 
58
59  static void printUsage() {
60    System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
61                       "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
62    System.exit(1);
63  }
64
65  static private IntWritable deduceInputFile(JobConf job) {
66    Path[] inputPaths = FileInputFormat.getInputPaths(job);
67    Path inputFile = new Path(job.get("map.input.file"));
68
69    // value == one for sort-input; value == two for sort-output
70    return (inputFile.getParent().equals(inputPaths[0])) ? 
71        sortInput : sortOutput;
72  }
73 
74  static private byte[] pair(BytesWritable a, BytesWritable b) {
75    byte[] pairData = new byte[a.getLength()+ b.getLength()];
76    System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
77    System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
78    return pairData;
79  }
80
81  private static final PathFilter sortPathsFilter = new PathFilter() {
82    public boolean accept(Path path) {
83      return (path.getName().startsWith("part-"));
84    }
85  };
86 
87  /**
88   * A simple map-reduce job which checks consistency of the
89   * MapReduce framework's sort by checking:
90   * a) Records are sorted correctly
91   * b) Keys are partitioned correctly
92   * c) The input and output have same no. of bytes and records.
93   * d) The input and output have the correct 'checksum' by xor'ing
94   *    the md5 of each record.
95   *   
96   */
97  public static class RecordStatsChecker {
98
99    /**
100     * Generic way to get <b>raw</b> data from a {@link Writable}.
101     */
102    static class Raw {
103      /**
104       * Get raw data bytes from a {@link Writable}
105       * @param writable {@link Writable} object from whom to get the raw data
106       * @return raw data of the writable
107       */
108      public byte[] getRawBytes(Writable writable) {
109        return writable.toString().getBytes(); 
110      } 
111     
112      /**
113       * Get number of raw data bytes of the {@link Writable}
114       * @param writable {@link Writable} object from whom to get the raw data
115       *                 length
116       * @return number of raw data bytes
117       */
118      public int getRawBytesLength(Writable writable) {
119        return writable.toString().getBytes().length; 
120      }
121    }
122
123    /**
124     * Specialization of {@link Raw} for {@link BytesWritable}.
125     */
126    static class RawBytesWritable extends Raw  {
127      public byte[] getRawBytes(Writable bw) {
128        return ((BytesWritable)bw).getBytes();
129      }
130      public int getRawBytesLength(Writable bw) {
131        return ((BytesWritable)bw).getLength(); 
132      }
133    }
134   
135    /**
136     * Specialization of {@link Raw} for {@link Text}.
137     */
138    static class RawText extends Raw  {
139      public byte[] getRawBytes(Writable text) {
140        return ((Text)text).getBytes();
141      }
142      public int getRawBytesLength(Writable text) {
143        return ((Text)text).getLength();
144      }
145    }
146   
147    private static Raw createRaw(Class rawClass) {
148      if (rawClass == Text.class) {
149        return new RawText();
150      } else if (rawClass == BytesWritable.class) {
151        System.err.println("Returning " + RawBytesWritable.class);
152        return new RawBytesWritable();
153      }     
154      return new Raw();
155    }
156
157    public static class RecordStatsWritable implements Writable {
158      private long bytes = 0;
159      private long records = 0;
160      private int checksum = 0;
161     
162      public RecordStatsWritable() {}
163     
164      public RecordStatsWritable(long bytes, long records, int checksum) {
165        this.bytes = bytes;
166        this.records = records;
167        this.checksum = checksum;
168      }
169     
170      public void write(DataOutput out) throws IOException {
171        WritableUtils.writeVLong(out, bytes);
172        WritableUtils.writeVLong(out, records);
173        WritableUtils.writeVInt(out, checksum);
174      }
175
176      public void readFields(DataInput in) throws IOException {
177        bytes = WritableUtils.readVLong(in);
178        records = WritableUtils.readVLong(in);
179        checksum = WritableUtils.readVInt(in);
180      }
181     
182      public long getBytes() { return bytes; }
183      public long getRecords() { return records; }
184      public int getChecksum() { return checksum; }
185    }
186   
187    public static class Map extends MapReduceBase
188      implements Mapper<WritableComparable, Writable,
189                        IntWritable, RecordStatsWritable> {
190     
191      private IntWritable key = null;
192      private WritableComparable prevKey = null;
193      private Class<? extends WritableComparable> keyClass;
194      private Partitioner<WritableComparable, Writable> partitioner = null;
195      private int partition = -1;
196      private int noSortReducers = -1;
197      private long recordId = -1;
198     
199      private Raw rawKey;
200      private Raw rawValue;
201
202      public void configure(JobConf job) {
203        // 'key' == sortInput for sort-input; key == sortOutput for sort-output
204        key = deduceInputFile(job);
205       
206        if (key == sortOutput) {
207          partitioner = new HashPartitioner<WritableComparable, Writable>();
208         
209          // Figure the 'current' partition and no. of reduces of the 'sort'
210          try {
211            URI inputURI = new URI(job.get("map.input.file"));
212            String inputFile = inputURI.getPath();
213            partition = Integer.valueOf(
214                                        inputFile.substring(inputFile.lastIndexOf("part")+5)
215                                        ).intValue();
216            noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
217          } catch (Exception e) {
218            System.err.println("Caught: " + e);
219            System.exit(-1);
220          }
221        }
222      }
223     
224      @SuppressWarnings("unchecked")
225      public void map(WritableComparable key, Writable value,
226                      OutputCollector<IntWritable, RecordStatsWritable> output, 
227                      Reporter reporter) throws IOException {
228        // Set up rawKey and rawValue on the first call to 'map'
229        if (recordId == -1) {
230         rawKey = createRaw(key.getClass());
231         rawValue = createRaw(value.getClass());
232        }
233        ++recordId;
234       
235        if (this.key == sortOutput) {
236          // Check if keys are 'sorted' if this 
237          // record is from sort's output
238          if (prevKey == null) {
239            prevKey = key;
240            keyClass = prevKey.getClass();
241          } else {
242            // Sanity check
243            if (keyClass != key.getClass()) {
244              throw new IOException("Type mismatch in key: expected " +
245                                    keyClass.getName() + ", recieved " +
246                                    key.getClass().getName());
247            }
248           
249            // Check if they were sorted correctly
250            if (prevKey.compareTo(key) > 0) {
251              throw new IOException("The 'map-reduce' framework wrongly" +
252                                    " classifed (" + prevKey + ") > (" + 
253                                    key + ") "+ "for record# " + recordId); 
254            }
255            prevKey = key;
256          }
257
258          // Check if the sorted output is 'partitioned' right
259          int keyPartition = 
260            partitioner.getPartition(key, value, noSortReducers);
261          if (partition != keyPartition) {
262            throw new IOException("Partitions do not match for record# " + 
263                                  recordId + " ! - '" + partition + "' v/s '" + 
264                                  keyPartition + "'");
265          }
266        }
267
268        // Construct the record-stats and output (this.key, record-stats)
269        byte[] keyBytes = rawKey.getRawBytes(key);
270        int keyBytesLen = rawKey.getRawBytesLength(key);
271        byte[] valueBytes = rawValue.getRawBytes(value);
272        int valueBytesLen = rawValue.getRawBytesLength(value);
273       
274        int keyValueChecksum = 
275          (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
276           WritableComparator.hashBytes(valueBytes, valueBytesLen));
277
278        output.collect(this.key, 
279                       new RecordStatsWritable((keyBytesLen+valueBytesLen),
280                       1, keyValueChecksum)
281                      );
282      }
283     
284    }
285   
286    public static class Reduce extends MapReduceBase
287      implements Reducer<IntWritable, RecordStatsWritable,
288                         IntWritable, RecordStatsWritable> {
289     
290      public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
291                         OutputCollector<IntWritable,
292                                         RecordStatsWritable> output, 
293                         Reporter reporter) throws IOException {
294        long bytes = 0;
295        long records = 0;
296        int xor = 0;
297        while (values.hasNext()) {
298          RecordStatsWritable stats = values.next();
299          bytes += stats.getBytes();
300          records += stats.getRecords();
301          xor ^= stats.getChecksum(); 
302        }
303       
304        output.collect(key, new RecordStatsWritable(bytes, records, xor));
305      }
306    }
307   
308    public static class NonSplitableSequenceFileInputFormat 
309      extends SequenceFileInputFormat {
310      protected boolean isSplitable(FileSystem fs, Path filename) {
311        return false;
312      }
313    }
314   
315    static void checkRecords(Configuration defaults, 
316                             Path sortInput, Path sortOutput) throws IOException {
317      FileSystem inputfs = sortInput.getFileSystem(defaults);
318      FileSystem outputfs = sortOutput.getFileSystem(defaults);
319      FileSystem defaultfs = FileSystem.get(defaults);
320      JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
321      jobConf.setJobName("sortvalidate-recordstats-checker");
322
323      int noSortReduceTasks = 
324        outputfs.listStatus(sortOutput, sortPathsFilter).length;
325      jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks);
326      int noSortInputpaths =  inputfs.listStatus(sortInput).length;
327
328      jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
329      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
330     
331      jobConf.setOutputKeyClass(IntWritable.class);
332      jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
333     
334      jobConf.setMapperClass(Map.class);
335      jobConf.setCombinerClass(Reduce.class);
336      jobConf.setReducerClass(Reduce.class);
337     
338      jobConf.setNumMapTasks(noSortReduceTasks);
339      jobConf.setNumReduceTasks(1);
340
341      FileInputFormat.setInputPaths(jobConf, sortInput);
342      FileInputFormat.addInputPath(jobConf, sortOutput);
343      Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
344      if (defaultfs.exists(outputPath)) {
345        defaultfs.delete(outputPath, true);
346      }
347      FileOutputFormat.setOutputPath(jobConf, outputPath);
348     
349      // Uncomment to run locally in a single process
350      //job_conf.set("mapred.job.tracker", "local");
351      Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
352      System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
353                         "from " + inputPaths[0] + " (" + 
354                         noSortInputpaths + " files), " + 
355                         inputPaths[1] + " (" + 
356                         noSortReduceTasks + 
357                         " files) into " + 
358                         FileOutputFormat.getOutputPath(jobConf) + 
359                         " with 1 reducer.");
360      Date startTime = new Date();
361      System.out.println("Job started: " + startTime);
362      JobClient.runJob(jobConf);
363      Date end_time = new Date();
364      System.out.println("Job ended: " + end_time);
365      System.out.println("The job took " + 
366                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
367     
368      // Check to ensure that the statistics of the
369      // framework's sort-input and sort-output match
370      SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
371                                                          new Path(outputPath, "part-00000"), defaults);
372      IntWritable k1 = new IntWritable();
373      IntWritable k2 = new IntWritable();
374      RecordStatsWritable v1 = new RecordStatsWritable();
375      RecordStatsWritable v2 = new RecordStatsWritable();
376      if (!stats.next(k1, v1)) {
377        throw new IOException("Failed to read record #1 from reduce's output");
378      }
379      if (!stats.next(k2, v2)) {
380        throw new IOException("Failed to read record #2 from reduce's output");
381      }
382
383      if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || 
384          v1.getChecksum() != v2.getChecksum()) {
385        throw new IOException("(" + 
386                              v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
387                              v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
388      }
389    }
390
391  }
392 
393  /**
394   * A simple map-reduce task to check if the input and the output
395   * of the framework's sort is consistent by ensuring each record
396   * is present in both the input and the output.
397   *
398   */
399  public static class RecordChecker {
400   
401    public static class Map extends MapReduceBase
402      implements Mapper<BytesWritable, BytesWritable,
403                        BytesWritable, IntWritable> {
404     
405      private IntWritable value = null;
406     
407      public void configure(JobConf job) {
408        // value == one for sort-input; value == two for sort-output
409        value = deduceInputFile(job);
410      }
411     
412      public void map(BytesWritable key, 
413                      BytesWritable value,
414                      OutputCollector<BytesWritable, IntWritable> output, 
415                      Reporter reporter) throws IOException {
416        // newKey = (key, value)
417        BytesWritable keyValue = new BytesWritable(pair(key, value));
418   
419        // output (newKey, value)
420        output.collect(keyValue, this.value);
421      }
422    }
423   
424    public static class Reduce extends MapReduceBase
425      implements Reducer<BytesWritable, IntWritable,
426                        BytesWritable, IntWritable> {
427     
428      public void reduce(BytesWritable key, Iterator<IntWritable> values,
429                         OutputCollector<BytesWritable, IntWritable> output,
430                         Reporter reporter) throws IOException {
431        int ones = 0;
432        int twos = 0;
433        while (values.hasNext()) {
434          IntWritable count = values.next(); 
435          if (count.equals(sortInput)) {
436            ++ones;
437          } else if (count.equals(sortOutput)) {
438            ++twos;
439          } else {
440            throw new IOException("Invalid 'value' of " + count.get() + 
441                                  " for (key,value): " + key.toString());
442          }
443        }
444       
445        // Check to ensure there are equal no. of ones and twos
446        if (ones != twos) {
447          throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
448                                ") for (key, value): " + key.toString());
449        }
450      }
451    }
452   
453    static void checkRecords(Configuration defaults, int noMaps, int noReduces,
454                             Path sortInput, Path sortOutput) throws IOException {
455      JobConf jobConf = new JobConf(defaults, RecordChecker.class);
456      jobConf.setJobName("sortvalidate-record-checker");
457     
458      jobConf.setInputFormat(SequenceFileInputFormat.class);
459      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
460     
461      jobConf.setOutputKeyClass(BytesWritable.class);
462      jobConf.setOutputValueClass(IntWritable.class);
463     
464      jobConf.setMapperClass(Map.class);       
465      jobConf.setReducerClass(Reduce.class);
466     
467      JobClient client = new JobClient(jobConf);
468      ClusterStatus cluster = client.getClusterStatus();
469      if (noMaps == -1) {
470        noMaps = cluster.getTaskTrackers() * 
471          jobConf.getInt("test.sortvalidate.maps_per_host", 10);
472      }
473      if (noReduces == -1) {
474        noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
475        String sortReduces = jobConf.get("test.sortvalidate.reduces_per_host");
476        if (sortReduces != null) {
477           noReduces = cluster.getTaskTrackers() * 
478                           Integer.parseInt(sortReduces);
479        }
480      }
481      jobConf.setNumMapTasks(noMaps);
482      jobConf.setNumReduceTasks(noReduces);
483     
484      FileInputFormat.setInputPaths(jobConf, sortInput);
485      FileInputFormat.addInputPath(jobConf, sortOutput);
486      Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
487      FileSystem fs = FileSystem.get(defaults);
488      if (fs.exists(outputPath)) {
489        fs.delete(outputPath, true);
490      }
491      FileOutputFormat.setOutputPath(jobConf, outputPath);
492     
493      // Uncomment to run locally in a single process
494      //job_conf.set("mapred.job.tracker", "local");
495      Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
496      System.out.println("\nSortValidator.RecordChecker: Running on " +
497                         cluster.getTaskTrackers() +
498                        " nodes to validate sort from " + 
499                         inputPaths[0] + ", " + 
500                         inputPaths[1] + " into " + 
501                         FileOutputFormat.getOutputPath(jobConf) + 
502                         " with " + noReduces + " reduces.");
503      Date startTime = new Date();
504      System.out.println("Job started: " + startTime);
505      JobClient.runJob(jobConf);
506      Date end_time = new Date();
507      System.out.println("Job ended: " + end_time);
508      System.out.println("The job took " + 
509                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
510    }
511  }
512
513 
514  /**
515   * The main driver for sort-validator program.
516   * Invoke this method to submit the map/reduce job.
517   * @throws IOException When there is communication problems with the
518   *                     job tracker.
519   */
520  public int run(String[] args) throws Exception {
521    Configuration defaults = getConf();
522   
523    int noMaps = -1, noReduces = -1;
524    Path sortInput = null, sortOutput = null;
525    boolean deepTest = false;
526    for(int i=0; i < args.length; ++i) {
527      try {
528        if ("-m".equals(args[i])) {
529          noMaps = Integer.parseInt(args[++i]);
530        } else if ("-r".equals(args[i])) {
531          noReduces = Integer.parseInt(args[++i]);
532        } else if ("-sortInput".equals(args[i])){
533          sortInput = new Path(args[++i]);
534        } else if ("-sortOutput".equals(args[i])){
535          sortOutput = new Path(args[++i]);
536        } else if ("-deep".equals(args[i])) {
537          deepTest = true;
538        } else {
539          printUsage();
540          return -1;
541        }
542      } catch (NumberFormatException except) {
543        System.err.println("ERROR: Integer expected instead of " + args[i]);
544        printUsage();
545        return -1;
546      } catch (ArrayIndexOutOfBoundsException except) {
547        System.err.println("ERROR: Required parameter missing from " +
548                           args[i-1]);
549        printUsage();
550        return -1;
551      }
552    }
553   
554    // Sanity check
555    if (sortInput == null || sortOutput == null) {
556      printUsage();
557      return -2;
558    }
559
560    // Check if the records are consistent and sorted correctly
561    RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
562
563    // Check if the same records are present in sort's inputs & outputs
564    if (deepTest) {
565      RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput, 
566                                 sortOutput);
567    }
568   
569    System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
570                       " successfully.");
571   
572    return 0;
573  }
574
575  public static void main(String[] args) throws Exception {
576    int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
577    System.exit(res);
578  }
579}
Note: See TracBrowser for help on using the repository browser.