source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.io.File;
23import java.util.Random;
24
25import org.apache.commons.logging.Log;
26import org.apache.commons.logging.LogFactory;
27import org.apache.hadoop.conf.Configured;
28import org.apache.hadoop.examples.RandomWriter;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.io.BytesWritable;
32import org.apache.hadoop.io.Text;
33import org.apache.hadoop.io.Writable;
34import org.apache.hadoop.io.WritableComparable;
35import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
36import org.apache.hadoop.mapred.lib.IdentityMapper;
37import org.apache.hadoop.mapred.lib.IdentityReducer;
38import org.apache.hadoop.util.Tool;
39import org.apache.hadoop.util.ToolRunner;
40
41/**
42 * Distributed threaded map benchmark.
43 * <p>
44 * This benchmark generates random data per map and tests the performance
45 * of having multiple spills (using multiple threads) over having just one
46 * spill. Following are the parameters that can be specified
47 * <li>File size per map.
48 * <li>Number of spills per map.
49 * <li>Number of maps per host.
50 * <p>
51 * Sort is used for benchmarking the performance.
52 */
53
54public class ThreadedMapBenchmark extends Configured implements Tool {
55
56  private static final Log LOG = LogFactory.getLog(ThreadedMapBenchmark.class);
57  private static Path BASE_DIR =
58    new Path(System.getProperty("test.build.data", 
59                                File.separator + "benchmarks" + File.separator 
60                                + "ThreadedMapBenchmark"));
61  private static Path INPUT_DIR = new Path(BASE_DIR, "input");
62  private static Path OUTPUT_DIR = new Path(BASE_DIR, "output");
63  private static final float FACTOR = 2.3f; // io.sort.mb set to
64                                            // (FACTOR * data_size) should
65                                            // result in only 1 spill
66
67  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
68 
69  /**
70   * Generates random input data of given size with keys and values of given
71   * sizes. By default it generates 128mb input data with 10 byte keys and 10
72   * byte values.
73   */
74  public static class Map extends MapReduceBase
75  implements Mapper<WritableComparable, Writable,
76                    BytesWritable, BytesWritable> {
77 
78  private long numBytesToWrite;
79  private int minKeySize;
80  private int keySizeRange;
81  private int minValueSize;
82  private int valueSizeRange;
83  private Random random = new Random();
84  private BytesWritable randomKey = new BytesWritable();
85  private BytesWritable randomValue = new BytesWritable();
86 
87  private void randomizeBytes(byte[] data, int offset, int length) {
88    for(int i = offset + length - 1; i >= offset; --i) {
89      data[i] = (byte) random.nextInt(256);
90    }
91  }
92 
93  public void map(WritableComparable key, 
94                  Writable value,
95                  OutputCollector<BytesWritable, BytesWritable> output, 
96                  Reporter reporter) throws IOException {
97    int itemCount = 0;
98    while (numBytesToWrite > 0) {
99      int keyLength = minKeySize
100                      + (keySizeRange != 0 
101                         ? random.nextInt(keySizeRange) 
102                         : 0);
103      randomKey.setSize(keyLength);
104      randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
105      int valueLength = minValueSize
106                        + (valueSizeRange != 0 
107                           ? random.nextInt(valueSizeRange) 
108                           : 0);
109      randomValue.setSize(valueLength);
110      randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
111      output.collect(randomKey, randomValue);
112      numBytesToWrite -= keyLength + valueLength;
113      reporter.incrCounter(Counters.BYTES_WRITTEN, 1);
114      reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
115      if (++itemCount % 200 == 0) {
116        reporter.setStatus("wrote record " + itemCount + ". " 
117                           + numBytesToWrite + " bytes left.");
118      }
119    }
120    reporter.setStatus("done with " + itemCount + " records.");
121  }
122 
123  @Override
124  public void configure(JobConf job) {
125    numBytesToWrite = job.getLong("test.tmb.bytes_per_map",
126                                  128 * 1024 * 1024);
127    minKeySize = job.getInt("test.tmb.min_key", 10);
128    keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize;
129    minValueSize = job.getInt("test.tmb.min_value", 10);
130    valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize;
131  }
132}
133
134  /**
135   * Generate input data for the benchmark
136   */
137  public static void generateInputData(int dataSizePerMap, 
138                                       int numSpillsPerMap, 
139                                       int numMapsPerHost, 
140                                       JobConf masterConf) 
141  throws Exception { 
142    JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
143    job.setJobName("threaded-map-benchmark-random-writer");
144    job.setJarByClass(ThreadedMapBenchmark.class);
145    job.setInputFormat(UtilsForTests.RandomInputFormat.class);
146    job.setOutputFormat(SequenceFileOutputFormat.class);
147   
148    job.setMapperClass(Map.class);
149    job.setReducerClass(IdentityReducer.class);
150   
151    job.setOutputKeyClass(BytesWritable.class);
152    job.setOutputValueClass(BytesWritable.class);
153   
154    JobClient client = new JobClient(job);
155    ClusterStatus cluster = client.getClusterStatus();
156    long totalDataSize = dataSizePerMap * numMapsPerHost
157                         * cluster.getTaskTrackers();
158    job.set("test.tmb.bytes_per_map", 
159            String.valueOf(dataSizePerMap * 1024 * 1024));
160    job.setNumReduceTasks(0); // none reduce
161    job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
162    FileOutputFormat.setOutputPath(job, INPUT_DIR);
163   
164    FileSystem fs = FileSystem.get(job);
165    fs.delete(BASE_DIR, true);
166   
167    LOG.info("Generating random input for the benchmark");
168    LOG.info("Total data : " + totalDataSize + " mb");
169    LOG.info("Data per map: " + dataSizePerMap + " mb");
170    LOG.info("Number of spills : " + numSpillsPerMap);
171    LOG.info("Number of maps per host : " + numMapsPerHost);
172    LOG.info("Number of hosts : " + cluster.getTaskTrackers());
173   
174    JobClient.runJob(job); // generates the input for the benchmark
175  }
176
177  /**
178   * This is the main routine for launching the benchmark. It generates random
179   * input data. The input is non-splittable. Sort is used for benchmarking.
180   * This benchmark reports the effect of having multiple sort and spill
181   * cycles over a single sort and spill.
182   *
183   * @throws IOException
184   */
185  public int run (String[] args) throws Exception {
186    LOG.info("Starting the benchmark for threaded spills");
187    String version = "ThreadedMapBenchmark.0.0.1";
188    System.out.println(version);
189   
190    String usage = 
191      "Usage: threadedmapbenchmark " +
192      "[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " + 
193      "[-numSpillsPerMap <number of spills per map, default is 2>] " +
194      "[-numMapsPerHost <number of maps per host, default is 1>]";
195   
196    int dataSizePerMap = 128; // in mb
197    int numSpillsPerMap = 2;
198    int numMapsPerHost = 1;
199    JobConf masterConf = new JobConf(getConf());
200   
201    for (int i = 0; i < args.length; i++) { // parse command line
202      if (args[i].equals("-dataSizePerMap")) {
203        dataSizePerMap = Integer.parseInt(args[++i]);
204      } else if (args[i].equals("-numSpillsPerMap")) {
205        numSpillsPerMap = Integer.parseInt(args[++i]);
206      } else if (args[i].equals("-numMapsPerHost")) {
207        numMapsPerHost = Integer.parseInt(args[++i]);
208      } else {
209        System.err.println(usage);
210        System.exit(-1);
211      }
212    }
213   
214    if (dataSizePerMap <  1 ||  // verify arguments
215        numSpillsPerMap < 1 ||
216        numMapsPerHost < 1)
217      {
218        System.err.println(usage);
219        System.exit(-1);
220      }
221   
222    FileSystem fs = null;
223    try {
224      // using random-writer to generate the input data
225      generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost, 
226                        masterConf);
227     
228      // configure job for sorting
229      JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
230      job.setJobName("threaded-map-benchmark-unspilled");
231      job.setJarByClass(ThreadedMapBenchmark.class);
232
233      job.setInputFormat(NonSplitableSequenceFileInputFormat.class);
234      job.setOutputFormat(SequenceFileOutputFormat.class);
235     
236      job.setOutputKeyClass(BytesWritable.class);
237      job.setOutputValueClass(BytesWritable.class);
238     
239      job.setMapperClass(IdentityMapper.class);       
240      job.setReducerClass(IdentityReducer.class);
241     
242      FileInputFormat.addInputPath(job, INPUT_DIR);
243      FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
244     
245      JobClient client = new JobClient(job);
246      ClusterStatus cluster = client.getClusterStatus();
247      job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
248      job.setNumReduceTasks(1);
249     
250      // set io.sort.mb to avoid spill
251      int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap);
252      job.set("io.sort.mb", String.valueOf(ioSortMb));
253      fs = FileSystem.get(job);
254     
255      LOG.info("Running sort with 1 spill per map");
256      long startTime = System.currentTimeMillis();
257      JobClient.runJob(job);
258      long endTime = System.currentTimeMillis();
259     
260      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
261               + " millisec");
262      fs.delete(OUTPUT_DIR, true);
263     
264      // set io.sort.mb to have multiple spills
265      JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class);
266      ioSortMb = (int)Math.ceil(FACTOR
267                                * Math.ceil((double)dataSizePerMap
268                                            / numSpillsPerMap));
269      spilledJob.set("io.sort.mb", String.valueOf(ioSortMb));
270      spilledJob.setJobName("threaded-map-benchmark-spilled");
271      spilledJob.setJarByClass(ThreadedMapBenchmark.class);
272     
273      LOG.info("Running sort with " + numSpillsPerMap + " spills per map");
274      startTime = System.currentTimeMillis();
275      JobClient.runJob(spilledJob);
276      endTime = System.currentTimeMillis();
277     
278      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
279               + " millisec");
280    } finally {
281      if (fs != null) {
282        fs.delete(BASE_DIR, true);
283      }
284    }
285    return 0;
286  }
287
288  public static void main(String[] args) throws Exception {
289    int res = ToolRunner.run(new ThreadedMapBenchmark(), args);
290    System.exit(res);
291  }
292}
Note: See TracBrowser for help on using the repository browser.