[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.mapred; |
---|
| 20 | |
---|
| 21 | import java.io.IOException; |
---|
| 22 | import java.util.Date; |
---|
| 23 | import java.util.Random; |
---|
| 24 | |
---|
| 25 | import org.apache.commons.logging.Log; |
---|
| 26 | import org.apache.commons.logging.LogFactory; |
---|
| 27 | import org.apache.hadoop.conf.Configuration; |
---|
| 28 | import org.apache.hadoop.conf.Configured; |
---|
| 29 | import org.apache.hadoop.fs.FileSystem; |
---|
| 30 | import org.apache.hadoop.fs.FileStatus; |
---|
| 31 | import org.apache.hadoop.fs.Path; |
---|
| 32 | import org.apache.hadoop.io.BytesWritable; |
---|
| 33 | import org.apache.hadoop.io.SequenceFile; |
---|
| 34 | import org.apache.hadoop.io.SequenceFile.CompressionType; |
---|
| 35 | import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; |
---|
| 36 | import org.apache.hadoop.mapred.lib.IdentityMapper; |
---|
| 37 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
---|
| 38 | import org.apache.hadoop.util.Tool; |
---|
| 39 | import org.apache.hadoop.util.ToolRunner; |
---|
| 40 | |
---|
| 41 | public class BigMapOutput extends Configured implements Tool { |
---|
| 42 | public static final Log LOG = |
---|
| 43 | LogFactory.getLog(BigMapOutput.class.getName()); |
---|
| 44 | private static Random random = new Random(); |
---|
| 45 | |
---|
| 46 | private static void randomizeBytes(byte[] data, int offset, int length) { |
---|
| 47 | for(int i=offset + length - 1; i >= offset; --i) { |
---|
| 48 | data[i] = (byte) random.nextInt(256); |
---|
| 49 | } |
---|
| 50 | } |
---|
| 51 | |
---|
| 52 | private static void createBigMapInputFile(Configuration conf, FileSystem fs, |
---|
| 53 | Path dir, long fileSizeInMB) |
---|
| 54 | throws IOException { |
---|
| 55 | // Check if the input path exists and is non-empty |
---|
| 56 | if (fs.exists(dir)) { |
---|
| 57 | FileStatus[] list = fs.listStatus(dir); |
---|
| 58 | if (list != null && list.length > 0) { |
---|
| 59 | throw new IOException("Input path: " + dir + " already exists... "); |
---|
| 60 | } |
---|
| 61 | } |
---|
| 62 | |
---|
| 63 | Path file = new Path(dir, "part-0"); |
---|
| 64 | SequenceFile.Writer writer = |
---|
| 65 | SequenceFile.createWriter(fs, conf, file, |
---|
| 66 | BytesWritable.class, BytesWritable.class, |
---|
| 67 | CompressionType.NONE); |
---|
| 68 | long numBytesToWrite = fileSizeInMB * 1024 * 1024; |
---|
| 69 | int minKeySize = conf.getInt("test.bmo.min_key", 10);; |
---|
| 70 | int keySizeRange = |
---|
| 71 | conf.getInt("test.bmo.max_key", 1000) - minKeySize; |
---|
| 72 | int minValueSize = conf.getInt("test.bmo.min_value", 0); |
---|
| 73 | int valueSizeRange = |
---|
| 74 | conf.getInt("test.bmo.max_value", 20000) - minValueSize; |
---|
| 75 | BytesWritable randomKey = new BytesWritable(); |
---|
| 76 | BytesWritable randomValue = new BytesWritable(); |
---|
| 77 | |
---|
| 78 | LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " + |
---|
| 79 | "minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange + |
---|
| 80 | " minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange); |
---|
| 81 | long start = System.currentTimeMillis(); |
---|
| 82 | while (numBytesToWrite > 0) { |
---|
| 83 | int keyLength = minKeySize + |
---|
| 84 | (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); |
---|
| 85 | randomKey.setSize(keyLength); |
---|
| 86 | randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); |
---|
| 87 | int valueLength = minValueSize + |
---|
| 88 | (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); |
---|
| 89 | randomValue.setSize(valueLength); |
---|
| 90 | randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); |
---|
| 91 | writer.append(randomKey, randomValue); |
---|
| 92 | numBytesToWrite -= keyLength + valueLength; |
---|
| 93 | } |
---|
| 94 | writer.close(); |
---|
| 95 | long end = System.currentTimeMillis(); |
---|
| 96 | |
---|
| 97 | LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " + |
---|
| 98 | (end-start)/1000 + "secs"); |
---|
| 99 | } |
---|
| 100 | |
---|
| 101 | private static void usage() { |
---|
| 102 | System.err.println("BigMapOutput -input <input-dir> -output <output-dir> " + |
---|
| 103 | "[-create <filesize in MB>]"); |
---|
| 104 | ToolRunner.printGenericCommandUsage(System.err); |
---|
| 105 | System.exit(1); |
---|
| 106 | } |
---|
| 107 | public int run(String[] args) throws Exception { |
---|
| 108 | if (args.length < 4) { //input-dir should contain a huge file ( > 2GB) |
---|
| 109 | usage(); |
---|
| 110 | } |
---|
| 111 | Path bigMapInput = null; |
---|
| 112 | Path outputPath = null; |
---|
| 113 | boolean createInput = false; |
---|
| 114 | long fileSizeInMB = 3 * 1024; // default of 3GB (>2GB) |
---|
| 115 | for(int i=0; i < args.length; ++i) { |
---|
| 116 | if ("-input".equals(args[i])){ |
---|
| 117 | bigMapInput = new Path(args[++i]); |
---|
| 118 | } else if ("-output".equals(args[i])){ |
---|
| 119 | outputPath = new Path(args[++i]); |
---|
| 120 | } else if ("-create".equals(args[i])) { |
---|
| 121 | createInput = true; |
---|
| 122 | fileSizeInMB = Long.parseLong(args[++i]); |
---|
| 123 | } else { |
---|
| 124 | usage(); |
---|
| 125 | } |
---|
| 126 | } |
---|
| 127 | |
---|
| 128 | FileSystem fs = FileSystem.get(getConf()); |
---|
| 129 | JobConf jobConf = new JobConf(getConf(), BigMapOutput.class); |
---|
| 130 | |
---|
| 131 | jobConf.setJobName("BigMapOutput"); |
---|
| 132 | jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); |
---|
| 133 | jobConf.setOutputFormat(SequenceFileOutputFormat.class); |
---|
| 134 | FileInputFormat.setInputPaths(jobConf, bigMapInput); |
---|
| 135 | if (fs.exists(outputPath)) { |
---|
| 136 | fs.delete(outputPath, true); |
---|
| 137 | } |
---|
| 138 | FileOutputFormat.setOutputPath(jobConf, outputPath); |
---|
| 139 | jobConf.setMapperClass(IdentityMapper.class); |
---|
| 140 | jobConf.setReducerClass(IdentityReducer.class); |
---|
| 141 | jobConf.setOutputKeyClass(BytesWritable.class); |
---|
| 142 | jobConf.setOutputValueClass(BytesWritable.class); |
---|
| 143 | |
---|
| 144 | if (createInput) { |
---|
| 145 | createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB); |
---|
| 146 | } |
---|
| 147 | |
---|
| 148 | Date startTime = new Date(); |
---|
| 149 | System.out.println("Job started: " + startTime); |
---|
| 150 | JobClient.runJob(jobConf); |
---|
| 151 | Date end_time = new Date(); |
---|
| 152 | System.out.println("Job ended: " + end_time); |
---|
| 153 | |
---|
| 154 | return 0; |
---|
| 155 | } |
---|
| 156 | |
---|
| 157 | public static void main(String argv[]) throws Exception { |
---|
| 158 | int res = ToolRunner.run(new Configuration(), new BigMapOutput(), argv); |
---|
| 159 | System.exit(res); |
---|
| 160 | } |
---|
| 161 | |
---|
| 162 | } |
---|