1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.io.PrintStream; |
---|
23 | import java.util.ArrayList; |
---|
24 | import java.util.Iterator; |
---|
25 | import java.util.Random; |
---|
26 | |
---|
27 | import org.apache.commons.logging.Log; |
---|
28 | import org.apache.commons.logging.LogFactory; |
---|
29 | import org.apache.hadoop.fs.FileSystem; |
---|
30 | import org.apache.hadoop.fs.Path; |
---|
31 | import org.apache.hadoop.io.UTF8; |
---|
32 | import org.apache.hadoop.io.WritableComparable; |
---|
33 | import org.apache.hadoop.io.Text; |
---|
34 | |
---|
35 | /** |
---|
36 | * Runs a job multiple times and takes average of all runs. |
---|
37 | */ |
---|
38 | public class MRBench { |
---|
39 | |
---|
40 | private static final Log LOG = LogFactory.getLog(MRBench.class); |
---|
41 | private static Path BASE_DIR = |
---|
42 | new Path(System.getProperty("test.build.data","/benchmarks/MRBench")); |
---|
43 | private static Path INPUT_DIR = new Path(BASE_DIR, "mr_input"); |
---|
44 | private static Path OUTPUT_DIR = new Path(BASE_DIR, "mr_output"); |
---|
45 | |
---|
46 | public static enum Order {RANDOM, ASCENDING, DESCENDING}; |
---|
47 | |
---|
48 | /** |
---|
49 | * Takes input format as text lines, runs some processing on it and |
---|
50 | * writes out data as text again. |
---|
51 | */ |
---|
52 | public static class Map extends MapReduceBase |
---|
53 | implements Mapper<WritableComparable, Text, UTF8, UTF8> { |
---|
54 | |
---|
55 | public void map(WritableComparable key, Text value, |
---|
56 | OutputCollector<UTF8, UTF8> output, |
---|
57 | Reporter reporter) throws IOException |
---|
58 | { |
---|
59 | String line = value.toString(); |
---|
60 | output.collect(new UTF8(process(line)), new UTF8("")); |
---|
61 | } |
---|
62 | public String process(String line) { |
---|
63 | return line; |
---|
64 | } |
---|
65 | } |
---|
66 | |
---|
67 | /** |
---|
68 | * Ignores the key and writes values to the output. |
---|
69 | */ |
---|
70 | public static class Reduce extends MapReduceBase |
---|
71 | implements Reducer<UTF8, UTF8, UTF8, UTF8> { |
---|
72 | |
---|
73 | public void reduce(UTF8 key, Iterator<UTF8> values, |
---|
74 | OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException |
---|
75 | { |
---|
76 | while(values.hasNext()) { |
---|
77 | output.collect(key, new UTF8(values.next().toString())); |
---|
78 | } |
---|
79 | } |
---|
80 | } |
---|
81 | |
---|
82 | /** |
---|
83 | * Generate a text file on the given filesystem with the given path name. |
---|
84 | * The text file will contain the given number of lines of generated data. |
---|
85 | * The generated data are string representations of numbers. Each line |
---|
86 | * is the same length, which is achieved by padding each number with |
---|
87 | * an appropriate number of leading '0' (zero) characters. The order of |
---|
88 | * generated data is one of ascending, descending, or random. |
---|
89 | */ |
---|
90 | public static void generateTextFile(FileSystem fs, Path inputFile, |
---|
91 | long numLines, Order sortOrder) throws IOException |
---|
92 | { |
---|
93 | LOG.info("creating control file: "+numLines+" numLines, "+sortOrder+" sortOrder"); |
---|
94 | PrintStream output = null; |
---|
95 | try { |
---|
96 | output = new PrintStream(fs.create(inputFile)); |
---|
97 | int padding = String.valueOf(numLines).length(); |
---|
98 | switch(sortOrder) { |
---|
99 | case RANDOM: |
---|
100 | for (long l = 0; l < numLines; l++) { |
---|
101 | output.println(pad((new Random()).nextLong(), padding)); |
---|
102 | } |
---|
103 | break; |
---|
104 | case ASCENDING: |
---|
105 | for (long l = 0; l < numLines; l++) { |
---|
106 | output.println(pad(l, padding)); |
---|
107 | } |
---|
108 | break; |
---|
109 | case DESCENDING: |
---|
110 | for (long l = numLines; l > 0; l--) { |
---|
111 | output.println(pad(l, padding)); |
---|
112 | } |
---|
113 | break; |
---|
114 | } |
---|
115 | } finally { |
---|
116 | if (output != null) |
---|
117 | output.close(); |
---|
118 | } |
---|
119 | LOG.info("created control file: " + inputFile); |
---|
120 | } |
---|
121 | |
---|
122 | /** |
---|
123 | * Convert the given number to a string and pad the number with |
---|
124 | * leading '0' (zero) characters so that the string is exactly |
---|
125 | * the given length. |
---|
126 | */ |
---|
127 | private static String pad(long number, int length) { |
---|
128 | String str = String.valueOf(number); |
---|
129 | StringBuffer value = new StringBuffer(); |
---|
130 | for (int i = str.length(); i < length; i++) { |
---|
131 | value.append("0"); |
---|
132 | } |
---|
133 | value.append(str); |
---|
134 | return value.toString(); |
---|
135 | } |
---|
136 | |
---|
137 | /** |
---|
138 | * Create the job configuration. |
---|
139 | */ |
---|
140 | private static JobConf setupJob(int numMaps, int numReduces, String jarFile) { |
---|
141 | JobConf jobConf = new JobConf(MRBench.class); |
---|
142 | FileInputFormat.addInputPath(jobConf, INPUT_DIR); |
---|
143 | |
---|
144 | jobConf.setInputFormat(TextInputFormat.class); |
---|
145 | jobConf.setOutputFormat(TextOutputFormat.class); |
---|
146 | |
---|
147 | jobConf.setOutputValueClass(UTF8.class); |
---|
148 | |
---|
149 | jobConf.setMapOutputKeyClass(UTF8.class); |
---|
150 | jobConf.setMapOutputValueClass(UTF8.class); |
---|
151 | |
---|
152 | if (null != jarFile) { |
---|
153 | jobConf.setJar(jarFile); |
---|
154 | } |
---|
155 | jobConf.setMapperClass(Map.class); |
---|
156 | jobConf.setReducerClass(Reduce.class); |
---|
157 | |
---|
158 | jobConf.setNumMapTasks(numMaps); |
---|
159 | jobConf.setNumReduceTasks(numReduces); |
---|
160 | |
---|
161 | return jobConf; |
---|
162 | } |
---|
163 | |
---|
164 | /** |
---|
165 | * Runs a MapReduce task, given number of times. The input to each run |
---|
166 | * is the same file. |
---|
167 | */ |
---|
168 | private static ArrayList<Long> runJobInSequence(JobConf masterJobConf, int numRuns) throws IOException { |
---|
169 | Path intrimData = null; |
---|
170 | Random rand = new Random(); |
---|
171 | ArrayList<Long> execTimes = new ArrayList<Long>(); |
---|
172 | |
---|
173 | for (int i = 0; i < numRuns; i++) { |
---|
174 | // create a new job conf every time, reusing same object does not work |
---|
175 | JobConf jobConf = new JobConf(masterJobConf); |
---|
176 | // reset the job jar because the copy constructor doesn't |
---|
177 | jobConf.setJar(masterJobConf.getJar()); |
---|
178 | // give a new random name to output of the mapred tasks |
---|
179 | FileOutputFormat.setOutputPath(jobConf, |
---|
180 | new Path(OUTPUT_DIR, "output_" + rand.nextInt())); |
---|
181 | |
---|
182 | LOG.info("Running job " + i + ":" + |
---|
183 | " input=" + FileInputFormat.getInputPaths(jobConf)[0] + |
---|
184 | " output=" + FileOutputFormat.getOutputPath(jobConf)); |
---|
185 | |
---|
186 | // run the mapred task now |
---|
187 | long curTime = System.currentTimeMillis(); |
---|
188 | JobClient.runJob(jobConf); |
---|
189 | execTimes.add(new Long(System.currentTimeMillis() - curTime)); |
---|
190 | } |
---|
191 | return execTimes; |
---|
192 | } |
---|
193 | |
---|
194 | /** |
---|
195 | * <pre> |
---|
196 | * Usage: mrbench |
---|
197 | * [-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>] |
---|
198 | * [-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] |
---|
199 | * [-numRuns <number of times to run the job, default is 1>] |
---|
200 | * [-maps <number of maps for each run, default is 2>] |
---|
201 | * [-reduces <number of reduces for each run, default is 1>] |
---|
202 | * [-inputLines <number of input lines to generate, default is 1>] |
---|
203 | * [-inputType <type of input to generate, one of ascending (default), descending, random>] |
---|
204 | * [-verbose] |
---|
205 | * </pre> |
---|
206 | */ |
---|
207 | public static void main (String[] args) throws IOException { |
---|
208 | String version = "MRBenchmark.0.0.2"; |
---|
209 | System.out.println(version); |
---|
210 | |
---|
211 | String usage = |
---|
212 | "Usage: mrbench " + |
---|
213 | "[-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>] " + |
---|
214 | "[-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] " + |
---|
215 | "[-numRuns <number of times to run the job, default is 1>] " + |
---|
216 | "[-maps <number of maps for each run, default is 2>] " + |
---|
217 | "[-reduces <number of reduces for each run, default is 1>] " + |
---|
218 | "[-inputLines <number of input lines to generate, default is 1>] " + |
---|
219 | "[-inputType <type of input to generate, one of ascending (default), descending, random>] " + |
---|
220 | "[-verbose]"; |
---|
221 | |
---|
222 | String jarFile = null; |
---|
223 | int inputLines = 1; |
---|
224 | int numRuns = 1; |
---|
225 | int numMaps = 2; |
---|
226 | int numReduces = 1; |
---|
227 | boolean verbose = false; |
---|
228 | Order inputSortOrder = Order.ASCENDING; |
---|
229 | for (int i = 0; i < args.length; i++) { // parse command line |
---|
230 | if (args[i].equals("-jar")) { |
---|
231 | jarFile = args[++i]; |
---|
232 | } else if (args[i].equals("-numRuns")) { |
---|
233 | numRuns = Integer.parseInt(args[++i]); |
---|
234 | } else if (args[i].equals("-baseDir")) { |
---|
235 | BASE_DIR = new Path(args[++i]); |
---|
236 | } else if (args[i].equals("-maps")) { |
---|
237 | numMaps = Integer.parseInt(args[++i]); |
---|
238 | } else if (args[i].equals("-reduces")) { |
---|
239 | numReduces = Integer.parseInt(args[++i]); |
---|
240 | } else if (args[i].equals("-inputLines")) { |
---|
241 | inputLines = Integer.parseInt(args[++i]); |
---|
242 | } else if (args[i].equals("-inputType")) { |
---|
243 | String s = args[++i]; |
---|
244 | if (s.equalsIgnoreCase("ascending")) { |
---|
245 | inputSortOrder = Order.ASCENDING; |
---|
246 | } else if (s.equalsIgnoreCase("descending")) { |
---|
247 | inputSortOrder = Order.DESCENDING; |
---|
248 | } else if (s.equalsIgnoreCase("random")) { |
---|
249 | inputSortOrder = Order.RANDOM; |
---|
250 | } else { |
---|
251 | inputSortOrder = null; |
---|
252 | } |
---|
253 | } else if (args[i].equals("-verbose")) { |
---|
254 | verbose = true; |
---|
255 | } else { |
---|
256 | System.err.println(usage); |
---|
257 | System.exit(-1); |
---|
258 | } |
---|
259 | } |
---|
260 | |
---|
261 | if (numRuns < 1 || // verify args |
---|
262 | numMaps < 1 || |
---|
263 | numReduces < 1 || |
---|
264 | inputLines < 0 || |
---|
265 | inputSortOrder == null) |
---|
266 | { |
---|
267 | System.err.println(usage); |
---|
268 | System.exit(-1); |
---|
269 | } |
---|
270 | |
---|
271 | JobConf jobConf = setupJob(numMaps, numReduces, jarFile); |
---|
272 | FileSystem fs = FileSystem.get(jobConf); |
---|
273 | Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt"); |
---|
274 | generateTextFile(fs, inputFile, inputLines, inputSortOrder); |
---|
275 | |
---|
276 | // setup test output directory |
---|
277 | fs.mkdirs(BASE_DIR); |
---|
278 | ArrayList<Long> execTimes = new ArrayList<Long>(); |
---|
279 | try { |
---|
280 | execTimes = runJobInSequence(jobConf, numRuns); |
---|
281 | } finally { |
---|
282 | // delete output -- should we really do this? |
---|
283 | fs.delete(BASE_DIR, true); |
---|
284 | } |
---|
285 | |
---|
286 | if (verbose) { |
---|
287 | // Print out a report |
---|
288 | System.out.println("Total MapReduce jobs executed: " + numRuns); |
---|
289 | System.out.println("Total lines of data per job: " + inputLines); |
---|
290 | System.out.println("Maps per job: " + numMaps); |
---|
291 | System.out.println("Reduces per job: " + numReduces); |
---|
292 | } |
---|
293 | int i = 0; |
---|
294 | long totalTime = 0; |
---|
295 | for (Long time : execTimes) { |
---|
296 | totalTime += time.longValue(); |
---|
297 | if (verbose) { |
---|
298 | System.out.println("Total milliseconds for task: " + (++i) + |
---|
299 | " = " + time); |
---|
300 | } |
---|
301 | } |
---|
302 | long avgTime = totalTime / numRuns; |
---|
303 | System.out.println("DataLines\tMaps\tReduces\tAvgTime (milliseconds)"); |
---|
304 | System.out.println(inputLines + "\t\t" + numMaps + "\t" + |
---|
305 | numReduces + "\t" + avgTime); |
---|
306 | } |
---|
307 | |
---|
308 | } |
---|