source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/UtilsForTests.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 19.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.text.DecimalFormat;
22import java.io.*;
23import java.util.Arrays;
24import java.util.Iterator;
25
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.examples.RandomWriter;
28import org.apache.hadoop.fs.Path;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.hdfs.DFSTestUtil;
31import org.apache.hadoop.hdfs.MiniDFSCluster;
32import org.apache.hadoop.hdfs.server.namenode.NameNode;
33import org.apache.hadoop.io.BytesWritable;
34import org.apache.hadoop.io.SequenceFile;
35import org.apache.hadoop.io.Text;
36import org.apache.hadoop.io.Writable;
37import org.apache.hadoop.io.IntWritable;
38import org.apache.hadoop.io.LongWritable;
39import org.apache.hadoop.io.WritableComparable;
40import org.apache.hadoop.io.SequenceFile.CompressionType;
41import org.apache.hadoop.mapred.JobConf;
42import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
43import org.apache.hadoop.mapred.lib.IdentityMapper;
44import org.apache.hadoop.mapred.lib.IdentityReducer;
45
46/**
47 * Utilities used in unit test.
48 * 
49 */
50public class UtilsForTests {
51
52  final static long KB = 1024L * 1;
53  final static long MB = 1024L * KB;
54  final static long GB = 1024L * MB;
55  final static long TB = 1024L * GB;
56  final static long PB = 1024L * TB;
57  final static Object waitLock = new Object();
58
59  static DecimalFormat dfm = new DecimalFormat("####.000");
60  static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
61
62  public static String dfmt(double d) {
63    return dfm.format(d);
64  }
65
66  public static String ifmt(double d) {
67    return ifm.format(d);
68  }
69
70  public static String formatBytes(long numBytes) {
71    StringBuffer buf = new StringBuffer();
72    boolean bDetails = true;
73    double num = numBytes;
74
75    if (numBytes < KB) {
76      buf.append(numBytes + " B");
77      bDetails = false;
78    } else if (numBytes < MB) {
79      buf.append(dfmt(num / KB) + " KB");
80    } else if (numBytes < GB) {
81      buf.append(dfmt(num / MB) + " MB");
82    } else if (numBytes < TB) {
83      buf.append(dfmt(num / GB) + " GB");
84    } else if (numBytes < PB) {
85      buf.append(dfmt(num / TB) + " TB");
86    } else {
87      buf.append(dfmt(num / PB) + " PB");
88    }
89    if (bDetails) {
90      buf.append(" (" + ifmt(numBytes) + " bytes)");
91    }
92    return buf.toString();
93  }
94
95  public static String formatBytes2(long numBytes) {
96    StringBuffer buf = new StringBuffer();
97    long u = 0;
98    if (numBytes >= TB) {
99      u = numBytes / TB;
100      numBytes -= u * TB;
101      buf.append(u + " TB ");
102    }
103    if (numBytes >= GB) {
104      u = numBytes / GB;
105      numBytes -= u * GB;
106      buf.append(u + " GB ");
107    }
108    if (numBytes >= MB) {
109      u = numBytes / MB;
110      numBytes -= u * MB;
111      buf.append(u + " MB ");
112    }
113    if (numBytes >= KB) {
114      u = numBytes / KB;
115      numBytes -= u * KB;
116      buf.append(u + " KB ");
117    }
118    buf.append(u + " B"); //even if zero
119    return buf.toString();
120  }
121
122  static final String regexpSpecials = "[]()?*+|.!^-\\~@";
123
124  public static String regexpEscape(String plain) {
125    StringBuffer buf = new StringBuffer();
126    char[] ch = plain.toCharArray();
127    int csup = ch.length;
128    for (int c = 0; c < csup; c++) {
129      if (regexpSpecials.indexOf(ch[c]) != -1) {
130        buf.append("\\");
131      }
132      buf.append(ch[c]);
133    }
134    return buf.toString();
135  }
136
137  public static String safeGetCanonicalPath(File f) {
138    try {
139      String s = f.getCanonicalPath();
140      return (s == null) ? f.toString() : s;
141    } catch (IOException io) {
142      return f.toString();
143    }
144  }
145
146  static String slurp(File f) throws IOException {
147    int len = (int) f.length();
148    byte[] buf = new byte[len];
149    FileInputStream in = new FileInputStream(f);
150    String contents = null;
151    try {
152      in.read(buf, 0, len);
153      contents = new String(buf, "UTF-8");
154    } finally {
155      in.close();
156    }
157    return contents;
158  }
159
160  static String slurpHadoop(Path p, FileSystem fs) throws IOException {
161    int len = (int) fs.getLength(p);
162    byte[] buf = new byte[len];
163    InputStream in = fs.open(p);
164    String contents = null;
165    try {
166      in.read(buf, 0, len);
167      contents = new String(buf, "UTF-8");
168    } finally {
169      in.close();
170    }
171    return contents;
172  }
173
174  public static String rjustify(String s, int width) {
175    if (s == null) s = "null";
176    if (width > s.length()) {
177      s = getSpace(width - s.length()) + s;
178    }
179    return s;
180  }
181
182  public static String ljustify(String s, int width) {
183    if (s == null) s = "null";
184    if (width > s.length()) {
185      s = s + getSpace(width - s.length());
186    }
187    return s;
188  }
189
190  static char[] space;
191  static {
192    space = new char[300];
193    Arrays.fill(space, '\u0020');
194  }
195
196  public static String getSpace(int len) {
197    if (len > space.length) {
198      space = new char[Math.max(len, 2 * space.length)];
199      Arrays.fill(space, '\u0020');
200    }
201    return new String(space, 0, len);
202  }
203 
204  /**
205   * Gets job status from the jobtracker given the jobclient and the job id
206   */
207  static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
208    JobStatus[] statuses = jc.getAllJobs();
209    for (JobStatus jobStatus : statuses) {
210      if (jobStatus.getJobID().equals(id)) {
211        return jobStatus;
212      }
213    }
214    return null;
215  }
216 
217  /**
218   * A utility that waits for specified amount of time
219   */
220  static void waitFor(long duration) {
221    try {
222      synchronized (waitLock) {
223        waitLock.wait(duration);
224      }
225    } catch (InterruptedException ie) {}
226  }
227 
228  /**
229   * Wait for the jobtracker to be RUNNING.
230   */
231  static void waitForJobTracker(JobClient jobClient) {
232    while (true) {
233      try {
234        ClusterStatus status = jobClient.getClusterStatus();
235        while (status.getJobTrackerState() != JobTracker.State.RUNNING) {
236          waitFor(100);
237          status = jobClient.getClusterStatus();
238        }
239        break; // means that the jt is ready
240      } catch (IOException ioe) {}
241    }
242  }
243 
244  /**
245   * Waits until all the jobs at the jobtracker complete.
246   */
247  static void waitTillDone(JobClient jobClient) throws IOException {
248    // Wait for the last job to complete
249    while (true) {
250      boolean shouldWait = false;
251      for (JobStatus jobStatuses : jobClient.getAllJobs()) {
252        if (jobStatuses.getRunState() == JobStatus.RUNNING) {
253          shouldWait = true;
254          break;
255        }
256      }
257      if (shouldWait) {
258        waitFor(1000);
259      } else {
260        break;
261      }
262    }
263  }
264 
265  /**
266   * Configure a waiting job
267   */
268  static void configureWaitingJobConf(JobConf jobConf, Path inDir,
269                                      Path outputPath, int numMaps, int numRed,
270                                      String jobName, String mapSignalFilename,
271                                      String redSignalFilename)
272  throws IOException {
273    jobConf.setJobName(jobName);
274    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
275    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
276    FileInputFormat.setInputPaths(jobConf, inDir);
277    FileOutputFormat.setOutputPath(jobConf, outputPath);
278    jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
279    jobConf.setReducerClass(IdentityReducer.class);
280    jobConf.setOutputKeyClass(BytesWritable.class);
281    jobConf.setOutputValueClass(BytesWritable.class);
282    jobConf.setInputFormat(RandomInputFormat.class);
283    jobConf.setNumMapTasks(numMaps);
284    jobConf.setNumReduceTasks(numRed);
285    jobConf.setJar("build/test/testjar/testjob.jar");
286    jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
287    jobConf.set(getTaskSignalParameter(false), redSignalFilename);
288  }
289
290  /**
291   * Commonly used map and reduce classes
292   */
293 
294  /**
295   * Map is a Mapper that just waits for a file to be created on the dfs. The
296   * file creation is a signal to the mappers and hence acts as a waiting job.
297   */
298
299  static class WaitingMapper 
300  extends MapReduceBase
301  implements Mapper<WritableComparable, Writable, 
302                    WritableComparable, Writable> {
303
304    FileSystem fs = null;
305    Path signal;
306    int id = 0;
307    int totalMaps = 0;
308
309    /**
310     * Checks if the map task needs to wait. By default all the maps will wait.
311     * This method needs to be overridden to make a custom waiting mapper.
312     */
313    public boolean shouldWait(int id) {
314      return true;
315    }
316   
317    /**
318     * Returns a signal file on which the map task should wait. By default all
319     * the maps wait on a single file passed as test.mapred.map.waiting.target.
320     * This method needs to be overridden to make a custom waiting mapper
321     */
322    public Path getSignalFile(int id) {
323      return signal;
324    }
325   
326    /** The waiting function.  The map exits once it gets a signal. Here the
327     * signal is the file existence.
328     */
329    public void map(WritableComparable key, Writable val, 
330                    OutputCollector<WritableComparable, Writable> output,
331                    Reporter reporter)
332    throws IOException {
333      if (shouldWait(id)) {
334        if (fs != null) {
335          while (!fs.exists(getSignalFile(id))) {
336            try {
337              reporter.progress();
338              synchronized (this) {
339                this.wait(1000); // wait for 1 sec
340              }
341            } catch (InterruptedException ie) {
342              System.out.println("Interrupted while the map was waiting for "
343                                 + " the signal.");
344              break;
345            }
346          }
347        } else {
348          throw new IOException("Could not get the DFS!!");
349        }
350      }
351    }
352
353    public void configure(JobConf conf) {
354      try {
355        String taskId = conf.get("mapred.task.id");
356        id = Integer.parseInt(taskId.split("_")[4]);
357        totalMaps = Integer.parseInt(conf.get("mapred.map.tasks"));
358        fs = FileSystem.get(conf);
359        signal = new Path(conf.get(getTaskSignalParameter(true)));
360      } catch (IOException ioe) {
361        System.out.println("Got an exception while obtaining the filesystem");
362      }
363    }
364  }
365 
366  /** Only the later half of the maps wait for the signal while the rest
367   * complete immediately.
368   */
369  static class HalfWaitingMapper extends WaitingMapper {
370    @Override
371    public boolean shouldWait(int id) {
372      return id >= (totalMaps / 2);
373    }
374  }
375 
376  /**
377   * Reduce that just waits for a file to be created on the dfs. The
378   * file creation is a signal to the reduce.
379   */
380
381  static class WaitingReducer extends MapReduceBase
382  implements Reducer<WritableComparable, Writable, 
383                     WritableComparable, Writable> {
384
385    FileSystem fs = null;
386    Path signal;
387   
388    /** The waiting function.  The reduce exits once it gets a signal. Here the
389     * signal is the file existence.
390     */
391    public void reduce(WritableComparable key, Iterator<Writable> val, 
392                       OutputCollector<WritableComparable, Writable> output,
393                       Reporter reporter)
394    throws IOException {
395      if (fs != null) {
396        while (!fs.exists(signal)) {
397          try {
398            reporter.progress();
399            synchronized (this) {
400              this.wait(1000); // wait for 1 sec
401            }
402          } catch (InterruptedException ie) {
403            System.out.println("Interrupted while the map was waiting for the"
404                               + " signal.");
405            break;
406          }
407        }
408      } else {
409        throw new IOException("Could not get the DFS!!");
410      }
411    }
412
413    public void configure(JobConf conf) {
414      try {
415        fs = FileSystem.get(conf);
416        signal = new Path(conf.get(getTaskSignalParameter(false)));
417      } catch (IOException ioe) {
418        System.out.println("Got an exception while obtaining the filesystem");
419      }
420    }
421  }
422 
423  static String getTaskSignalParameter(boolean isMap) {
424    return isMap
425           ? "test.mapred.map.waiting.target" 
426           : "test.mapred.reduce.waiting.target";
427  }
428 
429  /**
430   * Signal the maps/reduces to start.
431   */
432  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
433                          String mapSignalFile, 
434                          String reduceSignalFile, int replication) 
435  throws IOException {
436    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), 
437              (short)replication);
438    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), 
439              (short)replication);
440  }
441 
442  /**
443   * Signal the maps/reduces to start.
444   */
445  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
446                          boolean isMap, String mapSignalFile, 
447                          String reduceSignalFile)
448  throws IOException {
449    //  signal the maps to complete
450    writeFile(dfs.getNameNode(), fileSys.getConf(),
451              isMap
452              ? new Path(mapSignalFile)
453              : new Path(reduceSignalFile), (short)1);
454  }
455 
456  static String getSignalFile(Path dir) {
457    return (new Path(dir, "signal")).toString();
458  }
459 
460  static String getMapSignalFile(Path dir) {
461    return (new Path(dir, "map-signal")).toString();
462  }
463
464  static String getReduceSignalFile(Path dir) {
465    return (new Path(dir, "reduce-signal")).toString();
466  }
467 
468  static void writeFile(NameNode namenode, Configuration conf, Path name, 
469      short replication) throws IOException {
470    FileSystem fileSys = FileSystem.get(conf);
471    SequenceFile.Writer writer = 
472      SequenceFile.createWriter(fileSys, conf, name, 
473                                BytesWritable.class, BytesWritable.class,
474                                CompressionType.NONE);
475    writer.append(new BytesWritable(), new BytesWritable());
476    writer.close();
477    fileSys.setReplication(name, replication);
478    DFSTestUtil.waitReplication(fileSys, name, replication);
479  }
480 
481  // Input formats
482  /**
483   * A custom input format that creates virtual inputs of a single string
484   * for each map. Using {@link RandomWriter} code.
485   */
486  public static class RandomInputFormat implements InputFormat<Text, Text> {
487   
488    public InputSplit[] getSplits(JobConf job, 
489                                  int numSplits) throws IOException {
490      InputSplit[] result = new InputSplit[numSplits];
491      Path outDir = FileOutputFormat.getOutputPath(job);
492      for(int i=0; i < result.length; ++i) {
493        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
494                                  0, 1, (String[])null);
495      }
496      return result;
497    }
498
499    static class RandomRecordReader implements RecordReader<Text, Text> {
500      Path name;
501      public RandomRecordReader(Path p) {
502        name = p;
503      }
504      public boolean next(Text key, Text value) {
505        if (name != null) {
506          key.set(name.getName());
507          name = null;
508          return true;
509        }
510        return false;
511      }
512      public Text createKey() {
513        return new Text();
514      }
515      public Text createValue() {
516        return new Text();
517      }
518      public long getPos() {
519        return 0;
520      }
521      public void close() {}
522      public float getProgress() {
523        return 0.0f;
524      }
525    }
526
527    public RecordReader<Text, Text> getRecordReader(InputSplit split,
528                                                    JobConf job, 
529                                                    Reporter reporter) 
530    throws IOException {
531      return new RandomRecordReader(((FileSplit) split).getPath());
532    }
533  }
534
535  // Start a job and return its RunningJob object
536  static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
537                    throws IOException {
538    return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks());
539  }
540
541  // Start a job and return its RunningJob object
542  static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
543                           int numReds) throws IOException {
544
545    FileSystem fs = FileSystem.get(conf);
546    fs.delete(outDir, true);
547    if (!fs.exists(inDir)) {
548      fs.mkdirs(inDir);
549    }
550    String input = "The quick brown fox\n" + "has many silly\n"
551        + "red fox sox\n";
552    for (int i = 0; i < numMaps; ++i) {
553      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
554      file.writeBytes(input);
555      file.close();
556    }   
557
558    conf.setInputFormat(TextInputFormat.class);
559    conf.setOutputKeyClass(LongWritable.class);
560    conf.setOutputValueClass(Text.class);
561
562    FileInputFormat.setInputPaths(conf, inDir);
563    FileOutputFormat.setOutputPath(conf, outDir);
564    conf.setNumMapTasks(numMaps);
565    conf.setNumReduceTasks(numReds);
566
567    JobClient jobClient = new JobClient(conf);
568    RunningJob job = jobClient.submitJob(conf);
569
570    return job;
571  }
572
573  // Run a job that will be succeeded and wait until it completes
574  static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
575         throws IOException {
576    conf.setJobName("test-job-succeed");
577    conf.setMapperClass(IdentityMapper.class);
578    conf.setReducerClass(IdentityReducer.class);
579   
580    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
581    while (!job.isComplete()) {
582      try {
583        Thread.sleep(100);
584      } catch (InterruptedException e) {
585        break;
586      }
587    }
588
589    return job;
590  }
591
592  // Run a job that will be failed and wait until it completes
593  static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
594         throws IOException {
595    conf.setJobName("test-job-fail");
596    conf.setMapperClass(FailMapper.class);
597    conf.setReducerClass(IdentityReducer.class);
598   
599    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
600    while (!job.isComplete()) {
601      try {
602        Thread.sleep(100);
603      } catch (InterruptedException e) {
604        break;
605      }
606    }
607
608    return job;
609  }
610
611  // Run a job that will be killed and wait until it completes
612  static RunningJob runJobKill(JobConf conf,  Path inDir, Path outDir)
613         throws IOException {
614
615    conf.setJobName("test-job-kill");
616    conf.setMapperClass(KillMapper.class);
617    conf.setReducerClass(IdentityReducer.class);
618   
619    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
620    while (job.getJobState() != JobStatus.RUNNING) {
621      try {
622        Thread.sleep(100);
623      } catch (InterruptedException e) {
624        break;
625      }
626    }
627    job.killJob();
628    while (job.cleanupProgress() == 0.0f) {
629      try {
630        Thread.sleep(10);
631      } catch (InterruptedException ie) {
632        break;
633      }
634    }
635
636    return job;
637  }
638
639  // Mapper that fails
640  static class FailMapper extends MapReduceBase implements
641      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
642
643    public void map(WritableComparable key, Writable value,
644        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
645        throws IOException {
646
647      throw new RuntimeException("failing map");
648    }
649  }
650
651  // Mapper that sleeps for a long time.
652  // Used for running a job that will be killed
653  static class KillMapper extends MapReduceBase implements
654      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
655
656    public void map(WritableComparable key, Writable value,
657        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
658        throws IOException {
659
660      try {
661        Thread.sleep(1000000);
662      } catch (InterruptedException e) {
663        // Do nothing
664      }
665    }
666  }
667}
Note: See TracBrowser for help on using the repository browser.