source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestEmptyJob.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.File;
22import java.io.IOException;
23import java.net.URI;
24
25import junit.framework.TestCase;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.FileStatus;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.io.IntWritable;
34import org.apache.hadoop.io.Text;
35import org.apache.hadoop.mapred.lib.IdentityMapper;
36import org.apache.hadoop.mapred.lib.IdentityReducer;
37
38/**
39 * A JUnit test to test Map-Reduce empty jobs.
40 */
41public class TestEmptyJob extends TestCase {
42  private static final Log LOG =
43      LogFactory.getLog(TestEmptyJob.class.getName());
44
45  private static String TEST_ROOT_DIR =
46      new File(System.getProperty("test.build.data", "/tmp")).toURI()
47          .toString().replace(' ', '+');
48
49  MiniMRCluster mr = null;
50
51  /** Committer with cleanup waiting on a signal
52   */
53  static class CommitterWithDelayCleanup extends FileOutputCommitter {
54    @Override
55    public void cleanupJob(JobContext context) throws IOException {
56      Configuration conf = context.getConfiguration();
57      Path share = new Path(conf.get("share"));
58      FileSystem fs = FileSystem.get(conf);
59
60     
61      while (true) {
62        if (fs.exists(share)) {
63          break;
64        }
65        UtilsForTests.waitFor(100);
66      }
67      super.cleanupJob(context);
68    }
69  }
70
71  /**
72   * Simple method running a MapReduce job with no input data. Used to test that
73   * such a job is successful.
74   *
75   * @param fileSys
76   * @param numMaps
77   * @param numReduces
78   * @return true if the MR job is successful, otherwise false
79   * @throws IOException
80   */
81  private boolean launchEmptyJob(URI fileSys, int numMaps, int numReduces)
82      throws IOException {
83    // create an empty input dir
84    final Path inDir = new Path(TEST_ROOT_DIR, "testing/empty/input");
85    final Path outDir = new Path(TEST_ROOT_DIR, "testing/empty/output");
86    final Path inDir2 = new Path(TEST_ROOT_DIR, "testing/dummy/input");
87    final Path outDir2 = new Path(TEST_ROOT_DIR, "testing/dummy/output");
88    final Path share = new Path(TEST_ROOT_DIR, "share");
89
90    JobConf conf = mr.createJobConf();
91    FileSystem fs = FileSystem.get(fileSys, conf);
92    fs.delete(new Path(TEST_ROOT_DIR), true);
93    fs.delete(outDir, true);
94    if (!fs.mkdirs(inDir)) {
95      LOG.warn("Can't create " + inDir);
96      return false;
97    }
98
99    // use WordCount example
100    FileSystem.setDefaultUri(conf, fileSys);
101    conf.setJobName("empty");
102    // use an InputFormat which returns no split
103    conf.setInputFormat(EmptyInputFormat.class);
104    conf.setOutputCommitter(CommitterWithDelayCleanup.class);
105    conf.setOutputKeyClass(Text.class);
106    conf.setOutputValueClass(IntWritable.class);
107    conf.setMapperClass(IdentityMapper.class);
108    conf.setReducerClass(IdentityReducer.class);
109    FileInputFormat.setInputPaths(conf, inDir);
110    FileOutputFormat.setOutputPath(conf, outDir);
111    conf.setNumMapTasks(numMaps);
112    conf.setNumReduceTasks(numReduces);
113    conf.set("share", share.toString());
114
115    // run job and wait for completion
116    JobClient jc = new JobClient(conf);
117    RunningJob runningJob = jc.submitJob(conf);
118    JobInProgress job = mr.getJobTrackerRunner().getJobTracker().getJob(runningJob.getID());
119   
120    while (true) {
121      if (job.isCleanupLaunched()) {
122        LOG.info("Waiting for cleanup to be launched for job " 
123                 + runningJob.getID());
124        break;
125      }
126      UtilsForTests.waitFor(100);
127    }
128   
129    // submit another job so that the map load increases and scheduling happens
130    LOG.info("Launching dummy job ");
131    RunningJob dJob = null;
132    try {
133      JobConf dConf = new JobConf(conf);
134      dConf.setOutputCommitter(FileOutputCommitter.class);
135      dJob = UtilsForTests.runJob(dConf, inDir2, outDir2, 2, 0);
136    } catch (Exception e) {
137      LOG.info("Exception ", e);
138      throw new IOException(e);
139    }
140   
141    while (true) {
142      LOG.info("Waiting for job " + dJob.getID() + " to complete");
143      try {
144        Thread.sleep(100);
145      } catch (InterruptedException e) {
146      }
147      if (dJob.isComplete()) {
148        break;
149      }
150    }
151   
152    // check if the second job is successful
153    assertTrue(dJob.isSuccessful());
154
155    // signal the cleanup
156    fs.create(share).close();
157   
158    while (true) {
159      LOG.info("Waiting for job " + runningJob.getID() + " to complete");
160      try {
161        Thread.sleep(100);
162      } catch (InterruptedException e) {
163      }
164      if (runningJob.isComplete()) {
165        break;
166      }
167    }
168
169    assertTrue(runningJob.isComplete());
170    assertTrue(runningJob.isSuccessful());
171    JobID jobID = runningJob.getID();
172
173    TaskReport[] jobSetupTasks = jc.getSetupTaskReports(jobID);
174    assertTrue("Number of job-setup tips is not 2!", jobSetupTasks.length == 2);
175    assertTrue("Setup progress is " + runningJob.setupProgress()
176        + " and not 1.0", runningJob.setupProgress() == 1.0);
177    assertTrue("Setup task is not finished!", mr.getJobTrackerRunner()
178        .getJobTracker().getJob(jobID).isSetupFinished());
179
180    assertTrue("Number of maps is not zero!", jc.getMapTaskReports(runningJob
181        .getID()).length == 0);
182    assertTrue(
183        "Map progress is " + runningJob.mapProgress() + " and not 1.0!",
184        runningJob.mapProgress() == 1.0);
185
186    assertTrue("Reduce progress is " + runningJob.reduceProgress()
187        + " and not 1.0!", runningJob.reduceProgress() == 1.0);
188    assertTrue("Number of reduces is not " + numReduces, jc
189        .getReduceTaskReports(runningJob.getID()).length == numReduces);
190
191    TaskReport[] jobCleanupTasks = jc.getCleanupTaskReports(jobID);
192    assertTrue("Number of job-cleanup tips is not 2!",
193        jobCleanupTasks.length == 2);
194    assertTrue("Cleanup progress is " + runningJob.cleanupProgress()
195        + " and not 1.0", runningJob.cleanupProgress() == 1.0);
196
197    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
198    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
199    assertTrue("Number of part-files is " + list.length + " and not "
200        + numReduces, list.length == numReduces);
201
202    // cleanup
203    fs.delete(outDir, true);
204
205    // return job result
206    LOG.info("job is complete: " + runningJob.isSuccessful());
207    return (runningJob.isSuccessful());
208  }
209
210  /**
211   * Test that a job with no input data (and thus with no input split and no map
212   * task to execute) is successful.
213   *
214   * @throws IOException
215   */
216  public void testEmptyJob()
217      throws IOException {
218    FileSystem fileSys = null;
219    try {
220      final int taskTrackers = 2;
221      JobConf conf = new JobConf();
222      fileSys = FileSystem.get(conf);
223
224      conf.set("mapred.job.tracker.handler.count", "1");
225      conf.set("mapred.job.tracker", "127.0.0.1:0");
226      conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
227      conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
228
229      mr =
230          new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
231              null, null, conf);
232
233      assertTrue(launchEmptyJob(fileSys.getUri(), 3, 1));
234      assertTrue(launchEmptyJob(fileSys.getUri(), 0, 0));
235    } finally {
236      if (fileSys != null) {
237        fileSys.close();
238      }
239      if (mr != null) {
240        mr.shutdown();
241      }
242    }
243  }
244}
Note: See TracBrowser for help on using the repository browser.