source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/MRBench.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.io.PrintStream;
23import java.util.ArrayList;
24import java.util.Iterator;
25import java.util.Random;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.io.UTF8;
32import org.apache.hadoop.io.WritableComparable;
33import org.apache.hadoop.io.Text;
34
35/**
36 * Runs a job multiple times and takes average of all runs.
37 */
38public class MRBench {
39 
40  private static final Log LOG = LogFactory.getLog(MRBench.class);
41  private static Path BASE_DIR =
42    new Path(System.getProperty("test.build.data","/benchmarks/MRBench"));
43  private static Path INPUT_DIR = new Path(BASE_DIR, "mr_input");
44  private static Path OUTPUT_DIR = new Path(BASE_DIR, "mr_output");
45 
46  public static enum Order {RANDOM, ASCENDING, DESCENDING}; 
47 
48  /**
49   * Takes input format as text lines, runs some processing on it and
50   * writes out data as text again.
51   */
52  public static class Map extends MapReduceBase
53    implements Mapper<WritableComparable, Text, UTF8, UTF8> {
54   
55    public void map(WritableComparable key, Text value,
56                    OutputCollector<UTF8, UTF8> output,
57                    Reporter reporter) throws IOException
58    {
59      String line = value.toString();
60      output.collect(new UTF8(process(line)), new UTF8(""));           
61    }
62    public String process(String line) {
63      return line; 
64    }
65  }
66
67  /**
68   * Ignores the key and writes values to the output.
69   */
70  public static class Reduce extends MapReduceBase
71    implements Reducer<UTF8, UTF8, UTF8, UTF8> {
72   
73    public void reduce(UTF8 key, Iterator<UTF8> values,
74                       OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException
75    {
76      while(values.hasNext()) {
77        output.collect(key, new UTF8(values.next().toString()));
78      }
79    }
80  }
81
82  /**
83   * Generate a text file on the given filesystem with the given path name.
84   * The text file will contain the given number of lines of generated data.
85   * The generated data are string representations of numbers.  Each line
86   * is the same length, which is achieved by padding each number with
87   * an appropriate number of leading '0' (zero) characters.  The order of
88   * generated data is one of ascending, descending, or random.
89   */
90  public static void generateTextFile(FileSystem fs, Path inputFile, 
91                                      long numLines, Order sortOrder) throws IOException
92  {
93    LOG.info("creating control file: "+numLines+" numLines, "+sortOrder+" sortOrder");
94    PrintStream output = null;
95    try {
96      output = new PrintStream(fs.create(inputFile));
97      int padding = String.valueOf(numLines).length();
98      switch(sortOrder) {
99      case RANDOM:
100        for (long l = 0; l < numLines; l++) {
101          output.println(pad((new Random()).nextLong(), padding));
102        }
103        break; 
104      case ASCENDING: 
105        for (long l = 0; l < numLines; l++) {
106          output.println(pad(l, padding));
107        }
108        break;
109      case DESCENDING: 
110        for (long l = numLines; l > 0; l--) {
111          output.println(pad(l, padding));
112        }
113        break;
114      }
115    } finally {
116      if (output != null)
117        output.close();
118    }
119    LOG.info("created control file: " + inputFile);
120  }
121 
122  /**
123   * Convert the given number to a string and pad the number with
124   * leading '0' (zero) characters so that the string is exactly
125   * the given length.
126   */
127  private static String pad(long number, int length) {
128    String str = String.valueOf(number);
129    StringBuffer value = new StringBuffer(); 
130    for (int i = str.length(); i < length; i++) {
131      value.append("0"); 
132    }
133    value.append(str); 
134    return value.toString();
135  }
136 
137  /**
138   * Create the job configuration.
139   */
140  private static JobConf setupJob(int numMaps, int numReduces, String jarFile) {
141    JobConf jobConf = new JobConf(MRBench.class);
142    FileInputFormat.addInputPath(jobConf, INPUT_DIR);
143   
144    jobConf.setInputFormat(TextInputFormat.class);
145    jobConf.setOutputFormat(TextOutputFormat.class);
146   
147    jobConf.setOutputValueClass(UTF8.class);
148   
149    jobConf.setMapOutputKeyClass(UTF8.class);
150    jobConf.setMapOutputValueClass(UTF8.class);
151   
152    if (null != jarFile) {
153      jobConf.setJar(jarFile);
154    }
155    jobConf.setMapperClass(Map.class);
156    jobConf.setReducerClass(Reduce.class);
157   
158    jobConf.setNumMapTasks(numMaps);
159    jobConf.setNumReduceTasks(numReduces);
160   
161    return jobConf; 
162  }
163 
164  /**
165   * Runs a MapReduce task, given number of times. The input to each run
166   * is the same file.
167   */
168  private static ArrayList<Long> runJobInSequence(JobConf masterJobConf, int numRuns) throws IOException {
169    Path intrimData = null; 
170    Random rand = new Random();
171    ArrayList<Long> execTimes = new ArrayList<Long>(); 
172   
173    for (int i = 0; i < numRuns; i++) {
174      // create a new job conf every time, reusing same object does not work
175      JobConf jobConf = new JobConf(masterJobConf);
176      // reset the job jar because the copy constructor doesn't
177      jobConf.setJar(masterJobConf.getJar());
178      // give a new random name to output of the mapred tasks
179      FileOutputFormat.setOutputPath(jobConf, 
180                         new Path(OUTPUT_DIR, "output_" + rand.nextInt()));
181
182      LOG.info("Running job " + i + ":" +
183               " input=" + FileInputFormat.getInputPaths(jobConf)[0] + 
184               " output=" + FileOutputFormat.getOutputPath(jobConf));
185     
186      // run the mapred task now
187      long curTime = System.currentTimeMillis();
188      JobClient.runJob(jobConf);
189      execTimes.add(new Long(System.currentTimeMillis() - curTime));
190    }
191    return execTimes;
192  }
193 
194  /**
195   * <pre>
196   * Usage: mrbench
197   *    [-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>]
198   *    [-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>]
199   *    [-numRuns <number of times to run the job, default is 1>]
200   *    [-maps <number of maps for each run, default is 2>]
201   *    [-reduces <number of reduces for each run, default is 1>]
202   *    [-inputLines <number of input lines to generate, default is 1>]
203   *    [-inputType <type of input to generate, one of ascending (default), descending, random>]
204   *    [-verbose]
205   * </pre>
206   */
207  public static void main (String[] args) throws IOException {
208    String version = "MRBenchmark.0.0.2";
209    System.out.println(version);
210
211    String usage = 
212      "Usage: mrbench " +
213      "[-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>] " + 
214      "[-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] " + 
215      "[-numRuns <number of times to run the job, default is 1>] " +
216      "[-maps <number of maps for each run, default is 2>] " +
217      "[-reduces <number of reduces for each run, default is 1>] " +
218      "[-inputLines <number of input lines to generate, default is 1>] " +
219      "[-inputType <type of input to generate, one of ascending (default), descending, random>] " + 
220      "[-verbose]";
221   
222    String jarFile = null;
223    int inputLines = 1; 
224    int numRuns = 1;
225    int numMaps = 2; 
226    int numReduces = 1;
227    boolean verbose = false;         
228    Order inputSortOrder = Order.ASCENDING;     
229    for (int i = 0; i < args.length; i++) { // parse command line
230      if (args[i].equals("-jar")) {
231        jarFile = args[++i];
232      } else if (args[i].equals("-numRuns")) {
233        numRuns = Integer.parseInt(args[++i]);
234      } else if (args[i].equals("-baseDir")) {
235        BASE_DIR = new Path(args[++i]);
236      } else if (args[i].equals("-maps")) {
237        numMaps = Integer.parseInt(args[++i]);
238      } else if (args[i].equals("-reduces")) {
239        numReduces = Integer.parseInt(args[++i]);
240      } else if (args[i].equals("-inputLines")) {
241        inputLines = Integer.parseInt(args[++i]);
242      } else if (args[i].equals("-inputType")) {
243        String s = args[++i]; 
244        if (s.equalsIgnoreCase("ascending")) {
245          inputSortOrder = Order.ASCENDING;
246        } else if (s.equalsIgnoreCase("descending")) {
247          inputSortOrder = Order.DESCENDING; 
248        } else if (s.equalsIgnoreCase("random")) {
249          inputSortOrder = Order.RANDOM;
250        } else {
251          inputSortOrder = null;
252        }
253      } else if (args[i].equals("-verbose")) {
254        verbose = true;
255      } else {
256        System.err.println(usage);
257        System.exit(-1);
258      }
259    }
260   
261    if (numRuns < 1 ||  // verify args
262        numMaps < 1 ||
263        numReduces < 1 ||
264        inputLines < 0 ||
265        inputSortOrder == null)
266      {
267        System.err.println(usage);
268        System.exit(-1);
269      }
270
271    JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
272    FileSystem fs = FileSystem.get(jobConf);
273    Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
274    generateTextFile(fs, inputFile, inputLines, inputSortOrder);
275
276    // setup test output directory
277    fs.mkdirs(BASE_DIR); 
278    ArrayList<Long> execTimes = new ArrayList<Long>();
279    try {
280      execTimes = runJobInSequence(jobConf, numRuns);
281    } finally {
282      // delete output -- should we really do this?
283      fs.delete(BASE_DIR, true);
284    }
285   
286    if (verbose) {
287      // Print out a report
288      System.out.println("Total MapReduce jobs executed: " + numRuns);
289      System.out.println("Total lines of data per job: " + inputLines);
290      System.out.println("Maps per job: " + numMaps);
291      System.out.println("Reduces per job: " + numReduces);
292    }
293    int i = 0;
294    long totalTime = 0; 
295    for (Long time : execTimes) {
296      totalTime += time.longValue(); 
297      if (verbose) {
298        System.out.println("Total milliseconds for task: " + (++i) + 
299                           " = " +  time);
300      }
301    }
302    long avgTime = totalTime / numRuns;   
303    System.out.println("DataLines\tMaps\tReduces\tAvgTime (milliseconds)");
304    System.out.println(inputLines + "\t\t" + numMaps + "\t" + 
305                       numReduces + "\t" + avgTime);
306  }
307 
308}
Note: See TracBrowser for help on using the repository browser.