source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobHistory.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 42.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.File;
22import java.io.IOException;
23import java.text.ParseException;
24import java.util.ArrayList;
25import java.util.List;
26import java.util.HashMap;
27import java.util.Map;
28import java.util.Iterator;
29import java.util.regex.Matcher;
30import java.util.regex.Pattern;
31
32import junit.framework.TestCase;
33
34import org.apache.hadoop.conf.Configuration;
35import org.apache.hadoop.fs.FileSystem;
36import org.apache.hadoop.fs.Path;
37import org.apache.hadoop.fs.permission.FsPermission;
38import org.apache.hadoop.mapred.JobHistory.*;
39import org.apache.commons.logging.Log;
40import org.apache.commons.logging.LogFactory;
41
42/**
43 * Tests the JobHistory files - to catch any changes to JobHistory that can
44 * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
45 *
46 * testJobHistoryFile
47 * Run a job that will be succeeded and validate its history file format and
48 * content.
49 *
50 * testJobHistoryUserLogLocation
51 * Run jobs with the given values of hadoop.job.history.user.location as
52 *   (1)null(default case), (2)"none", and (3)some dir like "/tmp".
53 *   Validate user history file location in each case.
54 *
55 * testJobHistoryJobStatus
56 * Run jobs that will be (1) succeeded (2) failed (3) killed.
57 *   Validate job status read from history file in each case.
58 *
59 * Future changes to job history are to be reflected here in this file.
60 */
61public class TestJobHistory extends TestCase {
62   private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
63 
64  private static String TEST_ROOT_DIR = new File(System.getProperty(
65      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
66
67  private static final Pattern digitsPattern =
68                                     Pattern.compile(JobHistory.DIGITS);
69
70  // hostname like   /default-rack/host1.foo.com OR host1.foo.com
71  private static final Pattern hostNamePattern = Pattern.compile(
72                                       "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)");
73
74  private static final String IP_ADDR =
75                       "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?";
76
77  // hostname like   /default-rack/host1.foo.com OR host1.foo.com
78  private static final Pattern trackerNamePattern = Pattern.compile(
79                         "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
80                         IP_ADDR + ":" + JobHistory.DIGITS);
81
82  private static final Pattern splitsPattern = Pattern.compile(
83                              hostNamePattern + "(," + hostNamePattern + ")*");
84
85  private static Map<String, List<String>> taskIDsToAttemptIDs =
86                                     new HashMap<String, List<String>>();
87
88  //Each Task End seen from history file is added here
89  private static List<String> taskEnds = new ArrayList<String>();
90
91  // List of tasks that appear in history file after JT reatart. This is to
92  // allow START_TIME=0 for these tasks.
93  private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
94
95  // List of potential tasks whose start time can be 0 because of JT restart
96  private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
97
98  /**
99   * Listener for history log file, it populates JobHistory.JobInfo
100   * object with data from log file and validates the data.
101   */
102  static class TestListener
103                    extends DefaultJobHistoryParser.JobTasksParseListener {
104    int lineNum;//line number of history log file
105    boolean isJobLaunched;
106    boolean isJTRestarted;
107
108    TestListener(JobInfo job) {
109      super(job);
110      lineNum = 0;
111      isJobLaunched = false;
112      isJTRestarted = false;
113    }
114
115    // TestListener implementation
116    public void handle(RecordTypes recType, Map<Keys, String> values)
117    throws IOException {
118
119      lineNum++;
120
121      // Check if the record is of type Meta
122      if (recType == JobHistory.RecordTypes.Meta) {
123        long version = Long.parseLong(values.get(Keys.VERSION));
124        assertTrue("Unexpected job history version ",
125                   (version >= 0 && version <= JobHistory.VERSION));
126      }
127      else if (recType.equals(RecordTypes.Job)) {
128        String jobid = values.get(Keys.JOBID);
129        assertTrue("record type 'Job' is seen without JOBID key" +
130                        " in history file at line " + lineNum, jobid != null);
131        JobID id = JobID.forName(jobid);
132        assertTrue("JobID in history file is in unexpected format " +
133                  "at line " + lineNum, id != null);
134        String time = values.get(Keys.LAUNCH_TIME);
135        if (time != null) {
136          if (isJobLaunched) {
137            // We assume that if we see LAUNCH_TIME again, it is because of JT restart
138            isJTRestarted = true;
139          }
140          else {// job launched first time
141            isJobLaunched = true;
142          }
143        }
144        time = values.get(Keys.FINISH_TIME);
145        if (time != null) {
146          assertTrue ("Job FINISH_TIME is seen in history file at line " +
147                      lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
148        }
149      }
150      else if (recType.equals(RecordTypes.Task)) {
151        String taskid = values.get(Keys.TASKID);
152        assertTrue("record type 'Task' is seen without TASKID key" +
153                        " in history file at line " + lineNum, taskid != null);
154        TaskID id = TaskID.forName(taskid);
155        assertTrue("TaskID in history file is in unexpected format " +
156                  "at line " + lineNum, id != null);
157       
158        String time = values.get(Keys.START_TIME);
159        if (time != null) {
160          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
161          assertTrue("Duplicate START_TIME seen for task " + taskid +
162                     " in history file at line " + lineNum, attemptIDs == null);
163          attemptIDs = new ArrayList<String>();
164          taskIDsToAttemptIDs.put(taskid, attemptIDs);
165
166          if (isJTRestarted) {
167            // This maintains a potential ignoreStartTimeTasks list
168            tempIgnoreStartTimeOfTasks.add(taskid);
169          }
170        }
171
172        time = values.get(Keys.FINISH_TIME);
173        if (time != null) {
174          String s = values.get(Keys.TASK_STATUS);
175          if (s != null) {
176            List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
177            assertTrue ("Task FINISH_TIME is seen in history file at line " +
178                    lineNum + " before START_TIME is seen", attemptIDs != null);
179
180            // Check if all the attemptIDs of this task are finished
181            assertTrue("TaskId " + taskid + " is finished at line " +
182                       lineNum + " but its attemptID is not finished.",
183                       (attemptIDs.size() <= 1));
184
185            // Check if at least 1 attempt of this task is seen
186            assertTrue("TaskId " + taskid + " is finished at line " +
187                       lineNum + " but no attemptID is seen before this.",
188                       attemptIDs.size() == 1);
189
190            if (s.equals("KILLED") || s.equals("FAILED")) {
191              // Task End with KILLED/FAILED status in history file is
192              // considered as TaskEnd, TaskStart. This is useful in checking
193              // the order of history lines.
194              attemptIDs = new ArrayList<String>();
195              taskIDsToAttemptIDs.put(taskid, attemptIDs);
196            }
197            else {
198              taskEnds.add(taskid);
199            }
200          }
201          else {
202            // This line of history file could be just an update to finish time
203          }
204        }
205      }
206      else if (recType.equals(RecordTypes.MapAttempt) ||
207                 recType.equals(RecordTypes.ReduceAttempt)) {
208        String taskid =  values.get(Keys.TASKID);
209        assertTrue("record type " + recType + " is seen without TASKID key" +
210                        " in history file at line " + lineNum, taskid != null);
211       
212        String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
213        TaskAttemptID id = TaskAttemptID.forName(attemptId);
214        assertTrue("AttemptID in history file is in unexpected format " +
215                   "at line " + lineNum, id != null);
216       
217        String time = values.get(Keys.START_TIME);
218        if (time != null) {
219          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
220          assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
221                      " before Task is seen", attemptIDs != null);
222          assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
223                      "file at line " + lineNum, attemptIDs.remove(attemptId));
224
225          if (attemptIDs.isEmpty()) {
226            //just a boolean whether any attempt is seen or not
227            attemptIDs.add("firstAttemptIsSeen");
228          }
229          attemptIDs.add(attemptId);
230
231          if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
232              (id.getId() < 1000)) {
233            // If Task line of this attempt is seen in history file after
234            // JT restart and if this attempt is < 1000(i.e. attempt is noti
235            // started after JT restart) - assuming single JT restart happened
236            ignoreStartTimeOfTasks.add(taskid);
237          }
238        }
239
240        time = values.get(Keys.FINISH_TIME);
241        if (time != null) {
242          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
243          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
244                      + lineNum + " before Task is seen", attemptIDs != null);
245
246          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
247                      + lineNum + " before TaskAttempt START_TIME is seen",
248                      attemptIDs.remove(attemptId));
249        }
250      }
251      super.handle(recType, values);
252    }
253  }
254
255  // Check if the time is in the expected format
256  private static boolean isTimeValid(String time) {
257    Matcher m = digitsPattern.matcher(time);
258    return m.matches() && (Long.parseLong(time) > 0);
259  }
260
261  private static boolean areTimesInOrder(String time1, String time2) {
262    return (Long.parseLong(time1) <= Long.parseLong(time2));
263  }
264
265  // Validate Format of Job Level Keys, Values read from history file
266  private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
267                                                      String status) {
268    String time = values.get(Keys.SUBMIT_TIME);
269    assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
270               " in history file", isTimeValid(time));
271
272    time = values.get(Keys.LAUNCH_TIME);
273    assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
274               " in history file", isTimeValid(time));
275
276    String time1 = values.get(Keys.FINISH_TIME);
277    assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
278               " in history file", isTimeValid(time1));
279    assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
280               areTimesInOrder(time, time1));
281
282    String stat = values.get(Keys.JOB_STATUS);
283    assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" +
284               " history file", (status.equals(stat)));
285
286    String priority = values.get(Keys.JOB_PRIORITY);
287    assertTrue("Unknown priority for the job in history file",
288               (priority.equals("HIGH") ||
289                priority.equals("LOW")  || priority.equals("NORMAL") ||
290                priority.equals("VERY_HIGH") || priority.equals("VERY_LOW")));
291  }
292
293  // Validate Format of Task Level Keys, Values read from history file
294  private static void validateTaskLevelKeyValuesFormat(JobInfo job,
295                                  boolean splitsCanBeEmpty) {
296    Map<String, JobHistory.Task> tasks = job.getAllTasks();
297
298    // validate info of each task
299    for (JobHistory.Task task : tasks.values()) {
300
301      String tid = task.get(Keys.TASKID);
302      String time = task.get(Keys.START_TIME);
303      // We allow START_TIME=0 for tasks seen in history after JT restart
304      if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
305        assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
306                 time + " in history file", isTimeValid(time));
307      }
308
309      String time1 = task.get(Keys.FINISH_TIME);
310      assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
311                 time1 + " in history file", isTimeValid(time1));
312      assertTrue("Task FINISH_TIME is < START_TIME in history file",
313                 areTimesInOrder(time, time1));
314
315      // Make sure that the Task type exists and it is valid
316      String type = task.get(Keys.TASK_TYPE);
317      assertTrue("Unknown Task type \"" + type + "\" is seen in " +
318                 "history file for task " + tid,
319                 (type.equals("MAP") || type.equals("REDUCE") ||
320                  type.equals("SETUP") || type.equals("CLEANUP")));
321
322      if (type.equals("MAP")) {
323        String splits = task.get(Keys.SPLITS);
324        //order in the condition OR check is important here
325        if (!splitsCanBeEmpty || splits.length() != 0) {
326          Matcher m = splitsPattern.matcher(splits);
327          assertTrue("Unexpected format of SPLITS \"" + splits + "\" is seen" +
328                     " in history file for task " + tid, m.matches());
329        }
330      }
331
332      // Validate task status
333      String status = task.get(Keys.TASK_STATUS);
334      assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
335                 " history file for task " + tid, (status.equals("SUCCESS") ||
336                 status.equals("FAILED") || status.equals("KILLED")));
337    }
338  }
339
340  // Validate foramt of Task Attempt Level Keys, Values read from history file
341  private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
342    Map<String, JobHistory.Task> tasks = job.getAllTasks();
343
344    // For each task
345    for (JobHistory.Task task : tasks.values()) {
346      // validate info of each attempt
347      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
348
349        String id = attempt.get(Keys.TASK_ATTEMPT_ID);
350        String time = attempt.get(Keys.START_TIME);
351        assertTrue("START_TIME of task attempt " + id +
352                   " is in unexpected format:" + time +
353                   " in history file", isTimeValid(time));
354
355        String time1 = attempt.get(Keys.FINISH_TIME);
356        assertTrue("FINISH_TIME of task attempt " + id +
357                   " is in unexpected format:" + time1 +
358                   " in history file", isTimeValid(time1));
359        assertTrue("Task FINISH_TIME is < START_TIME in history file",
360                   areTimesInOrder(time, time1));
361
362        // Make sure that the Task type exists and it is valid
363        String type = attempt.get(Keys.TASK_TYPE);
364        assertTrue("Unknown Task type \"" + type + "\" is seen in " +
365                   "history file for task attempt " + id,
366                   (type.equals("MAP") || type.equals("REDUCE") ||
367                    type.equals("SETUP") || type.equals("CLEANUP")));
368
369        // Validate task status
370        String status = attempt.get(Keys.TASK_STATUS);
371        assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
372                   " history file for task attempt " + id,
373                   (status.equals("SUCCESS") || status.equals("FAILED") ||
374                    status.equals("KILLED")));
375
376        // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
377        // SORT_FINISHED time
378        if (type.equals("REDUCE") && status.equals("SUCCESS")) {
379          time1 = attempt.get(Keys.SHUFFLE_FINISHED);
380          assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
381                     " is in unexpected format:" + time1 +
382                     " in history file", isTimeValid(time1));
383          assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
384                     "in history file", areTimesInOrder(time, time1));
385          time = attempt.get(Keys.SORT_FINISHED);
386          assertTrue("SORT_FINISHED of task attempt " + id +
387                     " is in unexpected format:" + time +
388                     " in history file", isTimeValid(time));
389          assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
390                     " in history file", areTimesInOrder(time1, time));
391        }
392
393        // check if hostname is valid
394        String hostname = attempt.get(Keys.HOSTNAME);
395        Matcher m = hostNamePattern.matcher(hostname);
396        assertTrue("Unexpected Host name of task attempt " + id, m.matches());
397
398        // check if trackername is valid
399        String trackerName = attempt.get(Keys.TRACKER_NAME);
400        m = trackerNamePattern.matcher(trackerName);
401        assertTrue("Unexpected tracker name of task attempt " + id,
402                   m.matches());
403
404        if (!status.equals("KILLED")) {
405          // check if http port is valid
406          String httpPort = attempt.get(Keys.HTTP_PORT);
407          m = digitsPattern.matcher(httpPort);
408          assertTrue("Unexpected http port of task attempt " + id, m.matches());
409        }
410       
411        // check if counters are parsable
412        String counters = attempt.get(Keys.COUNTERS);
413        try {
414          Counters readCounters = Counters.fromEscapedCompactString(counters);
415          assertTrue("Counters of task attempt " + id + " are not parsable",
416                     readCounters != null);
417        } catch (ParseException pe) {
418          LOG.warn("While trying to parse counters of task attempt " + id +
419                   ", " + pe);
420        }
421      }
422    }
423  }
424
425  /**
426   *  Validates the format of contents of history file
427   *  (1) history file exists and in correct location
428   *  (2) Verify if the history file is parsable
429   *  (3) Validate the contents of history file
430   *     (a) Format of all TIMEs are checked against a regex
431   *     (b) validate legality/format of job level key, values
432   *     (c) validate legality/format of task level key, values
433   *     (d) validate legality/format of attempt level key, values
434   *     (e) check if all the TaskAttempts, Tasks started are finished.
435   *         Check finish of each TaskAttemptID against its start to make sure
436   *         that all TaskAttempts, Tasks started are indeed finished and the
437   *         history log lines are in the proper order.
438   *         We want to catch ordering of history lines like
439   *            Task START
440   *            Attempt START
441   *            Task FINISH
442   *            Attempt FINISH
443   *         (speculative execution is turned off for this).
444   * @param id job id
445   * @param conf job conf
446   */
447  static void validateJobHistoryFileFormat(JobID id, JobConf conf,
448                 String status, boolean splitsCanBeEmpty) throws IOException  {
449
450    // Get the history file name
451    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
452
453    // Framework history log file location
454    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
455    FileSystem fileSys = logFile.getFileSystem(conf);
456 
457    // Check if the history file exists
458    assertTrue("History file does not exist", fileSys.exists(logFile));
459
460
461    // check if the history file is parsable
462    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
463                                                   logFileName).split("_");
464
465    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
466    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
467
468    TestListener l = new TestListener(jobInfo);
469    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
470
471
472    // validate format of job level key, values
473    validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
474
475    // validate format of task level key, values
476    validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
477
478    // validate format of attempt level key, values
479    validateTaskAttemptLevelKeyValuesFormat(jobInfo);
480
481    // check if all the TaskAttempts, Tasks started are finished for
482    // successful jobs
483    if (status.equals("SUCCESS")) {
484      // Make sure that the lists in taskIDsToAttemptIDs are empty.
485      for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
486        String taskid = it.next();
487        assertTrue("There are some Tasks which are not finished in history " +
488                   "file.", taskEnds.contains(taskid));
489        List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
490        if(attemptIDs != null) {
491          assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" +
492                     " that are not finished.", (attemptIDs.size() == 1));
493        }
494      }
495    }
496  }
497
498  // Validate Job Level Keys, Values read from history file by
499  // comparing them with the actual values from JT.
500  private static void validateJobLevelKeyValues(MiniMRCluster mr,
501          RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException  {
502
503    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
504    JobInProgress jip = jt.getJob(job.getID());
505
506    Map<Keys, String> values = jobInfo.getValues();
507
508    assertTrue("SUBMIT_TIME of job obtained from history file did not " +
509               "match the expected value", jip.getStartTime() ==
510               Long.parseLong(values.get(Keys.SUBMIT_TIME)));
511
512    assertTrue("LAUNCH_TIME of job obtained from history file did not " +
513               "match the expected value", jip.getLaunchTime() ==
514               Long.parseLong(values.get(Keys.LAUNCH_TIME)));
515
516    assertTrue("FINISH_TIME of job obtained from history file did not " +
517               "match the expected value", jip.getFinishTime() ==
518               Long.parseLong(values.get(Keys.FINISH_TIME)));
519
520    assertTrue("Job Status of job obtained from history file did not " +
521               "match the expected value",
522               values.get(Keys.JOB_STATUS).equals("SUCCESS"));
523
524    assertTrue("Job Priority of job obtained from history file did not " +
525               "match the expected value", jip.getPriority().toString().equals(
526               values.get(Keys.JOB_PRIORITY)));
527
528    assertTrue("Job Name of job obtained from history file did not " +
529               "match the expected value", JobInfo.getJobName(conf).equals(
530               values.get(Keys.JOBNAME)));
531
532    assertTrue("User Name of job obtained from history file did not " +
533               "match the expected value", JobInfo.getUserName(conf).equals(
534               values.get(Keys.USER)));
535
536    // Validate job counters
537    Counters c = jip.getCounters();
538    assertTrue("Counters of job obtained from history file did not " +
539               "match the expected value",
540               c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
541
542    // Validate number of total maps, total reduces, finished maps,
543    // finished reduces, failed maps, failed recudes
544    String totalMaps = values.get(Keys.TOTAL_MAPS);
545    assertTrue("Unexpected number of total maps in history file",
546               Integer.parseInt(totalMaps) == jip.desiredMaps());
547
548    String totalReduces = values.get(Keys.TOTAL_REDUCES);
549    assertTrue("Unexpected number of total reduces in history file",
550               Integer.parseInt(totalReduces) == jip.desiredReduces());
551
552    String finMaps = values.get(Keys.FINISHED_MAPS);
553    assertTrue("Unexpected number of finished maps in history file",
554               Integer.parseInt(finMaps) == jip.finishedMaps());
555
556    String finReduces = values.get(Keys.FINISHED_REDUCES);
557    assertTrue("Unexpected number of finished reduces in history file",
558               Integer.parseInt(finReduces) == jip.finishedReduces());
559
560    String failedMaps = values.get(Keys.FAILED_MAPS);
561    assertTrue("Unexpected number of failed maps in history file",
562               Integer.parseInt(failedMaps) == jip.failedMapTasks);
563
564    String failedReduces = values.get(Keys.FAILED_REDUCES);
565    assertTrue("Unexpected number of failed reduces in history file",
566               Integer.parseInt(failedReduces) == jip.failedReduceTasks);
567  }
568
569  // Validate Task Level Keys, Values read from history file by
570  // comparing them with the actual values from JT.
571  private static void validateTaskLevelKeyValues(MiniMRCluster mr,
572                      RunningJob job, JobInfo jobInfo) throws IOException  {
573
574    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
575    JobInProgress jip = jt.getJob(job.getID());
576
577    // Get the 1st map, 1st reduce, cleanup & setup taskIDs and
578    // validate their history info
579    TaskID mapTaskId = new TaskID(job.getID(), true, 0);
580    TaskID reduceTaskId = new TaskID(job.getID(), false, 0);
581
582    TaskInProgress cleanups[] = jip.getCleanupTasks();
583    TaskID cleanupTaskId;
584    if (cleanups[0].isComplete()) {
585      cleanupTaskId = cleanups[0].getTIPId();
586    }
587    else {
588      cleanupTaskId = cleanups[1].getTIPId();
589    }
590
591    TaskInProgress setups[] = jip.getSetupTasks();
592    TaskID setupTaskId;
593    if (setups[0].isComplete()) {
594      setupTaskId = setups[0].getTIPId();
595    }
596    else {
597      setupTaskId = setups[1].getTIPId();
598    }
599
600    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
601
602    // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
603    for (JobHistory.Task task : tasks.values()) {
604
605      String tid = task.get(Keys.TASKID);
606      if (tid.equals(mapTaskId.toString()) ||
607          tid.equals(reduceTaskId.toString()) ||
608          tid.equals(cleanupTaskId.toString()) ||
609          tid.equals(setupTaskId.toString())) {
610
611        TaskID taskId = null;
612        if (tid.equals(mapTaskId.toString())) {
613          taskId = mapTaskId;
614        }
615        else if (tid.equals(reduceTaskId.toString())) {
616          taskId = reduceTaskId;
617        }
618        else if (tid.equals(cleanupTaskId.toString())) {
619          taskId = cleanupTaskId;
620        }
621        else if (tid.equals(setupTaskId.toString())) {
622          taskId = setupTaskId;
623        }
624        TaskInProgress tip = jip.getTaskInProgress(taskId);
625        assertTrue("START_TIME of Task " + tid + " obtained from history " +
626             "file did not match the expected value", tip.getExecStartTime() ==
627             Long.parseLong(task.get(Keys.START_TIME)));
628
629        assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
630             "file did not match the expected value", tip.getExecFinishTime() ==
631             Long.parseLong(task.get(Keys.FINISH_TIME)));
632
633        if (taskId == mapTaskId) {//check splits only for map task
634          assertTrue("Splits of Task " + tid + " obtained from history file " +
635                     " did not match the expected value",
636                     tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
637        }
638
639        TaskAttemptID attemptId = tip.getSuccessfulTaskid();
640        TaskStatus ts = tip.getTaskStatus(attemptId);
641
642        // Validate task counters
643        Counters c = ts.getCounters();
644        assertTrue("Counters of Task " + tid + " obtained from history file " +
645                   " did not match the expected value",
646                  c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
647      }
648    }
649  }
650
651  // Validate Task Attempt Level Keys, Values read from history file by
652  // comparing them with the actual values from JT.
653  private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
654                      RunningJob job, JobInfo jobInfo) throws IOException  {
655
656    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
657    JobInProgress jip = jt.getJob(job.getID());
658
659    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
660
661    // For each task
662    for (JobHistory.Task task : tasks.values()) {
663      // validate info of each attempt
664      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
665
666        String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
667        TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
668        TaskID tid = attemptId.getTaskID();
669
670        // Validate task id
671        assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
672                   "history file did not match the expected value",
673                   tid.toString().equals(attempt.get(Keys.TASKID)));
674
675        TaskInProgress tip = jip.getTaskInProgress(tid);
676        TaskStatus ts = tip.getTaskStatus(attemptId);
677
678        // Validate task attempt start time
679        assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
680                   "history file did not match the expected value",
681            ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
682
683        // Validate task attempt finish time
684        assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
685                   "history file did not match the expected value",
686            ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
687
688
689        TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker());
690
691        if (ttStatus != null) {
692          assertTrue("http port of task attempt " + idStr + " obtained from " +
693                     "history file did not match the expected value",
694                     ttStatus.getHttpPort() ==
695                     Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
696
697          if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
698            String ttHostname = jt.getNode(ttStatus.getHost()).toString();
699
700            // check if hostname is valid
701            assertTrue("Host name of task attempt " + idStr + " obtained from" +
702                       " history file did not match the expected value",
703                       ttHostname.equals(attempt.get(Keys.HOSTNAME)));
704          }
705        }
706        if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
707          // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
708          // Reduce Task Attempts
709          if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
710            assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
711                     " obtained from history file did not match the expected" +
712                     " value", ts.getShuffleFinishTime() ==
713                     Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
714            assertTrue("SORT_FINISHED time of task attempt " + idStr +
715                     " obtained from history file did not match the expected" +
716                     " value", ts.getSortFinishTime() ==
717                     Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
718          }
719
720          //Validate task counters
721          Counters c = ts.getCounters();
722          assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
723                     "history file did not match the expected value",
724               c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
725        }
726       
727        // check if tracker name is valid
728        assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
729                   "history file did not match the expected value",
730                   ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
731      }
732    }
733  }
734
735  /**
736   * Checks if the history file content is as expected comparing with the
737   * actual values obtained from JT.
738   * Job Level, Task Level and Task Attempt Level Keys, Values are validated.
739   * @param job RunningJob object of the job whose history is to be validated
740   * @param conf job conf
741   */
742  static void validateJobHistoryFileContent(MiniMRCluster mr,
743                              RunningJob job, JobConf conf) throws IOException  {
744
745    JobID id = job.getID();
746    // Get the history file name
747    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
748
749    // Framework history log file location
750    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
751    FileSystem fileSys = logFile.getFileSystem(conf);
752 
753    // Check if the history file exists
754    assertTrue("History file does not exist", fileSys.exists(logFile));
755
756
757    // check if the history file is parsable
758    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
759                                                   logFileName).split("_");
760
761    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
762    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
763
764    DefaultJobHistoryParser.JobTasksParseListener l =
765                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
766    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
767
768    // Now the history file contents are available in jobInfo. Let us compare
769    // them with the actual values from JT.
770    validateJobLevelKeyValues(mr, job, jobInfo, conf);
771    validateTaskLevelKeyValues(mr, job, jobInfo);
772    validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
773  }
774
775  /** Run a job that will be succeeded and validate its history file format
776   *  and its content.
777   */
778  public void testJobHistoryFile() throws IOException {
779    MiniMRCluster mr = null;
780    try {
781      JobConf conf = new JobConf();
782      // keep for less time
783      conf.setLong("mapred.jobtracker.retirejob.check", 1000);
784      conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
785      mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
786
787      // run the TCs
788      conf = mr.createJobConf();
789
790      FileSystem fs = FileSystem.get(conf);
791      // clean up
792      fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
793
794      Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input");
795      Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output");
796
797      //Disable speculative execution
798      conf.setSpeculativeExecution(false);
799
800      // Make sure that the job is not removed from memory until we do finish
801      // the validation of history file content
802      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
803
804      // Run a job that will be succeeded and validate its history file
805      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
806      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
807      validateJobHistoryFileContent(mr, job, conf);
808
809      // get the job conf filename
810      String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID());
811      File file = new File(name);
812
813      // check if the file get deleted
814      while (file.exists()) {
815        LOG.info("Waiting for " + file + " to be deleted");
816        UtilsForTests.waitFor(100);
817      }
818    } finally {
819      if (mr != null) {
820        cleanupLocalFiles(mr);
821        mr.shutdown();
822      }
823    }
824  }
825
826  // Returns the output path where user history log file is written to with
827  // default configuration setting for hadoop.job.history.user.location
828  private static Path getLogLocationInOutputPath(String logFileName,
829                                                      JobConf conf) {
830    JobConf jobConf = new JobConf(true);//default JobConf
831    FileOutputFormat.setOutputPath(jobConf,
832                     FileOutputFormat.getOutputPath(conf));
833    return JobHistory.JobInfo.getJobHistoryLogLocationForUser(
834                                             logFileName, jobConf);
835  }
836
837  /**
838   * Checks if the user history file exists in the correct dir
839   * @param id job id
840   * @param conf job conf
841   */
842  private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
843          throws IOException  {
844    // Get the history file name
845    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
846
847    // User history log file location
848    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
849                                                     logFileName, conf);
850    if(logFile == null) {
851      // get the output path where history file is written to when
852      // hadoop.job.history.user.location is not set
853      logFile = getLogLocationInOutputPath(logFileName, conf);
854    }
855    FileSystem fileSys = null;
856    fileSys = logFile.getFileSystem(conf);
857
858    // Check if the user history file exists in the correct dir
859    if (conf.get("hadoop.job.history.user.location") == null) {
860      assertTrue("User log file " + logFile + " does not exist",
861                 fileSys.exists(logFile));
862    }
863    else if ("none".equals(conf.get("hadoop.job.history.user.location"))) {
864      // history file should not exist in the output path
865      assertFalse("Unexpected. User log file exists in output dir when " +
866                 "hadoop.job.history.user.location is set to \"none\"",
867                 fileSys.exists(logFile));
868    }
869    else {
870      //hadoop.job.history.user.location is set to a specific location.
871      // User log file should exist in that location
872      assertTrue("User log file " + logFile + " does not exist",
873                 fileSys.exists(logFile));
874
875      // User log file should not exist in output path.
876
877      // get the output path where history file is written to when
878      // hadoop.job.history.user.location is not set
879      Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
880     
881      if (logFile != logFile1) {
882        fileSys = logFile1.getFileSystem(conf);
883        assertFalse("Unexpected. User log file exists in output dir when " +
884              "hadoop.job.history.user.location is set to a different location",
885              fileSys.exists(logFile1));
886      }
887    }
888  }
889
890  // Validate user history file location for the given values of
891  // hadoop.job.history.user.location as
892  // (1)null(default case), (2)"none", and (3)some dir "/tmp"
893  public void testJobHistoryUserLogLocation() throws IOException {
894    MiniMRCluster mr = null;
895    try {
896      mr = new MiniMRCluster(2, "file:///", 3);
897
898      // run the TCs
899      JobConf conf = mr.createJobConf();
900
901      FileSystem fs = FileSystem.get(conf);
902      // clean up
903      fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
904
905      Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
906      Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
907
908      // validate for the case of null(default)
909      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
910      validateJobHistoryUserLogLocation(job.getID(), conf);
911
912      inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
913      outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
914      // validate for the case of "none"
915      conf.set("hadoop.job.history.user.location", "none");
916      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
917      validateJobHistoryUserLogLocation(job.getID(), conf);
918 
919      inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
920      outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
921      // validate for the case of any dir
922      conf.set("hadoop.job.history.user.location", "/tmp");
923      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
924      validateJobHistoryUserLogLocation(job.getID(), conf);
925
926    } finally {
927      if (mr != null) {
928        cleanupLocalFiles(mr);
929        mr.shutdown();
930      }
931    }
932  }
933
934  private void cleanupLocalFiles(MiniMRCluster mr) 
935  throws IOException {
936    Configuration conf = mr.createJobConf();
937    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
938    Path sysDir = new Path(jt.getSystemDir());
939    FileSystem fs = sysDir.getFileSystem(conf);
940    fs.delete(sysDir, true);
941    Path jobHistoryDir = JobHistory.getJobHistoryLocation();
942    fs = jobHistoryDir.getFileSystem(conf);
943    fs.delete(jobHistoryDir, true);
944  }
945
946  /**
947   * Checks if the history file has expected job status
948   * @param id job id
949   * @param conf job conf
950   */
951  private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
952          String status) throws IOException  {
953
954    // Get the history file name
955    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
956
957    // Framework history log file location
958    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
959    FileSystem fileSys = logFile.getFileSystem(conf);
960 
961    // Check if the history file exists
962    assertTrue("History file does not exist", fileSys.exists(logFile));
963
964    // check history file permission
965    assertTrue("History file permissions does not match", 
966    fileSys.getFileStatus(logFile).getPermission().equals(
967       new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)));
968   
969    // check if the history file is parsable
970    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
971                                                   logFileName).split("_");
972
973    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
974    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
975
976    DefaultJobHistoryParser.JobTasksParseListener l =
977                  new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
978    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
979
980    assertTrue("Job Status read from job history file is not the expected" +
981         " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
982  }
983
984  // run jobs that will be (1) succeeded (2) failed (3) killed
985  // and validate job status read from history file in each case
986  public void testJobHistoryJobStatus() throws IOException {
987    MiniMRCluster mr = null;
988    try {
989      mr = new MiniMRCluster(2, "file:///", 3);
990
991      // run the TCs
992      JobConf conf = mr.createJobConf();
993
994      FileSystem fs = FileSystem.get(conf);
995      // clean up
996      fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true);
997
998      Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input");
999      Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output");
1000
1001      // Run a job that will be succeeded and validate its job status
1002      // existing in history file
1003      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
1004      validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
1005      long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
1006      assertTrue(historyCleanerRanAt != 0);
1007     
1008      // Run a job that will be failed and validate its job status
1009      // existing in history file
1010      job = UtilsForTests.runJobFail(conf, inDir, outDir);
1011      validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
1012      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
1013     
1014      // Run a job that will be killed and validate its job status
1015      // existing in history file
1016      job = UtilsForTests.runJobKill(conf, inDir, outDir);
1017      validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
1018      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
1019     
1020    } finally {
1021      if (mr != null) {
1022        cleanupLocalFiles(mr);
1023        mr.shutdown();
1024      }
1025    }
1026  }
1027}
Note: See TracBrowser for help on using the repository browser.