source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.util.ArrayList;
22import java.io.File;
23import java.io.IOException;
24import java.util.List;
25
26import org.apache.hadoop.hdfs.MiniDFSCluster;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.fs.FileSystem;
29import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
30import org.apache.commons.logging.Log;
31import org.apache.commons.logging.LogFactory;
32
33import junit.framework.TestCase;
34
35/**
36 * Test whether the JobInProgressListeners are informed as expected.
37 */
38public class TestJobInProgressListener extends TestCase {
39  private static final Log LOG = 
40    LogFactory.getLog(TestJobInProgressListener.class);
41  private final Path testDir = new Path("test-jip-listener-update");
42 
43  private static String TEST_ROOT_DIR = new File(System.getProperty(
44          "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
45
46  private JobConf configureJob(JobConf conf, int m, int r, 
47                               Path inDir, Path outputDir,
48                               String mapSignalFile, String redSignalFile) 
49  throws IOException {
50    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,  m, r, 
51        "job-listener-test", mapSignalFile, redSignalFile);
52    return conf; 
53  }
54 
55  /**
56   * This test case tests if external updates to JIP do not result into
57   * undesirable effects
58   * Test is as follows
59   *   - submit 2 jobs of normal priority. job1 is a waiting job which waits and
60   *     blocks the cluster
61   *   - change one parameter of job2 such that the job bumps up in the queue
62   *   - check if the queue looks ok
63   *   
64   */
65  public void testJobQueueChanges() throws IOException {
66    LOG.info("Testing job queue changes");
67    JobConf conf = new JobConf();
68    MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null, null);
69    dfs.waitActive();
70    FileSystem fileSys = dfs.getFileSystem();
71   
72    dfs.startDataNodes(conf, 1, true, null, null, null, null);
73    dfs.waitActive();
74   
75    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
76                      + (dfs.getFileSystem()).getUri().getPort();
77    MiniMRCluster mr = new MiniMRCluster(1, namenode, 1);
78    JobClient jobClient = new JobClient(mr.createJobConf());
79   
80    // clean up
81    fileSys.delete(testDir, true);
82   
83    if (!fileSys.mkdirs(testDir)) {
84      throw new IOException("Mkdirs failed to create " + testDir.toString());
85    }
86
87    // Write the input file
88    Path inDir = new Path(testDir, "input");
89    Path shareDir = new Path(testDir, "share");
90    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
91    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
92    UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), 
93                            (short)1);
94   
95    JobQueueJobInProgressListener myListener = 
96      new JobQueueJobInProgressListener();
97   
98    // add the listener
99    mr.getJobTrackerRunner().getJobTracker()
100      .addJobInProgressListener(myListener);
101   
102    // big blocking job
103    Path outputDir = new Path(testDir, "output");
104    Path newOutputDir = outputDir.suffix("0");
105    JobConf job1 = configureJob(mr.createJobConf(), 10, 0, inDir, newOutputDir,
106                                mapSignalFile, redSignalFile);
107   
108    // short blocked job
109    newOutputDir = outputDir.suffix("1");
110    JobConf job2 = configureJob(mr.createJobConf(), 1, 0, inDir, newOutputDir,
111                                mapSignalFile, redSignalFile);
112   
113    RunningJob rJob1 = jobClient.submitJob(job1);
114    LOG.info("Running job " + rJob1.getID().toString());
115   
116    RunningJob rJob2 = jobClient.submitJob(job2);
117    LOG.info("Running job " + rJob2.getID().toString());
118   
119    // I. Check job-priority change
120    LOG.info("Testing job priority changes");
121   
122    // bump up job2's priority
123    LOG.info("Increasing job2's priority to HIGH");
124    rJob2.setJobPriority("HIGH");
125   
126    // check if the queue is sane
127    assertTrue("Priority change garbles the queue", 
128               myListener.getJobQueue().size() == 2);
129   
130    JobInProgress[] queue = 
131      myListener.getJobQueue().toArray(new JobInProgress[0]);
132   
133    // check if the bump has happened
134    assertTrue("Priority change failed to bump up job2 in the queue", 
135               queue[0].getJobID().equals(rJob2.getID()));
136   
137    assertTrue("Priority change failed to bump down job1 in the queue", 
138               queue[1].getJobID().equals(rJob1.getID()));
139   
140    assertEquals("Priority change has garbled the queue", 
141                 2, queue.length);
142   
143    // II. Check start-time change
144    LOG.info("Testing job start-time changes");
145   
146    // reset the priority which will make the order as
147    //  - job1
148    //  - job2
149    // this will help in bumping job2 on start-time change
150    LOG.info("Increasing job2's priority to NORMAL"); 
151    rJob2.setJobPriority("NORMAL");
152   
153    // create the change event
154    JobInProgress jip2 = mr.getJobTrackerRunner().getJobTracker()
155                          .getJob(rJob2.getID());
156    JobInProgress jip1 = mr.getJobTrackerRunner().getJobTracker()
157                           .getJob(rJob1.getID());
158   
159    JobStatus prevStatus = (JobStatus)jip2.getStatus().clone();
160   
161    // change job2's start-time and the status
162    jip2.startTime =  jip1.startTime - 1;
163    jip2.status.setStartTime(jip2.startTime);
164   
165   
166    JobStatus newStatus = (JobStatus)jip2.getStatus().clone();
167   
168    // inform the listener
169    LOG.info("Updating the listener about job2's start-time change");
170    JobStatusChangeEvent event = 
171      new JobStatusChangeEvent(jip2, EventType.START_TIME_CHANGED, 
172                              prevStatus, newStatus);
173    myListener.jobUpdated(event);
174   
175    // check if the queue is sane
176    assertTrue("Start time change garbles the queue", 
177               myListener.getJobQueue().size() == 2);
178   
179    queue = myListener.getJobQueue().toArray(new JobInProgress[0]);
180   
181    // check if the bump has happened
182    assertTrue("Start time change failed to bump up job2 in the queue", 
183               queue[0].getJobID().equals(rJob2.getID()));
184   
185    assertTrue("Start time change failed to bump down job1 in the queue", 
186               queue[1].getJobID().equals(rJob1.getID()));
187   
188    assertEquals("Start time change has garbled the queue", 
189                 2, queue.length);
190   
191    // signal the maps to complete
192    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
193   
194    // check if job completion leaves the queue sane
195    while (rJob2.getJobState() != JobStatus.SUCCEEDED) {
196      UtilsForTests.waitFor(10);
197    }
198   
199    while (rJob1.getJobState() != JobStatus.SUCCEEDED) {
200      UtilsForTests.waitFor(10);
201    }
202   
203    assertTrue("Job completion garbles the queue", 
204               myListener.getJobQueue().size() == 0);
205  }
206 
207  // A listener that inits the tasks one at a time and also listens to the
208  // events
209  public static class MyListener extends JobInProgressListener {
210    private List<JobInProgress> wjobs = new ArrayList<JobInProgress>();
211    private List<JobInProgress> jobs = new ArrayList<JobInProgress>(); 
212   
213    public boolean contains (JobID id) {
214      return contains(id, true) || contains(id, false);
215    }
216   
217    public boolean contains (JobID id, boolean waiting) {
218      List<JobInProgress> queue = waiting ? wjobs : jobs;
219      for (JobInProgress job : queue) {
220        if (job.getJobID().equals(id)) {
221          return true;
222        }
223      }
224      return false;
225    }
226   
227    public void jobAdded(JobInProgress job) {
228      LOG.info("Job " + job.getJobID().toString() + " added");
229      wjobs.add(job);
230    }
231   
232    public void jobRemoved(JobInProgress job) {
233      LOG.info("Job " + job.getJobID().toString() + " removed");
234    }
235   
236    public void jobUpdated(JobChangeEvent event) {
237      LOG.info("Job " + event.getJobInProgress().getJobID().toString() + " updated");
238      // remove the job is the event is for a completed job
239      if (event instanceof JobStatusChangeEvent) {
240        JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
241        if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
242          // check if the state changes from
243          // RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
244          JobInProgress jip = event.getJobInProgress();
245          String jobId = jip.getJobID().toString();
246          if (jip.isComplete()) {
247            LOG.info("Job " +  jobId + " deleted from the running queue");
248            if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
249              wjobs.remove(jip);
250            } else {
251              jobs.remove(jip);
252            }
253          } else {
254            // PREP->RUNNING
255            LOG.info("Job " +  jobId + " deleted from the waiting queue");
256            wjobs.remove(jip);
257            jobs.add(jip);
258          }
259        }
260      }
261    }
262  }
263 
264  public void testJobFailure() throws Exception {
265    LOG.info("Testing job-success");
266   
267    MyListener myListener = new MyListener();
268    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
269   
270    JobConf job = mr.createJobConf();
271   
272    mr.getJobTrackerRunner().getJobTracker()
273      .addJobInProgressListener(myListener);
274
275    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
276    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
277
278    // submit a job that fails
279    RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir);
280    JobID id = rJob.getID();
281
282    // check if the job failure was notified
283    assertFalse("Missing event notification on failing a running job", 
284                myListener.contains(id));
285   
286  }
287 
288  public void testJobKill() throws Exception {
289    LOG.info("Testing job-kill");
290   
291    MyListener myListener = new MyListener();
292    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
293   
294    JobConf job = mr.createJobConf();
295   
296    mr.getJobTrackerRunner().getJobTracker()
297      .addJobInProgressListener(myListener);
298   
299    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
300    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
301
302    // submit and kill the job   
303    RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir);
304    JobID id = rJob.getID();
305
306    // check if the job failure was notified
307    assertFalse("Missing event notification on killing a running job", 
308                myListener.contains(id));
309   
310  }
311 
312  public void testJobSuccess() throws Exception {
313    LOG.info("Testing job-success");
314    MyListener myListener = new MyListener();
315   
316    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
317   
318    JobConf job = mr.createJobConf();
319   
320    mr.getJobTrackerRunner().getJobTracker()
321      .addJobInProgressListener(myListener);
322   
323    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
324    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
325
326    // submit the job   
327    RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
328   
329    // wait for the job to be running
330    while (rJob.getJobState() != JobStatus.RUNNING) {
331      UtilsForTests.waitFor(10);
332    }
333   
334    LOG.info("Job " +  rJob.getID().toString() + " started running");
335   
336    // check if the listener was updated about this change
337    assertFalse("Missing event notification for a running job", 
338                myListener.contains(rJob.getID(), true));
339   
340    while (rJob.getJobState() != JobStatus.SUCCEEDED) {
341      UtilsForTests.waitFor(10);
342    }
343   
344    // check if the job success was notified
345    assertFalse("Missing event notification for a successful job", 
346                myListener.contains(rJob.getID(), false));
347  }
348 
349  /**
350   * This scheduler never schedules any task as it doesnt init any task. So all
351   * the jobs are queued forever.
352   */
353  public static class MyScheduler extends JobQueueTaskScheduler {
354
355    @Override
356    public synchronized void start() throws IOException {
357      super.start();
358      // Remove the eager task initializer
359      taskTrackerManager.removeJobInProgressListener(
360          eagerTaskInitializationListener);
361      // terminate it
362      eagerTaskInitializationListener.terminate();
363    }
364  }
365 
366  public void testQueuedJobKill() throws Exception {
367    LOG.info("Testing queued-job-kill");
368   
369    MyListener myListener = new MyListener();
370   
371    JobConf job = new JobConf();
372    job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
373                 TaskScheduler.class);
374    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
375   
376    job = mr.createJobConf();
377   
378    mr.getJobTrackerRunner().getJobTracker()
379      .addJobInProgressListener(myListener);
380   
381    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
382    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
383
384    RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
385    JobID id = rJob.getID();
386    LOG.info("Job : " + id.toString() + " submitted");
387   
388    // check if the job is in the waiting queue
389    assertTrue("Missing event notification on submiting a job", 
390                myListener.contains(id, true));
391   
392    // kill the job
393    LOG.info("Killing job : " + id.toString());
394    rJob.killJob();
395   
396    // check if the job is killed
397    assertEquals("Job status doesnt reflect the kill-job action", 
398                 JobStatus.KILLED, rJob.getJobState());
399
400    // check if the job is correctly moved
401    // from the waiting list
402    assertFalse("Missing event notification on killing a waiting job", 
403                myListener.contains(id, true));
404  }
405}
Note: See TracBrowser for help on using the repository browser.