1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import junit.framework.TestCase; |
---|
22 | |
---|
23 | import java.io.IOException; |
---|
24 | import java.io.DataInput; |
---|
25 | import java.io.DataOutput; |
---|
26 | import java.util.Arrays; |
---|
27 | import java.util.Iterator; |
---|
28 | |
---|
29 | import junit.framework.TestCase; |
---|
30 | import org.apache.commons.logging.Log; |
---|
31 | import org.apache.commons.logging.LogFactory; |
---|
32 | |
---|
33 | import org.apache.hadoop.conf.Configuration; |
---|
34 | import org.apache.hadoop.io.*; |
---|
35 | import org.apache.hadoop.mapred.lib.NullOutputFormat; |
---|
36 | |
---|
37 | public class TestMapCollection extends TestCase { |
---|
38 | |
---|
39 | private static final Log LOG = LogFactory.getLog( |
---|
40 | TestMapCollection.class.getName()); |
---|
41 | |
---|
42 | public static class KeyWritable |
---|
43 | implements WritableComparable<KeyWritable>, JobConfigurable { |
---|
44 | |
---|
45 | private final byte c = (byte)('K' & 0xFF); |
---|
46 | static private boolean pedantic = false; |
---|
47 | protected int expectedlen; |
---|
48 | |
---|
49 | public void configure(JobConf conf) { |
---|
50 | expectedlen = conf.getInt("test.keywritable.length", 1); |
---|
51 | pedantic = conf.getBoolean("test.pedantic.verification", false); |
---|
52 | } |
---|
53 | |
---|
54 | public KeyWritable() { } |
---|
55 | |
---|
56 | public KeyWritable(int len) { |
---|
57 | this(); |
---|
58 | expectedlen = len; |
---|
59 | } |
---|
60 | |
---|
61 | public int getLength() { |
---|
62 | return expectedlen; |
---|
63 | } |
---|
64 | |
---|
65 | public int compareTo(KeyWritable o) { |
---|
66 | if (o == this) return 0; |
---|
67 | return expectedlen - o.getLength(); |
---|
68 | } |
---|
69 | |
---|
70 | public boolean equals(Object o) { |
---|
71 | if (o == this) return true; |
---|
72 | if (!(o instanceof KeyWritable)) return false; |
---|
73 | return 0 == compareTo((KeyWritable)o); |
---|
74 | } |
---|
75 | |
---|
76 | public int hashCode() { |
---|
77 | return 37 * expectedlen; |
---|
78 | } |
---|
79 | |
---|
80 | public void readFields(DataInput in) throws IOException { |
---|
81 | if (expectedlen != 0) { |
---|
82 | int bytesread; |
---|
83 | if (pedantic) { |
---|
84 | for (int i = 0; i < expectedlen; ++i) |
---|
85 | assertEquals("Invalid byte at " + i, c, in.readByte()); |
---|
86 | bytesread = expectedlen; |
---|
87 | } else { |
---|
88 | bytesread = in.skipBytes(expectedlen); |
---|
89 | } |
---|
90 | assertEquals("Too few bytes in record", expectedlen, bytesread); |
---|
91 | } |
---|
92 | // cannot verify that the stream has been exhausted |
---|
93 | } |
---|
94 | |
---|
95 | public void write(DataOutput out) throws IOException { |
---|
96 | if (expectedlen != 0) { |
---|
97 | if (expectedlen > 1024) { |
---|
98 | byte[] b = new byte[expectedlen]; |
---|
99 | Arrays.fill(b, c); |
---|
100 | out.write(b); |
---|
101 | } else { |
---|
102 | for (int i = 0; i < expectedlen; ++i) { |
---|
103 | out.write(c); |
---|
104 | } |
---|
105 | } |
---|
106 | } |
---|
107 | } |
---|
108 | |
---|
109 | public static class Comparator extends WritableComparator { |
---|
110 | public Comparator() { |
---|
111 | super(KeyWritable.class); |
---|
112 | } |
---|
113 | |
---|
114 | public int compare(byte[] b1, int s1, int l1, |
---|
115 | byte[] b2, int s2, int l2) { |
---|
116 | if (pedantic) { |
---|
117 | for (int i = s1; i < l1; ++i) { |
---|
118 | assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF)); |
---|
119 | } |
---|
120 | for (int i = s2; i < l2; ++i) { |
---|
121 | assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF)); |
---|
122 | } |
---|
123 | } |
---|
124 | return l1 - l2; |
---|
125 | } |
---|
126 | } |
---|
127 | |
---|
128 | |
---|
129 | static { |
---|
130 | WritableComparator.define(KeyWritable.class, new Comparator()); |
---|
131 | } |
---|
132 | } |
---|
133 | |
---|
134 | public static class ValWritable extends KeyWritable { |
---|
135 | |
---|
136 | private final byte c = (byte)('V' & 0xFF); |
---|
137 | |
---|
138 | public ValWritable() { } |
---|
139 | |
---|
140 | public ValWritable(int len) { |
---|
141 | this(); |
---|
142 | expectedlen = len; |
---|
143 | } |
---|
144 | |
---|
145 | public void configure(JobConf conf) { |
---|
146 | expectedlen = conf.getInt("test.valwritable.length", 1); |
---|
147 | } |
---|
148 | } |
---|
149 | |
---|
150 | public static class SpillMapper |
---|
151 | implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> { |
---|
152 | |
---|
153 | private int keylen = 1; |
---|
154 | private int vallen = 1; |
---|
155 | private int numrecs = 100; |
---|
156 | |
---|
157 | public void configure(JobConf job) { |
---|
158 | keylen = job.getInt("test.keywritable.length", 1); |
---|
159 | vallen = job.getInt("test.valwritable.length", 1); |
---|
160 | numrecs = job.getInt("test.spillmap.records", 100); |
---|
161 | } |
---|
162 | |
---|
163 | public void map(NullWritable key, NullWritable value, |
---|
164 | OutputCollector<KeyWritable,ValWritable> out, Reporter reporter) |
---|
165 | throws IOException { |
---|
166 | KeyWritable k = new KeyWritable(keylen); |
---|
167 | ValWritable v = new ValWritable(vallen); |
---|
168 | for (int i = 0; i < numrecs; ++i) { |
---|
169 | if ((i % 1000) == 0) { |
---|
170 | reporter.progress(); |
---|
171 | } |
---|
172 | out.collect(k, v); |
---|
173 | } |
---|
174 | } |
---|
175 | |
---|
176 | public void close() { } |
---|
177 | |
---|
178 | } |
---|
179 | |
---|
180 | public static class SpillReducer |
---|
181 | implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> { |
---|
182 | |
---|
183 | private int numrecs = 100; |
---|
184 | |
---|
185 | public void configure(JobConf job) { |
---|
186 | numrecs = job.getInt("test.spillmap.records", 100); |
---|
187 | } |
---|
188 | |
---|
189 | public void reduce(KeyWritable k, Iterator<ValWritable> values, |
---|
190 | OutputCollector<NullWritable,NullWritable> out, Reporter reporter) { |
---|
191 | int i = 0; |
---|
192 | while (values.hasNext()) { |
---|
193 | values.next(); |
---|
194 | ++i; |
---|
195 | } |
---|
196 | assertEquals("Unexpected record count (" + i + "/" + |
---|
197 | numrecs + ")", numrecs, i); |
---|
198 | } |
---|
199 | |
---|
200 | public void close() { } |
---|
201 | |
---|
202 | } |
---|
203 | |
---|
204 | public static class FakeSplit implements InputSplit { |
---|
205 | public void write(DataOutput out) throws IOException { } |
---|
206 | public void readFields(DataInput in) throws IOException { } |
---|
207 | public long getLength() { return 0L; } |
---|
208 | public String[] getLocations() { return new String[0]; } |
---|
209 | } |
---|
210 | |
---|
211 | public static class FakeIF |
---|
212 | implements InputFormat<NullWritable,NullWritable> { |
---|
213 | |
---|
214 | public FakeIF() { } |
---|
215 | |
---|
216 | public InputSplit[] getSplits(JobConf conf, int numSplits) { |
---|
217 | InputSplit[] splits = new InputSplit[numSplits]; |
---|
218 | for (int i = 0; i < splits.length; ++i) { |
---|
219 | splits[i] = new FakeSplit(); |
---|
220 | } |
---|
221 | return splits; |
---|
222 | } |
---|
223 | |
---|
224 | public RecordReader<NullWritable,NullWritable> getRecordReader( |
---|
225 | InputSplit ignored, JobConf conf, Reporter reporter) { |
---|
226 | return new RecordReader<NullWritable,NullWritable>() { |
---|
227 | private boolean done = false; |
---|
228 | public boolean next(NullWritable key, NullWritable value) |
---|
229 | throws IOException { |
---|
230 | if (done) |
---|
231 | return false; |
---|
232 | done = true; |
---|
233 | return true; |
---|
234 | } |
---|
235 | public NullWritable createKey() { return NullWritable.get(); } |
---|
236 | public NullWritable createValue() { return NullWritable.get(); } |
---|
237 | public long getPos() throws IOException { return 0L; } |
---|
238 | public void close() throws IOException { } |
---|
239 | public float getProgress() throws IOException { return 0.0f; } |
---|
240 | }; |
---|
241 | } |
---|
242 | } |
---|
243 | |
---|
244 | private static void runTest(String name, int keylen, int vallen, |
---|
245 | int records, int ioSortMB, float recPer, float spillPer, |
---|
246 | boolean pedantic) throws Exception { |
---|
247 | JobConf conf = new JobConf(new Configuration(), SpillMapper.class); |
---|
248 | |
---|
249 | conf.setInt("io.sort.mb", ioSortMB); |
---|
250 | conf.set("io.sort.record.percent", Float.toString(recPer)); |
---|
251 | conf.set("io.sort.spill.percent", Float.toString(spillPer)); |
---|
252 | |
---|
253 | conf.setInt("test.keywritable.length", keylen); |
---|
254 | conf.setInt("test.valwritable.length", vallen); |
---|
255 | conf.setInt("test.spillmap.records", records); |
---|
256 | conf.setBoolean("test.pedantic.verification", pedantic); |
---|
257 | |
---|
258 | conf.setNumMapTasks(1); |
---|
259 | conf.setNumReduceTasks(1); |
---|
260 | conf.setInputFormat(FakeIF.class); |
---|
261 | conf.setOutputFormat(NullOutputFormat.class); |
---|
262 | conf.setMapperClass(SpillMapper.class); |
---|
263 | conf.setReducerClass(SpillReducer.class); |
---|
264 | conf.setMapOutputKeyClass(KeyWritable.class); |
---|
265 | conf.setMapOutputValueClass(ValWritable.class); |
---|
266 | |
---|
267 | LOG.info("Running " + name); |
---|
268 | JobClient.runJob(conf); |
---|
269 | } |
---|
270 | |
---|
271 | private static void runTest(String name, int keylen, int vallen, int records, |
---|
272 | boolean pedantic) throws Exception { |
---|
273 | runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic); |
---|
274 | } |
---|
275 | |
---|
276 | public void testLastFill() throws Exception { |
---|
277 | // last byte of record/key is the last/first byte in the spill buffer |
---|
278 | runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true); |
---|
279 | runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true); |
---|
280 | } |
---|
281 | |
---|
282 | public void testLargeRecords() throws Exception { |
---|
283 | // maps emitting records larger than io.sort.mb |
---|
284 | runTest("largerec", 100, 1024*1024, 5, false); |
---|
285 | runTest("largekeyzeroval", 1024*1024, 0, 5, false); |
---|
286 | } |
---|
287 | |
---|
288 | public void testSpillPer() throws Exception { |
---|
289 | // set non-default, 100% speculative spill boundary |
---|
290 | runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true); |
---|
291 | runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true); |
---|
292 | runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true); |
---|
293 | runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true); |
---|
294 | } |
---|
295 | |
---|
296 | public void testZeroLength() throws Exception { |
---|
297 | // test key/value at zero-length |
---|
298 | runTest("zeroval", 1, 0, 10000, true); |
---|
299 | runTest("zerokey", 0, 1, 10000, true); |
---|
300 | runTest("zerokeyval", 0, 0, 10000, false); |
---|
301 | runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false); |
---|
302 | } |
---|
303 | |
---|
304 | } |
---|