source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred.lib.aggregate;
19
20import org.apache.hadoop.fs.*;
21import org.apache.hadoop.io.*;
22import org.apache.hadoop.mapred.*;
23import org.apache.hadoop.mapred.lib.*;
24import junit.framework.TestCase;
25import java.io.*;
26import java.util.*;
27import java.text.NumberFormat;
28
29public class TestAggregates extends TestCase {
30
31  private static NumberFormat idFormat = NumberFormat.getInstance();
32    static {
33      idFormat.setMinimumIntegerDigits(4);
34      idFormat.setGroupingUsed(false);
35  }
36
37
38  public void testAggregates() throws Exception {
39    launch();
40  }
41
42  public static void launch() throws Exception {
43    JobConf conf = new JobConf(TestAggregates.class);
44    FileSystem fs = FileSystem.get(conf);
45    int numOfInputLines = 20;
46
47    Path OUTPUT_DIR = new Path("build/test/output_for_aggregates_test");
48    Path INPUT_DIR = new Path("build/test/input_for_aggregates_test");
49    String inputFile = "input.txt";
50    fs.delete(INPUT_DIR, true);
51    fs.mkdirs(INPUT_DIR);
52    fs.delete(OUTPUT_DIR, true);
53
54    StringBuffer inputData = new StringBuffer();
55    StringBuffer expectedOutput = new StringBuffer();
56    expectedOutput.append("max\t19\n");
57    expectedOutput.append("min\t1\n"); 
58
59    FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
60    for (int i = 1; i < numOfInputLines; i++) {
61      expectedOutput.append("count_").append(idFormat.format(i));
62      expectedOutput.append("\t").append(i).append("\n");
63
64      inputData.append(idFormat.format(i));
65      for (int j = 1; j < i; j++) {
66        inputData.append(" ").append(idFormat.format(i));
67      }
68      inputData.append("\n");
69    }
70    expectedOutput.append("value_as_string_max\t9\n");
71    expectedOutput.append("value_as_string_min\t1\n");
72    expectedOutput.append("uniq_count\t15\n");
73
74
75    fileOut.write(inputData.toString().getBytes("utf-8"));
76    fileOut.close();
77
78    System.out.println("inputData:");
79    System.out.println(inputData.toString());
80    JobConf job = new JobConf(conf, TestAggregates.class);
81    FileInputFormat.setInputPaths(job, INPUT_DIR);
82    job.setInputFormat(TextInputFormat.class);
83
84    FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
85    job.setOutputFormat(TextOutputFormat.class);
86    job.setMapOutputKeyClass(Text.class);
87    job.setMapOutputValueClass(Text.class);
88    job.setOutputKeyClass(Text.class);
89    job.setOutputValueClass(Text.class);
90    job.setNumReduceTasks(1);
91
92    job.setMapperClass(ValueAggregatorMapper.class);
93    job.setReducerClass(ValueAggregatorReducer.class);
94    job.setCombinerClass(ValueAggregatorCombiner.class);
95
96    job.setInt("aggregator.descriptor.num", 1);
97    job.set("aggregator.descriptor.0", 
98          "UserDefined,org.apache.hadoop.mapred.lib.aggregate.AggregatorTests");
99    job.setLong("aggregate.max.num.unique.values", 14);
100
101    JobClient.runJob(job);
102
103    //
104    // Finally, we compare the reconstructed answer key with the
105    // original one.  Remember, we need to ignore zero-count items
106    // in the original key.
107    //
108    boolean success = true;
109    Path outPath = new Path(OUTPUT_DIR, "part-00000");
110    String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
111    System.out.println("full out data:");
112    System.out.println(outdata.toString());
113    outdata = outdata.substring(0, expectedOutput.toString().length());
114
115    assertEquals(expectedOutput.toString(),outdata);
116    //fs.delete(OUTPUT_DIR);
117    fs.delete(INPUT_DIR, true);
118  }
119
120  /**
121   * Launches all the tasks in order.
122   */
123  public static void main(String[] argv) throws Exception {
124    launch();
125  }
126}
Note: See TracBrowser for help on using the repository browser.