source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMapCollection.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 9.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import junit.framework.TestCase;
22
23import java.io.IOException;
24import java.io.DataInput;
25import java.io.DataOutput;
26import java.util.Arrays;
27import java.util.Iterator;
28
29import junit.framework.TestCase;
30import org.apache.commons.logging.Log;
31import org.apache.commons.logging.LogFactory;
32
33import org.apache.hadoop.conf.Configuration;
34import org.apache.hadoop.io.*;
35import org.apache.hadoop.mapred.lib.NullOutputFormat;
36
37public class TestMapCollection extends TestCase {
38
39  private static final Log LOG = LogFactory.getLog(
40      TestMapCollection.class.getName());
41
42  public static class KeyWritable
43      implements WritableComparable<KeyWritable>, JobConfigurable {
44
45    private final byte c = (byte)('K' & 0xFF);
46    static private boolean pedantic = false;
47    protected int expectedlen;
48
49    public void configure(JobConf conf) {
50      expectedlen = conf.getInt("test.keywritable.length", 1);
51      pedantic = conf.getBoolean("test.pedantic.verification", false);
52    }
53
54    public KeyWritable() { }
55
56    public KeyWritable(int len) {
57      this();
58      expectedlen = len;
59    }
60
61    public int getLength() {
62      return expectedlen;
63    }
64
65    public int compareTo(KeyWritable o) {
66      if (o == this) return 0;
67      return expectedlen - o.getLength();
68    }
69
70    public boolean equals(Object o) {
71      if (o == this) return true;
72      if (!(o instanceof KeyWritable)) return false;
73      return 0 == compareTo((KeyWritable)o);
74    }
75
76    public int hashCode() {
77      return 37 * expectedlen;
78    }
79
80    public void readFields(DataInput in) throws IOException {
81      if (expectedlen != 0) {
82        int bytesread;
83        if (pedantic) {
84          for (int i = 0; i < expectedlen; ++i)
85            assertEquals("Invalid byte at " + i, c, in.readByte());
86          bytesread = expectedlen;
87        } else {
88          bytesread = in.skipBytes(expectedlen);
89        }
90        assertEquals("Too few bytes in record", expectedlen, bytesread);
91      }
92      // cannot verify that the stream has been exhausted
93    }
94
95    public void write(DataOutput out) throws IOException {
96      if (expectedlen != 0) {
97        if (expectedlen > 1024) {
98          byte[] b = new byte[expectedlen];
99          Arrays.fill(b, c);
100          out.write(b);
101        } else {
102          for (int i = 0; i < expectedlen; ++i) {
103            out.write(c);
104          }
105        }
106      }
107    }
108
109    public static class Comparator extends WritableComparator {
110      public Comparator() {
111        super(KeyWritable.class);
112      }
113
114      public int compare(byte[] b1, int s1, int l1,
115                         byte[] b2, int s2, int l2) {
116        if (pedantic) {
117          for (int i = s1; i < l1; ++i) {
118            assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF));
119          }
120          for (int i = s2; i < l2; ++i) {
121            assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF));
122          }
123        }
124        return l1 - l2;
125      }
126    }
127
128
129    static {
130      WritableComparator.define(KeyWritable.class, new Comparator());
131    }
132  }
133
134  public static class ValWritable extends KeyWritable {
135
136    private final byte c = (byte)('V' & 0xFF);
137
138    public ValWritable() { }
139
140    public ValWritable(int len) {
141      this();
142      expectedlen = len;
143    }
144
145    public void configure(JobConf conf) {
146      expectedlen = conf.getInt("test.valwritable.length", 1);
147    }
148  }
149
150  public static class SpillMapper
151      implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> {
152
153    private int keylen = 1;
154    private int vallen = 1;
155    private int numrecs = 100;
156
157    public void configure(JobConf job) {
158      keylen = job.getInt("test.keywritable.length", 1);
159      vallen = job.getInt("test.valwritable.length", 1);
160      numrecs = job.getInt("test.spillmap.records", 100);
161    }
162
163    public void map(NullWritable key, NullWritable value,
164        OutputCollector<KeyWritable,ValWritable> out, Reporter reporter)
165        throws IOException {
166      KeyWritable k = new KeyWritable(keylen);
167      ValWritable v = new ValWritable(vallen);
168      for (int i = 0; i < numrecs; ++i) {
169        if ((i % 1000) == 0) {
170          reporter.progress();
171        }
172        out.collect(k, v);
173      }
174    }
175
176    public void close() { }
177
178  }
179
180  public static class SpillReducer
181      implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
182
183    private int numrecs = 100;
184
185    public void configure(JobConf job) {
186      numrecs = job.getInt("test.spillmap.records", 100);
187    }
188
189    public void reduce(KeyWritable k, Iterator<ValWritable> values,
190        OutputCollector<NullWritable,NullWritable> out, Reporter reporter) {
191      int i = 0;
192      while (values.hasNext()) {
193        values.next();
194        ++i;
195      }
196      assertEquals("Unexpected record count (" + i + "/" +
197                   numrecs + ")", numrecs, i);
198    }
199
200    public void close() { }
201
202  }
203
204  public static class FakeSplit implements InputSplit {
205    public void write(DataOutput out) throws IOException { }
206    public void readFields(DataInput in) throws IOException { }
207    public long getLength() { return 0L; }
208    public String[] getLocations() { return new String[0]; }
209  }
210
211  public static class FakeIF
212      implements InputFormat<NullWritable,NullWritable> {
213
214    public FakeIF() { }
215
216    public InputSplit[] getSplits(JobConf conf, int numSplits) {
217      InputSplit[] splits = new InputSplit[numSplits];
218      for (int i = 0; i < splits.length; ++i) {
219        splits[i] = new FakeSplit();
220      }
221      return splits;
222    }
223
224    public RecordReader<NullWritable,NullWritable> getRecordReader(
225        InputSplit ignored, JobConf conf, Reporter reporter) {
226      return new RecordReader<NullWritable,NullWritable>() {
227        private boolean done = false;
228        public boolean next(NullWritable key, NullWritable value)
229            throws IOException {
230          if (done)
231            return false;
232          done = true;
233          return true;
234        }
235        public NullWritable createKey() { return NullWritable.get(); }
236        public NullWritable createValue() { return NullWritable.get(); }
237        public long getPos() throws IOException { return 0L; }
238        public void close() throws IOException { }
239        public float getProgress() throws IOException { return 0.0f; }
240      };
241    }
242  }
243
244  private static void runTest(String name, int keylen, int vallen,
245      int records, int ioSortMB, float recPer, float spillPer,
246      boolean pedantic) throws Exception {
247    JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
248
249    conf.setInt("io.sort.mb", ioSortMB);
250    conf.set("io.sort.record.percent", Float.toString(recPer));
251    conf.set("io.sort.spill.percent", Float.toString(spillPer));
252
253    conf.setInt("test.keywritable.length", keylen);
254    conf.setInt("test.valwritable.length", vallen);
255    conf.setInt("test.spillmap.records", records);
256    conf.setBoolean("test.pedantic.verification", pedantic);
257
258    conf.setNumMapTasks(1);
259    conf.setNumReduceTasks(1);
260    conf.setInputFormat(FakeIF.class);
261    conf.setOutputFormat(NullOutputFormat.class);
262    conf.setMapperClass(SpillMapper.class);
263    conf.setReducerClass(SpillReducer.class);
264    conf.setMapOutputKeyClass(KeyWritable.class);
265    conf.setMapOutputValueClass(ValWritable.class);
266
267    LOG.info("Running " + name);
268    JobClient.runJob(conf);
269  }
270
271  private static void runTest(String name, int keylen, int vallen, int records,
272      boolean pedantic) throws Exception {
273    runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic);
274  }
275
276  public void testLastFill() throws Exception {
277    // last byte of record/key is the last/first byte in the spill buffer
278    runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true);
279    runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true);
280  }
281
282  public void testLargeRecords() throws Exception {
283    // maps emitting records larger than io.sort.mb
284    runTest("largerec", 100, 1024*1024, 5, false);
285    runTest("largekeyzeroval", 1024*1024, 0, 5, false);
286  }
287
288  public void testSpillPer() throws Exception {
289    // set non-default, 100% speculative spill boundary
290    runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true);
291    runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true);
292    runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true);
293    runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true);
294  }
295
296  public void testZeroLength() throws Exception {
297    // test key/value at zero-length
298    runTest("zeroval", 1, 0, 10000, true);
299    runTest("zerokey", 0, 1, 10000, true);
300    runTest("zerokeyval", 0, 0, 10000, false);
301    runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false);
302  }
303
304}
Note: See TracBrowser for help on using the repository browser.