source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred.lib;
19
20import org.apache.hadoop.fs.FileSystem;
21import org.apache.hadoop.fs.Path;
22import org.apache.hadoop.io.LongWritable;
23import org.apache.hadoop.io.Text;
24import org.apache.hadoop.mapred.*;
25
26import java.io.DataOutputStream;
27import java.io.IOException;
28import java.util.Iterator;
29
30public class TestChainMapReduce extends HadoopTestCase {
31
32  private static Path getFlagDir(boolean local) {
33    Path flagDir = new Path("testing/chain/flags");
34
35    // Hack for local FS that does not have the concept of a 'mounting point'
36    if (local) {
37      String localPathRoot = System.getProperty("test.build.data", "/tmp")
38        .replace(' ', '+');
39      flagDir = new Path(localPathRoot, flagDir);
40    }
41    return flagDir;
42  }
43
44  private static void cleanFlags(JobConf conf) throws IOException {
45    FileSystem fs = FileSystem.get(conf);
46    fs.delete(getFlagDir(conf.getBoolean("localFS", true)), true);
47    fs.mkdirs(getFlagDir(conf.getBoolean("localFS", true)));
48  }
49
50  private static void writeFlag(JobConf conf, String flag) throws IOException {
51    FileSystem fs = FileSystem.get(conf);
52    if (getFlag(conf, flag)) {
53      fail("Flag " + flag + " already exists");
54    }
55    DataOutputStream file =
56      fs.create(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
57    file.close();
58  }
59
60  private static boolean getFlag(JobConf conf, String flag) throws IOException {
61    FileSystem fs = FileSystem.get(conf);
62    return fs
63      .exists(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
64  }
65
66  public TestChainMapReduce() throws IOException {
67    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
68  }
69
70  public void testChain() throws Exception {
71    Path inDir = new Path("testing/chain/input");
72    Path outDir = new Path("testing/chain/output");
73
74    // Hack for local FS that does not have the concept of a 'mounting point'
75    if (isLocalFS()) {
76      String localPathRoot = System.getProperty("test.build.data", "/tmp")
77        .replace(' ', '+');
78      inDir = new Path(localPathRoot, inDir);
79      outDir = new Path(localPathRoot, outDir);
80    }
81
82
83    JobConf conf = createJobConf();
84    conf.setBoolean("localFS", isLocalFS());
85
86    cleanFlags(conf);
87
88    FileSystem fs = FileSystem.get(conf);
89
90    fs.delete(outDir, true);
91    if (!fs.mkdirs(inDir)) {
92      throw new IOException("Mkdirs failed to create " + inDir.toString());
93    }
94
95    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
96    file.writeBytes("1\n2\n");
97    file.close();
98
99    conf.setJobName("chain");
100    conf.setInputFormat(TextInputFormat.class);
101    conf.setOutputFormat(TextOutputFormat.class);
102
103    conf.set("a", "X");
104
105    JobConf mapAConf = new JobConf(false);
106    mapAConf.set("a", "A");
107    ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
108                          LongWritable.class, Text.class, true, mapAConf);
109
110    ChainMapper.addMapper(conf, BMap.class, LongWritable.class, Text.class,
111                          LongWritable.class, Text.class, false, null);
112
113    JobConf reduceConf = new JobConf(false);
114    reduceConf.set("a", "C");
115    ChainReducer.setReducer(conf, CReduce.class, LongWritable.class, Text.class,
116                            LongWritable.class, Text.class, true, reduceConf);
117
118    ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
119                           LongWritable.class, Text.class, false, null);
120
121    JobConf mapEConf = new JobConf(false);
122    mapEConf.set("a", "E");
123    ChainReducer.addMapper(conf, EMap.class, LongWritable.class, Text.class,
124                           LongWritable.class, Text.class, true, mapEConf);
125
126    FileInputFormat.setInputPaths(conf, inDir);
127    FileOutputFormat.setOutputPath(conf, outDir);
128
129    JobClient jc = new JobClient(conf);
130    RunningJob job = jc.submitJob(conf);
131    while (!job.isComplete()) {
132      Thread.sleep(100);
133    }
134
135    assertTrue(getFlag(conf, "configure.A"));
136    assertTrue(getFlag(conf, "configure.B"));
137    assertTrue(getFlag(conf, "configure.C"));
138    assertTrue(getFlag(conf, "configure.D"));
139    assertTrue(getFlag(conf, "configure.E"));
140
141    assertTrue(getFlag(conf, "map.A.value.1"));
142    assertTrue(getFlag(conf, "map.A.value.2"));
143    assertTrue(getFlag(conf, "map.B.value.1"));
144    assertTrue(getFlag(conf, "map.B.value.2"));
145    assertTrue(getFlag(conf, "reduce.C.value.2"));
146    assertTrue(getFlag(conf, "reduce.C.value.1"));
147    assertTrue(getFlag(conf, "map.D.value.1"));
148    assertTrue(getFlag(conf, "map.D.value.2"));
149    assertTrue(getFlag(conf, "map.E.value.1"));
150    assertTrue(getFlag(conf, "map.E.value.2"));
151
152    assertTrue(getFlag(conf, "close.A"));
153    assertTrue(getFlag(conf, "close.B"));
154    assertTrue(getFlag(conf, "close.C"));
155    assertTrue(getFlag(conf, "close.D"));
156    assertTrue(getFlag(conf, "close.E"));
157  }
158
159  public static class AMap extends IDMap {
160    public AMap() {
161      super("A", "A", true);
162    }
163  }
164
165  public static class BMap extends IDMap {
166    public BMap() {
167      super("B", "X", false);
168    }
169  }
170
171  public static class CReduce extends IDReduce {
172    public CReduce() {
173      super("C", "C");
174    }
175  }
176
177  public static class DMap extends IDMap {
178    public DMap() {
179      super("D", "X", false);
180    }
181  }
182
183  public static class EMap extends IDMap {
184    public EMap() {
185      super("E", "E", true);
186    }
187  }
188
189  public static class IDMap
190    implements Mapper<LongWritable, Text, LongWritable, Text> {
191    private JobConf conf;
192    private String name;
193    private String prop;
194    private boolean byValue;
195
196    public IDMap(String name, String prop, boolean byValue) {
197      this.name = name;
198      this.prop = prop;
199      this.byValue = byValue;
200    }
201
202    public void configure(JobConf conf) {
203      this.conf = conf;
204      assertEquals(prop, conf.get("a"));
205      try {
206        writeFlag(conf, "configure." + name);
207      } catch (IOException ex) {
208        throw new RuntimeException(ex);
209      }
210    }
211
212    public void map(LongWritable key, Text value,
213                    OutputCollector<LongWritable, Text> output,
214                    Reporter reporter) throws IOException {
215      writeFlag(conf, "map." + name + ".value." + value);
216      key.set(10);
217      output.collect(key, value);
218      if (byValue) {
219        assertEquals(10, key.get());
220      } else {
221        assertNotSame(10, key.get());
222      }
223      key.set(11);
224    }
225
226    public void close() throws IOException {
227      try {
228        writeFlag(conf, "close." + name);
229      } catch (IOException ex) {
230        throw new RuntimeException(ex);
231      }
232    }
233  }
234
235  public static class IDReduce
236    implements Reducer<LongWritable, Text, LongWritable, Text> {
237
238    private JobConf conf;
239    private String name;
240    private String prop;
241    private boolean byValue = false;
242
243    public IDReduce(String name, String prop) {
244      this.name = name;
245      this.prop = prop;
246    }
247
248    public void configure(JobConf conf) {
249      this.conf = conf;
250      assertEquals(prop, conf.get("a"));
251      try {
252        writeFlag(conf, "configure." + name);
253      } catch (IOException ex) {
254        throw new RuntimeException(ex);
255      }
256    }
257
258    public void reduce(LongWritable key, Iterator<Text> values,
259                       OutputCollector<LongWritable, Text> output,
260                       Reporter reporter) throws IOException {
261      while (values.hasNext()) {
262        Text value = values.next();
263        writeFlag(conf, "reduce." + name + ".value." + value);
264        key.set(10);
265        output.collect(key, value);
266        if (byValue) {
267          assertEquals(10, key.get());
268        } else {
269          assertNotSame(10, key.get());
270        }
271        key.set(11);
272      }
273    }
274
275    public void close() throws IOException {
276      try {
277        writeFlag(conf, "close." + name);
278      } catch (IOException ex) {
279        throw new RuntimeException(ex);
280      }
281    }
282  }
283
284}
Note: See TracBrowser for help on using the repository browser.