source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 21.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.hadoop.conf.Configuration;
21import org.apache.hadoop.fs.*;
22import org.apache.hadoop.hdfs.MiniDFSCluster;
23import org.apache.hadoop.mapred.UtilsForTests;
24import org.apache.hadoop.security.UserGroupInformation;
25
26import junit.framework.TestCase;
27import java.io.*;
28import java.util.ArrayList;
29import java.util.List;
30
31/**
32 * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
33 * should be able to continue running the previously running jobs and also
34 * recover previosuly submitted jobs.
35 */
36public class TestJobTrackerRestart extends TestCase {
37  static final Path testDir = 
38    new Path(System.getProperty("test.build.data","/tmp"), 
39             "jt-restart-testing");
40  final Path inDir = new Path(testDir, "input");
41  static final Path shareDir = new Path(testDir, "share");
42  final Path outputDir = new Path(testDir, "output");
43  private static int numJobsSubmitted = 0;
44 
45  /**
46   * Return the job conf configured with the priorities and mappers as passed.
47   * @param conf The default conf
48   * @param priorities priorities for the jobs
49   * @param numMaps number of maps for the jobs
50   * @param numReds number of reducers for the jobs
51   * @param outputDir output dir
52   * @param inDir input dir
53   * @param mapSignalFile filename thats acts as a signal for maps
54   * @param reduceSignalFile filename thats acts as a signal for reducers
55   * @return a array of jobconfs configured as needed
56   * @throws IOException
57   */
58  private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, 
59                           int[] numMaps, int[] numReds,
60                           Path outputDir, Path inDir,
61                           String mapSignalFile, String reduceSignalFile) 
62  throws IOException {
63    JobConf[] jobs = new JobConf[priorities.length];
64    for (int i = 0; i < jobs.length; ++i) {
65      jobs[i] = new JobConf(conf);
66      Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
67      UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, 
68          numMaps[i], numReds[i], "jt restart test job", mapSignalFile, 
69          reduceSignalFile);
70      jobs[i].setJobPriority(priorities[i]);
71    }
72    return jobs;
73  }
74
75  /**
76   * Clean up the signals.
77   */
78  private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
79    // Delete the map signal file
80    fileSys.delete(new Path(getMapSignalFile(dir)), false);
81    // Delete the reduce signal file
82    fileSys.delete(new Path(getReduceSignalFile(dir)), false);
83  }
84 
85 /**
86   * Tests the jobtracker with restart-recovery turned off.
87   * Submit a job with normal priority, maps = 2, reducers = 0}
88   *
89   * Wait for the job to complete 50%
90   *
91   * Restart the jobtracker with recovery turned off
92   *
93   * Check if the job is missing
94   */
95  public void testRestartWithoutRecovery(MiniDFSCluster dfs, 
96                                         MiniMRCluster mr) 
97  throws IOException {
98    // III. Test a job with waiting mapper and recovery turned off
99   
100    FileSystem fileSys = dfs.getFileSystem();
101   
102    cleanUp(fileSys, shareDir);
103   
104    JobConf newConf = getJobs(mr.createJobConf(), 
105                              new JobPriority[] {JobPriority.NORMAL}, 
106                              new int[] {2}, new int[] {0},
107                              outputDir, inDir, 
108                              getMapSignalFile(shareDir), 
109                              getReduceSignalFile(shareDir))[0];
110   
111    JobClient jobClient = new JobClient(newConf);
112    RunningJob job = jobClient.submitJob(newConf);
113    JobID id = job.getID();
114   
115    //  make sure that the job is 50% completed
116    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
117      UtilsForTests.waitFor(100);
118    }
119   
120    mr.stopJobTracker();
121   
122    // Turn off the recovery
123    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
124                                      false);
125   
126    // Wait for a minute before submitting a job
127    UtilsForTests.waitFor(60 * 1000);
128   
129    mr.startJobTracker();
130   
131    // Signal the tasks
132    UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
133                              getReduceSignalFile(shareDir));
134   
135    // Wait for the JT to be ready
136    UtilsForTests.waitForJobTracker(jobClient);
137   
138    UtilsForTests.waitTillDone(jobClient);
139   
140    // The submitted job should not exist
141    assertTrue("Submitted job was detected with recovery disabled", 
142               UtilsForTests.getJobStatus(jobClient, id) == null);
143  }
144
145  /** Tests a job on jobtracker with restart-recovery turned on.
146   * Preparation :
147   *    - Configure a job with
148   *       - num-maps : 50
149   *       - num-reducers : 1
150   *    - Configure the cluster to run 1 reducer
151   *    - Lower the history file block size and buffer
152   *   
153   * Wait for the job to complete 50%. Note that all the job is configured to
154   * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will
155   * eventually wait on 50%
156   *
157   * Make a note of the following things
158   *    - Task completion events
159   *    - Cluster status
160   *    - Task Reports
161   *    - Job start time
162   *   
163   * Restart the jobtracker
164   *
165   * Wait for job to finish all the maps and note the TaskCompletion events at
166   * the tracker.
167   *
168   * Wait for all the jobs to finish and note the following
169   *    - New task completion events at the jobtracker
170   *    - Task reports
171   *    - Cluster status
172   *
173   * Check for the following
174   *    - Task completion events for recovered tasks should match
175   *    - Task completion events at the tasktracker and the restarted
176   *      jobtracker should be same
177   *    - Cluster status should be fine.
178   *    - Task Reports for recovered tasks should match
179   *      Checks
180   *        - start time
181   *        - finish time
182   *        - counters
183   *        - http-location
184   *        - task-id
185   *    - Job start time should match
186   *    - Check if the counters can be accessed
187   *    - Check if the history files are (re)named properly
188   */
189  public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs, 
190                                                   MiniMRCluster mr) 
191  throws IOException {
192    // II. Test a tasktracker with waiting mapper and recovery turned on.
193    //     Ideally the tracker should SYNC with the new/restarted jobtracker
194   
195    FileSystem fileSys = dfs.getFileSystem();
196    final int numMaps = 50;
197    final int numReducers = 1;
198   
199   
200    cleanUp(fileSys, shareDir);
201   
202    JobConf newConf = getJobs(mr.createJobConf(), 
203                              new JobPriority[] {JobPriority.NORMAL}, 
204                              new int[] {numMaps}, new int[] {numReducers},
205                              outputDir, inDir, 
206                              getMapSignalFile(shareDir), 
207                              getReduceSignalFile(shareDir))[0];
208   
209    JobClient jobClient = new JobClient(newConf);
210    RunningJob job = jobClient.submitJob(newConf);
211    JobID id = job.getID();
212   
213    // change the job priority
214    mr.setJobPriority(id, JobPriority.HIGH);
215   
216    mr.initializeJob(id);
217   
218    //  make sure that atleast on reducer is spawned
219    while (jobClient.getClusterStatus().getReduceTasks() == 0) {
220      UtilsForTests.waitFor(100);
221    }
222   
223    while(true) {
224      // Since we are using a half waiting mapper, maps should be stuck at 50%
225      TaskCompletionEvent[] trackerEvents = 
226        mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
227          .getMapTaskCompletionEvents();
228      if (trackerEvents.length < numMaps / 2) {
229        UtilsForTests.waitFor(1000);
230      } else {
231        break;
232      }
233    }
234   
235    TaskCompletionEvent[] prevEvents = 
236      mr.getTaskCompletionEvents(id, 0, numMaps);
237    TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
238    TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
239    ClusterStatus prevStatus = jobClient.getClusterStatus();
240   
241    mr.stopJobTracker();
242   
243    // Turn off the recovery
244    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
245                                      true);
246   
247    //  Wait for a minute before submitting a job
248    UtilsForTests.waitFor(60 * 1000);
249   
250    mr.startJobTracker();
251   
252    // Signal the map tasks
253    UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
254                              getReduceSignalFile(shareDir));
255   
256    // Wait for the JT to be ready
257    UtilsForTests.waitForJobTracker(jobClient);
258   
259    int numToMatch = mr.getNumEventsRecovered() / 2;
260   
261    //  make sure that the maps are completed
262    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
263      UtilsForTests.waitFor(100);
264    }
265   
266    // Get the new jobtrackers events
267    TaskCompletionEvent[] jtEvents = 
268      mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
269   
270    // Test if all the events that were recovered match exactly
271    testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
272   
273    // Check the task reports
274    // The reports should match exactly if the attempts are same
275    TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
276    TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
277    testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
278    testTaskReports(prevSetupReports, afterSetupReports, 1);
279   
280    // check the job priority
281    assertEquals("Job priority change is not reflected", 
282                 JobPriority.HIGH, mr.getJobPriority(id));
283   
284    List<TaskCompletionEvent> jtMapEvents =
285      new ArrayList<TaskCompletionEvent>();
286    for (TaskCompletionEvent tce : jtEvents) {
287      if (tce.isMapTask()) {
288        jtMapEvents.add(tce);
289      }
290    }
291   
292    TaskCompletionEvent[] trackerEvents; 
293    while(true) {
294     // Wait for the tracker to pull all the map events
295     trackerEvents =
296       mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
297         .getMapTaskCompletionEvents();
298     if (trackerEvents.length < jtMapEvents.size()) {
299       UtilsForTests.waitFor(1000);
300     } else {
301       break;
302     }
303   }
304
305    //  Signal the reduce tasks
306    UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
307                              getReduceSignalFile(shareDir));
308   
309    UtilsForTests.waitTillDone(jobClient);
310   
311    testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]), 
312                              trackerEvents, true, -1);
313   
314    // validate the history file
315    TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
316    TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
317   
318    // check if the cluster status is insane
319    ClusterStatus status = jobClient.getClusterStatus();
320    assertTrue("Cluster status is insane", 
321               checkClusterStatusOnCompletion(status, prevStatus));
322  }
323 
324  /**
325   * Checks if the history files are as expected
326   * @param id job id
327   * @param conf job conf
328   */
329  private void testJobHistoryFiles(JobID id, JobConf conf) 
330  throws IOException  {
331    // Get the history files for users
332    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
333    String tempLogFileName = 
334      JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
335   
336    // I. User files
337    Path logFile = 
338      JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
339    FileSystem fileSys = logFile.getFileSystem(conf);
340   
341    // Check if the history file exists
342    assertTrue("User log file does not exist", fileSys.exists(logFile));
343   
344    // Check if the temporary file is deleted
345    Path tempLogFile = 
346      JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, 
347                                                         conf);
348    assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
349   
350    // II. Framework files
351    // Get the history file
352    logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
353    fileSys = logFile.getFileSystem(conf);
354   
355    // Check if the history file exists
356    assertTrue("Log file does not exist", fileSys.exists(logFile));
357   
358    // Check if the temporary file is deleted
359    tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
360    assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
361  }
362 
363  /**
364   * Matches specified number of task reports.
365   * @param source the reports to be matched
366   * @param target reports to match with
367   * @param numToMatch num reports to match
368   * @param mismatchSet reports that should not match
369   */
370  private void testTaskReports(TaskReport[] source, TaskReport[] target, 
371                               int numToMatch) {
372    for (int i = 0; i < numToMatch; ++i) {
373      // Check if the task reports was recovered correctly
374      assertTrue("Task reports for same attempt has changed", 
375                 source[i].equals(target[i]));
376    }
377  }
378 
379  /**
380   * Matches the task completion events.
381   * @param source the events to be matched
382   * @param target events to match with
383   * @param fullMatch whether to match the events completely or partially
384   * @param numToMatch number of events to match in case full match is not
385   *        desired
386   * @param ignoreSet a set of taskids to ignore
387   */
388  private void testTaskCompletionEvents(TaskCompletionEvent[] source, 
389                                       TaskCompletionEvent[] target, 
390                                       boolean fullMatch,
391                                       int numToMatch) {
392    //  Check if the event list size matches
393    // The lengths should match only incase of full match
394    if (fullMatch) {
395      assertEquals("Map task completion events mismatch", 
396                   source.length, target.length);
397      numToMatch = source.length;
398    }
399    // Check if the events match
400    for (int i = 0; i < numToMatch; ++i) {
401      if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){
402        assertTrue("Map task completion events ordering mismatch", 
403                   source[i].equals(target[i]));
404      }
405    }
406  }
407 
408  private boolean checkClusterStatusOnCompletion(ClusterStatus status, 
409                                                 ClusterStatus prevStatus) {
410    return status.getJobTrackerState() == prevStatus.getJobTrackerState()
411           && status.getMapTasks() == 0
412           && status.getReduceTasks() == 0;
413  }
414 
415  /** Committer with setup waiting
416   */
417  static class CommitterWithDelaySetup extends FileOutputCommitter {
418    @Override
419    public void setupJob(JobContext context) throws IOException {
420      FileSystem fs = FileSystem.get(context.getConfiguration());
421      while (true) {
422        if (fs.exists(shareDir)) {
423          break;
424        }
425        UtilsForTests.waitFor(100);
426      }
427      super.cleanupJob(context);
428    }
429  }
430
431  /** Tests a job on jobtracker with restart-recovery turned on and empty
432   *  jobhistory file.
433   * Preparation :
434   *    - Configure a job with
435   *       - num-maps : 0 (long waiting setup)
436   *       - num-reducers : 0
437   *   
438   * Check if the job succeedes after restart.
439   *
440   * Assumption that map slots are given first for setup.
441   */
442  public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, 
443                                              MiniMRCluster mr) 
444  throws IOException {
445    mr.startTaskTracker(null, null, 1, 1);
446    FileSystem fileSys = dfs.getFileSystem();
447   
448    cleanUp(fileSys, shareDir);
449    cleanUp(fileSys, inDir);
450    cleanUp(fileSys, outputDir);
451   
452    JobConf conf = mr.createJobConf();
453    conf.setNumReduceTasks(0);
454    conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
455    fileSys.delete(outputDir, false);
456    RunningJob job1 = 
457      UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
458   
459    conf.setNumReduceTasks(0);
460    conf.setOutputCommitter(CommitterWithDelaySetup.class);
461    Path inDir2 = new Path(testDir, "input2");
462    fileSys.mkdirs(inDir2);
463    Path outDir2 = new Path(testDir, "output2");
464    fileSys.delete(outDir2, false);
465    JobConf newConf = getJobs(mr.createJobConf(),
466                              new JobPriority[] {JobPriority.NORMAL},
467                              new int[] {10}, new int[] {0},
468                              outDir2, inDir2,
469                              getMapSignalFile(shareDir),
470                              getReduceSignalFile(shareDir))[0];
471
472    JobClient jobClient = new JobClient(newConf);
473    RunningJob job2 = jobClient.submitJob(newConf);
474    JobID id = job2.getID();
475
476    /*RunningJob job2 =
477      UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
478   
479    JobID id = job2.getID();*/
480    JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
481   
482    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
483   
484    // find out the history filename
485    String history = 
486      JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
487    Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
488   
489    //  make sure that setup is launched
490    while (jip.runningMaps() == 0) {
491      UtilsForTests.waitFor(100);
492    }
493   
494    id = job1.getID();
495    jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
496   
497    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
498   
499    //  make sure that cleanup is launched and is waiting
500    while (!jip.isCleanupLaunched()) {
501      UtilsForTests.waitFor(100);
502    }
503   
504    mr.stopJobTracker();
505   
506    // delete the history file .. just to be safe.
507    FileSystem historyFS = historyPath.getFileSystem(conf);
508    historyFS.delete(historyPath, false);
509    historyFS.create(historyPath).close(); // create an empty file
510   
511   
512    UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1);
513
514    // Turn on the recovery
515    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
516                                      true);
517   
518    mr.startJobTracker();
519   
520    job1.waitForCompletion();
521    job2.waitForCompletion();
522  }
523 
524  public void testJobTrackerRestart() throws IOException {
525    String namenode = null;
526    MiniDFSCluster dfs = null;
527    MiniMRCluster mr = null;
528    FileSystem fileSys = null;
529
530    try {
531      Configuration conf = new Configuration();
532      conf.setBoolean("dfs.replication.considerLoad", false);
533      dfs = new MiniDFSCluster(conf, 1, true, null, null);
534      dfs.waitActive();
535      fileSys = dfs.getFileSystem();
536     
537      // clean up
538      fileSys.delete(testDir, true);
539     
540      if (!fileSys.mkdirs(inDir)) {
541        throw new IOException("Mkdirs failed to create " + inDir.toString());
542      }
543
544      // Write the input file
545      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
546                              new Path(inDir + "/file"), (short)1);
547
548      dfs.startDataNodes(conf, 1, true, null, null, null, null);
549      dfs.waitActive();
550
551      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
552                 + (dfs.getFileSystem()).getUri().getPort();
553
554      // Make sure that jobhistory leads to a proper job restart
555      // So keep the blocksize and the buffer size small
556      JobConf jtConf = new JobConf();
557      jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
558      jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
559      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
560      jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
561      jtConf.setBoolean("mapred.acls.enabled", true);
562      // get the user group info
563      UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
564      jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName());
565     
566      mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
567     
568      // Test the tasktracker SYNC
569      testTaskEventsAndReportsWithRecovery(dfs, mr);
570     
571      // Test jobtracker with restart-recovery turned off
572      testRestartWithoutRecovery(dfs, mr);
573     
574      // test recovery with empty file
575      testJobRecoveryWithEmptyHistory(dfs, mr);
576    } finally {
577      if (mr != null) {
578        try {
579          mr.shutdown();
580        } catch (Exception e) {}
581      }
582      if (dfs != null) {
583        try {
584          dfs.shutdown();
585        } catch (Exception e) {}
586      }
587    }
588  }
589
590  private static String getMapSignalFile(Path dir) {
591    return (new Path(dir, "jt-restart-map-signal")).toString();
592  }
593
594  private static String getReduceSignalFile(Path dir) {
595    return (new Path(dir, "jt-restart-reduce-signal")).toString();
596  }
597 
598  public static void main(String[] args) throws IOException {
599    new TestJobTrackerRestart().testJobTrackerRestart();
600  }
601}
Note: See TracBrowser for help on using the repository browser.