[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.record; |
---|
| 20 | |
---|
| 21 | import org.apache.hadoop.mapred.*; |
---|
| 22 | import org.apache.hadoop.fs.*; |
---|
| 23 | import org.apache.hadoop.io.*; |
---|
| 24 | import org.apache.hadoop.io.SequenceFile.CompressionType; |
---|
| 25 | import org.apache.hadoop.conf.*; |
---|
| 26 | import junit.framework.TestCase; |
---|
| 27 | import java.io.*; |
---|
| 28 | import java.util.*; |
---|
| 29 | |
---|
| 30 | |
---|
| 31 | /********************************************************** |
---|
| 32 | * MapredLoadTest generates a bunch of work that exercises |
---|
| 33 | * a Hadoop Map-Reduce system (and DFS, too). It goes through |
---|
| 34 | * the following steps: |
---|
| 35 | * |
---|
| 36 | * 1) Take inputs 'range' and 'counts'. |
---|
| 37 | * 2) Generate 'counts' random integers between 0 and range-1. |
---|
| 38 | * 3) Create a file that lists each integer between 0 and range-1, |
---|
| 39 | * and lists the number of times that integer was generated. |
---|
| 40 | * 4) Emit a (very large) file that contains all the integers |
---|
| 41 | * in the order generated. |
---|
| 42 | * 5) After the file has been generated, read it back and count |
---|
| 43 | * how many times each int was generated. |
---|
| 44 | * 6) Compare this big count-map against the original one. If |
---|
| 45 | * they match, then SUCCESS! Otherwise, FAILURE! |
---|
| 46 | * |
---|
| 47 | * OK, that's how we can think about it. What are the map-reduce |
---|
| 48 | * steps that get the job done? |
---|
| 49 | * |
---|
| 50 | * 1) In a non-mapred thread, take the inputs 'range' and 'counts'. |
---|
| 51 | * 2) In a non-mapread thread, generate the answer-key and write to disk. |
---|
| 52 | * 3) In a mapred job, divide the answer key into K jobs. |
---|
| 53 | * 4) A mapred 'generator' task consists of K map jobs. Each reads |
---|
| 54 | * an individual "sub-key", and generates integers according to |
---|
| 55 | * to it (though with a random ordering). |
---|
| 56 | * 5) The generator's reduce task agglomerates all of those files |
---|
| 57 | * into a single one. |
---|
| 58 | * 6) A mapred 'reader' task consists of M map jobs. The output |
---|
| 59 | * file is cut into M pieces. Each of the M jobs counts the |
---|
| 60 | * individual ints in its chunk and creates a map of all seen ints. |
---|
| 61 | * 7) A mapred job integrates all the count files into a single one. |
---|
| 62 | * |
---|
| 63 | **********************************************************/ |
---|
| 64 | public class TestRecordMR extends TestCase { |
---|
| 65 | /** |
---|
| 66 | * Modified to make it a junit test. |
---|
| 67 | * The RandomGen Job does the actual work of creating |
---|
| 68 | * a huge file of assorted numbers. It receives instructions |
---|
| 69 | * as to how many times each number should be counted. Then |
---|
| 70 | * it emits those numbers in a crazy order. |
---|
| 71 | * |
---|
| 72 | * The map() function takes a key/val pair that describes |
---|
| 73 | * a value-to-be-emitted (the key) and how many times it |
---|
| 74 | * should be emitted (the value), aka "numtimes". map() then |
---|
| 75 | * emits a series of intermediate key/val pairs. It emits |
---|
| 76 | * 'numtimes' of these. The key is a random number and the |
---|
| 77 | * value is the 'value-to-be-emitted'. |
---|
| 78 | * |
---|
| 79 | * The system collates and merges these pairs according to |
---|
| 80 | * the random number. reduce() function takes in a key/value |
---|
| 81 | * pair that consists of a crazy random number and a series |
---|
| 82 | * of values that should be emitted. The random number key |
---|
| 83 | * is now dropped, and reduce() emits a pair for every intermediate value. |
---|
| 84 | * The emitted key is an intermediate value. The emitted value |
---|
| 85 | * is just a blank string. Thus, we've created a huge file |
---|
| 86 | * of numbers in random order, but where each number appears |
---|
| 87 | * as many times as we were instructed. |
---|
| 88 | */ |
---|
| 89 | static public class RandomGenMapper implements Mapper<RecInt, RecInt, |
---|
| 90 | RecInt, RecString> { |
---|
| 91 | Random r = new Random(); |
---|
| 92 | public void configure(JobConf job) { |
---|
| 93 | } |
---|
| 94 | |
---|
| 95 | public void map(RecInt key, |
---|
| 96 | RecInt val, |
---|
| 97 | OutputCollector<RecInt, RecString> out, |
---|
| 98 | Reporter reporter) throws IOException { |
---|
| 99 | int randomVal = key.getData(); |
---|
| 100 | int randomCount = val.getData(); |
---|
| 101 | |
---|
| 102 | for (int i = 0; i < randomCount; i++) { |
---|
| 103 | out.collect(new RecInt(Math.abs(r.nextInt())), |
---|
| 104 | new RecString(Integer.toString(randomVal))); |
---|
| 105 | } |
---|
| 106 | } |
---|
| 107 | public void close() { |
---|
| 108 | } |
---|
| 109 | } |
---|
| 110 | /** |
---|
| 111 | */ |
---|
| 112 | static public class RandomGenReducer implements Reducer<RecInt, RecString, |
---|
| 113 | RecInt, RecString> { |
---|
| 114 | public void configure(JobConf job) { |
---|
| 115 | } |
---|
| 116 | |
---|
| 117 | public void reduce(RecInt key, |
---|
| 118 | Iterator<RecString> it, |
---|
| 119 | OutputCollector<RecInt, RecString> out, |
---|
| 120 | Reporter reporter) throws IOException { |
---|
| 121 | int keyint = key.getData(); |
---|
| 122 | while (it.hasNext()) { |
---|
| 123 | String val = it.next().getData(); |
---|
| 124 | out.collect(new RecInt(Integer.parseInt(val)), |
---|
| 125 | new RecString("")); |
---|
| 126 | } |
---|
| 127 | } |
---|
| 128 | public void close() { |
---|
| 129 | } |
---|
| 130 | } |
---|
| 131 | |
---|
| 132 | /** |
---|
| 133 | * The RandomCheck Job does a lot of our work. It takes |
---|
| 134 | * in a num/string keyspace, and transforms it into a |
---|
| 135 | * key/count(int) keyspace. |
---|
| 136 | * |
---|
| 137 | * The map() function just emits a num/1 pair for every |
---|
| 138 | * num/string input pair. |
---|
| 139 | * |
---|
| 140 | * The reduce() function sums up all the 1s that were |
---|
| 141 | * emitted for a single key. It then emits the key/total |
---|
| 142 | * pair. |
---|
| 143 | * |
---|
| 144 | * This is used to regenerate the random number "answer key". |
---|
| 145 | * Each key here is a random number, and the count is the |
---|
| 146 | * number of times the number was emitted. |
---|
| 147 | */ |
---|
| 148 | static public class RandomCheckMapper implements Mapper<RecInt, RecString, |
---|
| 149 | RecInt, RecString> { |
---|
| 150 | public void configure(JobConf job) { |
---|
| 151 | } |
---|
| 152 | |
---|
| 153 | public void map(RecInt key, |
---|
| 154 | RecString val, |
---|
| 155 | OutputCollector<RecInt, RecString> out, |
---|
| 156 | Reporter reporter) throws IOException { |
---|
| 157 | int pos = key.getData(); |
---|
| 158 | String str = val.getData(); |
---|
| 159 | out.collect(new RecInt(pos), new RecString("1")); |
---|
| 160 | } |
---|
| 161 | public void close() { |
---|
| 162 | } |
---|
| 163 | } |
---|
| 164 | /** |
---|
| 165 | */ |
---|
| 166 | static public class RandomCheckReducer implements Reducer<RecInt, RecString, |
---|
| 167 | RecInt, RecString> { |
---|
| 168 | public void configure(JobConf job) { |
---|
| 169 | } |
---|
| 170 | |
---|
| 171 | public void reduce(RecInt key, |
---|
| 172 | Iterator<RecString> it, |
---|
| 173 | OutputCollector<RecInt, RecString> out, |
---|
| 174 | Reporter reporter) throws IOException { |
---|
| 175 | int keyint = key.getData(); |
---|
| 176 | int count = 0; |
---|
| 177 | while (it.hasNext()) { |
---|
| 178 | it.next(); |
---|
| 179 | count++; |
---|
| 180 | } |
---|
| 181 | out.collect(new RecInt(keyint), new RecString(Integer.toString(count))); |
---|
| 182 | } |
---|
| 183 | public void close() { |
---|
| 184 | } |
---|
| 185 | } |
---|
| 186 | |
---|
| 187 | /** |
---|
| 188 | * The Merge Job is a really simple one. It takes in |
---|
| 189 | * an int/int key-value set, and emits the same set. |
---|
| 190 | * But it merges identical keys by adding their values. |
---|
| 191 | * |
---|
| 192 | * Thus, the map() function is just the identity function |
---|
| 193 | * and reduce() just sums. Nothing to see here! |
---|
| 194 | */ |
---|
| 195 | static public class MergeMapper implements Mapper<RecInt, RecString, |
---|
| 196 | RecInt, RecInt> { |
---|
| 197 | public void configure(JobConf job) { |
---|
| 198 | } |
---|
| 199 | |
---|
| 200 | public void map(RecInt key, |
---|
| 201 | RecString val, |
---|
| 202 | OutputCollector<RecInt, RecInt> out, |
---|
| 203 | Reporter reporter) throws IOException { |
---|
| 204 | int keyint = key.getData(); |
---|
| 205 | String valstr = val.getData(); |
---|
| 206 | out.collect(new RecInt(keyint), new RecInt(Integer.parseInt(valstr))); |
---|
| 207 | } |
---|
| 208 | public void close() { |
---|
| 209 | } |
---|
| 210 | } |
---|
| 211 | static public class MergeReducer implements Reducer<RecInt, RecInt, |
---|
| 212 | RecInt, RecInt> { |
---|
| 213 | public void configure(JobConf job) { |
---|
| 214 | } |
---|
| 215 | |
---|
| 216 | public void reduce(RecInt key, |
---|
| 217 | Iterator<RecInt> it, |
---|
| 218 | OutputCollector<RecInt, RecInt> out, |
---|
| 219 | Reporter reporter) throws IOException { |
---|
| 220 | int keyint = key.getData(); |
---|
| 221 | int total = 0; |
---|
| 222 | while (it.hasNext()) { |
---|
| 223 | total += it.next().getData(); |
---|
| 224 | } |
---|
| 225 | out.collect(new RecInt(keyint), new RecInt(total)); |
---|
| 226 | } |
---|
| 227 | public void close() { |
---|
| 228 | } |
---|
| 229 | } |
---|
| 230 | |
---|
| 231 | private static int range = 10; |
---|
| 232 | private static int counts = 100; |
---|
| 233 | private static Random r = new Random(); |
---|
| 234 | private static Configuration conf = new Configuration(); |
---|
| 235 | |
---|
| 236 | public void testMapred() throws Exception { |
---|
| 237 | launch(); |
---|
| 238 | } |
---|
| 239 | |
---|
| 240 | /** |
---|
| 241 | * |
---|
| 242 | */ |
---|
| 243 | public static void launch() throws Exception { |
---|
| 244 | // |
---|
| 245 | // Generate distribution of ints. This is the answer key. |
---|
| 246 | // |
---|
| 247 | int countsToGo = counts; |
---|
| 248 | int dist[] = new int[range]; |
---|
| 249 | for (int i = 0; i < range; i++) { |
---|
| 250 | double avgInts = (1.0 * countsToGo) / (range - i); |
---|
| 251 | dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian()))); |
---|
| 252 | countsToGo -= dist[i]; |
---|
| 253 | } |
---|
| 254 | if (countsToGo > 0) { |
---|
| 255 | dist[dist.length-1] += countsToGo; |
---|
| 256 | } |
---|
| 257 | |
---|
| 258 | // |
---|
| 259 | // Write the answer key to a file. |
---|
| 260 | // |
---|
| 261 | FileSystem fs = FileSystem.get(conf); |
---|
| 262 | Path testdir = new Path("mapred.loadtest"); |
---|
| 263 | if (!fs.mkdirs(testdir)) { |
---|
| 264 | throw new IOException("Mkdirs failed to create directory " + testdir.toString()); |
---|
| 265 | } |
---|
| 266 | |
---|
| 267 | Path randomIns = new Path(testdir, "genins"); |
---|
| 268 | if (!fs.mkdirs(randomIns)) { |
---|
| 269 | throw new IOException("Mkdirs failed to create directory " + randomIns.toString()); |
---|
| 270 | } |
---|
| 271 | |
---|
| 272 | Path answerkey = new Path(randomIns, "answer.key"); |
---|
| 273 | SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, |
---|
| 274 | answerkey, RecInt.class, RecInt.class, |
---|
| 275 | CompressionType.NONE); |
---|
| 276 | try { |
---|
| 277 | for (int i = 0; i < range; i++) { |
---|
| 278 | RecInt k = new RecInt(); |
---|
| 279 | RecInt v = new RecInt(); |
---|
| 280 | k.setData(i); |
---|
| 281 | v.setData(dist[i]); |
---|
| 282 | out.append(k, v); |
---|
| 283 | } |
---|
| 284 | } finally { |
---|
| 285 | out.close(); |
---|
| 286 | } |
---|
| 287 | |
---|
| 288 | // |
---|
| 289 | // Now we need to generate the random numbers according to |
---|
| 290 | // the above distribution. |
---|
| 291 | // |
---|
| 292 | // We create a lot of map tasks, each of which takes at least |
---|
| 293 | // one "line" of the distribution. (That is, a certain number |
---|
| 294 | // X is to be generated Y number of times.) |
---|
| 295 | // |
---|
| 296 | // A map task emits Y key/val pairs. The val is X. The key |
---|
| 297 | // is a randomly-generated number. |
---|
| 298 | // |
---|
| 299 | // The reduce task gets its input sorted by key. That is, sorted |
---|
| 300 | // in random order. It then emits a single line of text that |
---|
| 301 | // for the given values. It does not emit the key. |
---|
| 302 | // |
---|
| 303 | // Because there's just one reduce task, we emit a single big |
---|
| 304 | // file of random numbers. |
---|
| 305 | // |
---|
| 306 | Path randomOuts = new Path(testdir, "genouts"); |
---|
| 307 | fs.delete(randomOuts, true); |
---|
| 308 | |
---|
| 309 | |
---|
| 310 | JobConf genJob = new JobConf(conf, TestRecordMR.class); |
---|
| 311 | FileInputFormat.setInputPaths(genJob, randomIns); |
---|
| 312 | genJob.setInputFormat(SequenceFileInputFormat.class); |
---|
| 313 | genJob.setMapperClass(RandomGenMapper.class); |
---|
| 314 | |
---|
| 315 | FileOutputFormat.setOutputPath(genJob, randomOuts); |
---|
| 316 | genJob.setOutputKeyClass(RecInt.class); |
---|
| 317 | genJob.setOutputValueClass(RecString.class); |
---|
| 318 | genJob.setOutputFormat(SequenceFileOutputFormat.class); |
---|
| 319 | genJob.setReducerClass(RandomGenReducer.class); |
---|
| 320 | genJob.setNumReduceTasks(1); |
---|
| 321 | |
---|
| 322 | JobClient.runJob(genJob); |
---|
| 323 | |
---|
| 324 | // |
---|
| 325 | // Next, we read the big file in and regenerate the |
---|
| 326 | // original map. It's split into a number of parts. |
---|
| 327 | // (That number is 'intermediateReduces'.) |
---|
| 328 | // |
---|
| 329 | // We have many map tasks, each of which read at least one |
---|
| 330 | // of the output numbers. For each number read in, the |
---|
| 331 | // map task emits a key/value pair where the key is the |
---|
| 332 | // number and the value is "1". |
---|
| 333 | // |
---|
| 334 | // We have a single reduce task, which receives its input |
---|
| 335 | // sorted by the key emitted above. For each key, there will |
---|
| 336 | // be a certain number of "1" values. The reduce task sums |
---|
| 337 | // these values to compute how many times the given key was |
---|
| 338 | // emitted. |
---|
| 339 | // |
---|
| 340 | // The reduce task then emits a key/val pair where the key |
---|
| 341 | // is the number in question, and the value is the number of |
---|
| 342 | // times the key was emitted. This is the same format as the |
---|
| 343 | // original answer key (except that numbers emitted zero times |
---|
| 344 | // will not appear in the regenerated key.) The answer set |
---|
| 345 | // is split into a number of pieces. A final MapReduce job |
---|
| 346 | // will merge them. |
---|
| 347 | // |
---|
| 348 | // There's not really a need to go to 10 reduces here |
---|
| 349 | // instead of 1. But we want to test what happens when |
---|
| 350 | // you have multiple reduces at once. |
---|
| 351 | // |
---|
| 352 | int intermediateReduces = 10; |
---|
| 353 | Path intermediateOuts = new Path(testdir, "intermediateouts"); |
---|
| 354 | fs.delete(intermediateOuts, true); |
---|
| 355 | JobConf checkJob = new JobConf(conf, TestRecordMR.class); |
---|
| 356 | FileInputFormat.setInputPaths(checkJob, randomOuts); |
---|
| 357 | checkJob.setInputFormat(SequenceFileInputFormat.class); |
---|
| 358 | checkJob.setMapperClass(RandomCheckMapper.class); |
---|
| 359 | |
---|
| 360 | FileOutputFormat.setOutputPath(checkJob, intermediateOuts); |
---|
| 361 | checkJob.setOutputKeyClass(RecInt.class); |
---|
| 362 | checkJob.setOutputValueClass(RecString.class); |
---|
| 363 | checkJob.setOutputFormat(SequenceFileOutputFormat.class); |
---|
| 364 | checkJob.setReducerClass(RandomCheckReducer.class); |
---|
| 365 | checkJob.setNumReduceTasks(intermediateReduces); |
---|
| 366 | |
---|
| 367 | JobClient.runJob(checkJob); |
---|
| 368 | |
---|
| 369 | // |
---|
| 370 | // OK, now we take the output from the last job and |
---|
| 371 | // merge it down to a single file. The map() and reduce() |
---|
| 372 | // functions don't really do anything except reemit tuples. |
---|
| 373 | // But by having a single reduce task here, we end up merging |
---|
| 374 | // all the files. |
---|
| 375 | // |
---|
| 376 | Path finalOuts = new Path(testdir, "finalouts"); |
---|
| 377 | fs.delete(finalOuts, true); |
---|
| 378 | JobConf mergeJob = new JobConf(conf, TestRecordMR.class); |
---|
| 379 | FileInputFormat.setInputPaths(mergeJob, intermediateOuts); |
---|
| 380 | mergeJob.setInputFormat(SequenceFileInputFormat.class); |
---|
| 381 | mergeJob.setMapperClass(MergeMapper.class); |
---|
| 382 | |
---|
| 383 | FileOutputFormat.setOutputPath(mergeJob, finalOuts); |
---|
| 384 | mergeJob.setOutputKeyClass(RecInt.class); |
---|
| 385 | mergeJob.setOutputValueClass(RecInt.class); |
---|
| 386 | mergeJob.setOutputFormat(SequenceFileOutputFormat.class); |
---|
| 387 | mergeJob.setReducerClass(MergeReducer.class); |
---|
| 388 | mergeJob.setNumReduceTasks(1); |
---|
| 389 | |
---|
| 390 | JobClient.runJob(mergeJob); |
---|
| 391 | |
---|
| 392 | |
---|
| 393 | // |
---|
| 394 | // Finally, we compare the reconstructed answer key with the |
---|
| 395 | // original one. Remember, we need to ignore zero-count items |
---|
| 396 | // in the original key. |
---|
| 397 | // |
---|
| 398 | boolean success = true; |
---|
| 399 | Path recomputedkey = new Path(finalOuts, "part-00000"); |
---|
| 400 | SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf); |
---|
| 401 | int totalseen = 0; |
---|
| 402 | try { |
---|
| 403 | RecInt key = new RecInt(); |
---|
| 404 | RecInt val = new RecInt(); |
---|
| 405 | for (int i = 0; i < range; i++) { |
---|
| 406 | if (dist[i] == 0) { |
---|
| 407 | continue; |
---|
| 408 | } |
---|
| 409 | if (!in.next(key, val)) { |
---|
| 410 | System.err.println("Cannot read entry " + i); |
---|
| 411 | success = false; |
---|
| 412 | break; |
---|
| 413 | } else { |
---|
| 414 | if (!((key.getData() == i) && (val.getData() == dist[i]))) { |
---|
| 415 | System.err.println("Mismatch! Pos=" + key.getData() + ", i=" + i + ", val=" + val.getData() + ", dist[i]=" + dist[i]); |
---|
| 416 | success = false; |
---|
| 417 | } |
---|
| 418 | totalseen += val.getData(); |
---|
| 419 | } |
---|
| 420 | } |
---|
| 421 | if (success) { |
---|
| 422 | if (in.next(key, val)) { |
---|
| 423 | System.err.println("Unnecessary lines in recomputed key!"); |
---|
| 424 | success = false; |
---|
| 425 | } |
---|
| 426 | } |
---|
| 427 | } finally { |
---|
| 428 | in.close(); |
---|
| 429 | } |
---|
| 430 | int originalTotal = 0; |
---|
| 431 | for (int i = 0; i < dist.length; i++) { |
---|
| 432 | originalTotal += dist[i]; |
---|
| 433 | } |
---|
| 434 | System.out.println("Original sum: " + originalTotal); |
---|
| 435 | System.out.println("Recomputed sum: " + totalseen); |
---|
| 436 | |
---|
| 437 | // |
---|
| 438 | // Write to "results" whether the test succeeded or not. |
---|
| 439 | // |
---|
| 440 | Path resultFile = new Path(testdir, "results"); |
---|
| 441 | BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile))); |
---|
| 442 | try { |
---|
| 443 | bw.write("Success=" + success + "\n"); |
---|
| 444 | System.out.println("Success=" + success); |
---|
| 445 | } finally { |
---|
| 446 | bw.close(); |
---|
| 447 | } |
---|
| 448 | fs.delete(testdir, true); |
---|
| 449 | } |
---|
| 450 | |
---|
| 451 | /** |
---|
| 452 | * Launches all the tasks in order. |
---|
| 453 | */ |
---|
| 454 | public static void main(String[] argv) throws Exception { |
---|
| 455 | if (argv.length < 2) { |
---|
| 456 | System.err.println("Usage: TestRecordMR <range> <counts>"); |
---|
| 457 | System.err.println(); |
---|
| 458 | System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>"); |
---|
| 459 | return; |
---|
| 460 | } |
---|
| 461 | |
---|
| 462 | int i = 0; |
---|
| 463 | int range = Integer.parseInt(argv[i++]); |
---|
| 464 | int counts = Integer.parseInt(argv[i++]); |
---|
| 465 | launch(); |
---|
| 466 | } |
---|
| 467 | } |
---|