1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.util.Date; |
---|
23 | import java.util.Random; |
---|
24 | |
---|
25 | import org.apache.commons.logging.Log; |
---|
26 | import org.apache.commons.logging.LogFactory; |
---|
27 | import org.apache.hadoop.conf.Configuration; |
---|
28 | import org.apache.hadoop.conf.Configured; |
---|
29 | import org.apache.hadoop.fs.FileSystem; |
---|
30 | import org.apache.hadoop.fs.FileStatus; |
---|
31 | import org.apache.hadoop.fs.Path; |
---|
32 | import org.apache.hadoop.io.BytesWritable; |
---|
33 | import org.apache.hadoop.io.SequenceFile; |
---|
34 | import org.apache.hadoop.io.SequenceFile.CompressionType; |
---|
35 | import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; |
---|
36 | import org.apache.hadoop.mapred.lib.IdentityMapper; |
---|
37 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
---|
38 | import org.apache.hadoop.util.Tool; |
---|
39 | import org.apache.hadoop.util.ToolRunner; |
---|
40 | |
---|
41 | public class BigMapOutput extends Configured implements Tool { |
---|
42 | public static final Log LOG = |
---|
43 | LogFactory.getLog(BigMapOutput.class.getName()); |
---|
44 | private static Random random = new Random(); |
---|
45 | |
---|
46 | private static void randomizeBytes(byte[] data, int offset, int length) { |
---|
47 | for(int i=offset + length - 1; i >= offset; --i) { |
---|
48 | data[i] = (byte) random.nextInt(256); |
---|
49 | } |
---|
50 | } |
---|
51 | |
---|
52 | private static void createBigMapInputFile(Configuration conf, FileSystem fs, |
---|
53 | Path dir, long fileSizeInMB) |
---|
54 | throws IOException { |
---|
55 | // Check if the input path exists and is non-empty |
---|
56 | if (fs.exists(dir)) { |
---|
57 | FileStatus[] list = fs.listStatus(dir); |
---|
58 | if (list != null && list.length > 0) { |
---|
59 | throw new IOException("Input path: " + dir + " already exists... "); |
---|
60 | } |
---|
61 | } |
---|
62 | |
---|
63 | Path file = new Path(dir, "part-0"); |
---|
64 | SequenceFile.Writer writer = |
---|
65 | SequenceFile.createWriter(fs, conf, file, |
---|
66 | BytesWritable.class, BytesWritable.class, |
---|
67 | CompressionType.NONE); |
---|
68 | long numBytesToWrite = fileSizeInMB * 1024 * 1024; |
---|
69 | int minKeySize = conf.getInt("test.bmo.min_key", 10);; |
---|
70 | int keySizeRange = |
---|
71 | conf.getInt("test.bmo.max_key", 1000) - minKeySize; |
---|
72 | int minValueSize = conf.getInt("test.bmo.min_value", 0); |
---|
73 | int valueSizeRange = |
---|
74 | conf.getInt("test.bmo.max_value", 20000) - minValueSize; |
---|
75 | BytesWritable randomKey = new BytesWritable(); |
---|
76 | BytesWritable randomValue = new BytesWritable(); |
---|
77 | |
---|
78 | LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " + |
---|
79 | "minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange + |
---|
80 | " minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange); |
---|
81 | long start = System.currentTimeMillis(); |
---|
82 | while (numBytesToWrite > 0) { |
---|
83 | int keyLength = minKeySize + |
---|
84 | (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); |
---|
85 | randomKey.setSize(keyLength); |
---|
86 | randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength()); |
---|
87 | int valueLength = minValueSize + |
---|
88 | (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); |
---|
89 | randomValue.setSize(valueLength); |
---|
90 | randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength()); |
---|
91 | writer.append(randomKey, randomValue); |
---|
92 | numBytesToWrite -= keyLength + valueLength; |
---|
93 | } |
---|
94 | writer.close(); |
---|
95 | long end = System.currentTimeMillis(); |
---|
96 | |
---|
97 | LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " + |
---|
98 | (end-start)/1000 + "secs"); |
---|
99 | } |
---|
100 | |
---|
101 | private static void usage() { |
---|
102 | System.err.println("BigMapOutput -input <input-dir> -output <output-dir> " + |
---|
103 | "[-create <filesize in MB>]"); |
---|
104 | ToolRunner.printGenericCommandUsage(System.err); |
---|
105 | System.exit(1); |
---|
106 | } |
---|
107 | public int run(String[] args) throws Exception { |
---|
108 | if (args.length < 4) { //input-dir should contain a huge file ( > 2GB) |
---|
109 | usage(); |
---|
110 | } |
---|
111 | Path bigMapInput = null; |
---|
112 | Path outputPath = null; |
---|
113 | boolean createInput = false; |
---|
114 | long fileSizeInMB = 3 * 1024; // default of 3GB (>2GB) |
---|
115 | for(int i=0; i < args.length; ++i) { |
---|
116 | if ("-input".equals(args[i])){ |
---|
117 | bigMapInput = new Path(args[++i]); |
---|
118 | } else if ("-output".equals(args[i])){ |
---|
119 | outputPath = new Path(args[++i]); |
---|
120 | } else if ("-create".equals(args[i])) { |
---|
121 | createInput = true; |
---|
122 | fileSizeInMB = Long.parseLong(args[++i]); |
---|
123 | } else { |
---|
124 | usage(); |
---|
125 | } |
---|
126 | } |
---|
127 | |
---|
128 | FileSystem fs = FileSystem.get(getConf()); |
---|
129 | JobConf jobConf = new JobConf(getConf(), BigMapOutput.class); |
---|
130 | |
---|
131 | jobConf.setJobName("BigMapOutput"); |
---|
132 | jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); |
---|
133 | jobConf.setOutputFormat(SequenceFileOutputFormat.class); |
---|
134 | FileInputFormat.setInputPaths(jobConf, bigMapInput); |
---|
135 | if (fs.exists(outputPath)) { |
---|
136 | fs.delete(outputPath, true); |
---|
137 | } |
---|
138 | FileOutputFormat.setOutputPath(jobConf, outputPath); |
---|
139 | jobConf.setMapperClass(IdentityMapper.class); |
---|
140 | jobConf.setReducerClass(IdentityReducer.class); |
---|
141 | jobConf.setOutputKeyClass(BytesWritable.class); |
---|
142 | jobConf.setOutputValueClass(BytesWritable.class); |
---|
143 | |
---|
144 | if (createInput) { |
---|
145 | createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB); |
---|
146 | } |
---|
147 | |
---|
148 | Date startTime = new Date(); |
---|
149 | System.out.println("Job started: " + startTime); |
---|
150 | JobClient.runJob(jobConf); |
---|
151 | Date end_time = new Date(); |
---|
152 | System.out.println("Job ended: " + end_time); |
---|
153 | |
---|
154 | return 0; |
---|
155 | } |
---|
156 | |
---|
157 | public static void main(String argv[]) throws Exception { |
---|
158 | int res = ToolRunner.run(new Configuration(), new BigMapOutput(), argv); |
---|
159 | System.exit(res); |
---|
160 | } |
---|
161 | |
---|
162 | } |
---|