[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.mapred; |
---|
| 20 | |
---|
| 21 | import junit.framework.TestCase; |
---|
| 22 | |
---|
| 23 | import java.io.IOException; |
---|
| 24 | import java.io.DataInput; |
---|
| 25 | import java.io.DataOutput; |
---|
| 26 | import java.util.Arrays; |
---|
| 27 | import java.util.Iterator; |
---|
| 28 | |
---|
| 29 | import junit.framework.TestCase; |
---|
| 30 | import org.apache.commons.logging.Log; |
---|
| 31 | import org.apache.commons.logging.LogFactory; |
---|
| 32 | |
---|
| 33 | import org.apache.hadoop.conf.Configuration; |
---|
| 34 | import org.apache.hadoop.io.*; |
---|
| 35 | import org.apache.hadoop.mapred.lib.NullOutputFormat; |
---|
| 36 | |
---|
| 37 | public class TestMapCollection extends TestCase { |
---|
| 38 | |
---|
| 39 | private static final Log LOG = LogFactory.getLog( |
---|
| 40 | TestMapCollection.class.getName()); |
---|
| 41 | |
---|
| 42 | public static class KeyWritable |
---|
| 43 | implements WritableComparable<KeyWritable>, JobConfigurable { |
---|
| 44 | |
---|
| 45 | private final byte c = (byte)('K' & 0xFF); |
---|
| 46 | static private boolean pedantic = false; |
---|
| 47 | protected int expectedlen; |
---|
| 48 | |
---|
| 49 | public void configure(JobConf conf) { |
---|
| 50 | expectedlen = conf.getInt("test.keywritable.length", 1); |
---|
| 51 | pedantic = conf.getBoolean("test.pedantic.verification", false); |
---|
| 52 | } |
---|
| 53 | |
---|
| 54 | public KeyWritable() { } |
---|
| 55 | |
---|
| 56 | public KeyWritable(int len) { |
---|
| 57 | this(); |
---|
| 58 | expectedlen = len; |
---|
| 59 | } |
---|
| 60 | |
---|
| 61 | public int getLength() { |
---|
| 62 | return expectedlen; |
---|
| 63 | } |
---|
| 64 | |
---|
| 65 | public int compareTo(KeyWritable o) { |
---|
| 66 | if (o == this) return 0; |
---|
| 67 | return expectedlen - o.getLength(); |
---|
| 68 | } |
---|
| 69 | |
---|
| 70 | public boolean equals(Object o) { |
---|
| 71 | if (o == this) return true; |
---|
| 72 | if (!(o instanceof KeyWritable)) return false; |
---|
| 73 | return 0 == compareTo((KeyWritable)o); |
---|
| 74 | } |
---|
| 75 | |
---|
| 76 | public int hashCode() { |
---|
| 77 | return 37 * expectedlen; |
---|
| 78 | } |
---|
| 79 | |
---|
| 80 | public void readFields(DataInput in) throws IOException { |
---|
| 81 | if (expectedlen != 0) { |
---|
| 82 | int bytesread; |
---|
| 83 | if (pedantic) { |
---|
| 84 | for (int i = 0; i < expectedlen; ++i) |
---|
| 85 | assertEquals("Invalid byte at " + i, c, in.readByte()); |
---|
| 86 | bytesread = expectedlen; |
---|
| 87 | } else { |
---|
| 88 | bytesread = in.skipBytes(expectedlen); |
---|
| 89 | } |
---|
| 90 | assertEquals("Too few bytes in record", expectedlen, bytesread); |
---|
| 91 | } |
---|
| 92 | // cannot verify that the stream has been exhausted |
---|
| 93 | } |
---|
| 94 | |
---|
| 95 | public void write(DataOutput out) throws IOException { |
---|
| 96 | if (expectedlen != 0) { |
---|
| 97 | if (expectedlen > 1024) { |
---|
| 98 | byte[] b = new byte[expectedlen]; |
---|
| 99 | Arrays.fill(b, c); |
---|
| 100 | out.write(b); |
---|
| 101 | } else { |
---|
| 102 | for (int i = 0; i < expectedlen; ++i) { |
---|
| 103 | out.write(c); |
---|
| 104 | } |
---|
| 105 | } |
---|
| 106 | } |
---|
| 107 | } |
---|
| 108 | |
---|
| 109 | public static class Comparator extends WritableComparator { |
---|
| 110 | public Comparator() { |
---|
| 111 | super(KeyWritable.class); |
---|
| 112 | } |
---|
| 113 | |
---|
| 114 | public int compare(byte[] b1, int s1, int l1, |
---|
| 115 | byte[] b2, int s2, int l2) { |
---|
| 116 | if (pedantic) { |
---|
| 117 | for (int i = s1; i < l1; ++i) { |
---|
| 118 | assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF)); |
---|
| 119 | } |
---|
| 120 | for (int i = s2; i < l2; ++i) { |
---|
| 121 | assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF)); |
---|
| 122 | } |
---|
| 123 | } |
---|
| 124 | return l1 - l2; |
---|
| 125 | } |
---|
| 126 | } |
---|
| 127 | |
---|
| 128 | |
---|
| 129 | static { |
---|
| 130 | WritableComparator.define(KeyWritable.class, new Comparator()); |
---|
| 131 | } |
---|
| 132 | } |
---|
| 133 | |
---|
| 134 | public static class ValWritable extends KeyWritable { |
---|
| 135 | |
---|
| 136 | private final byte c = (byte)('V' & 0xFF); |
---|
| 137 | |
---|
| 138 | public ValWritable() { } |
---|
| 139 | |
---|
| 140 | public ValWritable(int len) { |
---|
| 141 | this(); |
---|
| 142 | expectedlen = len; |
---|
| 143 | } |
---|
| 144 | |
---|
| 145 | public void configure(JobConf conf) { |
---|
| 146 | expectedlen = conf.getInt("test.valwritable.length", 1); |
---|
| 147 | } |
---|
| 148 | } |
---|
| 149 | |
---|
| 150 | public static class SpillMapper |
---|
| 151 | implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> { |
---|
| 152 | |
---|
| 153 | private int keylen = 1; |
---|
| 154 | private int vallen = 1; |
---|
| 155 | private int numrecs = 100; |
---|
| 156 | |
---|
| 157 | public void configure(JobConf job) { |
---|
| 158 | keylen = job.getInt("test.keywritable.length", 1); |
---|
| 159 | vallen = job.getInt("test.valwritable.length", 1); |
---|
| 160 | numrecs = job.getInt("test.spillmap.records", 100); |
---|
| 161 | } |
---|
| 162 | |
---|
| 163 | public void map(NullWritable key, NullWritable value, |
---|
| 164 | OutputCollector<KeyWritable,ValWritable> out, Reporter reporter) |
---|
| 165 | throws IOException { |
---|
| 166 | KeyWritable k = new KeyWritable(keylen); |
---|
| 167 | ValWritable v = new ValWritable(vallen); |
---|
| 168 | for (int i = 0; i < numrecs; ++i) { |
---|
| 169 | if ((i % 1000) == 0) { |
---|
| 170 | reporter.progress(); |
---|
| 171 | } |
---|
| 172 | out.collect(k, v); |
---|
| 173 | } |
---|
| 174 | } |
---|
| 175 | |
---|
| 176 | public void close() { } |
---|
| 177 | |
---|
| 178 | } |
---|
| 179 | |
---|
| 180 | public static class SpillReducer |
---|
| 181 | implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> { |
---|
| 182 | |
---|
| 183 | private int numrecs = 100; |
---|
| 184 | |
---|
| 185 | public void configure(JobConf job) { |
---|
| 186 | numrecs = job.getInt("test.spillmap.records", 100); |
---|
| 187 | } |
---|
| 188 | |
---|
| 189 | public void reduce(KeyWritable k, Iterator<ValWritable> values, |
---|
| 190 | OutputCollector<NullWritable,NullWritable> out, Reporter reporter) { |
---|
| 191 | int i = 0; |
---|
| 192 | while (values.hasNext()) { |
---|
| 193 | values.next(); |
---|
| 194 | ++i; |
---|
| 195 | } |
---|
| 196 | assertEquals("Unexpected record count (" + i + "/" + |
---|
| 197 | numrecs + ")", numrecs, i); |
---|
| 198 | } |
---|
| 199 | |
---|
| 200 | public void close() { } |
---|
| 201 | |
---|
| 202 | } |
---|
| 203 | |
---|
| 204 | public static class FakeSplit implements InputSplit { |
---|
| 205 | public void write(DataOutput out) throws IOException { } |
---|
| 206 | public void readFields(DataInput in) throws IOException { } |
---|
| 207 | public long getLength() { return 0L; } |
---|
| 208 | public String[] getLocations() { return new String[0]; } |
---|
| 209 | } |
---|
| 210 | |
---|
| 211 | public static class FakeIF |
---|
| 212 | implements InputFormat<NullWritable,NullWritable> { |
---|
| 213 | |
---|
| 214 | public FakeIF() { } |
---|
| 215 | |
---|
| 216 | public InputSplit[] getSplits(JobConf conf, int numSplits) { |
---|
| 217 | InputSplit[] splits = new InputSplit[numSplits]; |
---|
| 218 | for (int i = 0; i < splits.length; ++i) { |
---|
| 219 | splits[i] = new FakeSplit(); |
---|
| 220 | } |
---|
| 221 | return splits; |
---|
| 222 | } |
---|
| 223 | |
---|
| 224 | public RecordReader<NullWritable,NullWritable> getRecordReader( |
---|
| 225 | InputSplit ignored, JobConf conf, Reporter reporter) { |
---|
| 226 | return new RecordReader<NullWritable,NullWritable>() { |
---|
| 227 | private boolean done = false; |
---|
| 228 | public boolean next(NullWritable key, NullWritable value) |
---|
| 229 | throws IOException { |
---|
| 230 | if (done) |
---|
| 231 | return false; |
---|
| 232 | done = true; |
---|
| 233 | return true; |
---|
| 234 | } |
---|
| 235 | public NullWritable createKey() { return NullWritable.get(); } |
---|
| 236 | public NullWritable createValue() { return NullWritable.get(); } |
---|
| 237 | public long getPos() throws IOException { return 0L; } |
---|
| 238 | public void close() throws IOException { } |
---|
| 239 | public float getProgress() throws IOException { return 0.0f; } |
---|
| 240 | }; |
---|
| 241 | } |
---|
| 242 | } |
---|
| 243 | |
---|
| 244 | private static void runTest(String name, int keylen, int vallen, |
---|
| 245 | int records, int ioSortMB, float recPer, float spillPer, |
---|
| 246 | boolean pedantic) throws Exception { |
---|
| 247 | JobConf conf = new JobConf(new Configuration(), SpillMapper.class); |
---|
| 248 | |
---|
| 249 | conf.setInt("io.sort.mb", ioSortMB); |
---|
| 250 | conf.set("io.sort.record.percent", Float.toString(recPer)); |
---|
| 251 | conf.set("io.sort.spill.percent", Float.toString(spillPer)); |
---|
| 252 | |
---|
| 253 | conf.setInt("test.keywritable.length", keylen); |
---|
| 254 | conf.setInt("test.valwritable.length", vallen); |
---|
| 255 | conf.setInt("test.spillmap.records", records); |
---|
| 256 | conf.setBoolean("test.pedantic.verification", pedantic); |
---|
| 257 | |
---|
| 258 | conf.setNumMapTasks(1); |
---|
| 259 | conf.setNumReduceTasks(1); |
---|
| 260 | conf.setInputFormat(FakeIF.class); |
---|
| 261 | conf.setOutputFormat(NullOutputFormat.class); |
---|
| 262 | conf.setMapperClass(SpillMapper.class); |
---|
| 263 | conf.setReducerClass(SpillReducer.class); |
---|
| 264 | conf.setMapOutputKeyClass(KeyWritable.class); |
---|
| 265 | conf.setMapOutputValueClass(ValWritable.class); |
---|
| 266 | |
---|
| 267 | LOG.info("Running " + name); |
---|
| 268 | JobClient.runJob(conf); |
---|
| 269 | } |
---|
| 270 | |
---|
| 271 | private static void runTest(String name, int keylen, int vallen, int records, |
---|
| 272 | boolean pedantic) throws Exception { |
---|
| 273 | runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic); |
---|
| 274 | } |
---|
| 275 | |
---|
| 276 | public void testLastFill() throws Exception { |
---|
| 277 | // last byte of record/key is the last/first byte in the spill buffer |
---|
| 278 | runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true); |
---|
| 279 | runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true); |
---|
| 280 | } |
---|
| 281 | |
---|
| 282 | public void testLargeRecords() throws Exception { |
---|
| 283 | // maps emitting records larger than io.sort.mb |
---|
| 284 | runTest("largerec", 100, 1024*1024, 5, false); |
---|
| 285 | runTest("largekeyzeroval", 1024*1024, 0, 5, false); |
---|
| 286 | } |
---|
| 287 | |
---|
| 288 | public void testSpillPer() throws Exception { |
---|
| 289 | // set non-default, 100% speculative spill boundary |
---|
| 290 | runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true); |
---|
| 291 | runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true); |
---|
| 292 | runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true); |
---|
| 293 | runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true); |
---|
| 294 | } |
---|
| 295 | |
---|
| 296 | public void testZeroLength() throws Exception { |
---|
| 297 | // test key/value at zero-length |
---|
| 298 | runTest("zeroval", 1, 0, 10000, true); |
---|
| 299 | runTest("zerokey", 0, 1, 10000, true); |
---|
| 300 | runTest("zerokeyval", 0, 0, 10000, false); |
---|
| 301 | runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false); |
---|
| 302 | } |
---|
| 303 | |
---|
| 304 | } |
---|