source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestReduceTask.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21
22import junit.framework.TestCase;
23
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.LocalFileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.io.Text;
29import org.apache.hadoop.io.WritableComparator;
30import org.apache.hadoop.io.compress.CompressionCodec;
31import org.apache.hadoop.io.compress.DefaultCodec;
32import org.apache.hadoop.util.Progressable;
33
34/**
35 * This test exercises the ValueIterator.
36 */
37public class TestReduceTask extends TestCase {
38
39  static class NullProgress implements Progressable {
40    public void progress() { }
41  }
42
43  private static class Pair {
44    String key;
45    String value;
46    Pair(String k, String v) {
47      key = k;
48      value = v;
49    }
50  }
51  private static Pair[][] testCases =
52    new Pair[][]{
53      new Pair[]{
54                 new Pair("k1", "v1"),
55                 new Pair("k2", "v2"),
56                 new Pair("k3", "v3"),
57                 new Pair("k3", "v4"),
58                 new Pair("k4", "v5"),
59                 new Pair("k5", "v6"),
60      },
61      new Pair[]{
62                 new Pair("", "v1"),
63                 new Pair("k1", "v2"),
64                 new Pair("k2", "v3"),
65                 new Pair("k2", "v4"),
66      },
67      new Pair[] {},
68      new Pair[]{
69                 new Pair("k1", "v1"),
70                 new Pair("k1", "v2"),
71                 new Pair("k1", "v3"),
72                 new Pair("k1", "v4"),
73      }
74    };
75 
76  public void runValueIterator(Path tmpDir, Pair[] vals, 
77                               Configuration conf, 
78                               CompressionCodec codec) throws IOException {
79    FileSystem localFs = FileSystem.getLocal(conf);
80    FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
81    Path path = new Path(tmpDir, "data.in");
82    IFile.Writer<Text, Text> writer = 
83      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
84                                   codec, null);
85    for(Pair p: vals) {
86      writer.append(new Text(p.key), new Text(p.value));
87    }
88    writer.close();
89   
90    @SuppressWarnings("unchecked")
91    RawKeyValueIterator rawItr = 
92      Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
93                   false, conf.getInt("io.sort.factor", 100), tmpDir, 
94                   new Text.Comparator(), new NullProgress(),null,null);
95    @SuppressWarnings("unchecked") // WritableComparators are not generic
96    ReduceTask.ValuesIterator valItr = 
97      new ReduceTask.ValuesIterator<Text,Text>(rawItr,
98          WritableComparator.get(Text.class), Text.class, Text.class,
99          conf, new NullProgress());
100    int i = 0;
101    while (valItr.more()) {
102      Object key = valItr.getKey();
103      String keyString = key.toString();
104      // make sure it matches!
105      assertEquals(vals[i].key, keyString);
106      // must have at least 1 value!
107      assertTrue(valItr.hasNext());
108      while (valItr.hasNext()) {
109        String valueString = valItr.next().toString();
110        // make sure the values match
111        assertEquals(vals[i].value, valueString);
112        // make sure the keys match
113        assertEquals(vals[i].key, valItr.getKey().toString());
114        i += 1;
115      }
116      // make sure the key hasn't changed under the hood
117      assertEquals(keyString, valItr.getKey().toString());
118      valItr.nextKey();
119    }
120    assertEquals(vals.length, i);
121    // make sure we have progress equal to 1.0
122    assertEquals(1.0f, rawItr.getProgress().get());
123  }
124
125  public void testValueIterator() throws Exception {
126    Path tmpDir = new Path("build/test/test.reduce.task");
127    Configuration conf = new Configuration();
128    for (Pair[] testCase: testCases) {
129      runValueIterator(tmpDir, testCase, conf, null);
130    }
131  }
132 
133  public void testValueIteratorWithCompression() throws Exception {
134    Path tmpDir = new Path("build/test/test.reduce.task.compression");
135    Configuration conf = new Configuration();
136    DefaultCodec codec = new DefaultCodec();
137    codec.setConf(conf);
138    for (Pair[] testCase: testCases) {
139      runValueIterator(tmpDir, testCase, conf, codec);
140    }
141  }
142}
Note: See TracBrowser for help on using the repository browser.