source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 36.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20
21import java.io.IOException;
22import java.util.ArrayList;
23import java.util.Arrays;
24import java.util.Comparator;
25import java.util.Iterator;
26import java.util.List;
27import java.util.Map;
28import java.util.TreeMap;
29import java.util.TreeSet;
30
31import org.apache.commons.logging.Log;
32import org.apache.commons.logging.LogFactory;
33import org.apache.hadoop.io.BytesWritable;
34import org.apache.hadoop.mapred.JobClient.RawSplit;
35import org.apache.hadoop.mapred.SortedRanges.Range;
36import org.apache.hadoop.net.Node;
37
38
39/*************************************************************
40 * TaskInProgress maintains all the info needed for a
41 * Task in the lifetime of its owning Job.  A given Task
42 * might be speculatively executed or reexecuted, so we
43 * need a level of indirection above the running-id itself.
44 * <br>
45 * A given TaskInProgress contains multiple taskids,
46 * 0 or more of which might be executing at any one time.
47 * (That's what allows speculative execution.)  A taskid
48 * is now *never* recycled.  A TIP allocates enough taskids
49 * to account for all the speculation and failures it will
50 * ever have to handle.  Once those are up, the TIP is dead.
51 * **************************************************************
52 */
53class TaskInProgress {
54  static final int MAX_TASK_EXECS = 1;
55  int maxTaskAttempts = 4;   
56  static final double SPECULATIVE_GAP = 0.2;
57  static final long SPECULATIVE_LAG = 60 * 1000;
58  private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
59
60  public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
61
62  // Defines the TIP
63  private String jobFile = null;
64  private RawSplit rawSplit;
65  private int numMaps;
66  private int partition;
67  private JobTracker jobtracker;
68  private TaskID id;
69  private JobInProgress job;
70
71  // Status of the TIP
72  private int successEventNumber = -1;
73  private int numTaskFailures = 0;
74  private int numKilledTasks = 0;
75  private double progress = 0;
76  private String state = "";
77  private long startTime = 0;
78  private long execStartTime = 0;
79  private long execFinishTime = 0;
80  private int completes = 0;
81  private boolean failed = false;
82  private boolean killed = false;
83  private long maxSkipRecords = 0;
84  private FailedRanges failedRanges = new FailedRanges();
85  private volatile boolean skipping = false;
86  private boolean jobCleanup = false; 
87  private boolean jobSetup = false;
88   
89  // The 'next' usable taskid of this tip
90  int nextTaskId = 0;
91   
92  // The taskid that took this TIP to SUCCESS
93  private TaskAttemptID successfulTaskId;
94
95  // The first taskid of this tip
96  private TaskAttemptID firstTaskId;
97 
98  // Map from task Id -> TaskTracker Id, contains tasks that are
99  // currently runnings
100  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
101  // All attempt Ids of this TIP
102  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
103  private JobConf conf;
104  private Map<TaskAttemptID,List<String>> taskDiagnosticData =
105    new TreeMap<TaskAttemptID,List<String>>();
106  /**
107   * Map from taskId -> TaskStatus
108   */
109  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
110    new TreeMap<TaskAttemptID,TaskStatus>();
111
112  // Map from taskId -> TaskTracker Id,
113  // contains cleanup attempts and where they ran, if any
114  private TreeMap<TaskAttemptID, String> cleanupTasks =
115    new TreeMap<TaskAttemptID, String>();
116
117  private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
118  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
119 
120  //list of tasks to kill, <taskid> -> <shouldFail>
121  private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
122 
123  //task to commit, <taskattemptid> 
124  private TaskAttemptID taskToCommit;
125 
126  private Counters counters = new Counters();
127 
128
129  /**
130   * Constructor for MapTask
131   */
132  public TaskInProgress(JobID jobid, String jobFile, 
133                        RawSplit rawSplit, 
134                        JobTracker jobtracker, JobConf conf, 
135                        JobInProgress job, int partition) {
136    this.jobFile = jobFile;
137    this.rawSplit = rawSplit;
138    this.jobtracker = jobtracker;
139    this.job = job;
140    this.conf = conf;
141    this.partition = partition;
142    this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
143    setMaxTaskAttempts();
144    init(jobid);
145  }
146       
147  /**
148   * Constructor for ReduceTask
149   */
150  public TaskInProgress(JobID jobid, String jobFile, 
151                        int numMaps, 
152                        int partition, JobTracker jobtracker, JobConf conf,
153                        JobInProgress job) {
154    this.jobFile = jobFile;
155    this.numMaps = numMaps;
156    this.partition = partition;
157    this.jobtracker = jobtracker;
158    this.job = job;
159    this.conf = conf;
160    this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
161    setMaxTaskAttempts();
162    init(jobid);
163  }
164 
165  /**
166   * Set the max number of attempts before we declare a TIP as "failed"
167   */
168  private void setMaxTaskAttempts() {
169    if (isMapTask()) {
170      this.maxTaskAttempts = conf.getMaxMapAttempts();
171    } else {
172      this.maxTaskAttempts = conf.getMaxReduceAttempts();
173    }
174  }
175   
176  /**
177   * Return the index of the tip within the job, so
178   * "task_200707121733_1313_0002_m_012345" would return 12345;
179   * @return int the tip index
180   */
181  public int idWithinJob() {
182    return partition;
183  }   
184
185  public boolean isJobCleanupTask() {
186   return jobCleanup;
187  }
188 
189  public void setJobCleanupTask() {
190    jobCleanup = true;
191  }
192
193  public boolean isJobSetupTask() {
194    return jobSetup;
195  }
196         
197  public void setJobSetupTask() {
198    jobSetup = true;
199  }
200
201  public boolean isOnlyCommitPending() {
202    for (TaskStatus t : taskStatuses.values()) {
203      if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
204        return true;
205      }
206    }
207    return false;
208  }
209 
210  public boolean isCommitPending(TaskAttemptID taskId) {
211    TaskStatus t = taskStatuses.get(taskId);
212    if (t == null) {
213      return false;
214    }
215    return t.getRunState() ==  TaskStatus.State.COMMIT_PENDING;
216  }
217 
218  /**
219   * Initialization common to Map and Reduce
220   */
221  void init(JobID jobId) {
222    this.startTime = System.currentTimeMillis();
223    this.id = new TaskID(jobId, isMapTask(), partition);
224    this.skipping = startSkipping();
225  }
226
227  ////////////////////////////////////
228  // Accessors, info, profiles, etc.
229  ////////////////////////////////////
230
231  /**
232   * Return the start time
233   */
234  public long getStartTime() {
235    return startTime;
236  }
237 
238  /**
239   * Return the exec start time
240   */
241  public long getExecStartTime() {
242    return execStartTime;
243  }
244 
245  /**
246   * Set the exec start time
247   */
248  public void setExecStartTime(long startTime) {
249    execStartTime = startTime;
250  }
251 
252  /**
253   * Return the exec finish time
254   */
255  public long getExecFinishTime() {
256    return execFinishTime;
257  }
258
259  /**
260   * Set the exec finish time
261   */
262  public void setExecFinishTime(long finishTime) {
263    execFinishTime = finishTime;
264    JobHistory.Task.logUpdates(id, execFinishTime); // log the update
265  }
266 
267  /**
268   * Return the parent job
269   */
270  public JobInProgress getJob() {
271    return job;
272  }
273  /**
274   * Return an ID for this task, not its component taskid-threads
275   */
276  public TaskID getTIPId() {
277    return this.id;
278  }
279  /**
280   * Whether this is a map task
281   */
282  public boolean isMapTask() {
283    return rawSplit != null;
284  }
285   
286  /**
287   * Is the Task associated with taskid is the first attempt of the tip?
288   * @param taskId
289   * @return Returns true if the Task is the first attempt of the tip
290   */ 
291  public boolean isFirstAttempt(TaskAttemptID taskId) {
292    return firstTaskId == null ? false : firstTaskId.equals(taskId); 
293  }
294 
295  /**
296   * Is this tip currently running any tasks?
297   * @return true if any tasks are running
298   */
299  public boolean isRunning() {
300    return !activeTasks.isEmpty();
301  }
302   
303  TaskAttemptID getSuccessfulTaskid() {
304    return successfulTaskId;
305  }
306 
307  private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) {
308    this.successfulTaskId = successfulTaskId; 
309  }
310 
311  private void resetSuccessfulTaskid() {
312    this.successfulTaskId = null; 
313  }
314 
315  /**
316   * Is this tip complete?
317   *
318   * @return <code>true</code> if the tip is complete, else <code>false</code>
319   */
320  public synchronized boolean isComplete() {
321    return (completes > 0);
322  }
323
324  /**
325   * Is the given taskid the one that took this tip to completion?
326   *
327   * @param taskid taskid of attempt to check for completion
328   * @return <code>true</code> if taskid is complete, else <code>false</code>
329   */
330  public boolean isComplete(TaskAttemptID taskid) {
331    return ((completes > 0) 
332            && taskid.equals(getSuccessfulTaskid()));
333  }
334
335  /**
336   * Is the tip a failure?
337   *
338   * @return <code>true</code> if tip has failed, else <code>false</code>
339   */
340  public boolean isFailed() {
341    return failed;
342  }
343
344  /**
345   * Number of times the TaskInProgress has failed.
346   */
347  public int numTaskFailures() {
348    return numTaskFailures;
349  }
350
351  /**
352   * Number of times the TaskInProgress has been killed by the framework.
353   */
354  public int numKilledTasks() {
355    return numKilledTasks;
356  }
357
358  /**
359   * Get the overall progress (from 0 to 1.0) for this TIP
360   */
361  public double getProgress() {
362    return progress;
363  }
364   
365  /**
366   * Get the task's counters
367   */
368  public Counters getCounters() {
369    return counters;
370  }
371
372  /**
373   * Returns whether a component task-thread should be
374   * closed because the containing JobInProgress has completed
375   * or the task is killed by the user
376   */
377  public boolean shouldClose(TaskAttemptID taskid) {
378    /**
379     * If the task hasn't been closed yet, and it belongs to a completed
380     * TaskInProgress close it.
381     *
382     * However, for completed map tasks we do not close the task which
383     * actually was the one responsible for _completing_ the TaskInProgress.
384     */
385    boolean close = false;
386    TaskStatus ts = taskStatuses.get(taskid);
387    if ((ts != null) &&
388        (!tasksReportedClosed.contains(taskid)) &&
389        ((this.failed) ||
390        ((job.getStatus().getRunState() != JobStatus.RUNNING &&
391         (job.getStatus().getRunState() != JobStatus.PREP))))) {
392      tasksReportedClosed.add(taskid);
393      close = true;
394    } else if (isComplete() && 
395               !(isMapTask() && !jobSetup && 
396                   !jobCleanup && isComplete(taskid)) &&
397               !tasksReportedClosed.contains(taskid)) {
398      tasksReportedClosed.add(taskid);
399      close = true; 
400    } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
401               !tasksReportedClosed.contains(taskid)) {
402      tasksReportedClosed.add(taskid);
403      close = true; 
404    } else {
405      close = tasksToKill.keySet().contains(taskid);
406    }   
407    return close;
408  }
409
410  /**
411   * Commit this task attempt for the tip.
412   * @param taskid
413   */
414  public void doCommit(TaskAttemptID taskid) {
415    taskToCommit = taskid;
416  }
417
418  /**
419   * Returns whether the task attempt should be committed or not
420   */
421  public boolean shouldCommit(TaskAttemptID taskid) {
422    return !isComplete() && isCommitPending(taskid) && 
423           taskToCommit.equals(taskid);
424  }
425
426  /**
427   * Creates a "status report" for this task.  Includes the
428   * task ID and overall status, plus reports for all the
429   * component task-threads that have ever been started.
430   */
431  synchronized TaskReport generateSingleReport() {
432    ArrayList<String> diagnostics = new ArrayList<String>();
433    for (List<String> l : taskDiagnosticData.values()) {
434      diagnostics.addAll(l);
435    }
436    TIPStatus currentStatus = null;
437    if (isRunning() && !isComplete()) {
438      currentStatus = TIPStatus.RUNNING;
439    } else if (isComplete()) {
440      currentStatus = TIPStatus.COMPLETE;
441    } else if (wasKilled()) {
442      currentStatus = TIPStatus.KILLED;
443    } else if (isFailed()) {
444      currentStatus = TIPStatus.FAILED;
445    } else if (!(isComplete() || isRunning() || wasKilled())) {
446      currentStatus = TIPStatus.PENDING;
447    }
448   
449    TaskReport report = new TaskReport
450      (getTIPId(), (float)progress, state,
451       diagnostics.toArray(new String[diagnostics.size()]),
452       currentStatus, execStartTime, execFinishTime, counters);
453    if (currentStatus == TIPStatus.RUNNING) {
454      report.setRunningTaskAttempts(activeTasks.keySet());
455    } else if (currentStatus == TIPStatus.COMPLETE) {
456      report.setSuccessfulAttempt(getSuccessfulTaskid());
457    }
458    return report;
459  }
460
461  /**
462   * Get the diagnostic messages for a given task within this tip.
463   *
464   * @param taskId the id of the required task
465   * @return the list of diagnostics for that task
466   */
467  synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) {
468    return taskDiagnosticData.get(taskId);
469  }
470   
471  ////////////////////////////////////////////////
472  // Update methods, usually invoked by the owning
473  // job.
474  ////////////////////////////////////////////////
475 
476  /**
477   * Save diagnostic information for a given task.
478   *
479   * @param taskId id of the task
480   * @param diagInfo diagnostic information for the task
481   */
482  public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) {
483    List<String> diagHistory = taskDiagnosticData.get(taskId);
484    if (diagHistory == null) {
485      diagHistory = new ArrayList<String>();
486      taskDiagnosticData.put(taskId, diagHistory);
487    }
488    diagHistory.add(diagInfo);
489  }
490 
491  /**
492   * A status message from a client has arrived.
493   * It updates the status of a single component-thread-task,
494   * which might result in an overall TaskInProgress status update.
495   * @return has the task changed its state noticably?
496   */
497  synchronized boolean updateStatus(TaskStatus status) {
498    TaskAttemptID taskid = status.getTaskID();
499    String diagInfo = status.getDiagnosticInfo();
500    TaskStatus oldStatus = taskStatuses.get(taskid);
501    boolean changed = true;
502    if (diagInfo != null && diagInfo.length() > 0) {
503      LOG.info("Error from "+taskid+": "+diagInfo);
504      addDiagnosticInfo(taskid, diagInfo);
505    }
506   
507    if(skipping) {
508      failedRanges.updateState(status);
509    }
510   
511    if (oldStatus != null) {
512      TaskStatus.State oldState = oldStatus.getRunState();
513      TaskStatus.State newState = status.getRunState();
514         
515      // We should never recieve a duplicate success/failure/killed
516      // status update for the same taskid! This is a safety check,
517      // and is addressed better at the TaskTracker to ensure this.
518      // @see {@link TaskTracker.transmitHeartbeat()}
519      if ((newState != TaskStatus.State.RUNNING && 
520           newState != TaskStatus.State.COMMIT_PENDING && 
521           newState != TaskStatus.State.FAILED_UNCLEAN && 
522           newState != TaskStatus.State.KILLED_UNCLEAN && 
523           newState != TaskStatus.State.UNASSIGNED) && 
524          (oldState == newState)) {
525        LOG.warn("Recieved duplicate status update of '" + newState + 
526                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
527        return false;
528      }
529
530      // The task is not allowed to move from completed back to running.
531      // We have seen out of order status messagesmoving tasks from complete
532      // to running. This is a spot fix, but it should be addressed more
533      // globally.
534      if ((newState == TaskStatus.State.RUNNING || 
535          newState == TaskStatus.State.UNASSIGNED) &&
536          (oldState == TaskStatus.State.FAILED || 
537           oldState == TaskStatus.State.KILLED || 
538           oldState == TaskStatus.State.FAILED_UNCLEAN || 
539           oldState == TaskStatus.State.KILLED_UNCLEAN || 
540           oldState == TaskStatus.State.SUCCEEDED ||
541           oldState == TaskStatus.State.COMMIT_PENDING)) {
542        return false;
543      }
544     
545      //Do not accept any status once the task is marked FAILED/KILLED
546      //This is to handle the case of the JobTracker timing out a task
547      //due to launch delay, but the TT comes back with any state or
548      //TT got expired
549      if (oldState == TaskStatus.State.FAILED ||
550          oldState == TaskStatus.State.KILLED) {
551        tasksToKill.put(taskid, true);
552        return false;     
553      }
554         
555      changed = oldState != newState;
556    }
557    // if task is a cleanup attempt, do not replace the complete status,
558    // update only specific fields.
559    // For example, startTime should not be updated,
560    // but finishTime has to be updated.
561    if (!isCleanupAttempt(taskid)) {
562      taskStatuses.put(taskid, status);
563    } else {
564      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
565        status.getProgress(), status.getStateString(), status.getPhase(),
566        status.getFinishTime());
567    }
568
569    // Recompute progress
570    recomputeProgress();
571    return changed;
572  }
573
574  /**
575   * Indicate that one of the taskids in this TaskInProgress
576   * has failed.
577   */
578  public void incompleteSubTask(TaskAttemptID taskid, 
579                                JobStatus jobStatus) {
580    //
581    // Note the failure and its location
582    //
583    TaskStatus status = taskStatuses.get(taskid);
584    String trackerName;
585    String trackerHostName = null;
586    TaskStatus.State taskState = TaskStatus.State.FAILED;
587    if (status != null) {
588      trackerName = status.getTaskTracker();
589      trackerHostName = 
590        JobInProgress.convertTrackerNameToHostName(trackerName);
591      // Check if the user manually KILLED/FAILED this task-attempt...
592      Boolean shouldFail = tasksToKill.remove(taskid);
593      if (shouldFail != null) {
594        if (status.getRunState() == TaskStatus.State.FAILED ||
595            status.getRunState() == TaskStatus.State.KILLED) {
596          taskState = (shouldFail) ? TaskStatus.State.FAILED :
597                                     TaskStatus.State.KILLED;
598        } else {
599          taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
600                                     TaskStatus.State.KILLED_UNCLEAN;
601         
602        }
603        status.setRunState(taskState);
604        addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
605      }
606 
607      taskState = status.getRunState();
608      if (taskState != TaskStatus.State.FAILED && 
609          taskState != TaskStatus.State.KILLED &&
610          taskState != TaskStatus.State.FAILED_UNCLEAN &&
611          taskState != TaskStatus.State.KILLED_UNCLEAN) {
612        LOG.info("Task '" + taskid + "' running on '" + trackerName + 
613                "' in state: '" + taskState + "' being failed!");
614        status.setRunState(TaskStatus.State.FAILED);
615        taskState = TaskStatus.State.FAILED;
616      }
617
618      // tasktracker went down and failed time was not reported.
619      if (0 == status.getFinishTime()){
620        status.setFinishTime(System.currentTimeMillis());
621      }
622    }
623
624    this.activeTasks.remove(taskid);
625   
626    // Since we do not fail completed reduces (whose outputs go to hdfs), we
627    // should note this failure only for completed maps, only if this taskid;
628    // completed this map. however if the job is done, there is no need to
629    // manipulate completed maps
630    if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && 
631        jobStatus.getRunState() != JobStatus.SUCCEEDED) {
632      this.completes--;
633     
634      // Reset the successfulTaskId since we don't have a SUCCESSFUL task now
635      resetSuccessfulTaskid();
636    }
637
638    // Note that there can be failures of tasks that are hosted on a machine
639    // that has not yet registered with restarted jobtracker
640    // recalculate the counts only if its a genuine failure
641    if (tasks.contains(taskid)) {
642      if (taskState == TaskStatus.State.FAILED) {
643        numTaskFailures++;
644        machinesWhereFailed.add(trackerHostName);
645        if(maxSkipRecords>0) {
646          //skipping feature enabled
647          LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
648          failedRanges.add(status.getNextRecordRange());
649          skipping = startSkipping();
650        }
651
652      } else if (taskState == TaskStatus.State.KILLED) {
653        numKilledTasks++;
654      }
655    }
656
657    if (numTaskFailures >= maxTaskAttempts) {
658      LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
659      kill();
660    }
661  }
662 
663  /**
664   * Get whether to start skipping mode.
665   */
666  private boolean startSkipping() {
667    if(maxSkipRecords>0 && 
668        numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
669      return true;
670    }
671    return false;
672  }
673
674  /**
675   * Finalize the <b>completed</b> task; note that this might not be the first
676   * task-attempt of the {@link TaskInProgress} and hence might be declared
677   * {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED}
678   *
679   * @param taskId id of the completed task-attempt
680   * @param finalTaskState final {@link TaskStatus.State} of the task-attempt
681   */
682  private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) {
683    TaskStatus status = taskStatuses.get(taskId);
684    status.setRunState(finalTaskState);
685    activeTasks.remove(taskId);
686  }
687 
688  /**
689   * Indicate that one of the taskids in this already-completed
690   * TaskInProgress has successfully completed; hence we mark this
691   * taskid as {@link TaskStatus.State.KILLED}.
692   */
693  void alreadyCompletedTask(TaskAttemptID taskid) {
694    // 'KILL' the task
695    completedTask(taskid, TaskStatus.State.KILLED);
696   
697    // Note the reason for the task being 'KILLED'
698    addDiagnosticInfo(taskid, "Already completed TIP");
699   
700    LOG.info("Already complete TIP " + getTIPId() + 
701             " has completed task " + taskid);
702  }
703
704  /**
705   * Indicate that one of the taskids in this TaskInProgress
706   * has successfully completed!
707   */
708  public void completed(TaskAttemptID taskid) {
709    //
710    // Record that this taskid is complete
711    //
712    completedTask(taskid, TaskStatus.State.SUCCEEDED);
713       
714    // Note the successful taskid
715    setSuccessfulTaskid(taskid);
716   
717    //
718    // Now that the TIP is complete, the other speculative
719    // subtasks will be closed when the owning tasktracker
720    // reports in and calls shouldClose() on this object.
721    //
722
723    this.completes++;
724    this.execFinishTime = System.currentTimeMillis();
725    recomputeProgress();
726   
727  }
728
729  /**
730   * Get the split locations
731   */
732  public String[] getSplitLocations() {
733    if (isMapTask() && !jobSetup && !jobCleanup) {
734      return rawSplit.getLocations();
735    }
736    return new String[0];
737  }
738 
739  /**
740   * Get the Status of the tasks managed by this TIP
741   */
742  public TaskStatus[] getTaskStatuses() {
743    return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
744  }
745
746  /**
747   * Get the status of the specified task
748   * @param taskid
749   * @return
750   */
751  public TaskStatus getTaskStatus(TaskAttemptID taskid) {
752    return taskStatuses.get(taskid);
753  }
754  /**
755   * The TIP's been ordered kill()ed.
756   */
757  public void kill() {
758    if (isComplete() || failed) {
759      return;
760    }
761    this.failed = true;
762    killed = true;
763    this.execFinishTime = System.currentTimeMillis();
764    recomputeProgress();
765  }
766
767  /**
768   * Was the task killed?
769   * @return true if the task killed
770   */
771  public boolean wasKilled() {
772    return killed;
773  }
774 
775  /**
776   * Kill the given task
777   */
778  boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
779    TaskStatus st = taskStatuses.get(taskId);
780    if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
781        || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
782        st.inTaskCleanupPhase() ||
783        st.getRunState() == TaskStatus.State.UNASSIGNED)
784        && tasksToKill.put(taskId, shouldFail) == null ) {
785      String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
786                      + " task '" + taskId + "' by user";
787      addDiagnosticInfo(taskId, logStr);
788      LOG.info(logStr);
789      return true;
790    }
791    return false;
792  }
793
794  /**
795   * This method is called whenever there's a status change
796   * for one of the TIP's sub-tasks.  It recomputes the overall
797   * progress for the TIP.  We examine all sub-tasks and find
798   * the one that's most advanced (and non-failed).
799   */
800  void recomputeProgress() {
801    if (isComplete()) {
802      this.progress = 1;
803      // update the counters and the state
804      TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid());
805      this.counters = completedStatus.getCounters();
806      this.state = completedStatus.getStateString();
807    } else if (failed) {
808      this.progress = 0;
809      // reset the counters and the state
810      this.state = "";
811      this.counters = new Counters();
812    } else {
813      double bestProgress = 0;
814      String bestState = "";
815      Counters bestCounters = new Counters();
816      for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
817        TaskAttemptID taskid = it.next();
818        TaskStatus status = taskStatuses.get(taskid);
819        if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
820          bestProgress = 1;
821          bestState = status.getStateString();
822          bestCounters = status.getCounters();
823          break;
824        } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
825          //for COMMIT_PENDING, we take the last state that we recorded
826          //when the task was RUNNING
827          bestProgress = this.progress;
828          bestState = this.state;
829          bestCounters = this.counters;
830        } else if (status.getRunState() == TaskStatus.State.RUNNING) {
831          if (status.getProgress() >= bestProgress) {
832            bestProgress = status.getProgress();
833            bestState = status.getStateString();
834            if (status.getIncludeCounters()) {
835              bestCounters = status.getCounters();
836            } else {
837              bestCounters = this.counters;
838            }
839          }
840        }
841      }
842      this.progress = bestProgress;
843      this.state = bestState;
844      this.counters = bestCounters;
845    }
846  }
847
848  /////////////////////////////////////////////////
849  // "Action" methods that actually require the TIP
850  // to do something.
851  /////////////////////////////////////////////////
852
853  /**
854   * Return whether this TIP still needs to run
855   */
856  boolean isRunnable() {
857    return !failed && (completes == 0);
858  }
859   
860  /**
861   * Return whether the TIP has a speculative task to run.  We
862   * only launch a speculative task if the current TIP is really
863   * far behind, and has been behind for a non-trivial amount of
864   * time.
865   */
866  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
867    //
868    // REMIND - mjc - these constants should be examined
869    // in more depth eventually...
870    //
871     
872    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
873        (averageProgress - progress >= SPECULATIVE_GAP) &&
874        (currentTime - startTime >= SPECULATIVE_LAG) 
875        && completes == 0 && !isOnlyCommitPending()) {
876      return true;
877    }
878    return false;
879  }
880   
881  /**
882   * Return a Task that can be sent to a TaskTracker for execution.
883   */
884  public Task getTaskToRun(String taskTracker) throws IOException {
885    if (0 == execStartTime){
886      // assume task starts running now
887      execStartTime = System.currentTimeMillis();
888    }
889
890    // Create the 'taskid'; do not count the 'killed' tasks against the job!
891    TaskAttemptID taskid = null;
892    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
893      // Make sure that the attempts are unqiue across restarts
894      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
895      taskid = new TaskAttemptID( id, attemptId);
896      ++nextTaskId;
897    } else {
898      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
899              " (plus " + numKilledTasks + " killed)"  + 
900              " attempts for the tip '" + getTIPId() + "'");
901      return null;
902    }
903
904    return addRunningTask(taskid, taskTracker);
905  }
906 
907  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
908    return addRunningTask(taskid, taskTracker, false);
909  }
910 
911  /**
912   * Adds a previously running task to this tip. This is used in case of
913   * jobtracker restarts.
914   */
915  public Task addRunningTask(TaskAttemptID taskid, 
916                             String taskTracker,
917                             boolean taskCleanup) {
918    // create the task
919    Task t = null;
920    if (isMapTask()) {
921      LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
922          + failedRanges.getIndicesCount());
923      String splitClass = null;
924      BytesWritable split;
925      if (!jobSetup && !jobCleanup) {
926        splitClass = rawSplit.getClassName();
927        split = rawSplit.getBytes();
928      } else {
929        split = new BytesWritable();
930      }
931      t = new MapTask(jobFile, taskid, partition, splitClass, split);
932    } else {
933      t = new ReduceTask(jobFile, taskid, partition, numMaps);
934    }
935    if (jobCleanup) {
936      t.setJobCleanupTask();
937    }
938    if (jobSetup) {
939      t.setJobSetupTask();
940    }
941    if (taskCleanup) {
942      t.setTaskCleanupTask();
943      t.setState(taskStatuses.get(taskid).getRunState());
944      cleanupTasks.put(taskid, taskTracker);
945    }
946    t.setConf(conf);
947    LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
948    t.setSkipRanges(failedRanges.getSkipRanges());
949    t.setSkipping(skipping);
950    if(failedRanges.isTestAttempt()) {
951      t.setWriteSkipRecs(false);
952    }
953
954    activeTasks.put(taskid, taskTracker);
955    tasks.add(taskid);
956
957    // Ask JobTracker to note that the task exists
958    jobtracker.createTaskEntry(taskid, taskTracker, this);
959
960    // check and set the first attempt
961    if (firstTaskId == null) {
962      firstTaskId = taskid;
963    }
964    return t;
965  }
966
967  boolean isRunningTask(TaskAttemptID taskid) {
968    TaskStatus status = taskStatuses.get(taskid);
969    return status != null && status.getRunState() == TaskStatus.State.RUNNING;
970  }
971 
972  boolean isCleanupAttempt(TaskAttemptID taskid) {
973    return cleanupTasks.containsKey(taskid);
974  }
975 
976  String machineWhereCleanupRan(TaskAttemptID taskid) {
977    return cleanupTasks.get(taskid);
978  }
979 
980  String machineWhereTaskRan(TaskAttemptID taskid) {
981    return taskStatuses.get(taskid).getTaskTracker();
982  }
983   
984  boolean wasKilled(TaskAttemptID taskid) {
985    return tasksToKill.containsKey(taskid);
986  }
987 
988  /**
989   * Has this task already failed on this machine?
990   * @param trackerHost The task tracker hostname
991   * @return Has it failed?
992   */
993  public boolean hasFailedOnMachine(String trackerHost) {
994    return machinesWhereFailed.contains(trackerHost);
995  }
996   
997  /**
998   * Was this task ever scheduled to run on this machine?
999   * @param trackerHost The task tracker hostname
1000   * @param trackerName The tracker name
1001   * @return Was task scheduled on the tracker?
1002   */
1003  public boolean hasRunOnMachine(String trackerHost, String trackerName) {
1004    return this.activeTasks.values().contains(trackerName) || 
1005      hasFailedOnMachine(trackerHost);
1006  }
1007  /**
1008   * Get the number of machines where this task has failed.
1009   * @return the size of the failed machine set
1010   */
1011  public int getNumberOfFailedMachines() {
1012    return machinesWhereFailed.size();
1013  }
1014   
1015  /**
1016   * Get the id of this map or reduce task.
1017   * @return The index of this tip in the maps/reduces lists.
1018   */
1019  public int getIdWithinJob() {
1020    return partition;
1021  }
1022   
1023  /**
1024   * Set the event number that was raised for this tip
1025   */
1026  public void setSuccessEventNumber(int eventNumber) {
1027    successEventNumber = eventNumber;
1028  }
1029       
1030  /**
1031   * Get the event number that was raised for this tip
1032   */
1033  public int getSuccessEventNumber() {
1034    return successEventNumber;
1035  }
1036 
1037  /**
1038   * Gets the Node list of input split locations sorted in rack order.
1039   */ 
1040  public String getSplitNodes() {
1041    if (!isMapTask() || jobSetup || jobCleanup) {
1042      return "";
1043    }
1044    String[] splits = rawSplit.getLocations();
1045    Node[] nodes = new Node[splits.length];
1046    for (int i = 0; i < splits.length; i++) {
1047      nodes[i] = jobtracker.getNode(splits[i]);
1048    }
1049    // sort nodes on rack location
1050    Arrays.sort(nodes, new Comparator<Node>() {
1051      public int compare(Node a, Node b) {
1052        String left = a.getNetworkLocation();
1053        String right = b.getNetworkLocation();
1054        return left.compareTo(right);
1055      }
1056    }); 
1057    return nodeToString(nodes);
1058  }
1059
1060  private static String nodeToString(Node[] nodes) {
1061    if (nodes == null || nodes.length == 0) {
1062      return "";
1063    }
1064    StringBuffer ret = new StringBuffer(nodes[0].toString());
1065    for(int i = 1; i < nodes.length;i++) {
1066      ret.append(",");
1067      ret.append(nodes[i].toString());
1068    }
1069    return ret.toString();
1070  }
1071
1072  public long getMapInputSize() {
1073    if(isMapTask() && !jobSetup && !jobCleanup) {
1074      return rawSplit.getDataLength();
1075    } else {
1076      return 0;
1077    }
1078  }
1079 
1080  public void clearSplit() {
1081    rawSplit.clearBytes();
1082  }
1083 
1084  /**
1085   * This class keeps the records to be skipped during further executions
1086   * based on failed records from all the previous attempts.
1087   * It also narrow down the skip records if it is more than the
1088   * acceptable value by dividing the failed range into half. In this case one
1089   * half is executed in the next attempt (test attempt).
1090   * In the test attempt, only the test range gets executed, others get skipped.
1091   * Based on the success/failure of the test attempt, the range is divided
1092   * further.
1093   */
1094  private class FailedRanges {
1095    private SortedRanges skipRanges = new SortedRanges();
1096    private Divide divide;
1097   
1098    synchronized SortedRanges getSkipRanges() {
1099      if(divide!=null) {
1100        return divide.skipRange;
1101      }
1102      return skipRanges;
1103    }
1104   
1105    synchronized boolean isTestAttempt() {
1106      return divide!=null;
1107    }
1108   
1109    synchronized long getIndicesCount() {
1110      if(isTestAttempt()) {
1111        return divide.skipRange.getIndicesCount();
1112      }
1113      return skipRanges.getIndicesCount();
1114    }
1115   
1116    synchronized void updateState(TaskStatus status){
1117      if (isTestAttempt() && 
1118          (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
1119        divide.testPassed = true;
1120        //since it was the test attempt we need to set it to failed
1121        //as it worked only on the test range
1122        status.setRunState(TaskStatus.State.FAILED);
1123       
1124      }
1125    }
1126   
1127    synchronized void add(Range failedRange) {
1128      LOG.warn("FailedRange:"+ failedRange);
1129      if(divide!=null) {
1130        LOG.warn("FailedRange:"+ failedRange +"  test:"+divide.test +
1131            "  pass:"+divide.testPassed);
1132        if(divide.testPassed) {
1133          //test range passed
1134          //other range would be bad. test it
1135          failedRange = divide.other;
1136        }
1137        else {
1138          //test range failed
1139          //other range would be good.
1140          failedRange = divide.test;
1141        }
1142        //reset
1143        divide = null;
1144      }
1145     
1146      if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) {
1147        skipRanges.add(failedRange);
1148      } else {
1149        //start dividing the range to narrow down the skipped
1150        //records until maxSkipRecords are met OR all attempts
1151        //get exhausted
1152        divide = new Divide(failedRange);
1153      }
1154    }
1155   
1156    class Divide {
1157      private final SortedRanges skipRange;
1158      private final Range test;
1159      private final Range other;
1160      private boolean testPassed;
1161      Divide(Range range){
1162        long half = range.getLength()/2;
1163        test = new Range(range.getStartIndex(), half);
1164        other = new Range(test.getEndIndex(), range.getLength()-half);
1165        //construct the skip range from the skipRanges
1166        skipRange = new SortedRanges();
1167        for(Range r : skipRanges.getRanges()) {
1168          skipRange.add(r);
1169        }
1170        skipRange.add(new Range(0,test.getStartIndex()));
1171        skipRange.add(new Range(test.getEndIndex(), 
1172            (Long.MAX_VALUE-test.getEndIndex())));
1173      }
1174    }
1175   
1176  }
1177
1178  TreeMap<TaskAttemptID, String> getActiveTasks() {
1179    return activeTasks;
1180  }
1181}
Note: See TracBrowser for help on using the repository browser.