source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java

Last change on this file was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.File;
22import java.io.FileWriter;
23import java.io.Writer;
24import java.io.BufferedWriter;
25import java.io.IOException;
26
27import junit.framework.TestCase;
28
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.FileSystem;
31import org.apache.hadoop.fs.Path;
32import org.apache.hadoop.io.IntWritable;
33import org.apache.hadoop.io.Text;
34
35/**
36 * This is an wordcount application that tests the count of records
37 * got spilled to disk. It generates simple text input files. Then
38 * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
39 * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
40 * and 1 reduce) and verifies counters. Wordcount application reads the
41 * text input files, breaks each line into words and counts them. The output
42 * is a locally sorted list of words and the count of how often they occurred.
43 *
44 */
45public class TestSpilledRecordsCounter extends TestCase {
46
47  private void validateCounters(Counters counter, long spillRecCnt) {
48      // Check if the numer of Spilled Records is same as expected
49      assertEquals(counter.findCounter(Task.Counter.SPILLED_RECORDS).
50                     getCounter(), spillRecCnt);
51  }
52
53  private void createWordsFile(File inpFile) throws Exception {
54    Writer out = new BufferedWriter(new FileWriter(inpFile));
55    try {
56      // 500*4 unique words --- repeated 5 times => 5*2K words
57      int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
58
59      for (int i = 0; i < REPLICAS; i++) {
60        for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
61          out.write("word" + j + " word" + (j+1) + " word" + (j+2) + " word" + (j+3) + '\n');
62        }
63      }
64    } finally {
65      out.close();
66    }
67  }
68
69
70  /**
71   * The main driver for word count map/reduce program.
72   * Invoke this method to submit the map/reduce job.
73   * @throws IOException When there is communication problems with the
74   *                     job tracker.
75   */
76  public void testSpillCounter() throws Exception {
77    JobConf conf = new JobConf(TestSpilledRecordsCounter.class);
78    conf.setJobName("wordcountSpilledRecordsCounter");
79
80    // the keys are words (strings)
81    conf.setOutputKeyClass(Text.class);
82    // the values are counts (ints)
83    conf.setOutputValueClass(IntWritable.class);
84
85    conf.setMapperClass(WordCount.MapClass.class);
86    conf.setCombinerClass(WordCount.Reduce.class);
87    conf.setReducerClass(WordCount.Reduce.class);
88
89    conf.setNumMapTasks(3);
90    conf.setNumReduceTasks(1);
91    conf.setInt("io.sort.mb", 1);
92    conf.setInt("io.sort.factor", 2);
93    conf.set("io.sort.record.percent", "0.05");
94    conf.set("io.sort.spill.percent", "0.80");
95
96
97    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
98                                      File.separator + "tmp"))
99                               .toString().replace(' ', '+');
100    conf.set("test.build.data", TEST_ROOT_DIR);
101    String IN_DIR = TEST_ROOT_DIR + File.separator +
102                      "spilledRecords.countertest" +  File.separator +
103                      "genins" + File.separator;
104    String OUT_DIR = TEST_ROOT_DIR + File.separator +
105                      "spilledRecords.countertest" + File.separator;
106
107    FileSystem fs = FileSystem.get(conf);
108    Path testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
109    try {
110      if (fs.exists(testdir)) {
111        fs.delete(testdir, true);
112      }
113      if (!fs.mkdirs(testdir)) {
114        throw new IOException("Mkdirs failed to create " + testdir.toString());
115      }
116
117      Path wordsIns = new Path(testdir, "genins");
118      if (!fs.mkdirs(wordsIns)) {
119        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
120      }
121
122      //create 3 input files each with 5*2k words
123      File inpFile = new File(IN_DIR + "input5_2k_1");
124      createWordsFile(inpFile);
125      inpFile = new File(IN_DIR + "input5_2k_2");
126      createWordsFile(inpFile);
127      inpFile = new File(IN_DIR + "input5_2k_3");
128      createWordsFile(inpFile);
129
130      FileInputFormat.setInputPaths(conf, IN_DIR);
131      Path outputPath1=new Path(OUT_DIR, "output5_2k_3");
132      FileOutputFormat.setOutputPath(conf, outputPath1);
133
134      RunningJob myJob = JobClient.runJob(conf);
135      Counters c1 = myJob.getCounters();
136      // 3maps & in each map, 4 first level spills --- So total 12.
137      // spilled records count:
138      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
139      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
140      //           So total 8k+8k+2k=18k
141      // For 3 Maps, total = 3*18=54k
142      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
143      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
144      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
145      //         So 0 records spilled to disk in 3rd level)
146      //         So total of 6k+4k=10k
147      // Total job counter will be 54k+10k = 64k
148      validateCounters(c1, 64000);
149
150      //create 4th input file each with 5*2k words and test with 4 maps
151      inpFile = new File(IN_DIR + "input5_2k_4");
152      createWordsFile(inpFile);
153      conf.setNumMapTasks(4);
154      Path outputPath2=new Path(OUT_DIR, "output5_2k_4");
155      FileOutputFormat.setOutputPath(conf, outputPath2);
156
157      myJob = JobClient.runJob(conf);
158      c1 = myJob.getCounters();
159      // 4maps & in each map 4 first level spills --- So total 16.
160      // spilled records count:
161      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
162      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
163      //           So total 8k+8k+2k=18k
164      // For 3 Maps, total = 4*18=72k
165      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
166      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
167      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
168      //         So 0 records spilled to disk in 3rd level)
169      //         So total of 8k+8k=16k
170      // Total job counter will be 72k+16k = 88k
171      validateCounters(c1, 88000);
172    } finally {
173      //clean up the input and output files
174      if (fs.exists(testdir)) {
175        fs.delete(testdir, true);
176      }
177    }
178  }
179}
Note: See TracBrowser for help on using the repository browser.