source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred.pipes;
20
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.util.List;
24import java.util.ArrayList;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.hdfs.MiniDFSCluster;
30import org.apache.hadoop.fs.FileUtil;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.mapred.Counters;
34import org.apache.hadoop.mapred.FileInputFormat;
35import org.apache.hadoop.mapred.FileOutputFormat;
36import org.apache.hadoop.mapred.JobConf;
37import org.apache.hadoop.mapred.MiniMRCluster;
38import org.apache.hadoop.mapred.OutputLogFilter;
39import org.apache.hadoop.mapred.RunningJob;
40import org.apache.hadoop.mapred.TestMiniMRWithDFS;
41import org.apache.hadoop.mapred.Counters.Counter;
42import org.apache.hadoop.util.StringUtils;
43import org.apache.hadoop.util.ToolRunner;
44
45import junit.framework.TestCase;
46
47public class TestPipes extends TestCase {
48  private static final Log LOG =
49    LogFactory.getLog(TestPipes.class.getName());
50
51  static void cleanup(FileSystem fs, Path p) throws IOException {
52    fs.delete(p, true);
53    assertFalse("output not cleaned up", fs.exists(p));
54  }
55
56  public void testPipes() throws IOException {
57    if (System.getProperty("compile.c++") == null) {
58      LOG.info("compile.c++ is not defined, so skipping TestPipes");
59      return;
60    }
61    MiniDFSCluster dfs = null;
62    MiniMRCluster mr = null;
63    Path cppExamples = new Path(System.getProperty("install.c++.examples"));
64    Path inputPath = new Path("/testing/in");
65    Path outputPath = new Path("/testing/out");
66    try {
67      final int numSlaves = 2;
68      Configuration conf = new Configuration();
69      dfs = new MiniDFSCluster(conf, numSlaves, true, null);
70      mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
71      writeInputFile(dfs.getFileSystem(), inputPath);
72      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
73                 inputPath, outputPath, 3, 2, twoSplitOutput);
74      cleanup(dfs.getFileSystem(), outputPath);
75
76      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
77                 inputPath, outputPath, 3, 0, noSortOutput);
78      cleanup(dfs.getFileSystem(), outputPath);
79
80      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
81                 inputPath, outputPath, 3, 2, fixedPartitionOutput);
82      runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
83      mr.waitUntilIdle();
84    } finally {
85      mr.shutdown();
86      dfs.shutdown();
87    }
88  }
89
90  final static String[] twoSplitOutput = new String[] {
91    "`and\t1\na\t1\nand\t1\nbeginning\t1\nbook\t1\nbut\t1\nby\t1\n" +
92    "conversation?'\t1\ndo:\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\nit\t1\n"+
93    "it,\t1\nno\t1\nnothing\t1\nof\t3\non\t1\nonce\t1\nor\t3\npeeped\t1\n"+
94    "pictures\t2\nthe\t3\nthought\t1\nto\t2\nuse\t1\nwas\t2\n",
95
96    "Alice\t2\n`without\t1\nbank,\t1\nbook,'\t1\nconversations\t1\nget\t1\n" +
97    "into\t1\nis\t1\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\ntired\t1\n" +
98    "twice\t1\nvery\t1\nwhat\t1\n"
99  };
100
101  final static String[] noSortOutput = new String[] {
102    "it,\t1\n`and\t1\nwhat\t1\nis\t1\nthe\t1\nuse\t1\nof\t1\na\t1\n" +
103    "book,'\t1\nthought\t1\nAlice\t1\n`without\t1\npictures\t1\nor\t1\n"+
104    "conversation?'\t1\n",
105
106    "Alice\t1\nwas\t1\nbeginning\t1\nto\t1\nget\t1\nvery\t1\ntired\t1\n"+
107    "of\t1\nsitting\t1\nby\t1\nher\t1\nsister\t1\non\t1\nthe\t1\nbank,\t1\n"+
108    "and\t1\nof\t1\nhaving\t1\nnothing\t1\nto\t1\ndo:\t1\nonce\t1\n", 
109
110    "or\t1\ntwice\t1\nshe\t1\nhad\t1\npeeped\t1\ninto\t1\nthe\t1\nbook\t1\n"+
111    "her\t1\nsister\t1\nwas\t1\nreading,\t1\nbut\t1\nit\t1\nhad\t1\nno\t1\n"+
112    "pictures\t1\nor\t1\nconversations\t1\nin\t1\n"
113  };
114 
115  final static String[] fixedPartitionOutput = new String[] {
116    "Alice\t2\n`and\t1\n`without\t1\na\t1\nand\t1\nbank,\t1\nbeginning\t1\n" +
117    "book\t1\nbook,'\t1\nbut\t1\nby\t1\nconversation?'\t1\nconversations\t1\n"+
118    "do:\t1\nget\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\ninto\t1\nis\t1\n" +
119    "it\t1\nit,\t1\nno\t1\nnothing\t1\nof\t3\non\t1\nonce\t1\nor\t3\n" +
120    "peeped\t1\npictures\t2\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\n" +
121    "the\t3\nthought\t1\ntired\t1\nto\t2\ntwice\t1\nuse\t1\n" +
122    "very\t1\nwas\t2\nwhat\t1\n",
123   
124    ""                                                   
125  };
126 
127  private void writeInputFile(FileSystem fs, Path dir) throws IOException {
128    DataOutputStream out = fs.create(new Path(dir, "part0"));
129    out.writeBytes("Alice was beginning to get very tired of sitting by her\n");
130    out.writeBytes("sister on the bank, and of having nothing to do: once\n");
131    out.writeBytes("or twice she had peeped into the book her sister was\n");
132    out.writeBytes("reading, but it had no pictures or conversations in\n");
133    out.writeBytes("it, `and what is the use of a book,' thought Alice\n");
134    out.writeBytes("`without pictures or conversation?'\n");
135    out.close();
136  }
137
138  private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, 
139                          Path program, Path inputPath, Path outputPath,
140                          int numMaps, int numReduces, String[] expectedResults
141                         ) throws IOException {
142    Path wordExec = new Path("/testing/bin/application");
143    JobConf job = mr.createJobConf();
144    job.setNumMapTasks(numMaps);
145    job.setNumReduceTasks(numReduces);
146    {
147      FileSystem fs = dfs.getFileSystem();
148      fs.delete(wordExec.getParent(), true);
149      fs.copyFromLocalFile(program, wordExec);                                         
150      Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
151      Submitter.setIsJavaRecordReader(job, true);
152      Submitter.setIsJavaRecordWriter(job, true);
153      FileInputFormat.setInputPaths(job, inputPath);
154      FileOutputFormat.setOutputPath(job, outputPath);
155      RunningJob rJob = null;
156      if (numReduces == 0) {
157        rJob = Submitter.jobSubmit(job);
158       
159        while (!rJob.isComplete()) {
160          try {
161            Thread.sleep(1000);
162          } catch (InterruptedException ie) {
163            throw new RuntimeException(ie);
164          }
165        }
166      } else {
167        rJob = Submitter.runJob(job);
168      }
169      assertTrue("pipes job failed", rJob.isSuccessful());
170     
171      Counters counters = rJob.getCounters();
172      Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
173      int numCounters = 0;
174      for (Counter c : wordCountCounters) {
175        System.out.println(c);
176        ++numCounters;
177      }
178      assertTrue("No counters found!", (numCounters > 0));
179    }
180
181    List<String> results = new ArrayList<String>();
182    for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
183                                        new OutputLogFilter()))) {
184      results.add(TestMiniMRWithDFS.readOutput(p, job));
185    }
186    assertEquals("number of reduces is wrong", 
187                 expectedResults.length, results.size());
188    for(int i=0; i < results.size(); i++) {
189      assertEquals("pipes program " + program + " output " + i + " wrong",
190                   expectedResults[i], results.get(i));
191    }
192  }
193 
194  /**
195   * Run a map/reduce word count that does all of the map input and reduce
196   * output directly rather than sending it back up to Java.
197   * @param mr The mini mr cluster
198   * @param dfs the dfs cluster
199   * @param program the program to run
200   * @throws IOException
201   */
202  private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
203                                  Path program) throws IOException {
204    JobConf job = mr.createJobConf();
205    job.setInputFormat(WordCountInputFormat.class);
206    FileSystem local = FileSystem.getLocal(job);
207    Path testDir = new Path("file:" + System.getProperty("test.build.data"), 
208                            "pipes");
209    Path inDir = new Path(testDir, "input");
210    Path outDir = new Path(testDir, "output");
211    Path wordExec = new Path("/testing/bin/application");
212    Path jobXml = new Path(testDir, "job.xml");
213    {
214      FileSystem fs = dfs.getFileSystem();
215      fs.delete(wordExec.getParent(), true);
216      fs.copyFromLocalFile(program, wordExec);
217    }
218    DataOutputStream out = local.create(new Path(inDir, "part0"));
219    out.writeBytes("i am a silly test\n");
220    out.writeBytes("you are silly\n");
221    out.writeBytes("i am a cat test\n");
222    out.writeBytes("you is silly\n");
223    out.writeBytes("i am a billy test\n");
224    out.writeBytes("hello are silly\n");
225    out.close();
226    out = local.create(new Path(inDir, "part1"));
227    out.writeBytes("mall world things drink java\n");
228    out.writeBytes("hall silly cats drink java\n");
229    out.writeBytes("all dogs bow wow\n");
230    out.writeBytes("hello drink java\n");
231    out.close();
232    local.delete(outDir, true);
233    local.mkdirs(outDir);
234    out = local.create(jobXml);
235    job.writeXml(out);
236    out.close();
237    System.err.println("About to run: Submitter -conf " + jobXml + 
238                       " -input " + inDir + " -output " + outDir + 
239                       " -program " + 
240                       dfs.getFileSystem().makeQualified(wordExec));
241    try {
242      int ret = ToolRunner.run(new Submitter(),
243                               new String[]{"-conf", jobXml.toString(),
244                                  "-input", inDir.toString(),
245                                  "-output", outDir.toString(),
246                                  "-program", 
247                        dfs.getFileSystem().makeQualified(wordExec).toString(),
248                                  "-reduces", "2"});
249      assertEquals(0, ret);
250    } catch (Exception e) {
251      assertTrue("got exception: " + StringUtils.stringifyException(e), false);
252    }
253  }
254}
Note: See TracBrowser for help on using the repository browser.