source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobInProgress.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 9.0 KB
Line 
1package org.apache.hadoop.mapred;
2
3import java.io.DataOutputStream;
4import java.io.IOException;
5import java.util.Iterator;
6import java.util.Map;
7import java.util.Set;
8import java.util.HashSet;
9
10import org.apache.commons.logging.Log;
11import org.apache.commons.logging.LogFactory;
12import org.apache.hadoop.conf.Configuration;
13import org.apache.hadoop.examples.RandomWriter;
14import org.apache.hadoop.fs.FileSystem;
15import org.apache.hadoop.fs.Path;
16import org.apache.hadoop.hdfs.MiniDFSCluster;
17import org.apache.hadoop.io.IntWritable;
18import org.apache.hadoop.io.LongWritable;
19import org.apache.hadoop.io.Text;
20import org.apache.hadoop.mapred.UtilsForTests;
21import org.apache.hadoop.mapred.lib.IdentityMapper;
22import org.apache.hadoop.mapred.lib.IdentityReducer;
23import org.apache.hadoop.net.Node;
24
25import junit.framework.TestCase;
26
27public class TestJobInProgress extends TestCase {
28  static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
29
30  private MiniMRCluster mrCluster;
31
32  private MiniDFSCluster dfsCluster;
33  JobTracker jt;
34  private static Path TEST_DIR = 
35    new Path(System.getProperty("test.build.data","/tmp"), "jip-testing");
36  private static int numSlaves = 4;
37
38  public static class FailMapTaskJob extends MapReduceBase implements
39      Mapper<LongWritable, Text, Text, IntWritable> {
40
41    @Override
42    public void map(LongWritable key, Text value,
43        OutputCollector<Text, IntWritable> output, Reporter reporter)
44        throws IOException {
45      // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
46      try {
47        Thread.sleep(1000);
48      } catch (InterruptedException e) {
49        throw new IllegalArgumentException("Interrupted MAP task");
50      }
51      throw new IllegalArgumentException("Failing MAP task");
52    }
53  }
54
55  // Suppressing waring as we just need to write a failing reduce task job
56  // We don't need to bother about the actual key value pairs which are passed.
57  @SuppressWarnings("unchecked")
58  public static class FailReduceTaskJob extends MapReduceBase implements
59      Reducer {
60
61    @Override
62    public void reduce(Object key, Iterator values, OutputCollector output,
63        Reporter reporter) throws IOException {
64      // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
65      try {
66        Thread.sleep(1000);
67      } catch (InterruptedException e) {
68        throw new IllegalArgumentException("Failing Reduce task");
69      }
70      throw new IllegalArgumentException("Failing Reduce task");
71    }
72
73  }
74
75  @Override
76  protected void setUp() throws Exception {
77    // TODO Auto-generated method stub
78    super.setUp();
79    Configuration conf = new Configuration();
80    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
81    mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
82        .getUri().toString(), 1);
83    jt = mrCluster.getJobTrackerRunner().getJobTracker();
84  }
85
86  public void testPendingMapTaskCount() throws Exception {
87    launchTask(FailMapTaskJob.class, IdentityReducer.class);
88    checkTaskCounts();
89  }
90 
91  public void testPendingReduceTaskCount() throws Exception {
92    launchTask(IdentityMapper.class, FailReduceTaskJob.class);
93    checkTaskCounts();
94  }
95
96  /**
97   * Test if running tasks are correctly maintained for various types of jobs
98   */
99  private void testRunningTaskCount(boolean speculation, boolean locality)
100  throws Exception {
101    LOG.info("Testing running jobs with speculation : " + speculation
102             + ", locality : " + locality);
103    // cleanup
104    dfsCluster.getFileSystem().delete(TEST_DIR, true);
105   
106    final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
107    final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
108   
109    // configure a waiting job with 2 maps and 2 reducers
110    JobConf job = 
111      configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
112                locality);
113    job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
114    job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
115   
116    // Disable slow-start for reduces since this maps don't complete
117    // in these test-cases...
118    job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
119   
120    // test jobs with speculation
121    job.setSpeculativeExecution(speculation);
122    JobClient jc = new JobClient(job);
123    RunningJob running = jc.submitJob(job);
124    JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
125    JobInProgress jip = jobtracker.getJob(running.getID());
126    LOG.info("Running job " + jip.getJobID());
127   
128    // wait
129    LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
130    waitTillReady(jip, job);
131   
132    // check if the running structures are populated
133    Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
134    for (Map.Entry<Node, Set<TaskInProgress>> s : 
135           jip.getRunningMapCache().entrySet()) {
136      uniqueTasks.addAll(s.getValue());
137    }
138   
139    // add non local map tasks
140    uniqueTasks.addAll(jip.getNonLocalRunningMaps());
141   
142    assertEquals("Running map count doesnt match for jobs with speculation " 
143                 + speculation + ", and locality " + locality,
144                 jip.runningMaps(), uniqueTasks.size());
145
146    assertEquals("Running reducer count doesnt match for jobs with speculation "
147                 + speculation + ", and locality " + locality,
148                 jip.runningReduces(), jip.getRunningReduces().size());
149   
150    // signal the tasks
151    LOG.info("Signaling the tasks");
152    UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
153                              mapSignalFile.toString(), 
154                              redSignalFile.toString(), numSlaves);
155   
156    // wait for the job to complete
157    LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
158    UtilsForTests.waitTillDone(jc);
159   
160    // cleanup
161    dfsCluster.getFileSystem().delete(TEST_DIR, true);
162  }
163 
164  // wait for the job to start
165  private void waitTillReady(JobInProgress jip, JobConf job) {
166    // wait for all the maps to get scheduled
167    while (jip.runningMaps() < job.getNumMapTasks()) {
168      UtilsForTests.waitFor(10);
169    }
170   
171    // wait for all the reducers to get scheduled
172    while (jip.runningReduces() < job.getNumReduceTasks()) {
173      UtilsForTests.waitFor(10);
174    }
175  }
176 
177  public void testRunningTaskCount() throws Exception {
178    // test with spec = false and locality=true
179    testRunningTaskCount(false, true);
180   
181    // test with spec = true and locality=true
182    testRunningTaskCount(true, true);
183   
184    // test with spec = false and locality=false
185    testRunningTaskCount(false, false);
186   
187    // test with spec = true and locality=false
188    testRunningTaskCount(true, false);
189  }
190 
191  @Override
192  protected void tearDown() throws Exception {
193    mrCluster.shutdown();
194    dfsCluster.shutdown();
195    super.tearDown();
196  }
197 
198
199  void launchTask(Class MapClass,Class ReduceClass) throws Exception{
200    JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
201    try {
202      JobClient.runJob(job);
203    } catch (IOException ioe) {}
204  }
205 
206  @SuppressWarnings("unchecked")
207  JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
208                    boolean locality) 
209  throws Exception {
210    JobConf jobConf = mrCluster.createJobConf();
211    final Path inDir = new Path("./failjob/input");
212    final Path outDir = new Path("./failjob/output");
213    String input = "Test failing job.\n One more line";
214    FileSystem inFs = inDir.getFileSystem(jobConf);
215    FileSystem outFs = outDir.getFileSystem(jobConf);
216    outFs.delete(outDir, true);
217    if (!inFs.mkdirs(inDir)) {
218      throw new IOException("create directory failed" + inDir.toString());
219    }
220
221    DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
222    file.writeBytes(input);
223    file.close();
224    jobConf.setJobName("failmaptask");
225    if (locality) {
226      jobConf.setInputFormat(TextInputFormat.class);
227    } else {
228      jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
229    }
230    jobConf.setOutputKeyClass(Text.class);
231    jobConf.setOutputValueClass(Text.class);
232    jobConf.setMapperClass(MapClass);
233    jobConf.setCombinerClass(ReduceClass);
234    jobConf.setReducerClass(ReduceClass);
235    FileInputFormat.setInputPaths(jobConf, inDir);
236    FileOutputFormat.setOutputPath(jobConf, outDir);
237    jobConf.setNumMapTasks(maps);
238    jobConf.setNumReduceTasks(reducers);
239    return jobConf; 
240  }
241
242  void checkTaskCounts() {
243    JobStatus[] status = jt.getAllJobs();
244    for (JobStatus js : status) {
245      JobInProgress jip = jt.getJob(js.getJobID());
246      Counters counter = jip.getJobCounters();
247      long totalTaskCount = counter
248          .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS)
249          + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
250      while (jip.getNumTaskCompletionEvents() < totalTaskCount) {
251        assertEquals(true, (jip.runningMaps() >= 0));
252        assertEquals(true, (jip.pendingMaps() >= 0));
253        assertEquals(true, (jip.runningReduces() >= 0));
254        assertEquals(true, (jip.pendingReduces() >= 0));
255      }
256    }
257  }
258 
259}
Note: See TracBrowser for help on using the repository browser.