source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.DataOutputStream;
22import java.io.IOException;
23
24import junit.framework.TestCase;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.apache.hadoop.hdfs.MiniDFSCluster;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.io.IntWritable;
32import org.apache.hadoop.io.Text;
33
34/**
35 * A JUnit test to test Job System Directory with Mini-DFS.
36 */
37public class TestJobSysDirWithDFS extends TestCase {
38  private static final Log LOG =
39    LogFactory.getLog(TestJobSysDirWithDFS.class.getName());
40 
41  static final int NUM_MAPS = 10;
42  static final int NUM_SAMPLES = 100000;
43 
44  public static class TestResult {
45    public String output;
46    public RunningJob job;
47    TestResult(RunningJob job, String output) {
48      this.job = job;
49      this.output = output;
50    }
51  }
52
53  public static TestResult launchWordCount(JobConf conf,
54                                           Path inDir,
55                                           Path outDir,
56                                           String input,
57                                           int numMaps,
58                                           int numReduces) throws IOException {
59    FileSystem inFs = inDir.getFileSystem(conf);
60    FileSystem outFs = outDir.getFileSystem(conf);
61    outFs.delete(outDir, true);
62    if (!inFs.mkdirs(inDir)) {
63      throw new IOException("Mkdirs failed to create " + inDir.toString());
64    }
65    {
66      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
67      file.writeBytes(input);
68      file.close();
69    }
70    conf.setJobName("wordcount");
71    conf.setInputFormat(TextInputFormat.class);
72   
73    // the keys are words (strings)
74    conf.setOutputKeyClass(Text.class);
75    // the values are counts (ints)
76    conf.setOutputValueClass(IntWritable.class);
77   
78    conf.setMapperClass(WordCount.MapClass.class);       
79    conf.setCombinerClass(WordCount.Reduce.class);
80    conf.setReducerClass(WordCount.Reduce.class);
81    FileInputFormat.setInputPaths(conf, inDir);
82    FileOutputFormat.setOutputPath(conf, outDir);
83    conf.setNumMapTasks(numMaps);
84    conf.setNumReduceTasks(numReduces);
85    conf.set("mapred.system.dir", "/tmp/subru/mapred/system");
86    JobClient jobClient = new JobClient(conf);
87    RunningJob job = jobClient.runJob(conf);
88    // Checking that the Job Client system dir is not used
89    assertFalse(FileSystem.get(conf).exists(new Path(conf.get("mapred.system.dir")))); 
90    // Check if the Job Tracker system dir is propogated to client
91    String sysDir = jobClient.getSystemDir().toString();
92    System.out.println("Job sys dir -->" + sysDir);
93    assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
94    assertTrue(sysDir.contains("custom"));
95    return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
96  }
97
98 static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
99    LOG.info("runWordCount");
100    // Run a word count example
101    // Keeping tasks that match this pattern
102    TestResult result;
103    final Path inDir = new Path("./wc/input");
104    final Path outDir = new Path("./wc/output");
105    result = launchWordCount(jobConf, inDir, outDir,
106                             "The quick brown fox\nhas many silly\n" + 
107                             "red fox sox\n",
108                             3, 1);
109    assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
110                 "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
111    // Checking if the Job ran successfully in spite of different system dir config
112    //  between Job Client & Job Tracker
113    assertTrue(result.job.isSuccessful());
114  }
115
116  public void testWithDFS() throws IOException {
117    MiniDFSCluster dfs = null;
118    MiniMRCluster mr = null;
119    FileSystem fileSys = null;
120    try {
121      final int taskTrackers = 4;
122
123      JobConf conf = new JobConf();
124      conf.set("mapred.system.dir", "/tmp/custom/mapred/system");
125      dfs = new MiniDFSCluster(conf, 4, true, null);
126      fileSys = dfs.getFileSystem();
127      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
128
129      runWordCount(mr, mr.createJobConf());
130    } finally {
131      if (dfs != null) { dfs.shutdown(); }
132      if (mr != null) { mr.shutdown();
133      }
134    }
135  }
136 
137}
Note: See TracBrowser for help on using the repository browser.