source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.*;
22import junit.framework.TestCase;
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.hdfs.MiniDFSCluster;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.FileUtil;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.io.IntWritable;
29import org.apache.hadoop.io.Text;
30
31/**
32 * A JUnit test to test Mini Map-Reduce Cluster with multiple directories
33 * and check for correct classpath
34 */
35public class TestMiniMRClasspath extends TestCase {
36 
37 
38  static String launchWordCount(String fileSys,
39                                String jobTracker,
40                                JobConf conf,
41                                String input,
42                                int numMaps,
43                                int numReduces) throws IOException {
44    final Path inDir = new Path("/testing/wc/input");
45    final Path outDir = new Path("/testing/wc/output");
46    FileSystem fs = FileSystem.getNamed(fileSys, conf);
47    fs.delete(outDir, true);
48    if (!fs.mkdirs(inDir)) {
49      throw new IOException("Mkdirs failed to create " + inDir.toString());
50    }
51    {
52      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
53      file.writeBytes(input);
54      file.close();
55    }
56    FileSystem.setDefaultUri(conf, fileSys);
57    conf.set("mapred.job.tracker", jobTracker);
58    conf.setJobName("wordcount");
59    conf.setInputFormat(TextInputFormat.class);
60   
61    // the keys are words (strings)
62    conf.setOutputKeyClass(Text.class);
63    // the values are counts (ints)
64    conf.setOutputValueClass(IntWritable.class);
65   
66    conf.set("mapred.mapper.class", "testjar.ClassWordCount$MapClass");       
67    conf.set("mapred.combine.class", "testjar.ClassWordCount$Reduce");
68    conf.set("mapred.reducer.class", "testjar.ClassWordCount$Reduce");
69    FileInputFormat.setInputPaths(conf, inDir);
70    FileOutputFormat.setOutputPath(conf, outDir);
71    conf.setNumMapTasks(numMaps);
72    conf.setNumReduceTasks(numReduces);
73    //pass a job.jar already included in the hadoop build
74    conf.setJar("build/test/testjar/testjob.jar");
75    JobClient.runJob(conf);
76    StringBuffer result = new StringBuffer();
77    {
78      Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
79      Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
80              new OutputLogFilter()));
81      for(int i=0; i < fileList.length; ++i) {
82        BufferedReader file = 
83          new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
84        String line = file.readLine();
85        while (line != null) {
86          result.append(line);
87          result.append("\n");
88          line = file.readLine();
89        }
90        file.close();
91      }
92    }
93    return result.toString();
94  }
95
96  static String launchExternal(String fileSys, String jobTracker, JobConf conf,
97                               String input, int numMaps, int numReduces)
98    throws IOException {
99
100    final Path inDir = new Path("/testing/ext/input");
101    final Path outDir = new Path("/testing/ext/output");
102    FileSystem fs = FileSystem.getNamed(fileSys, conf);
103    fs.delete(outDir, true);
104    if (!fs.mkdirs(inDir)) {
105      throw new IOException("Mkdirs failed to create " + inDir.toString());
106    }
107    {
108      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
109      file.writeBytes(input);
110      file.close();
111    }
112    FileSystem.setDefaultUri(conf, fileSys);
113    conf.set("mapred.job.tracker", jobTracker);
114    conf.setJobName("wordcount");
115    conf.setInputFormat(TextInputFormat.class);
116
117    // the keys are counts
118    conf.setOutputValueClass(IntWritable.class);
119    // the values are the messages
120    conf.set("mapred.output.key.class", "testjar.ExternalWritable");
121
122    FileInputFormat.setInputPaths(conf, inDir);
123    FileOutputFormat.setOutputPath(conf, outDir);
124    conf.setNumMapTasks(numMaps);
125    conf.setNumReduceTasks(numReduces);
126   
127    conf.set("mapred.mapper.class", "testjar.ExternalMapperReducer"); 
128    conf.set("mapred.reducer.class", "testjar.ExternalMapperReducer");
129
130    //pass a job.jar already included in the hadoop build
131    conf.setJar("build/test/testjar/testjob.jar");
132    JobClient.runJob(conf);
133    StringBuffer result = new StringBuffer();
134    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
135                                 new OutputLogFilter()));
136    for (int i = 0; i < fileList.length; ++i) {
137      BufferedReader file = new BufferedReader(new InputStreamReader(
138                                                                     fs.open(fileList[i])));
139      String line = file.readLine();
140      while (line != null) {
141        result.append(line);
142        line = file.readLine();
143        result.append("\n");
144      }
145      file.close();
146    }
147
148    return result.toString();
149  }
150   
151  public void testClassPath() throws IOException {
152    String namenode = null;
153    MiniDFSCluster dfs = null;
154    MiniMRCluster mr = null;
155    FileSystem fileSys = null;
156    try {
157      final int taskTrackers = 4;
158      final int jobTrackerPort = 60050;
159
160      Configuration conf = new Configuration();
161      dfs = new MiniDFSCluster(conf, 1, true, null);
162      fileSys = dfs.getFileSystem();
163      namenode = fileSys.getName();
164      mr = new MiniMRCluster(taskTrackers, namenode, 3);
165      JobConf jobConf = new JobConf();
166      String result;
167      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
168      result = launchWordCount(namenode, jobTrackerName, jobConf, 
169                               "The quick brown fox\nhas many silly\n" + 
170                               "red fox sox\n",
171                               3, 1);
172      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
173                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
174         
175    } finally {
176      if (dfs != null) { dfs.shutdown(); }
177      if (mr != null) { mr.shutdown();
178      }
179    }
180  }
181 
182  public void testExternalWritable()
183    throws IOException {
184 
185    String namenode = null;
186    MiniDFSCluster dfs = null;
187    MiniMRCluster mr = null;
188    FileSystem fileSys = null;
189
190    try {
191     
192      final int taskTrackers = 4;
193
194      Configuration conf = new Configuration();
195      dfs = new MiniDFSCluster(conf, 1, true, null);
196      fileSys = dfs.getFileSystem();
197      namenode = fileSys.getName();
198      mr = new MiniMRCluster(taskTrackers, namenode, 3);     
199      JobConf jobConf = new JobConf();
200      String result;
201      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
202     
203      result = launchExternal(namenode, jobTrackerName, jobConf, 
204                              "Dennis was here!\nDennis again!",
205                              3, 1);
206      assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result);
207     
208    } 
209    finally {
210      if (dfs != null) { dfs.shutdown(); }
211      if (mr != null) { mr.shutdown();
212      }
213    }
214  }
215 
216}
Note: See TracBrowser for help on using the repository browser.