1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import java.io.BufferedReader; |
---|
21 | import java.io.BufferedWriter; |
---|
22 | import java.io.DataInputStream; |
---|
23 | import java.io.DataOutputStream; |
---|
24 | import java.io.IOException; |
---|
25 | import java.io.InputStreamReader; |
---|
26 | import java.io.OutputStreamWriter; |
---|
27 | import java.util.EnumSet; |
---|
28 | import java.util.Iterator; |
---|
29 | import java.util.Random; |
---|
30 | |
---|
31 | import junit.framework.TestCase; |
---|
32 | |
---|
33 | import org.apache.hadoop.conf.Configuration; |
---|
34 | import org.apache.hadoop.fs.FileStatus; |
---|
35 | import org.apache.hadoop.fs.FileSystem; |
---|
36 | import org.apache.hadoop.fs.Path; |
---|
37 | import org.apache.hadoop.io.IntWritable; |
---|
38 | import org.apache.hadoop.io.LongWritable; |
---|
39 | import org.apache.hadoop.io.NullWritable; |
---|
40 | import org.apache.hadoop.io.SequenceFile; |
---|
41 | import org.apache.hadoop.io.Text; |
---|
42 | import org.apache.hadoop.io.WritableComparable; |
---|
43 | import org.apache.hadoop.io.SequenceFile.CompressionType; |
---|
44 | import org.apache.hadoop.mapred.lib.IdentityMapper; |
---|
45 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
---|
46 | |
---|
47 | /********************************************************** |
---|
48 | * MapredLoadTest generates a bunch of work that exercises |
---|
49 | * a Hadoop Map-Reduce system (and DFS, too). It goes through |
---|
50 | * the following steps: |
---|
51 | * |
---|
52 | * 1) Take inputs 'range' and 'counts'. |
---|
53 | * 2) Generate 'counts' random integers between 0 and range-1. |
---|
54 | * 3) Create a file that lists each integer between 0 and range-1, |
---|
55 | * and lists the number of times that integer was generated. |
---|
56 | * 4) Emit a (very large) file that contains all the integers |
---|
57 | * in the order generated. |
---|
58 | * 5) After the file has been generated, read it back and count |
---|
59 | * how many times each int was generated. |
---|
60 | * 6) Compare this big count-map against the original one. If |
---|
61 | * they match, then SUCCESS! Otherwise, FAILURE! |
---|
62 | * |
---|
63 | * OK, that's how we can think about it. What are the map-reduce |
---|
64 | * steps that get the job done? |
---|
65 | * |
---|
66 | * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. |
---|
67 | * 2) In a non-mapread thread, generate the answer-key and write to disk. |
---|
68 | * 3) In a mapred job, divide the answer key into K jobs. |
---|
69 | * 4) A mapred 'generator' task consists of K map jobs. Each reads |
---|
70 | * an individual "sub-key", and generates integers according to |
---|
71 | * to it (though with a random ordering). |
---|
72 | * 5) The generator's reduce task agglomerates all of those files |
---|
73 | * into a single one. |
---|
74 | * 6) A mapred 'reader' task consists of M map jobs. The output |
---|
75 | * file is cut into M pieces. Each of the M jobs counts the |
---|
76 | * individual ints in its chunk and creates a map of all seen ints. |
---|
77 | * 7) A mapred job integrates all the count files into a single one. |
---|
78 | * |
---|
79 | **********************************************************/ |
---|
80 | public class TestMapRed extends TestCase { |
---|
81 | /** |
---|
82 | * Modified to make it a junit test. |
---|
83 | * The RandomGen Job does the actual work of creating |
---|
84 | * a huge file of assorted numbers. It receives instructions |
---|
85 | * as to how many times each number should be counted. Then |
---|
86 | * it emits those numbers in a crazy order. |
---|
87 | * |
---|
88 | * The map() function takes a key/val pair that describes |
---|
89 | * a value-to-be-emitted (the key) and how many times it |
---|
90 | * should be emitted (the value), aka "numtimes". map() then |
---|
91 | * emits a series of intermediate key/val pairs. It emits |
---|
92 | * 'numtimes' of these. The key is a random number and the |
---|
93 | * value is the 'value-to-be-emitted'. |
---|
94 | * |
---|
95 | * The system collates and merges these pairs according to |
---|
96 | * the random number. reduce() function takes in a key/value |
---|
97 | * pair that consists of a crazy random number and a series |
---|
98 | * of values that should be emitted. The random number key |
---|
99 | * is now dropped, and reduce() emits a pair for every intermediate value. |
---|
100 | * The emitted key is an intermediate value. The emitted value |
---|
101 | * is just a blank string. Thus, we've created a huge file |
---|
102 | * of numbers in random order, but where each number appears |
---|
103 | * as many times as we were instructed. |
---|
104 | */ |
---|
105 | static class RandomGenMapper |
---|
106 | implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> { |
---|
107 | |
---|
108 | public void configure(JobConf job) { |
---|
109 | } |
---|
110 | |
---|
111 | public void map(IntWritable key, IntWritable val, |
---|
112 | OutputCollector<IntWritable, IntWritable> out, |
---|
113 | Reporter reporter) throws IOException { |
---|
114 | int randomVal = key.get(); |
---|
115 | int randomCount = val.get(); |
---|
116 | |
---|
117 | for (int i = 0; i < randomCount; i++) { |
---|
118 | out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal)); |
---|
119 | } |
---|
120 | } |
---|
121 | public void close() { |
---|
122 | } |
---|
123 | } |
---|
124 | /** |
---|
125 | */ |
---|
126 | static class RandomGenReducer |
---|
127 | implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { |
---|
128 | |
---|
129 | public void configure(JobConf job) { |
---|
130 | } |
---|
131 | |
---|
132 | public void reduce(IntWritable key, Iterator<IntWritable> it, |
---|
133 | OutputCollector<IntWritable, IntWritable> out, |
---|
134 | Reporter reporter) throws IOException { |
---|
135 | while (it.hasNext()) { |
---|
136 | out.collect(it.next(), null); |
---|
137 | } |
---|
138 | } |
---|
139 | public void close() { |
---|
140 | } |
---|
141 | } |
---|
142 | |
---|
143 | /** |
---|
144 | * The RandomCheck Job does a lot of our work. It takes |
---|
145 | * in a num/string keyspace, and transforms it into a |
---|
146 | * key/count(int) keyspace. |
---|
147 | * |
---|
148 | * The map() function just emits a num/1 pair for every |
---|
149 | * num/string input pair. |
---|
150 | * |
---|
151 | * The reduce() function sums up all the 1s that were |
---|
152 | * emitted for a single key. It then emits the key/total |
---|
153 | * pair. |
---|
154 | * |
---|
155 | * This is used to regenerate the random number "answer key". |
---|
156 | * Each key here is a random number, and the count is the |
---|
157 | * number of times the number was emitted. |
---|
158 | */ |
---|
159 | static class RandomCheckMapper |
---|
160 | implements Mapper<WritableComparable, Text, IntWritable, IntWritable> { |
---|
161 | |
---|
162 | public void configure(JobConf job) { |
---|
163 | } |
---|
164 | |
---|
165 | public void map(WritableComparable key, Text val, |
---|
166 | OutputCollector<IntWritable, IntWritable> out, |
---|
167 | Reporter reporter) throws IOException { |
---|
168 | out.collect(new IntWritable(Integer.parseInt(val.toString().trim())), new IntWritable(1)); |
---|
169 | } |
---|
170 | public void close() { |
---|
171 | } |
---|
172 | } |
---|
173 | /** |
---|
174 | */ |
---|
175 | static class RandomCheckReducer |
---|
176 | implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { |
---|
177 | public void configure(JobConf job) { |
---|
178 | } |
---|
179 | |
---|
180 | public void reduce(IntWritable key, Iterator<IntWritable> it, |
---|
181 | OutputCollector<IntWritable, IntWritable> out, |
---|
182 | Reporter reporter) throws IOException { |
---|
183 | int keyint = key.get(); |
---|
184 | int count = 0; |
---|
185 | while (it.hasNext()) { |
---|
186 | it.next(); |
---|
187 | count++; |
---|
188 | } |
---|
189 | out.collect(new IntWritable(keyint), new IntWritable(count)); |
---|
190 | } |
---|
191 | public void close() { |
---|
192 | } |
---|
193 | } |
---|
194 | |
---|
195 | /** |
---|
196 | * The Merge Job is a really simple one. It takes in |
---|
197 | * an int/int key-value set, and emits the same set. |
---|
198 | * But it merges identical keys by adding their values. |
---|
199 | * |
---|
200 | * Thus, the map() function is just the identity function |
---|
201 | * and reduce() just sums. Nothing to see here! |
---|
202 | */ |
---|
203 | static class MergeMapper |
---|
204 | implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> { |
---|
205 | |
---|
206 | public void configure(JobConf job) { |
---|
207 | } |
---|
208 | |
---|
209 | public void map(IntWritable key, IntWritable val, |
---|
210 | OutputCollector<IntWritable, IntWritable> out, |
---|
211 | Reporter reporter) throws IOException { |
---|
212 | int keyint = key.get(); |
---|
213 | int valint = val.get(); |
---|
214 | |
---|
215 | out.collect(new IntWritable(keyint), new IntWritable(valint)); |
---|
216 | } |
---|
217 | public void close() { |
---|
218 | } |
---|
219 | } |
---|
220 | static class MergeReducer |
---|
221 | implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { |
---|
222 | public void configure(JobConf job) { |
---|
223 | } |
---|
224 | |
---|
225 | public void reduce(IntWritable key, Iterator<IntWritable> it, |
---|
226 | OutputCollector<IntWritable, IntWritable> out, |
---|
227 | Reporter reporter) throws IOException { |
---|
228 | int keyint = key.get(); |
---|
229 | int total = 0; |
---|
230 | while (it.hasNext()) { |
---|
231 | total += it.next().get(); |
---|
232 | } |
---|
233 | out.collect(new IntWritable(keyint), new IntWritable(total)); |
---|
234 | } |
---|
235 | public void close() { |
---|
236 | } |
---|
237 | } |
---|
238 | |
---|
239 | private static int range = 10; |
---|
240 | private static int counts = 100; |
---|
241 | private static Random r = new Random(); |
---|
242 | |
---|
243 | /** |
---|
244 | public TestMapRed(int range, int counts, Configuration conf) throws IOException { |
---|
245 | this.range = range; |
---|
246 | this.counts = counts; |
---|
247 | this.conf = conf; |
---|
248 | } |
---|
249 | **/ |
---|
250 | |
---|
251 | public void testMapred() throws Exception { |
---|
252 | launch(); |
---|
253 | } |
---|
254 | |
---|
255 | private static class MyMap |
---|
256 | implements Mapper<WritableComparable, Text, Text, Text> { |
---|
257 | |
---|
258 | public void configure(JobConf conf) { |
---|
259 | } |
---|
260 | |
---|
261 | public void map(WritableComparable key, Text value, |
---|
262 | OutputCollector<Text, Text> output, |
---|
263 | Reporter reporter) throws IOException { |
---|
264 | String str = value.toString().toLowerCase(); |
---|
265 | output.collect(new Text(str), value); |
---|
266 | } |
---|
267 | |
---|
268 | public void close() throws IOException { |
---|
269 | } |
---|
270 | } |
---|
271 | |
---|
272 | private static class MyReduce extends IdentityReducer { |
---|
273 | private JobConf conf; |
---|
274 | private boolean compressInput; |
---|
275 | private TaskAttemptID taskId; |
---|
276 | private boolean first = true; |
---|
277 | |
---|
278 | @Override |
---|
279 | public void configure(JobConf conf) { |
---|
280 | this.conf = conf; |
---|
281 | compressInput = conf.getCompressMapOutput(); |
---|
282 | taskId = TaskAttemptID.forName(conf.get("mapred.task.id")); |
---|
283 | } |
---|
284 | |
---|
285 | public void reduce(WritableComparable key, Iterator values, |
---|
286 | OutputCollector output, Reporter reporter |
---|
287 | ) throws IOException { |
---|
288 | if (first) { |
---|
289 | first = false; |
---|
290 | MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID()); |
---|
291 | mapOutputFile.setConf(conf); |
---|
292 | Path input = mapOutputFile.getInputFile(0, taskId); |
---|
293 | FileSystem fs = FileSystem.get(conf); |
---|
294 | assertTrue("reduce input exists " + input, fs.exists(input)); |
---|
295 | SequenceFile.Reader rdr = |
---|
296 | new SequenceFile.Reader(fs, input, conf); |
---|
297 | assertEquals("is reduce input compressed " + input, |
---|
298 | compressInput, |
---|
299 | rdr.isCompressed()); |
---|
300 | rdr.close(); |
---|
301 | } |
---|
302 | } |
---|
303 | |
---|
304 | } |
---|
305 | |
---|
306 | private static class BadPartitioner |
---|
307 | implements Partitioner<LongWritable,Text> { |
---|
308 | boolean low; |
---|
309 | public void configure(JobConf conf) { |
---|
310 | low = conf.getBoolean("test.testmapred.badpartition", true); |
---|
311 | } |
---|
312 | public int getPartition(LongWritable k, Text v, int numPartitions) { |
---|
313 | return low ? -1 : numPartitions; |
---|
314 | } |
---|
315 | } |
---|
316 | |
---|
317 | public void testPartitioner() throws Exception { |
---|
318 | JobConf conf = new JobConf(TestMapRed.class); |
---|
319 | conf.setPartitionerClass(BadPartitioner.class); |
---|
320 | FileSystem fs = FileSystem.getLocal(conf); |
---|
321 | Path testdir = new Path( |
---|
322 | System.getProperty("test.build.data","/tmp")).makeQualified(fs); |
---|
323 | Path inFile = new Path(testdir, "blah/blah"); |
---|
324 | DataOutputStream f = fs.create(inFile); |
---|
325 | f.writeBytes("blah blah blah\n"); |
---|
326 | f.close(); |
---|
327 | FileInputFormat.setInputPaths(conf, inFile); |
---|
328 | FileOutputFormat.setOutputPath(conf, new Path(testdir, "out")); |
---|
329 | conf.setMapperClass(IdentityMapper.class); |
---|
330 | conf.setReducerClass(IdentityReducer.class); |
---|
331 | conf.setOutputKeyClass(LongWritable.class); |
---|
332 | conf.setOutputValueClass(Text.class); |
---|
333 | |
---|
334 | // partition too low |
---|
335 | conf.setBoolean("test.testmapred.badpartition", true); |
---|
336 | boolean pass = true; |
---|
337 | try { |
---|
338 | JobClient.runJob(conf); |
---|
339 | } catch (IOException e) { |
---|
340 | pass = false; |
---|
341 | } |
---|
342 | assertFalse("should fail for partition < 0", pass); |
---|
343 | |
---|
344 | // partition too high |
---|
345 | conf.setBoolean("test.testmapred.badpartition", false); |
---|
346 | pass = true; |
---|
347 | try { |
---|
348 | JobClient.runJob(conf); |
---|
349 | } catch (IOException e) { |
---|
350 | pass = false; |
---|
351 | } |
---|
352 | assertFalse("should fail for partition >= numPartitions", pass); |
---|
353 | } |
---|
354 | |
---|
355 | public static class NullMapper |
---|
356 | implements Mapper<NullWritable,Text,NullWritable,Text> { |
---|
357 | public void map(NullWritable key, Text val, |
---|
358 | OutputCollector<NullWritable,Text> output, Reporter reporter) |
---|
359 | throws IOException { |
---|
360 | output.collect(NullWritable.get(), val); |
---|
361 | } |
---|
362 | public void configure(JobConf conf) { } |
---|
363 | public void close() { } |
---|
364 | } |
---|
365 | |
---|
366 | public void testNullKeys() throws Exception { |
---|
367 | JobConf conf = new JobConf(TestMapRed.class); |
---|
368 | FileSystem fs = FileSystem.getLocal(conf); |
---|
369 | Path testdir = new Path( |
---|
370 | System.getProperty("test.build.data","/tmp")).makeQualified(fs); |
---|
371 | fs.delete(testdir, true); |
---|
372 | Path inFile = new Path(testdir, "nullin/blah"); |
---|
373 | SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile, |
---|
374 | NullWritable.class, Text.class, SequenceFile.CompressionType.NONE); |
---|
375 | Text t = new Text(); |
---|
376 | t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t); |
---|
377 | t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t); |
---|
378 | t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t); |
---|
379 | t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t); |
---|
380 | t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t); |
---|
381 | t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t); |
---|
382 | t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t); |
---|
383 | t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t); |
---|
384 | w.close(); |
---|
385 | FileInputFormat.setInputPaths(conf, inFile); |
---|
386 | FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout")); |
---|
387 | conf.setMapperClass(NullMapper.class); |
---|
388 | conf.setReducerClass(IdentityReducer.class); |
---|
389 | conf.setOutputKeyClass(NullWritable.class); |
---|
390 | conf.setOutputValueClass(Text.class); |
---|
391 | conf.setInputFormat(SequenceFileInputFormat.class); |
---|
392 | conf.setOutputFormat(SequenceFileOutputFormat.class); |
---|
393 | conf.setNumReduceTasks(1); |
---|
394 | |
---|
395 | JobClient.runJob(conf); |
---|
396 | |
---|
397 | SequenceFile.Reader r = new SequenceFile.Reader(fs, |
---|
398 | new Path(testdir, "nullout/part-00000"), conf); |
---|
399 | String m = "AAAAAAAAAAAAAA"; |
---|
400 | for (int i = 1; r.next(NullWritable.get(), t); ++i) { |
---|
401 | assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString())); |
---|
402 | m = m.replace((char)('A' + i - 1), (char)('A' + i)); |
---|
403 | } |
---|
404 | } |
---|
405 | |
---|
406 | private void checkCompression(boolean compressMapOutputs, |
---|
407 | CompressionType redCompression, |
---|
408 | boolean includeCombine |
---|
409 | ) throws Exception { |
---|
410 | JobConf conf = new JobConf(TestMapRed.class); |
---|
411 | Path testdir = new Path("build/test/test.mapred.compress"); |
---|
412 | Path inDir = new Path(testdir, "in"); |
---|
413 | Path outDir = new Path(testdir, "out"); |
---|
414 | FileSystem fs = FileSystem.get(conf); |
---|
415 | fs.delete(testdir, true); |
---|
416 | FileInputFormat.setInputPaths(conf, inDir); |
---|
417 | FileOutputFormat.setOutputPath(conf, outDir); |
---|
418 | conf.setMapperClass(MyMap.class); |
---|
419 | conf.setReducerClass(MyReduce.class); |
---|
420 | conf.setOutputKeyClass(Text.class); |
---|
421 | conf.setOutputValueClass(Text.class); |
---|
422 | conf.setOutputFormat(SequenceFileOutputFormat.class); |
---|
423 | if (includeCombine) { |
---|
424 | conf.setCombinerClass(IdentityReducer.class); |
---|
425 | } |
---|
426 | conf.setCompressMapOutput(compressMapOutputs); |
---|
427 | SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression); |
---|
428 | try { |
---|
429 | if (!fs.mkdirs(testdir)) { |
---|
430 | throw new IOException("Mkdirs failed to create " + testdir.toString()); |
---|
431 | } |
---|
432 | if (!fs.mkdirs(inDir)) { |
---|
433 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
434 | } |
---|
435 | Path inFile = new Path(inDir, "part0"); |
---|
436 | DataOutputStream f = fs.create(inFile); |
---|
437 | f.writeBytes("Owen was here\n"); |
---|
438 | f.writeBytes("Hadoop is fun\n"); |
---|
439 | f.writeBytes("Is this done, yet?\n"); |
---|
440 | f.close(); |
---|
441 | RunningJob rj = JobClient.runJob(conf); |
---|
442 | assertTrue("job was complete", rj.isComplete()); |
---|
443 | assertTrue("job was successful", rj.isSuccessful()); |
---|
444 | Path output = new Path(outDir, |
---|
445 | Task.getOutputName(0)); |
---|
446 | assertTrue("reduce output exists " + output, fs.exists(output)); |
---|
447 | SequenceFile.Reader rdr = |
---|
448 | new SequenceFile.Reader(fs, output, conf); |
---|
449 | assertEquals("is reduce output compressed " + output, |
---|
450 | redCompression != CompressionType.NONE, |
---|
451 | rdr.isCompressed()); |
---|
452 | rdr.close(); |
---|
453 | } finally { |
---|
454 | fs.delete(testdir, true); |
---|
455 | } |
---|
456 | } |
---|
457 | |
---|
458 | public void testCompression() throws Exception { |
---|
459 | EnumSet<SequenceFile.CompressionType> seq = |
---|
460 | EnumSet.allOf(SequenceFile.CompressionType.class); |
---|
461 | for (CompressionType redCompression : seq) { |
---|
462 | for(int combine=0; combine < 2; ++combine) { |
---|
463 | checkCompression(false, redCompression, combine == 1); |
---|
464 | checkCompression(true, redCompression, combine == 1); |
---|
465 | } |
---|
466 | } |
---|
467 | } |
---|
468 | |
---|
469 | |
---|
470 | /** |
---|
471 | * |
---|
472 | */ |
---|
473 | public static void launch() throws Exception { |
---|
474 | // |
---|
475 | // Generate distribution of ints. This is the answer key. |
---|
476 | // |
---|
477 | JobConf conf = new JobConf(TestMapRed.class); |
---|
478 | int countsToGo = counts; |
---|
479 | int dist[] = new int[range]; |
---|
480 | for (int i = 0; i < range; i++) { |
---|
481 | double avgInts = (1.0 * countsToGo) / (range - i); |
---|
482 | dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian()))); |
---|
483 | countsToGo -= dist[i]; |
---|
484 | } |
---|
485 | if (countsToGo > 0) { |
---|
486 | dist[dist.length-1] += countsToGo; |
---|
487 | } |
---|
488 | |
---|
489 | // |
---|
490 | // Write the answer key to a file. |
---|
491 | // |
---|
492 | FileSystem fs = FileSystem.get(conf); |
---|
493 | Path testdir = new Path("mapred.loadtest"); |
---|
494 | if (!fs.mkdirs(testdir)) { |
---|
495 | throw new IOException("Mkdirs failed to create " + testdir.toString()); |
---|
496 | } |
---|
497 | |
---|
498 | Path randomIns = new Path(testdir, "genins"); |
---|
499 | if (!fs.mkdirs(randomIns)) { |
---|
500 | throw new IOException("Mkdirs failed to create " + randomIns.toString()); |
---|
501 | } |
---|
502 | |
---|
503 | Path answerkey = new Path(randomIns, "answer.key"); |
---|
504 | SequenceFile.Writer out = |
---|
505 | SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class, |
---|
506 | IntWritable.class, |
---|
507 | SequenceFile.CompressionType.NONE); |
---|
508 | try { |
---|
509 | for (int i = 0; i < range; i++) { |
---|
510 | out.append(new IntWritable(i), new IntWritable(dist[i])); |
---|
511 | } |
---|
512 | } finally { |
---|
513 | out.close(); |
---|
514 | } |
---|
515 | //printFiles(randomIns, conf); |
---|
516 | |
---|
517 | // |
---|
518 | // Now we need to generate the random numbers according to |
---|
519 | // the above distribution. |
---|
520 | // |
---|
521 | // We create a lot of map tasks, each of which takes at least |
---|
522 | // one "line" of the distribution. (That is, a certain number |
---|
523 | // X is to be generated Y number of times.) |
---|
524 | // |
---|
525 | // A map task emits Y key/val pairs. The val is X. The key |
---|
526 | // is a randomly-generated number. |
---|
527 | // |
---|
528 | // The reduce task gets its input sorted by key. That is, sorted |
---|
529 | // in random order. It then emits a single line of text that |
---|
530 | // for the given values. It does not emit the key. |
---|
531 | // |
---|
532 | // Because there's just one reduce task, we emit a single big |
---|
533 | // file of random numbers. |
---|
534 | // |
---|
535 | Path randomOuts = new Path(testdir, "genouts"); |
---|
536 | fs.delete(randomOuts, true); |
---|
537 | |
---|
538 | |
---|
539 | JobConf genJob = new JobConf(conf, TestMapRed.class); |
---|
540 | FileInputFormat.setInputPaths(genJob, randomIns); |
---|
541 | genJob.setInputFormat(SequenceFileInputFormat.class); |
---|
542 | genJob.setMapperClass(RandomGenMapper.class); |
---|
543 | |
---|
544 | FileOutputFormat.setOutputPath(genJob, randomOuts); |
---|
545 | genJob.setOutputKeyClass(IntWritable.class); |
---|
546 | genJob.setOutputValueClass(IntWritable.class); |
---|
547 | genJob.setOutputFormat(TextOutputFormat.class); |
---|
548 | genJob.setReducerClass(RandomGenReducer.class); |
---|
549 | genJob.setNumReduceTasks(1); |
---|
550 | |
---|
551 | JobClient.runJob(genJob); |
---|
552 | //printFiles(randomOuts, conf); |
---|
553 | |
---|
554 | // |
---|
555 | // Next, we read the big file in and regenerate the |
---|
556 | // original map. It's split into a number of parts. |
---|
557 | // (That number is 'intermediateReduces'.) |
---|
558 | // |
---|
559 | // We have many map tasks, each of which read at least one |
---|
560 | // of the output numbers. For each number read in, the |
---|
561 | // map task emits a key/value pair where the key is the |
---|
562 | // number and the value is "1". |
---|
563 | // |
---|
564 | // We have a single reduce task, which receives its input |
---|
565 | // sorted by the key emitted above. For each key, there will |
---|
566 | // be a certain number of "1" values. The reduce task sums |
---|
567 | // these values to compute how many times the given key was |
---|
568 | // emitted. |
---|
569 | // |
---|
570 | // The reduce task then emits a key/val pair where the key |
---|
571 | // is the number in question, and the value is the number of |
---|
572 | // times the key was emitted. This is the same format as the |
---|
573 | // original answer key (except that numbers emitted zero times |
---|
574 | // will not appear in the regenerated key.) The answer set |
---|
575 | // is split into a number of pieces. A final MapReduce job |
---|
576 | // will merge them. |
---|
577 | // |
---|
578 | // There's not really a need to go to 10 reduces here |
---|
579 | // instead of 1. But we want to test what happens when |
---|
580 | // you have multiple reduces at once. |
---|
581 | // |
---|
582 | int intermediateReduces = 10; |
---|
583 | Path intermediateOuts = new Path(testdir, "intermediateouts"); |
---|
584 | fs.delete(intermediateOuts, true); |
---|
585 | JobConf checkJob = new JobConf(conf, TestMapRed.class); |
---|
586 | FileInputFormat.setInputPaths(checkJob, randomOuts); |
---|
587 | checkJob.setInputFormat(TextInputFormat.class); |
---|
588 | checkJob.setMapperClass(RandomCheckMapper.class); |
---|
589 | |
---|
590 | FileOutputFormat.setOutputPath(checkJob, intermediateOuts); |
---|
591 | checkJob.setOutputKeyClass(IntWritable.class); |
---|
592 | checkJob.setOutputValueClass(IntWritable.class); |
---|
593 | checkJob.setOutputFormat(MapFileOutputFormat.class); |
---|
594 | checkJob.setReducerClass(RandomCheckReducer.class); |
---|
595 | checkJob.setNumReduceTasks(intermediateReduces); |
---|
596 | |
---|
597 | JobClient.runJob(checkJob); |
---|
598 | //printFiles(intermediateOuts, conf); |
---|
599 | |
---|
600 | // |
---|
601 | // OK, now we take the output from the last job and |
---|
602 | // merge it down to a single file. The map() and reduce() |
---|
603 | // functions don't really do anything except reemit tuples. |
---|
604 | // But by having a single reduce task here, we end up merging |
---|
605 | // all the files. |
---|
606 | // |
---|
607 | Path finalOuts = new Path(testdir, "finalouts"); |
---|
608 | fs.delete(finalOuts, true); |
---|
609 | JobConf mergeJob = new JobConf(conf, TestMapRed.class); |
---|
610 | FileInputFormat.setInputPaths(mergeJob, intermediateOuts); |
---|
611 | mergeJob.setInputFormat(SequenceFileInputFormat.class); |
---|
612 | mergeJob.setMapperClass(MergeMapper.class); |
---|
613 | |
---|
614 | FileOutputFormat.setOutputPath(mergeJob, finalOuts); |
---|
615 | mergeJob.setOutputKeyClass(IntWritable.class); |
---|
616 | mergeJob.setOutputValueClass(IntWritable.class); |
---|
617 | mergeJob.setOutputFormat(SequenceFileOutputFormat.class); |
---|
618 | mergeJob.setReducerClass(MergeReducer.class); |
---|
619 | mergeJob.setNumReduceTasks(1); |
---|
620 | |
---|
621 | JobClient.runJob(mergeJob); |
---|
622 | //printFiles(finalOuts, conf); |
---|
623 | |
---|
624 | // |
---|
625 | // Finally, we compare the reconstructed answer key with the |
---|
626 | // original one. Remember, we need to ignore zero-count items |
---|
627 | // in the original key. |
---|
628 | // |
---|
629 | boolean success = true; |
---|
630 | Path recomputedkey = new Path(finalOuts, "part-00000"); |
---|
631 | SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf); |
---|
632 | int totalseen = 0; |
---|
633 | try { |
---|
634 | IntWritable key = new IntWritable(); |
---|
635 | IntWritable val = new IntWritable(); |
---|
636 | for (int i = 0; i < range; i++) { |
---|
637 | if (dist[i] == 0) { |
---|
638 | continue; |
---|
639 | } |
---|
640 | if (!in.next(key, val)) { |
---|
641 | System.err.println("Cannot read entry " + i); |
---|
642 | success = false; |
---|
643 | break; |
---|
644 | } else { |
---|
645 | if (!((key.get() == i) && (val.get() == dist[i]))) { |
---|
646 | System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + |
---|
647 | ", val=" + val.get() + ", dist[i]=" + dist[i]); |
---|
648 | success = false; |
---|
649 | } |
---|
650 | totalseen += val.get(); |
---|
651 | } |
---|
652 | } |
---|
653 | if (success) { |
---|
654 | if (in.next(key, val)) { |
---|
655 | System.err.println("Unnecessary lines in recomputed key!"); |
---|
656 | success = false; |
---|
657 | } |
---|
658 | } |
---|
659 | } finally { |
---|
660 | in.close(); |
---|
661 | } |
---|
662 | int originalTotal = 0; |
---|
663 | for (int i = 0; i < dist.length; i++) { |
---|
664 | originalTotal += dist[i]; |
---|
665 | } |
---|
666 | System.out.println("Original sum: " + originalTotal); |
---|
667 | System.out.println("Recomputed sum: " + totalseen); |
---|
668 | |
---|
669 | // |
---|
670 | // Write to "results" whether the test succeeded or not. |
---|
671 | // |
---|
672 | Path resultFile = new Path(testdir, "results"); |
---|
673 | BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile))); |
---|
674 | try { |
---|
675 | bw.write("Success=" + success + "\n"); |
---|
676 | System.out.println("Success=" + success); |
---|
677 | } finally { |
---|
678 | bw.close(); |
---|
679 | } |
---|
680 | assertTrue("testMapRed failed", success); |
---|
681 | fs.delete(testdir, true); |
---|
682 | } |
---|
683 | |
---|
684 | private static void printTextFile(FileSystem fs, Path p) throws IOException { |
---|
685 | BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p))); |
---|
686 | String line; |
---|
687 | while ((line = in.readLine()) != null) { |
---|
688 | System.out.println(" Row: " + line); |
---|
689 | } |
---|
690 | in.close(); |
---|
691 | } |
---|
692 | |
---|
693 | private static void printSequenceFile(FileSystem fs, Path p, |
---|
694 | Configuration conf) throws IOException { |
---|
695 | SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf); |
---|
696 | Object key = null; |
---|
697 | Object value = null; |
---|
698 | while ((key = r.next(key)) != null) { |
---|
699 | value = r.getCurrentValue(value); |
---|
700 | System.out.println(" Row: " + key + ", " + value); |
---|
701 | } |
---|
702 | r.close(); |
---|
703 | } |
---|
704 | |
---|
705 | private static boolean isSequenceFile(FileSystem fs, |
---|
706 | Path f) throws IOException { |
---|
707 | DataInputStream in = fs.open(f); |
---|
708 | byte[] seq = "SEQ".getBytes(); |
---|
709 | for(int i=0; i < seq.length; ++i) { |
---|
710 | if (seq[i] != in.read()) { |
---|
711 | return false; |
---|
712 | } |
---|
713 | } |
---|
714 | return true; |
---|
715 | } |
---|
716 | |
---|
717 | private static void printFiles(Path dir, |
---|
718 | Configuration conf) throws IOException { |
---|
719 | FileSystem fs = dir.getFileSystem(conf); |
---|
720 | for(FileStatus f: fs.listStatus(dir)) { |
---|
721 | System.out.println("Reading " + f.getPath() + ": "); |
---|
722 | if (f.isDir()) { |
---|
723 | System.out.println(" it is a map file."); |
---|
724 | printSequenceFile(fs, new Path(f.getPath(), "data"), conf); |
---|
725 | } else if (isSequenceFile(fs, f.getPath())) { |
---|
726 | System.out.println(" it is a sequence file."); |
---|
727 | printSequenceFile(fs, f.getPath(), conf); |
---|
728 | } else { |
---|
729 | System.out.println(" it is a text file."); |
---|
730 | printTextFile(fs, f.getPath()); |
---|
731 | } |
---|
732 | } |
---|
733 | } |
---|
734 | |
---|
735 | /** |
---|
736 | * Launches all the tasks in order. |
---|
737 | */ |
---|
738 | public static void main(String[] argv) throws Exception { |
---|
739 | if (argv.length < 2) { |
---|
740 | System.err.println("Usage: TestMapRed <range> <counts>"); |
---|
741 | System.err.println(); |
---|
742 | System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>"); |
---|
743 | return; |
---|
744 | } |
---|
745 | |
---|
746 | int i = 0; |
---|
747 | range = Integer.parseInt(argv[i++]); |
---|
748 | counts = Integer.parseInt(argv[i++]); |
---|
749 | launch(); |
---|
750 | } |
---|
751 | |
---|
752 | public void testSmallInput(){ |
---|
753 | runJob(100); |
---|
754 | } |
---|
755 | |
---|
756 | public void testBiggerInput(){ |
---|
757 | runJob(1000); |
---|
758 | } |
---|
759 | |
---|
760 | public void runJob(int items) { |
---|
761 | try { |
---|
762 | JobConf conf = new JobConf(TestMapRed.class); |
---|
763 | Path testdir = new Path("build/test/test.mapred.spill"); |
---|
764 | Path inDir = new Path(testdir, "in"); |
---|
765 | Path outDir = new Path(testdir, "out"); |
---|
766 | FileSystem fs = FileSystem.get(conf); |
---|
767 | fs.delete(testdir, true); |
---|
768 | conf.setInt("io.sort.mb", 1); |
---|
769 | conf.setInputFormat(SequenceFileInputFormat.class); |
---|
770 | FileInputFormat.setInputPaths(conf, inDir); |
---|
771 | FileOutputFormat.setOutputPath(conf, outDir); |
---|
772 | conf.setMapperClass(IdentityMapper.class); |
---|
773 | conf.setReducerClass(IdentityReducer.class); |
---|
774 | conf.setOutputKeyClass(Text.class); |
---|
775 | conf.setOutputValueClass(Text.class); |
---|
776 | conf.setOutputFormat(SequenceFileOutputFormat.class); |
---|
777 | if (!fs.mkdirs(testdir)) { |
---|
778 | throw new IOException("Mkdirs failed to create " + testdir.toString()); |
---|
779 | } |
---|
780 | if (!fs.mkdirs(inDir)) { |
---|
781 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
782 | } |
---|
783 | Path inFile = new Path(inDir, "part0"); |
---|
784 | SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, |
---|
785 | Text.class, Text.class); |
---|
786 | |
---|
787 | StringBuffer content = new StringBuffer(); |
---|
788 | |
---|
789 | for (int i = 0; i < 1000; i++) { |
---|
790 | content.append(i).append(": This is one more line of content\n"); |
---|
791 | } |
---|
792 | |
---|
793 | Text text = new Text(content.toString()); |
---|
794 | |
---|
795 | for (int i = 0; i < items; i++) { |
---|
796 | writer.append(new Text("rec:" + i), text); |
---|
797 | } |
---|
798 | writer.close(); |
---|
799 | |
---|
800 | JobClient.runJob(conf); |
---|
801 | } catch (Exception e) { |
---|
802 | fail("Threw exception:" + e); |
---|
803 | } |
---|
804 | } |
---|
805 | } |
---|