1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.io.DataInput; |
---|
22 | import java.io.DataOutput; |
---|
23 | import java.io.DataOutputStream; |
---|
24 | import java.io.File; |
---|
25 | import java.io.IOException; |
---|
26 | import java.util.Iterator; |
---|
27 | |
---|
28 | import junit.framework.TestCase; |
---|
29 | |
---|
30 | import org.apache.hadoop.conf.Configuration; |
---|
31 | import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator; |
---|
32 | import org.apache.hadoop.examples.SecondarySort.FirstPartitioner; |
---|
33 | import org.apache.hadoop.examples.SecondarySort.IntPair; |
---|
34 | import org.apache.hadoop.examples.SecondarySort; |
---|
35 | import org.apache.hadoop.examples.WordCount; |
---|
36 | import org.apache.hadoop.fs.FileSystem; |
---|
37 | import org.apache.hadoop.fs.Path; |
---|
38 | import org.apache.hadoop.io.IntWritable; |
---|
39 | import org.apache.hadoop.io.Text; |
---|
40 | import org.apache.hadoop.io.Writable; |
---|
41 | import org.apache.hadoop.io.WritableComparable; |
---|
42 | import org.apache.hadoop.io.WritableUtils; |
---|
43 | import org.apache.hadoop.mapred.MRCaching.TestResult; |
---|
44 | import org.apache.hadoop.mapreduce.Job; |
---|
45 | import org.apache.hadoop.mapreduce.TestMapReduceLocal; |
---|
46 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
---|
47 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
---|
48 | import org.apache.hadoop.util.Progressable; |
---|
49 | |
---|
50 | /** |
---|
51 | * A JUnit test to test min map-reduce cluster with local file system. |
---|
52 | */ |
---|
53 | public class TestMiniMRLocalFS extends TestCase { |
---|
54 | private static String TEST_ROOT_DIR = |
---|
55 | new File(System.getProperty("test.build.data","/tmp")) |
---|
56 | .toURI().toString().replace(' ', '+'); |
---|
57 | |
---|
58 | public void testWithLocal() |
---|
59 | throws IOException, InterruptedException, ClassNotFoundException { |
---|
60 | MiniMRCluster mr = null; |
---|
61 | try { |
---|
62 | mr = new MiniMRCluster(2, "file:///", 3); |
---|
63 | TestMiniMRWithDFS.runPI(mr, mr.createJobConf()); |
---|
64 | |
---|
65 | // run the wordcount example with caching |
---|
66 | JobConf job = mr.createJobConf(); |
---|
67 | TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input", |
---|
68 | TEST_ROOT_DIR + "/wc/output", |
---|
69 | TEST_ROOT_DIR + "/cachedir", |
---|
70 | job, |
---|
71 | "The quick brown fox\n" |
---|
72 | + "has many silly\n" |
---|
73 | + "red fox sox\n"); |
---|
74 | // assert the number of lines read during caching |
---|
75 | assertTrue("Failed test archives not matching", ret.isOutputOk); |
---|
76 | // test the task report fetchers |
---|
77 | JobClient client = new JobClient(job); |
---|
78 | JobID jobid = ret.job.getID(); |
---|
79 | TaskReport[] reports; |
---|
80 | reports = client.getSetupTaskReports(jobid); |
---|
81 | assertEquals("number of setups", 2, reports.length); |
---|
82 | reports = client.getMapTaskReports(jobid); |
---|
83 | assertEquals("number of maps", 1, reports.length); |
---|
84 | reports = client.getReduceTaskReports(jobid); |
---|
85 | assertEquals("number of reduces", 1, reports.length); |
---|
86 | reports = client.getCleanupTaskReports(jobid); |
---|
87 | assertEquals("number of cleanups", 2, reports.length); |
---|
88 | Counters counters = ret.job.getCounters(); |
---|
89 | assertEquals("number of map inputs", 3, |
---|
90 | counters.getCounter(Task.Counter.MAP_INPUT_RECORDS)); |
---|
91 | assertEquals("number of reduce outputs", 9, |
---|
92 | counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS)); |
---|
93 | runCustomFormats(mr); |
---|
94 | runSecondarySort(mr.createJobConf()); |
---|
95 | } finally { |
---|
96 | if (mr != null) { mr.shutdown(); } |
---|
97 | } |
---|
98 | } |
---|
99 | |
---|
100 | private void runCustomFormats(MiniMRCluster mr) throws IOException { |
---|
101 | JobConf job = mr.createJobConf(); |
---|
102 | FileSystem fileSys = FileSystem.get(job); |
---|
103 | Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); |
---|
104 | Path outDir = new Path(testDir, "out"); |
---|
105 | System.out.println("testDir= " + testDir); |
---|
106 | fileSys.delete(testDir, true); |
---|
107 | |
---|
108 | job.setInputFormat(MyInputFormat.class); |
---|
109 | job.setOutputFormat(MyOutputFormat.class); |
---|
110 | job.setOutputKeyClass(Text.class); |
---|
111 | job.setOutputValueClass(IntWritable.class); |
---|
112 | |
---|
113 | job.setMapperClass(MyMapper.class); |
---|
114 | job.setReducerClass(MyReducer.class); |
---|
115 | job.setNumMapTasks(100); |
---|
116 | job.setNumReduceTasks(1); |
---|
117 | // explicitly do not use "normal" job.setOutputPath to make sure |
---|
118 | // that it is not hardcoded anywhere in the framework. |
---|
119 | job.set("non.std.out", outDir.toString()); |
---|
120 | try { |
---|
121 | JobClient.runJob(job); |
---|
122 | String result = |
---|
123 | TestMiniMRWithDFS.readOutput(outDir, job); |
---|
124 | assertEquals("output", ("aunt annie\t1\n" + |
---|
125 | "bumble boat\t4\n" + |
---|
126 | "crocodile pants\t0\n" + |
---|
127 | "duck-dog\t5\n"+ |
---|
128 | "eggs\t2\n" + |
---|
129 | "finagle the agent\t3\n"), result); |
---|
130 | } finally { |
---|
131 | fileSys.delete(testDir, true); |
---|
132 | } |
---|
133 | |
---|
134 | } |
---|
135 | |
---|
136 | private static class MyInputFormat |
---|
137 | implements InputFormat<IntWritable, Text> { |
---|
138 | |
---|
139 | static final String[] data = new String[]{ |
---|
140 | "crocodile pants", |
---|
141 | "aunt annie", |
---|
142 | "eggs", |
---|
143 | "finagle the agent", |
---|
144 | "bumble boat", |
---|
145 | "duck-dog", |
---|
146 | }; |
---|
147 | |
---|
148 | private static class MySplit implements InputSplit { |
---|
149 | int first; |
---|
150 | int length; |
---|
151 | |
---|
152 | public MySplit() { } |
---|
153 | |
---|
154 | public MySplit(int first, int length) { |
---|
155 | this.first = first; |
---|
156 | this.length = length; |
---|
157 | } |
---|
158 | |
---|
159 | public String[] getLocations() { |
---|
160 | return new String[0]; |
---|
161 | } |
---|
162 | |
---|
163 | public long getLength() { |
---|
164 | return length; |
---|
165 | } |
---|
166 | |
---|
167 | public void write(DataOutput out) throws IOException { |
---|
168 | WritableUtils.writeVInt(out, first); |
---|
169 | WritableUtils.writeVInt(out, length); |
---|
170 | } |
---|
171 | |
---|
172 | public void readFields(DataInput in) throws IOException { |
---|
173 | first = WritableUtils.readVInt(in); |
---|
174 | length = WritableUtils.readVInt(in); |
---|
175 | } |
---|
176 | } |
---|
177 | |
---|
178 | static class MyRecordReader implements RecordReader<IntWritable, Text> { |
---|
179 | int index; |
---|
180 | int past; |
---|
181 | int length; |
---|
182 | |
---|
183 | MyRecordReader(int index, int length) { |
---|
184 | this.index = index; |
---|
185 | this.past = index + length; |
---|
186 | this.length = length; |
---|
187 | } |
---|
188 | |
---|
189 | public boolean next(IntWritable key, Text value) throws IOException { |
---|
190 | if (index < past) { |
---|
191 | key.set(index); |
---|
192 | value.set(data[index]); |
---|
193 | index += 1; |
---|
194 | return true; |
---|
195 | } |
---|
196 | return false; |
---|
197 | } |
---|
198 | |
---|
199 | public IntWritable createKey() { |
---|
200 | return new IntWritable(); |
---|
201 | } |
---|
202 | |
---|
203 | public Text createValue() { |
---|
204 | return new Text(); |
---|
205 | } |
---|
206 | |
---|
207 | public long getPos() throws IOException { |
---|
208 | return index; |
---|
209 | } |
---|
210 | |
---|
211 | public void close() throws IOException {} |
---|
212 | |
---|
213 | public float getProgress() throws IOException { |
---|
214 | return 1.0f - (past-index)/length; |
---|
215 | } |
---|
216 | } |
---|
217 | |
---|
218 | public InputSplit[] getSplits(JobConf job, |
---|
219 | int numSplits) throws IOException { |
---|
220 | return new MySplit[]{new MySplit(0, 1), new MySplit(1, 3), |
---|
221 | new MySplit(4, 2)}; |
---|
222 | } |
---|
223 | |
---|
224 | public RecordReader<IntWritable, Text> getRecordReader(InputSplit split, |
---|
225 | JobConf job, |
---|
226 | Reporter reporter) |
---|
227 | throws IOException { |
---|
228 | MySplit sp = (MySplit) split; |
---|
229 | return new MyRecordReader(sp.first, sp.length); |
---|
230 | } |
---|
231 | |
---|
232 | } |
---|
233 | |
---|
234 | static class MyMapper extends MapReduceBase |
---|
235 | implements Mapper<WritableComparable, Writable, |
---|
236 | WritableComparable, Writable> { |
---|
237 | |
---|
238 | public void map(WritableComparable key, Writable value, |
---|
239 | OutputCollector<WritableComparable, Writable> out, |
---|
240 | Reporter reporter) throws IOException { |
---|
241 | System.out.println("map: " + key + ", " + value); |
---|
242 | out.collect((WritableComparable) value, key); |
---|
243 | InputSplit split = reporter.getInputSplit(); |
---|
244 | if (split.getClass() != MyInputFormat.MySplit.class) { |
---|
245 | throw new IOException("Got wrong split in MyMapper! " + |
---|
246 | split.getClass().getName()); |
---|
247 | } |
---|
248 | } |
---|
249 | } |
---|
250 | |
---|
251 | static class MyReducer extends MapReduceBase |
---|
252 | implements Reducer<WritableComparable, Writable, |
---|
253 | WritableComparable, Writable> { |
---|
254 | public void reduce(WritableComparable key, Iterator<Writable> values, |
---|
255 | OutputCollector<WritableComparable, Writable> output, |
---|
256 | Reporter reporter) throws IOException { |
---|
257 | try { |
---|
258 | InputSplit split = reporter.getInputSplit(); |
---|
259 | throw new IOException("Got an input split of " + split); |
---|
260 | } catch (UnsupportedOperationException e) { |
---|
261 | // expected result |
---|
262 | } |
---|
263 | while (values.hasNext()) { |
---|
264 | Writable value = values.next(); |
---|
265 | System.out.println("reduce: " + key + ", " + value); |
---|
266 | output.collect(key, value); |
---|
267 | } |
---|
268 | } |
---|
269 | } |
---|
270 | |
---|
271 | static class MyOutputFormat implements OutputFormat { |
---|
272 | static class MyRecordWriter implements RecordWriter<Object, Object> { |
---|
273 | private DataOutputStream out; |
---|
274 | |
---|
275 | public MyRecordWriter(Path outputFile, JobConf job) throws IOException { |
---|
276 | out = outputFile.getFileSystem(job).create(outputFile); |
---|
277 | } |
---|
278 | |
---|
279 | public void write(Object key, Object value) throws IOException { |
---|
280 | out.writeBytes(key.toString() + "\t" + value.toString() + "\n"); |
---|
281 | } |
---|
282 | |
---|
283 | public void close(Reporter reporter) throws IOException { |
---|
284 | out.close(); |
---|
285 | } |
---|
286 | } |
---|
287 | |
---|
288 | public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, |
---|
289 | String name, |
---|
290 | Progressable progress |
---|
291 | ) throws IOException { |
---|
292 | return new MyRecordWriter(new Path(job.get("non.std.out")), job); |
---|
293 | } |
---|
294 | |
---|
295 | public void checkOutputSpecs(FileSystem ignored, |
---|
296 | JobConf job) throws IOException { |
---|
297 | } |
---|
298 | } |
---|
299 | |
---|
300 | private void runSecondarySort(Configuration conf) throws IOException, |
---|
301 | InterruptedException, |
---|
302 | ClassNotFoundException { |
---|
303 | FileSystem localFs = FileSystem.getLocal(conf); |
---|
304 | localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true); |
---|
305 | localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true); |
---|
306 | TestMapReduceLocal.writeFile |
---|
307 | ("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" + |
---|
308 | "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n"); |
---|
309 | Job job = new Job(conf, "word count"); |
---|
310 | job.setJarByClass(WordCount.class); |
---|
311 | job.setNumReduceTasks(2); |
---|
312 | job.setMapperClass(SecondarySort.MapClass.class); |
---|
313 | job.setReducerClass(SecondarySort.Reduce.class); |
---|
314 | // group and partition by the first int in the pair |
---|
315 | job.setPartitionerClass(FirstPartitioner.class); |
---|
316 | job.setGroupingComparatorClass(FirstGroupingComparator.class); |
---|
317 | |
---|
318 | // the map output is IntPair, IntWritable |
---|
319 | job.setMapOutputKeyClass(IntPair.class); |
---|
320 | job.setMapOutputValueClass(IntWritable.class); |
---|
321 | |
---|
322 | // the reduce output is Text, IntWritable |
---|
323 | job.setOutputKeyClass(Text.class); |
---|
324 | job.setOutputValueClass(IntWritable.class); |
---|
325 | |
---|
326 | FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in")); |
---|
327 | FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out")); |
---|
328 | assertTrue(job.waitForCompletion(true)); |
---|
329 | String out = TestMapReduceLocal.readFile("out/part-r-00000"); |
---|
330 | assertEquals("------------------------------------------------\n" + |
---|
331 | "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" + |
---|
332 | "------------------------------------------------\n" + |
---|
333 | "10\t20\n10\t25\n10\t30\n", out); |
---|
334 | out = TestMapReduceLocal.readFile("out/part-r-00001"); |
---|
335 | assertEquals("------------------------------------------------\n" + |
---|
336 | "-3\t23\n" + |
---|
337 | "------------------------------------------------\n" + |
---|
338 | "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" + |
---|
339 | "------------------------------------------------\n" + |
---|
340 | "5\t10\n", out); |
---|
341 | } |
---|
342 | } |
---|