source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.BufferedReader;
22import java.io.DataOutputStream;
23import java.io.File;
24import java.io.IOException;
25import java.io.InputStreamReader;
26import java.util.ArrayList;
27import java.util.Arrays;
28import java.util.List;
29
30import junit.framework.TestCase;
31
32import org.apache.commons.logging.Log;
33import org.apache.commons.logging.LogFactory;
34import org.apache.hadoop.conf.Configuration;
35import org.apache.hadoop.hdfs.MiniDFSCluster;
36import org.apache.hadoop.hdfs.server.namenode.NameNode;
37import org.apache.hadoop.fs.FileSystem;
38import org.apache.hadoop.fs.FileUtil;
39import org.apache.hadoop.fs.Path;
40import org.apache.hadoop.io.IntWritable;
41import org.apache.hadoop.io.Text;
42
43/**
44 * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
45 */
46public class TestMiniMRWithDFS extends TestCase {
47  private static final Log LOG =
48    LogFactory.getLog(TestMiniMRWithDFS.class.getName());
49 
50  static final int NUM_MAPS = 10;
51  static final int NUM_SAMPLES = 100000;
52 
53  public static class TestResult {
54    public String output;
55    public RunningJob job;
56    TestResult(RunningJob job, String output) {
57      this.job = job;
58      this.output = output;
59    }
60  }
61  public static TestResult launchWordCount(JobConf conf,
62                                           Path inDir,
63                                           Path outDir,
64                                           String input,
65                                           int numMaps,
66                                           int numReduces) throws IOException {
67    FileSystem inFs = inDir.getFileSystem(conf);
68    FileSystem outFs = outDir.getFileSystem(conf);
69    outFs.delete(outDir, true);
70    if (!inFs.mkdirs(inDir)) {
71      throw new IOException("Mkdirs failed to create " + inDir.toString());
72    }
73    {
74      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
75      file.writeBytes(input);
76      file.close();
77    }
78    conf.setJobName("wordcount");
79    conf.setInputFormat(TextInputFormat.class);
80   
81    // the keys are words (strings)
82    conf.setOutputKeyClass(Text.class);
83    // the values are counts (ints)
84    conf.setOutputValueClass(IntWritable.class);
85   
86    conf.setMapperClass(WordCount.MapClass.class);       
87    conf.setCombinerClass(WordCount.Reduce.class);
88    conf.setReducerClass(WordCount.Reduce.class);
89    FileInputFormat.setInputPaths(conf, inDir);
90    FileOutputFormat.setOutputPath(conf, outDir);
91    conf.setNumMapTasks(numMaps);
92    conf.setNumReduceTasks(numReduces);
93    RunningJob job = JobClient.runJob(conf);
94    return new TestResult(job, readOutput(outDir, conf));
95  }
96
97  public static String readOutput(Path outDir, 
98                                  JobConf conf) throws IOException {
99    FileSystem fs = outDir.getFileSystem(conf);
100    StringBuffer result = new StringBuffer();
101    {
102     
103      Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
104                                   new OutputLogFilter()));
105      for(int i=0; i < fileList.length; ++i) {
106        LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
107        BufferedReader file = 
108          new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
109        String line = file.readLine();
110        while (line != null) {
111          result.append(line);
112          result.append("\n");
113          line = file.readLine();
114        }
115        file.close();
116      }
117    }
118    return result.toString();
119  }
120 
121  /**
122   * Make sure that there are exactly the directories that we expect to find.
123   * @param mr the map-reduce cluster
124   * @param taskDirs the task ids that should be present
125   */
126  static void checkTaskDirectories(MiniMRCluster mr,
127                                           String[] jobIds,
128                                           String[] taskDirs) {
129    mr.waitUntilIdle();
130    int trackers = mr.getNumTaskTrackers();
131    List<String> neededDirs = new ArrayList<String>(Arrays.asList(taskDirs));
132    boolean[] found = new boolean[taskDirs.length];
133    for(int i=0; i < trackers; ++i) {
134      int numNotDel = 0;
135      File localDir = new File(mr.getTaskTrackerLocalDir(i));
136      LOG.debug("Tracker directory: " + localDir);
137      File trackerDir = new File(localDir, "taskTracker");
138      assertTrue("local dir " + localDir + " does not exist.", 
139                 localDir.isDirectory());
140      assertTrue("task tracker dir " + trackerDir + " does not exist.", 
141                 trackerDir.isDirectory());
142      String contents[] = localDir.list();
143      String trackerContents[] = trackerDir.list();
144      for(int j=0; j < contents.length; ++j) {
145        System.out.println("Local " + localDir + ": " + contents[j]);
146      }
147      for(int j=0; j < trackerContents.length; ++j) {
148        System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
149      }
150      for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
151        String name = contents[fileIdx];
152        if (!("taskTracker".equals(contents[fileIdx]))) {
153          LOG.debug("Looking at " + name);
154          assertTrue("Spurious directory " + name + " found in " +
155                     localDir, false);
156        }
157      }
158      for (int idx = 0; idx < neededDirs.size(); ++idx) {
159        String name = neededDirs.get(idx);
160        if (new File(new File(new File(trackerDir, "jobcache"),
161                              jobIds[idx]), name).isDirectory()) {
162          found[idx] = true;
163          numNotDel++;
164        } 
165      }
166    }
167    for(int i=0; i< found.length; i++) {
168      assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
169    }
170  }
171
172  public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
173    LOG.info("runPI");
174    double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
175        NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
176    double error = Math.abs(Math.PI - estimate);
177    assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
178    checkTaskDirectories(mr, new String[]{}, new String[]{});
179  }
180
181  public static void runWordCount(MiniMRCluster mr, JobConf jobConf) 
182  throws IOException {
183    LOG.info("runWordCount");
184    // Run a word count example
185    // Keeping tasks that match this pattern
186    String pattern = 
187      TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null);
188    jobConf.setKeepTaskFilesPattern(pattern);
189    TestResult result;
190    final Path inDir = new Path("./wc/input");
191    final Path outDir = new Path("./wc/output");
192    String input = "The quick brown fox\nhas many silly\nred fox sox\n";
193    result = launchWordCount(jobConf, inDir, outDir, input, 3, 1);
194    assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
195                 "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
196    JobID jobid = result.job.getID();
197    TaskAttemptID taskid = new TaskAttemptID(new TaskID(jobid, true, 1),0);
198    checkTaskDirectories(mr, new String[]{jobid.toString()}, 
199                         new String[]{taskid.toString()});
200    // test with maps=0
201    jobConf = mr.createJobConf();
202    input = "owen is oom";
203    result = launchWordCount(jobConf, inDir, outDir, input, 0, 1);
204    assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
205    Counters counters = result.job.getCounters();
206    long hdfsRead = 
207      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
208          Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
209    long hdfsWrite = 
210      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
211          Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
212    assertEquals(result.output.length(), hdfsWrite);
213    assertEquals(input.length(), hdfsRead);
214
215    // Run a job with input and output going to localfs even though the
216    // default fs is hdfs.
217    {
218      FileSystem localfs = FileSystem.getLocal(jobConf);
219      String TEST_ROOT_DIR =
220        new File(System.getProperty("test.build.data","/tmp"))
221        .toString().replace(' ', '+');
222      Path localIn = localfs.makeQualified
223                        (new Path(TEST_ROOT_DIR + "/local/in"));
224      Path localOut = localfs.makeQualified
225                        (new Path(TEST_ROOT_DIR + "/local/out"));
226      result = launchWordCount(jobConf, localIn, localOut,
227                               "all your base belong to us", 1, 1);
228      assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
229                   result.output);
230      assertTrue("outputs on localfs", localfs.exists(localOut));
231
232    }
233  }
234
235  public void testWithDFS() throws IOException {
236    MiniDFSCluster dfs = null;
237    MiniMRCluster mr = null;
238    FileSystem fileSys = null;
239    try {
240      final int taskTrackers = 4;
241
242      Configuration conf = new Configuration();
243      dfs = new MiniDFSCluster(conf, 4, true, null);
244      fileSys = dfs.getFileSystem();
245      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
246
247      runPI(mr, mr.createJobConf());
248      runWordCount(mr, mr.createJobConf());
249    } finally {
250      if (dfs != null) { dfs.shutdown(); }
251      if (mr != null) { mr.shutdown();
252      }
253    }
254  }
255 
256  public void testWithDFSWithDefaultPort() throws IOException {
257    MiniDFSCluster dfs = null;
258    MiniMRCluster mr = null;
259    FileSystem fileSys = null;
260    try {
261      final int taskTrackers = 4;
262
263      Configuration conf = new Configuration();
264      // start a dfs with the default port number
265      dfs = new MiniDFSCluster(
266          NameNode.DEFAULT_PORT, conf, 4, true, true, null, null);
267      fileSys = dfs.getFileSystem();
268      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
269
270      JobConf jobConf = mr.createJobConf();
271      TestResult result;
272      final Path inDir = new Path("./wc/input");
273      final Path outDir = new Path("hdfs://" +
274          dfs.getNameNode().getNameNodeAddress().getHostName() +
275          ":" + NameNode.DEFAULT_PORT +"/./wc/output");
276      String input = "The quick brown fox\nhas many silly\nred fox sox\n";
277      result = launchWordCount(jobConf, inDir, outDir, input, 3, 1);
278      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
279                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
280      final Path outDir2 = new Path("hdfs:/test/wc/output2");
281      jobConf.set("fs.default.name", "hdfs://localhost:" + NameNode.DEFAULT_PORT);
282      result = launchWordCount(jobConf, inDir, outDir2, input, 3, 1);
283      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
284                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
285    } catch (java.net.BindException be) {
286      LOG.info("Skip the test this time because can not start namenode on port "
287          + NameNode.DEFAULT_PORT, be);
288    } finally {
289      if (dfs != null) { dfs.shutdown(); }
290      if (mr != null) { mr.shutdown();
291      }
292    }
293  }
294}
Note: See TracBrowser for help on using the repository browser.