source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestReduceFetch.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.util.Arrays;
23
24import junit.framework.Test;
25import junit.framework.TestCase;
26import junit.framework.TestSuite;
27import junit.extensions.TestSetup;
28
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.FileSystem;
31import org.apache.hadoop.fs.Path;
32import org.apache.hadoop.hdfs.MiniDFSCluster;
33import org.apache.hadoop.io.NullWritable;
34import org.apache.hadoop.io.Text;
35import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
36import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
37import org.apache.hadoop.mapred.lib.IdentityReducer;
38
39public class TestReduceFetch extends TestCase {
40
41  private static MiniMRCluster mrCluster = null;
42  private static MiniDFSCluster dfsCluster = null;
43  public static Test suite() {
44    TestSetup setup = new TestSetup(new TestSuite(TestReduceFetch.class)) {
45      protected void setUp() throws Exception {
46        Configuration conf = new Configuration();
47        dfsCluster = new MiniDFSCluster(conf, 2, true, null);
48        mrCluster = new MiniMRCluster(2,
49            dfsCluster.getFileSystem().getUri().toString(), 1);
50      }
51      protected void tearDown() throws Exception {
52        if (dfsCluster != null) { dfsCluster.shutdown(); }
53        if (mrCluster != null) { mrCluster.shutdown(); }
54      }
55    };
56    return setup;
57  }
58
59  public static class MapMB
60      implements Mapper<NullWritable,NullWritable,Text,Text> {
61
62    public void map(NullWritable nk, NullWritable nv,
63        OutputCollector<Text, Text> output, Reporter reporter)
64        throws IOException {
65      Text key = new Text();
66      Text val = new Text();
67      key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
68      byte[] b = new byte[1000];
69      Arrays.fill(b, (byte)'V');
70      val.set(b);
71      b = null;
72      for (int i = 0; i < 4 * 1024; ++i) {
73        output.collect(key, val);
74      }
75    }
76    public void configure(JobConf conf) { }
77    public void close() throws IOException { }
78  }
79
80  public static Counters runJob(JobConf conf) throws Exception {
81    conf.setMapperClass(MapMB.class);
82    conf.setReducerClass(IdentityReducer.class);
83    conf.setOutputKeyClass(Text.class);
84    conf.setOutputValueClass(Text.class);
85    conf.setNumReduceTasks(1);
86    conf.setInputFormat(FakeIF.class);
87    FileInputFormat.setInputPaths(conf, new Path("/in"));
88    final Path outp = new Path("/out");
89    FileOutputFormat.setOutputPath(conf, outp);
90    RunningJob job = null;
91    try {
92      job = JobClient.runJob(conf);
93      assertTrue(job.isSuccessful());
94    } finally {
95      FileSystem fs = dfsCluster.getFileSystem();
96      if (fs.exists(outp)) {
97        fs.delete(outp, true);
98      }
99    }
100    return job.getCounters();
101  }
102
103  public void testReduceFromDisk() throws Exception {
104    JobConf job = mrCluster.createJobConf();
105    job.set("mapred.job.reduce.input.buffer.percent", "0.0");
106    job.setNumMapTasks(3);
107    Counters c = runJob(job);
108    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
109        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
110    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
111        Task.getFileSystemCounterNames("file")[0]).getCounter();
112    assertTrue("Expected more bytes read from local (" +
113        localRead + ") than written to HDFS (" + hdfsWritten + ")",
114        hdfsWritten <= localRead);
115  }
116
117  public void testReduceFromPartialMem() throws Exception {
118    JobConf job = mrCluster.createJobConf();
119    job.setNumMapTasks(5);
120    job.setInt("mapred.inmem.merge.threshold", 0);
121    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
122    job.setInt("mapred.reduce.parallel.copies", 1);
123    job.setInt("io.sort.mb", 10);
124    job.set("mapred.child.java.opts", "-Xmx128m");
125    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
126    job.setNumTasksToExecutePerJvm(1);
127    job.set("mapred.job.shuffle.merge.percent", "1.0");
128    Counters c = runJob(job);
129    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
130        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
131    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
132        Task.getFileSystemCounterNames("file")[0]).getCounter();
133    assertTrue("Expected at least 1MB fewer bytes read from local (" +
134        localRead + ") than written to HDFS (" + hdfsWritten + ")",
135        hdfsWritten >= localRead + 1024 * 1024);
136  }
137
138  public void testReduceFromMem() throws Exception {
139    JobConf job = mrCluster.createJobConf();
140    job.set("mapred.job.reduce.input.buffer.percent", "1.0");
141    job.setNumMapTasks(3);
142    Counters c = runJob(job);
143    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
144        Task.getFileSystemCounterNames("file")[0]).getCounter();
145    assertTrue("Non-zero read from local: " + localRead, localRead == 0);
146  }
147
148}
Note: See TracBrowser for help on using the repository browser.