source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred.lib;
19
20import org.apache.hadoop.fs.FileStatus;
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23import org.apache.hadoop.io.LongWritable;
24import org.apache.hadoop.io.SequenceFile;
25import org.apache.hadoop.io.Text;
26import org.apache.hadoop.mapred.*;
27
28import java.io.BufferedReader;
29import java.io.DataOutputStream;
30import java.io.IOException;
31import java.io.InputStreamReader;
32import java.util.Iterator;
33
34public class TestMultipleOutputs extends HadoopTestCase {
35
36  public TestMultipleOutputs() throws IOException {
37    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
38  }
39
40  public void testWithoutCounters() throws Exception {
41    _testMultipleOutputs(false);
42  }
43
44  public void testWithCounters() throws Exception {
45    _testMultipleOutputs(true);
46  }
47
48  private static final Path ROOT_DIR = new Path("testing/mo");
49  private static final Path IN_DIR = new Path(ROOT_DIR, "input");
50  private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
51
52  private Path getDir(Path dir) {
53    // Hack for local FS that does not have the concept of a 'mounting point'
54    if (isLocalFS()) {
55      String localPathRoot = System.getProperty("test.build.data", "/tmp")
56        .replace(' ', '+');
57      dir = new Path(localPathRoot, dir);
58    }
59    return dir;
60  }
61
62  public void setUp() throws Exception {
63    super.setUp();
64    Path rootDir = getDir(ROOT_DIR);
65    Path inDir = getDir(IN_DIR);
66
67    JobConf conf = createJobConf();
68    FileSystem fs = FileSystem.get(conf);
69    fs.delete(rootDir, true);
70    if (!fs.mkdirs(inDir)) {
71      throw new IOException("Mkdirs failed to create " + inDir.toString());
72    }
73  }
74
75  public void tearDown() throws Exception {
76    Path rootDir = getDir(ROOT_DIR);
77
78    JobConf conf = createJobConf();
79    FileSystem fs = FileSystem.get(conf);
80    fs.delete(rootDir, true);
81    super.tearDown();
82  }
83
84  protected void _testMultipleOutputs(boolean withCounters) throws Exception {
85    Path inDir = getDir(IN_DIR);
86    Path outDir = getDir(OUT_DIR);
87
88    JobConf conf = createJobConf();
89    FileSystem fs = FileSystem.get(conf);
90
91    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
92    file.writeBytes("a\nb\n\nc\nd\ne");
93    file.close();
94
95    file = fs.create(new Path(inDir, "part-1"));
96    file.writeBytes("a\nb\n\nc\nd\ne");
97    file.close();
98
99    conf.setJobName("mo");
100    conf.setInputFormat(TextInputFormat.class);
101
102    conf.setOutputKeyClass(LongWritable.class);
103    conf.setOutputValueClass(Text.class);
104
105    conf.setMapOutputKeyClass(LongWritable.class);
106    conf.setMapOutputValueClass(Text.class);
107
108    conf.setOutputFormat(TextOutputFormat.class);
109    conf.setOutputKeyClass(LongWritable.class);
110    conf.setOutputValueClass(Text.class);
111
112    MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
113      LongWritable.class, Text.class);
114    MultipleOutputs.addMultiNamedOutput(conf, "sequence",
115      SequenceFileOutputFormat.class, LongWritable.class, Text.class);
116
117    MultipleOutputs.setCountersEnabled(conf, withCounters);
118
119    conf.setMapperClass(MOMap.class);
120    conf.setReducerClass(MOReduce.class);
121
122    FileInputFormat.setInputPaths(conf, inDir);
123    FileOutputFormat.setOutputPath(conf, outDir);
124
125    JobClient jc = new JobClient(conf);
126    RunningJob job = jc.submitJob(conf);
127    while (!job.isComplete()) {
128      Thread.sleep(100);
129    }
130
131    // assert number of named output part files
132    int namedOutputCount = 0;
133    FileStatus[] statuses = fs.listStatus(outDir);
134    for (FileStatus status : statuses) {
135      if (status.getPath().getName().equals("text-m-00000") ||
136        status.getPath().getName().equals("text-m-00001") ||
137        status.getPath().getName().equals("text-r-00000") ||
138        status.getPath().getName().equals("sequence_A-m-00000") ||
139        status.getPath().getName().equals("sequence_A-m-00001") ||
140        status.getPath().getName().equals("sequence_B-m-00000") ||
141        status.getPath().getName().equals("sequence_B-m-00001") ||
142        status.getPath().getName().equals("sequence_B-r-00000") ||
143        status.getPath().getName().equals("sequence_C-r-00000")) {
144        namedOutputCount++;
145      }
146    }
147    assertEquals(9, namedOutputCount);
148
149    // assert TextOutputFormat files correctness
150    BufferedReader reader = new BufferedReader(
151      new InputStreamReader(fs.open(
152        new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
153    int count = 0;
154    String line = reader.readLine();
155    while (line != null) {
156      assertTrue(line.endsWith("text"));
157      line = reader.readLine();
158      count++;
159    }
160    reader.close();
161    assertFalse(count == 0);
162
163    // assert SequenceOutputFormat files correctness
164    SequenceFile.Reader seqReader =
165      new SequenceFile.Reader(fs, new Path(FileOutputFormat.getOutputPath(conf),
166        "sequence_B-r-00000"), conf);
167
168    assertEquals(LongWritable.class, seqReader.getKeyClass());
169    assertEquals(Text.class, seqReader.getValueClass());
170
171    count = 0;
172    LongWritable key = new LongWritable();
173    Text value = new Text();
174    while (seqReader.next(key, value)) {
175      assertEquals("sequence", value.toString());
176      count++;
177    }
178    seqReader.close();
179    assertFalse(count == 0);
180
181    Counters.Group counters =
182      job.getCounters().getGroup(MultipleOutputs.class.getName());
183    if (!withCounters) {
184      assertEquals(0, counters.size());
185    }
186    else {
187      assertEquals(4, counters.size());
188      assertEquals(4, counters.getCounter("text"));
189      assertEquals(2, counters.getCounter("sequence_A"));
190      assertEquals(4, counters.getCounter("sequence_B"));
191      assertEquals(2, counters.getCounter("sequence_C"));
192
193    }
194
195  }
196
197  @SuppressWarnings({"unchecked"})
198  public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
199    Text> {
200
201    private MultipleOutputs mos;
202
203    public void configure(JobConf conf) {
204      mos = new MultipleOutputs(conf);
205    }
206
207    public void map(LongWritable key, Text value,
208                    OutputCollector<LongWritable, Text> output,
209                    Reporter reporter)
210      throws IOException {
211      if (!value.toString().equals("a")) {
212        output.collect(key, value);
213      } else {
214        mos.getCollector("text", reporter).collect(key, new Text("text"));
215        mos.getCollector("sequence", "A", reporter).collect(key,
216          new Text("sequence"));
217        mos.getCollector("sequence", "B", reporter).collect(key,
218          new Text("sequence"));
219      }
220    }
221
222    public void close() throws IOException {
223      mos.close();
224    }
225  }
226
227  @SuppressWarnings({"unchecked"})
228  public static class MOReduce implements Reducer<LongWritable, Text,
229    LongWritable, Text> {
230
231    private MultipleOutputs mos;
232
233    public void configure(JobConf conf) {
234      mos = new MultipleOutputs(conf);
235    }
236
237    public void reduce(LongWritable key, Iterator<Text> values,
238                       OutputCollector<LongWritable, Text> output,
239                       Reporter reporter)
240      throws IOException {
241      while (values.hasNext()) {
242        Text value = values.next();
243        if (!value.toString().equals("b")) {
244          output.collect(key, value);
245        } else {
246          mos.getCollector("text", reporter).collect(key, new Text("text"));
247          mos.getCollector("sequence", "B", reporter).collect(key,
248            new Text("sequence"));
249          mos.getCollector("sequence", "C", reporter).collect(key,
250            new Text("sequence"));
251        }
252      }
253    }
254
255    public void close() throws IOException {
256      mos.close();
257    }
258  }
259
260}
Note: See TracBrowser for help on using the repository browser.