1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import org.apache.hadoop.fs.*; |
---|
21 | import org.apache.hadoop.io.*; |
---|
22 | import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat; |
---|
23 | |
---|
24 | import junit.framework.TestCase; |
---|
25 | import java.io.*; |
---|
26 | import java.util.*; |
---|
27 | |
---|
28 | /** |
---|
29 | * TestCollect checks if the collect can handle simultaneous invocations. |
---|
30 | */ |
---|
31 | public class TestCollect extends TestCase |
---|
32 | { |
---|
33 | final static Path OUTPUT_DIR = new Path("build/test/test.collect.output"); |
---|
34 | static final int NUM_FEEDERS = 10; |
---|
35 | static final int NUM_COLLECTS_PER_THREAD = 1000; |
---|
36 | |
---|
37 | /** |
---|
38 | * Map is a Mapper that spawns threads which simultaneously call collect. |
---|
39 | * Each thread has a specific range to write to the buffer and is unique to |
---|
40 | * the thread. This is a synchronization test for the map's collect. |
---|
41 | */ |
---|
42 | |
---|
43 | static class Map |
---|
44 | implements Mapper<Text, Text, IntWritable, IntWritable> { |
---|
45 | |
---|
46 | public void configure(JobConf job) { |
---|
47 | } |
---|
48 | |
---|
49 | public void map(Text key, Text val, |
---|
50 | final OutputCollector<IntWritable, IntWritable> out, |
---|
51 | Reporter reporter) throws IOException { |
---|
52 | // Class for calling collect in separate threads |
---|
53 | class CollectFeeder extends Thread { |
---|
54 | int id; // id for the thread |
---|
55 | |
---|
56 | public CollectFeeder(int id) { |
---|
57 | this.id = id; |
---|
58 | } |
---|
59 | |
---|
60 | public void run() { |
---|
61 | for (int j = 1; j <= NUM_COLLECTS_PER_THREAD; j++) { |
---|
62 | try { |
---|
63 | out.collect(new IntWritable((id * NUM_COLLECTS_PER_THREAD) + j), |
---|
64 | new IntWritable(0)); |
---|
65 | } catch (IOException ioe) { } |
---|
66 | } |
---|
67 | } |
---|
68 | } |
---|
69 | |
---|
70 | CollectFeeder [] feeders = new CollectFeeder[NUM_FEEDERS]; |
---|
71 | |
---|
72 | // start the feeders |
---|
73 | for (int i = 0; i < NUM_FEEDERS; i++) { |
---|
74 | feeders[i] = new CollectFeeder(i); |
---|
75 | feeders[i].start(); |
---|
76 | } |
---|
77 | // wait for them to finish |
---|
78 | for (int i = 0; i < NUM_FEEDERS; i++) { |
---|
79 | try { |
---|
80 | feeders[i].join(); |
---|
81 | } catch (InterruptedException ie) { |
---|
82 | throw new IOException(ie.toString()); |
---|
83 | } |
---|
84 | } |
---|
85 | } |
---|
86 | |
---|
87 | public void close() { |
---|
88 | } |
---|
89 | } |
---|
90 | |
---|
91 | static class Reduce |
---|
92 | implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { |
---|
93 | |
---|
94 | static int numSeen; |
---|
95 | static int actualSum; |
---|
96 | public void configure(JobConf job) { } |
---|
97 | |
---|
98 | public void reduce(IntWritable key, Iterator<IntWritable> val, |
---|
99 | OutputCollector<IntWritable, IntWritable> out, |
---|
100 | Reporter reporter) throws IOException { |
---|
101 | actualSum += key.get(); // keep the running count of the seen values |
---|
102 | numSeen++; // number of values seen so far |
---|
103 | |
---|
104 | // using '1+2+3+...n = n*(n+1)/2' to validate |
---|
105 | int expectedSum = numSeen * (numSeen + 1) / 2; |
---|
106 | if (expectedSum != actualSum) { |
---|
107 | throw new IOException("Collect test failed!! Ordering mismatch."); |
---|
108 | } |
---|
109 | } |
---|
110 | |
---|
111 | public void close() { } |
---|
112 | } |
---|
113 | |
---|
114 | public void configure(JobConf conf) throws IOException { |
---|
115 | conf.setJobName("TestCollect"); |
---|
116 | conf.setJarByClass(TestCollect.class); |
---|
117 | |
---|
118 | conf.setInputFormat(RandomInputFormat.class); // for self data generation |
---|
119 | conf.setOutputKeyClass(IntWritable.class); |
---|
120 | conf.setOutputValueClass(IntWritable.class); |
---|
121 | FileOutputFormat.setOutputPath(conf, OUTPUT_DIR); |
---|
122 | |
---|
123 | conf.setMapperClass(Map.class); |
---|
124 | conf.setReducerClass(Reduce.class); |
---|
125 | conf.setNumMapTasks(1); |
---|
126 | conf.setNumReduceTasks(1); |
---|
127 | } |
---|
128 | |
---|
129 | public void testCollect() throws IOException { |
---|
130 | JobConf conf = new JobConf(); |
---|
131 | configure(conf); |
---|
132 | try { |
---|
133 | JobClient.runJob(conf); |
---|
134 | // check if all the values were seen by the reducer |
---|
135 | if (Reduce.numSeen != (NUM_COLLECTS_PER_THREAD * NUM_FEEDERS)) { |
---|
136 | throw new IOException("Collect test failed!! Total does not match."); |
---|
137 | } |
---|
138 | } catch (IOException ioe) { |
---|
139 | throw ioe; |
---|
140 | } finally { |
---|
141 | FileSystem fs = FileSystem.get(conf); |
---|
142 | fs.delete(OUTPUT_DIR, true); |
---|
143 | } |
---|
144 | } |
---|
145 | |
---|
146 | public static void main(String[] args) throws IOException { |
---|
147 | new TestCollect().testCollect(); |
---|
148 | } |
---|
149 | } |
---|
150 | |
---|