source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred.join;
19
20import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
23import java.util.Iterator;
24
25import junit.framework.Test;
26import junit.framework.TestCase;
27import junit.framework.TestSuite;
28import junit.extensions.TestSetup;
29
30import org.apache.hadoop.conf.Configuration;
31import org.apache.hadoop.fs.FileStatus;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.hdfs.MiniDFSCluster;
34import org.apache.hadoop.io.IntWritable;
35import org.apache.hadoop.io.NullWritable;
36import org.apache.hadoop.io.SequenceFile;
37import org.apache.hadoop.io.Text;
38import org.apache.hadoop.io.Writable;
39import org.apache.hadoop.io.WritableComparable;
40import org.apache.hadoop.mapred.FileOutputFormat;
41import org.apache.hadoop.mapred.InputFormat;
42import org.apache.hadoop.mapred.InputSplit;
43import org.apache.hadoop.mapred.JobClient;
44import org.apache.hadoop.mapred.JobConf;
45import org.apache.hadoop.mapred.JobConfigurable;
46import org.apache.hadoop.mapred.Mapper;
47import org.apache.hadoop.mapred.OutputCollector;
48import org.apache.hadoop.mapred.RecordReader;
49import org.apache.hadoop.mapred.Reducer;
50import org.apache.hadoop.mapred.Reporter;
51import org.apache.hadoop.mapred.SequenceFileInputFormat;
52import org.apache.hadoop.mapred.SequenceFileOutputFormat;
53import org.apache.hadoop.mapred.lib.IdentityMapper;
54import org.apache.hadoop.mapred.lib.IdentityReducer;
55import org.apache.hadoop.util.ReflectionUtils;
56
57public class TestDatamerge extends TestCase {
58
59  private static MiniDFSCluster cluster = null;
60  public static Test suite() {
61    TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) {
62      protected void setUp() throws Exception {
63        Configuration conf = new Configuration();
64        cluster = new MiniDFSCluster(conf, 2, true, null);
65      }
66      protected void tearDown() throws Exception {
67        if (cluster != null) {
68          cluster.shutdown();
69        }
70      }
71    };
72    return setup;
73  }
74
75  private static SequenceFile.Writer[] createWriters(Path testdir,
76      Configuration conf, int srcs, Path[] src) throws IOException {
77    for (int i = 0; i < srcs; ++i) {
78      src[i] = new Path(testdir, Integer.toString(i + 10, 36));
79    }
80    SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
81    for (int i = 0; i < srcs; ++i) {
82      out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
83          src[i], IntWritable.class, IntWritable.class);
84    }
85    return out;
86  }
87
88  private static Path[] writeSimpleSrc(Path testdir, Configuration conf,
89      int srcs) throws IOException {
90    SequenceFile.Writer out[] = null;
91    Path[] src = new Path[srcs];
92    try {
93      out = createWriters(testdir, conf, srcs, src);
94      final int capacity = srcs * 2 + 1;
95      IntWritable key = new IntWritable();
96      IntWritable val = new IntWritable();
97      for (int k = 0; k < capacity; ++k) {
98        for (int i = 0; i < srcs; ++i) {
99          key.set(k % srcs == 0 ? k * srcs : k * srcs + i);
100          val.set(10 * k + i);
101          out[i].append(key, val);
102          if (i == k) {
103            // add duplicate key
104            out[i].append(key, val);
105          }
106        }
107      }
108    } finally {
109      if (out != null) {
110        for (int i = 0; i < srcs; ++i) {
111          if (out[i] != null)
112            out[i].close();
113        }
114      }
115    }
116    return src;
117  }
118
119  private static String stringify(IntWritable key, Writable val) {
120    StringBuilder sb = new StringBuilder();
121    sb.append("(" + key);
122    sb.append("," + val + ")");
123    return sb.toString();
124  }
125
126  private static abstract class SimpleCheckerBase<V extends Writable>
127      implements Mapper<IntWritable, V, IntWritable, IntWritable>,
128                 Reducer<IntWritable, IntWritable, Text, Text> {
129    protected final static IntWritable one = new IntWritable(1);
130    int srcs;
131    public void close() { }
132    public void configure(JobConf job) {
133      srcs = job.getInt("testdatamerge.sources", 0);
134      assertTrue("Invalid src count: " + srcs, srcs > 0);
135    }
136    public abstract void map(IntWritable key, V val,
137        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
138        throws IOException;
139    public void reduce(IntWritable key, Iterator<IntWritable> values,
140                       OutputCollector<Text, Text> output,
141                       Reporter reporter) throws IOException {
142      int seen = 0;
143      while (values.hasNext()) {
144        seen += values.next().get();
145      }
146      assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
147    }
148    public abstract boolean verify(int key, int occ);
149  }
150
151  private static class InnerJoinChecker
152      extends SimpleCheckerBase<TupleWritable> {
153    public void map(IntWritable key, TupleWritable val,
154        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
155        throws IOException {
156      int k = key.get();
157      final String kvstr = "Unexpected tuple: " + stringify(key, val);
158      assertTrue(kvstr, 0 == k % (srcs * srcs));
159      for (int i = 0; i < val.size(); ++i) {
160        final int vali = ((IntWritable)val.get(i)).get();
161        assertTrue(kvstr, (vali - i) * srcs == 10 * k);
162      }
163      out.collect(key, one);
164    }
165    public boolean verify(int key, int occ) {
166      return (key == 0 && occ == 2) ||
167             (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
168    }
169  }
170
171  private static class OuterJoinChecker
172      extends SimpleCheckerBase<TupleWritable> {
173    public void map(IntWritable key, TupleWritable val,
174        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
175        throws IOException {
176      int k = key.get();
177      final String kvstr = "Unexpected tuple: " + stringify(key, val);
178      if (0 == k % (srcs * srcs)) {
179        for (int i = 0; i < val.size(); ++i) {
180          assertTrue(kvstr, val.get(i) instanceof IntWritable);
181          final int vali = ((IntWritable)val.get(i)).get();
182          assertTrue(kvstr, (vali - i) * srcs == 10 * k);
183        }
184      } else {
185        for (int i = 0; i < val.size(); ++i) {
186          if (i == k % srcs) {
187            assertTrue(kvstr, val.get(i) instanceof IntWritable);
188            final int vali = ((IntWritable)val.get(i)).get();
189            assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
190          } else {
191            assertTrue(kvstr, !val.has(i));
192          }
193        }
194      }
195      out.collect(key, one);
196    }
197    public boolean verify(int key, int occ) {
198      if (key < srcs * srcs && (key % (srcs + 1)) == 0)
199        return 2 == occ;
200      return 1 == occ;
201    }
202  }
203
204  private static class OverrideChecker
205      extends SimpleCheckerBase<IntWritable> {
206    public void map(IntWritable key, IntWritable val,
207        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
208        throws IOException {
209      int k = key.get();
210      final int vali = val.get();
211      final String kvstr = "Unexpected tuple: " + stringify(key, val);
212      if (0 == k % (srcs * srcs)) {
213        assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1);
214      } else {
215        final int i = k % srcs;
216        assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
217      }
218      out.collect(key, one);
219    }
220    public boolean verify(int key, int occ) {
221      if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0)
222        return 2 == occ;
223      return 1 == occ;
224    }
225  }
226
227  private static void joinAs(String jointype,
228      Class<? extends SimpleCheckerBase> c) throws Exception {
229    final int srcs = 4;
230    Configuration conf = new Configuration();
231    JobConf job = new JobConf(conf, c);
232    Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
233    Path[] src = writeSimpleSrc(base, conf, srcs);
234    job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
235        SequenceFileInputFormat.class, src));
236    job.setInt("testdatamerge.sources", srcs);
237    job.setInputFormat(CompositeInputFormat.class);
238    FileOutputFormat.setOutputPath(job, new Path(base, "out"));
239
240    job.setMapperClass(c);
241    job.setReducerClass(c);
242    job.setOutputKeyClass(IntWritable.class);
243    job.setOutputValueClass(IntWritable.class);
244    JobClient.runJob(job);
245    base.getFileSystem(job).delete(base, true);
246  }
247
248  public void testSimpleInnerJoin() throws Exception {
249    joinAs("inner", InnerJoinChecker.class);
250  }
251
252  public void testSimpleOuterJoin() throws Exception {
253    joinAs("outer", OuterJoinChecker.class);
254  }
255
256  public void testSimpleOverride() throws Exception {
257    joinAs("override", OverrideChecker.class);
258  }
259
260  public void testNestedJoin() throws Exception {
261    // outer(inner(S1,...,Sn),outer(S1,...Sn))
262    final int SOURCES = 3;
263    final int ITEMS = (SOURCES + 1) * (SOURCES + 1);
264    JobConf job = new JobConf();
265    Path base = cluster.getFileSystem().makeQualified(new Path("/nested"));
266    int[][] source = new int[SOURCES][];
267    for (int i = 0; i < SOURCES; ++i) {
268      source[i] = new int[ITEMS];
269      for (int j = 0; j < ITEMS; ++j) {
270        source[i][j] = (i + 2) * (j + 1);
271      }
272    }
273    Path[] src = new Path[SOURCES];
274    SequenceFile.Writer out[] = createWriters(base, job, SOURCES, src);
275    IntWritable k = new IntWritable();
276    for (int i = 0; i < SOURCES; ++i) {
277      IntWritable v = new IntWritable();
278      v.set(i);
279      for (int j = 0; j < ITEMS; ++j) {
280        k.set(source[i][j]);
281        out[i].append(k, v);
282      }
283      out[i].close();
284    }
285    out = null;
286
287    StringBuilder sb = new StringBuilder();
288    sb.append("outer(inner(");
289    for (int i = 0; i < SOURCES; ++i) {
290      sb.append(
291          CompositeInputFormat.compose(SequenceFileInputFormat.class,
292            src[i].toString()));
293      if (i + 1 != SOURCES) sb.append(",");
294    }
295    sb.append("),outer(");
296    sb.append(CompositeInputFormat.compose(Fake_IF.class,"foobar"));
297    sb.append(",");
298    for (int i = 0; i < SOURCES; ++i) {
299      sb.append(
300          CompositeInputFormat.compose(SequenceFileInputFormat.class,
301            src[i].toString()));
302      sb.append(",");
303    }
304    sb.append(CompositeInputFormat.compose(Fake_IF.class,"raboof") + "))");
305    job.set("mapred.join.expr", sb.toString());
306    job.setInputFormat(CompositeInputFormat.class);
307    Path outf = new Path(base, "out");
308    FileOutputFormat.setOutputPath(job, outf);
309    Fake_IF.setKeyClass(job, IntWritable.class);
310    Fake_IF.setValClass(job, IntWritable.class);
311
312    job.setMapperClass(IdentityMapper.class);
313    job.setReducerClass(IdentityReducer.class);
314    job.setNumReduceTasks(0);
315    job.setOutputKeyClass(IntWritable.class);
316    job.setOutputValueClass(TupleWritable.class);
317    job.setOutputFormat(SequenceFileOutputFormat.class);
318    JobClient.runJob(job);
319
320    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
321    assertEquals(1, outlist.length);
322    assertTrue(0 < outlist[0].getLen());
323    SequenceFile.Reader r =
324      new SequenceFile.Reader(cluster.getFileSystem(),
325          outlist[0].getPath(), job);
326    TupleWritable v = new TupleWritable();
327    while (r.next(k, v)) {
328      assertFalse(((TupleWritable)v.get(1)).has(0));
329      assertFalse(((TupleWritable)v.get(1)).has(SOURCES + 1));
330      boolean chk = true;
331      int ki = k.get();
332      for (int i = 2; i < SOURCES + 2; ++i) {
333        if ((ki % i) == 0 && ki <= i * ITEMS) {
334          assertEquals(i - 2, ((IntWritable)
335                              ((TupleWritable)v.get(1)).get((i - 1))).get());
336        } else chk = false;
337      }
338      if (chk) { // present in all sources; chk inner
339        assertTrue(v.has(0));
340        for (int i = 0; i < SOURCES; ++i)
341          assertTrue(((TupleWritable)v.get(0)).has(i));
342      } else { // should not be present in inner join
343        assertFalse(v.has(0));
344      }
345    }
346    r.close();
347    base.getFileSystem(job).delete(base, true);
348
349  }
350
351  public void testEmptyJoin() throws Exception {
352    JobConf job = new JobConf();
353    Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
354    Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
355    job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
356        Fake_IF.class, src));
357    job.setInputFormat(CompositeInputFormat.class);
358    FileOutputFormat.setOutputPath(job, new Path(base, "out"));
359
360    job.setMapperClass(IdentityMapper.class);
361    job.setReducerClass(IdentityReducer.class);
362    job.setOutputKeyClass(IncomparableKey.class);
363    job.setOutputValueClass(NullWritable.class);
364
365    JobClient.runJob(job);
366    base.getFileSystem(job).delete(base, true);
367  }
368
369  public static class Fake_IF<K,V>
370      implements InputFormat<K,V>, JobConfigurable {
371
372    public static class FakeSplit implements InputSplit {
373      public void write(DataOutput out) throws IOException { }
374      public void readFields(DataInput in) throws IOException { }
375      public long getLength() { return 0L; }
376      public String[] getLocations() { return new String[0]; }
377    }
378
379    public static void setKeyClass(JobConf job, Class<?> k) {
380      job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
381    }
382
383    public static void setValClass(JobConf job, Class<?> v) {
384      job.setClass("test.fakeif.valclass", v, Writable.class);
385    }
386
387    private Class<? extends K> keyclass;
388    private Class<? extends V> valclass;
389
390    @SuppressWarnings("unchecked")
391    public void configure(JobConf job) {
392      keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
393    IncomparableKey.class, WritableComparable.class);
394      valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
395    NullWritable.class, WritableComparable.class);
396    }
397
398    public Fake_IF() { }
399
400    public InputSplit[] getSplits(JobConf conf, int splits) {
401      return new InputSplit[] { new FakeSplit() };
402    }
403
404    public RecordReader<K,V> getRecordReader(
405        InputSplit ignored, JobConf conf, Reporter reporter) {
406      return new RecordReader<K,V>() {
407        public boolean next(K key, V value) throws IOException { return false; }
408        public K createKey() {
409          return ReflectionUtils.newInstance(keyclass, null);
410        }
411        public V createValue() {
412          return ReflectionUtils.newInstance(valclass, null);
413        }
414        public long getPos() throws IOException { return 0L; }
415        public void close() throws IOException { }
416        public float getProgress() throws IOException { return 0.0f; }
417      };
418    }
419  }
420}
Note: See TracBrowser for help on using the repository browser.