source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22
23import junit.extensions.TestSetup;
24import junit.framework.Test;
25import junit.framework.TestCase;
26import junit.framework.TestSuite;
27
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.hdfs.MiniDFSCluster;
30import org.apache.hadoop.io.BytesWritable;
31import org.apache.hadoop.io.Text;
32import org.apache.hadoop.mapred.lib.NullOutputFormat;
33import org.apache.hadoop.fs.FileSystem;
34import org.apache.hadoop.fs.Path;
35import org.apache.hadoop.util.ToolRunner;
36import org.apache.hadoop.examples.RandomWriter;
37import org.apache.hadoop.examples.Sort;
38
39/**
40 * A JUnit test to test the Map-Reduce framework's sort
41 * with a Mini Map-Reduce Cluster with a Mini HDFS Clusters.
42 */
43public class TestMiniMRDFSSort extends TestCase {
44  // Input/Output paths for sort
45  private static final Path SORT_INPUT_PATH = new Path("/sort/input");
46  private static final Path SORT_OUTPUT_PATH = new Path("/sort/output");
47
48  // Knobs to control randomwriter; and hence sort
49  private static final int NUM_HADOOP_SLAVES = 3;
50  // make it big enough to cause a spill in the map
51  private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
52  private static final int RW_MAPS_PER_HOST = 2;
53
54  private static MiniMRCluster mrCluster = null;
55  private static MiniDFSCluster dfsCluster = null;
56  private static FileSystem dfs = null;
57  public static Test suite() {
58    TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) {
59      protected void setUp() throws Exception {
60        Configuration conf = new Configuration();
61        dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
62        dfs = dfsCluster.getFileSystem();
63        mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES, 
64                                      dfs.getUri().toString(), 1);
65      }
66      protected void tearDown() throws Exception {
67        if (dfsCluster != null) { dfsCluster.shutdown(); }
68        if (mrCluster != null) { mrCluster.shutdown(); }
69      }
70    };
71    return setup;
72  }
73
74  private static void runRandomWriter(JobConf job, Path sortInput) 
75  throws Exception {
76    // Scale down the default settings for RandomWriter for the test-case
77    // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
78    job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
79    job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
80    String[] rwArgs = {sortInput.toString()};
81   
82    // Run RandomWriter
83    assertEquals(ToolRunner.run(job, new RandomWriter(), rwArgs), 0);
84  }
85 
86  private static void runSort(JobConf job, Path sortInput, Path sortOutput) 
87  throws Exception {
88
89    job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
90    job.setInt("io.sort.mb", 1);
91    job.setNumMapTasks(12);
92
93    // Setup command-line arguments to 'sort'
94    String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
95   
96    // Run Sort
97    Sort sort = new Sort();
98    assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
99    Counters counters = sort.getResult().getCounters();
100    long mapInput = counters.findCounter(Task.Counter.MAP_INPUT_BYTES
101    ).getValue();
102    long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
103                                         "HDFS_BYTES_READ").getValue();
104    // the hdfs read should be between 100% and 110% of the map input bytes
105    assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
106               (hdfsRead < (mapInput * 1.1)) &&
107               (hdfsRead > mapInput)); 
108  }
109 
110  private static void runSortValidator(JobConf job, 
111                                       Path sortInput, Path sortOutput) 
112  throws Exception {
113    String[] svArgs = {"-sortInput", sortInput.toString(), 
114                       "-sortOutput", sortOutput.toString()};
115
116    // Run Sort-Validator
117    assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
118  }
119 
120  private static class ReuseDetector extends MapReduceBase
121      implements Mapper<BytesWritable,BytesWritable, Text, Text> {
122    static int instances = 0;
123    Reporter reporter = null;
124
125    @Override
126    public void map(BytesWritable key, BytesWritable value,
127                    OutputCollector<Text, Text> output, 
128                    Reporter reporter) throws IOException {
129      this.reporter = reporter;
130    }
131   
132    public void close() throws IOException {
133      reporter.incrCounter("jvm", "use", ++instances);
134    }
135  }
136
137  private static void runJvmReuseTest(JobConf job,
138                                      boolean reuse) throws IOException {
139    // setup a map-only job that reads the input and only sets the counters
140    // based on how many times the jvm was reused.
141    job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
142    FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
143    job.setInputFormat(SequenceFileInputFormat.class);
144    job.setOutputFormat(NullOutputFormat.class);
145    job.setMapperClass(ReuseDetector.class);
146    job.setOutputKeyClass(Text.class);
147    job.setOutputValueClass(Text.class);
148    job.setNumMapTasks(24);
149    job.setNumReduceTasks(0);
150    RunningJob result = JobClient.runJob(job);
151    long uses = result.getCounters().findCounter("jvm", "use").getValue();
152    int maps = job.getNumMapTasks();
153    if (reuse) {
154      assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
155    } else {
156      assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
157    }
158  }
159
160  public void testMapReduceSort() throws Exception {
161    // Run randomwriter to generate input for 'sort'
162    runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH);
163
164    // Run sort
165    runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
166
167    // Run sort-validator to check if sort worked correctly
168    runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH, 
169                     SORT_OUTPUT_PATH);
170  }
171 
172  public void testJvmReuse() throws Exception {
173    runJvmReuseTest(mrCluster.createJobConf(), true);
174  }
175
176  public void testNoJvmReuse() throws Exception {
177    runJvmReuseTest(mrCluster.createJobConf(), false);
178  }
179}
Note: See TracBrowser for help on using the repository browser.