1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.util.ArrayList; |
---|
23 | import java.util.Arrays; |
---|
24 | import java.util.Comparator; |
---|
25 | import java.util.Iterator; |
---|
26 | import java.util.List; |
---|
27 | import java.util.Map; |
---|
28 | import java.util.TreeMap; |
---|
29 | import java.util.TreeSet; |
---|
30 | |
---|
31 | import org.apache.commons.logging.Log; |
---|
32 | import org.apache.commons.logging.LogFactory; |
---|
33 | import org.apache.hadoop.io.BytesWritable; |
---|
34 | import org.apache.hadoop.mapred.JobClient.RawSplit; |
---|
35 | import org.apache.hadoop.mapred.SortedRanges.Range; |
---|
36 | import org.apache.hadoop.net.Node; |
---|
37 | |
---|
38 | |
---|
39 | /************************************************************* |
---|
40 | * TaskInProgress maintains all the info needed for a |
---|
41 | * Task in the lifetime of its owning Job. A given Task |
---|
42 | * might be speculatively executed or reexecuted, so we |
---|
43 | * need a level of indirection above the running-id itself. |
---|
44 | * <br> |
---|
45 | * A given TaskInProgress contains multiple taskids, |
---|
46 | * 0 or more of which might be executing at any one time. |
---|
47 | * (That's what allows speculative execution.) A taskid |
---|
48 | * is now *never* recycled. A TIP allocates enough taskids |
---|
49 | * to account for all the speculation and failures it will |
---|
50 | * ever have to handle. Once those are up, the TIP is dead. |
---|
51 | * ************************************************************** |
---|
52 | */ |
---|
53 | class TaskInProgress { |
---|
54 | static final int MAX_TASK_EXECS = 1; |
---|
55 | int maxTaskAttempts = 4; |
---|
56 | static final double SPECULATIVE_GAP = 0.2; |
---|
57 | static final long SPECULATIVE_LAG = 60 * 1000; |
---|
58 | private static final int NUM_ATTEMPTS_PER_RESTART = 1000; |
---|
59 | |
---|
60 | public static final Log LOG = LogFactory.getLog(TaskInProgress.class); |
---|
61 | |
---|
62 | // Defines the TIP |
---|
63 | private String jobFile = null; |
---|
64 | private RawSplit rawSplit; |
---|
65 | private int numMaps; |
---|
66 | private int partition; |
---|
67 | private JobTracker jobtracker; |
---|
68 | private TaskID id; |
---|
69 | private JobInProgress job; |
---|
70 | |
---|
71 | // Status of the TIP |
---|
72 | private int successEventNumber = -1; |
---|
73 | private int numTaskFailures = 0; |
---|
74 | private int numKilledTasks = 0; |
---|
75 | private double progress = 0; |
---|
76 | private String state = ""; |
---|
77 | private long startTime = 0; |
---|
78 | private long execStartTime = 0; |
---|
79 | private long execFinishTime = 0; |
---|
80 | private int completes = 0; |
---|
81 | private boolean failed = false; |
---|
82 | private boolean killed = false; |
---|
83 | private long maxSkipRecords = 0; |
---|
84 | private FailedRanges failedRanges = new FailedRanges(); |
---|
85 | private volatile boolean skipping = false; |
---|
86 | private boolean jobCleanup = false; |
---|
87 | private boolean jobSetup = false; |
---|
88 | |
---|
89 | // The 'next' usable taskid of this tip |
---|
90 | int nextTaskId = 0; |
---|
91 | |
---|
92 | // The taskid that took this TIP to SUCCESS |
---|
93 | private TaskAttemptID successfulTaskId; |
---|
94 | |
---|
95 | // The first taskid of this tip |
---|
96 | private TaskAttemptID firstTaskId; |
---|
97 | |
---|
98 | // Map from task Id -> TaskTracker Id, contains tasks that are |
---|
99 | // currently runnings |
---|
100 | private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>(); |
---|
101 | // All attempt Ids of this TIP |
---|
102 | private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>(); |
---|
103 | private JobConf conf; |
---|
104 | private Map<TaskAttemptID,List<String>> taskDiagnosticData = |
---|
105 | new TreeMap<TaskAttemptID,List<String>>(); |
---|
106 | /** |
---|
107 | * Map from taskId -> TaskStatus |
---|
108 | */ |
---|
109 | private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = |
---|
110 | new TreeMap<TaskAttemptID,TaskStatus>(); |
---|
111 | |
---|
112 | // Map from taskId -> TaskTracker Id, |
---|
113 | // contains cleanup attempts and where they ran, if any |
---|
114 | private TreeMap<TaskAttemptID, String> cleanupTasks = |
---|
115 | new TreeMap<TaskAttemptID, String>(); |
---|
116 | |
---|
117 | private TreeSet<String> machinesWhereFailed = new TreeSet<String>(); |
---|
118 | private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>(); |
---|
119 | |
---|
120 | //list of tasks to kill, <taskid> -> <shouldFail> |
---|
121 | private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>(); |
---|
122 | |
---|
123 | //task to commit, <taskattemptid> |
---|
124 | private TaskAttemptID taskToCommit; |
---|
125 | |
---|
126 | private Counters counters = new Counters(); |
---|
127 | |
---|
128 | |
---|
129 | /** |
---|
130 | * Constructor for MapTask |
---|
131 | */ |
---|
132 | public TaskInProgress(JobID jobid, String jobFile, |
---|
133 | RawSplit rawSplit, |
---|
134 | JobTracker jobtracker, JobConf conf, |
---|
135 | JobInProgress job, int partition) { |
---|
136 | this.jobFile = jobFile; |
---|
137 | this.rawSplit = rawSplit; |
---|
138 | this.jobtracker = jobtracker; |
---|
139 | this.job = job; |
---|
140 | this.conf = conf; |
---|
141 | this.partition = partition; |
---|
142 | this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf); |
---|
143 | setMaxTaskAttempts(); |
---|
144 | init(jobid); |
---|
145 | } |
---|
146 | |
---|
147 | /** |
---|
148 | * Constructor for ReduceTask |
---|
149 | */ |
---|
150 | public TaskInProgress(JobID jobid, String jobFile, |
---|
151 | int numMaps, |
---|
152 | int partition, JobTracker jobtracker, JobConf conf, |
---|
153 | JobInProgress job) { |
---|
154 | this.jobFile = jobFile; |
---|
155 | this.numMaps = numMaps; |
---|
156 | this.partition = partition; |
---|
157 | this.jobtracker = jobtracker; |
---|
158 | this.job = job; |
---|
159 | this.conf = conf; |
---|
160 | this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf); |
---|
161 | setMaxTaskAttempts(); |
---|
162 | init(jobid); |
---|
163 | } |
---|
164 | |
---|
165 | /** |
---|
166 | * Set the max number of attempts before we declare a TIP as "failed" |
---|
167 | */ |
---|
168 | private void setMaxTaskAttempts() { |
---|
169 | if (isMapTask()) { |
---|
170 | this.maxTaskAttempts = conf.getMaxMapAttempts(); |
---|
171 | } else { |
---|
172 | this.maxTaskAttempts = conf.getMaxReduceAttempts(); |
---|
173 | } |
---|
174 | } |
---|
175 | |
---|
176 | /** |
---|
177 | * Return the index of the tip within the job, so |
---|
178 | * "task_200707121733_1313_0002_m_012345" would return 12345; |
---|
179 | * @return int the tip index |
---|
180 | */ |
---|
181 | public int idWithinJob() { |
---|
182 | return partition; |
---|
183 | } |
---|
184 | |
---|
185 | public boolean isJobCleanupTask() { |
---|
186 | return jobCleanup; |
---|
187 | } |
---|
188 | |
---|
189 | public void setJobCleanupTask() { |
---|
190 | jobCleanup = true; |
---|
191 | } |
---|
192 | |
---|
193 | public boolean isJobSetupTask() { |
---|
194 | return jobSetup; |
---|
195 | } |
---|
196 | |
---|
197 | public void setJobSetupTask() { |
---|
198 | jobSetup = true; |
---|
199 | } |
---|
200 | |
---|
201 | public boolean isOnlyCommitPending() { |
---|
202 | for (TaskStatus t : taskStatuses.values()) { |
---|
203 | if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) { |
---|
204 | return true; |
---|
205 | } |
---|
206 | } |
---|
207 | return false; |
---|
208 | } |
---|
209 | |
---|
210 | public boolean isCommitPending(TaskAttemptID taskId) { |
---|
211 | TaskStatus t = taskStatuses.get(taskId); |
---|
212 | if (t == null) { |
---|
213 | return false; |
---|
214 | } |
---|
215 | return t.getRunState() == TaskStatus.State.COMMIT_PENDING; |
---|
216 | } |
---|
217 | |
---|
218 | /** |
---|
219 | * Initialization common to Map and Reduce |
---|
220 | */ |
---|
221 | void init(JobID jobId) { |
---|
222 | this.startTime = System.currentTimeMillis(); |
---|
223 | this.id = new TaskID(jobId, isMapTask(), partition); |
---|
224 | this.skipping = startSkipping(); |
---|
225 | } |
---|
226 | |
---|
227 | //////////////////////////////////// |
---|
228 | // Accessors, info, profiles, etc. |
---|
229 | //////////////////////////////////// |
---|
230 | |
---|
231 | /** |
---|
232 | * Return the start time |
---|
233 | */ |
---|
234 | public long getStartTime() { |
---|
235 | return startTime; |
---|
236 | } |
---|
237 | |
---|
238 | /** |
---|
239 | * Return the exec start time |
---|
240 | */ |
---|
241 | public long getExecStartTime() { |
---|
242 | return execStartTime; |
---|
243 | } |
---|
244 | |
---|
245 | /** |
---|
246 | * Set the exec start time |
---|
247 | */ |
---|
248 | public void setExecStartTime(long startTime) { |
---|
249 | execStartTime = startTime; |
---|
250 | } |
---|
251 | |
---|
252 | /** |
---|
253 | * Return the exec finish time |
---|
254 | */ |
---|
255 | public long getExecFinishTime() { |
---|
256 | return execFinishTime; |
---|
257 | } |
---|
258 | |
---|
259 | /** |
---|
260 | * Set the exec finish time |
---|
261 | */ |
---|
262 | public void setExecFinishTime(long finishTime) { |
---|
263 | execFinishTime = finishTime; |
---|
264 | JobHistory.Task.logUpdates(id, execFinishTime); // log the update |
---|
265 | } |
---|
266 | |
---|
267 | /** |
---|
268 | * Return the parent job |
---|
269 | */ |
---|
270 | public JobInProgress getJob() { |
---|
271 | return job; |
---|
272 | } |
---|
273 | /** |
---|
274 | * Return an ID for this task, not its component taskid-threads |
---|
275 | */ |
---|
276 | public TaskID getTIPId() { |
---|
277 | return this.id; |
---|
278 | } |
---|
279 | /** |
---|
280 | * Whether this is a map task |
---|
281 | */ |
---|
282 | public boolean isMapTask() { |
---|
283 | return rawSplit != null; |
---|
284 | } |
---|
285 | |
---|
286 | /** |
---|
287 | * Is the Task associated with taskid is the first attempt of the tip? |
---|
288 | * @param taskId |
---|
289 | * @return Returns true if the Task is the first attempt of the tip |
---|
290 | */ |
---|
291 | public boolean isFirstAttempt(TaskAttemptID taskId) { |
---|
292 | return firstTaskId == null ? false : firstTaskId.equals(taskId); |
---|
293 | } |
---|
294 | |
---|
295 | /** |
---|
296 | * Is this tip currently running any tasks? |
---|
297 | * @return true if any tasks are running |
---|
298 | */ |
---|
299 | public boolean isRunning() { |
---|
300 | return !activeTasks.isEmpty(); |
---|
301 | } |
---|
302 | |
---|
303 | TaskAttemptID getSuccessfulTaskid() { |
---|
304 | return successfulTaskId; |
---|
305 | } |
---|
306 | |
---|
307 | private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) { |
---|
308 | this.successfulTaskId = successfulTaskId; |
---|
309 | } |
---|
310 | |
---|
311 | private void resetSuccessfulTaskid() { |
---|
312 | this.successfulTaskId = null; |
---|
313 | } |
---|
314 | |
---|
315 | /** |
---|
316 | * Is this tip complete? |
---|
317 | * |
---|
318 | * @return <code>true</code> if the tip is complete, else <code>false</code> |
---|
319 | */ |
---|
320 | public synchronized boolean isComplete() { |
---|
321 | return (completes > 0); |
---|
322 | } |
---|
323 | |
---|
324 | /** |
---|
325 | * Is the given taskid the one that took this tip to completion? |
---|
326 | * |
---|
327 | * @param taskid taskid of attempt to check for completion |
---|
328 | * @return <code>true</code> if taskid is complete, else <code>false</code> |
---|
329 | */ |
---|
330 | public boolean isComplete(TaskAttemptID taskid) { |
---|
331 | return ((completes > 0) |
---|
332 | && taskid.equals(getSuccessfulTaskid())); |
---|
333 | } |
---|
334 | |
---|
335 | /** |
---|
336 | * Is the tip a failure? |
---|
337 | * |
---|
338 | * @return <code>true</code> if tip has failed, else <code>false</code> |
---|
339 | */ |
---|
340 | public boolean isFailed() { |
---|
341 | return failed; |
---|
342 | } |
---|
343 | |
---|
344 | /** |
---|
345 | * Number of times the TaskInProgress has failed. |
---|
346 | */ |
---|
347 | public int numTaskFailures() { |
---|
348 | return numTaskFailures; |
---|
349 | } |
---|
350 | |
---|
351 | /** |
---|
352 | * Number of times the TaskInProgress has been killed by the framework. |
---|
353 | */ |
---|
354 | public int numKilledTasks() { |
---|
355 | return numKilledTasks; |
---|
356 | } |
---|
357 | |
---|
358 | /** |
---|
359 | * Get the overall progress (from 0 to 1.0) for this TIP |
---|
360 | */ |
---|
361 | public double getProgress() { |
---|
362 | return progress; |
---|
363 | } |
---|
364 | |
---|
365 | /** |
---|
366 | * Get the task's counters |
---|
367 | */ |
---|
368 | public Counters getCounters() { |
---|
369 | return counters; |
---|
370 | } |
---|
371 | |
---|
372 | /** |
---|
373 | * Returns whether a component task-thread should be |
---|
374 | * closed because the containing JobInProgress has completed |
---|
375 | * or the task is killed by the user |
---|
376 | */ |
---|
377 | public boolean shouldClose(TaskAttemptID taskid) { |
---|
378 | /** |
---|
379 | * If the task hasn't been closed yet, and it belongs to a completed |
---|
380 | * TaskInProgress close it. |
---|
381 | * |
---|
382 | * However, for completed map tasks we do not close the task which |
---|
383 | * actually was the one responsible for _completing_ the TaskInProgress. |
---|
384 | */ |
---|
385 | boolean close = false; |
---|
386 | TaskStatus ts = taskStatuses.get(taskid); |
---|
387 | if ((ts != null) && |
---|
388 | (!tasksReportedClosed.contains(taskid)) && |
---|
389 | ((this.failed) || |
---|
390 | ((job.getStatus().getRunState() != JobStatus.RUNNING && |
---|
391 | (job.getStatus().getRunState() != JobStatus.PREP))))) { |
---|
392 | tasksReportedClosed.add(taskid); |
---|
393 | close = true; |
---|
394 | } else if (isComplete() && |
---|
395 | !(isMapTask() && !jobSetup && |
---|
396 | !jobCleanup && isComplete(taskid)) && |
---|
397 | !tasksReportedClosed.contains(taskid)) { |
---|
398 | tasksReportedClosed.add(taskid); |
---|
399 | close = true; |
---|
400 | } else if (isCommitPending(taskid) && !shouldCommit(taskid) && |
---|
401 | !tasksReportedClosed.contains(taskid)) { |
---|
402 | tasksReportedClosed.add(taskid); |
---|
403 | close = true; |
---|
404 | } else { |
---|
405 | close = tasksToKill.keySet().contains(taskid); |
---|
406 | } |
---|
407 | return close; |
---|
408 | } |
---|
409 | |
---|
410 | /** |
---|
411 | * Commit this task attempt for the tip. |
---|
412 | * @param taskid |
---|
413 | */ |
---|
414 | public void doCommit(TaskAttemptID taskid) { |
---|
415 | taskToCommit = taskid; |
---|
416 | } |
---|
417 | |
---|
418 | /** |
---|
419 | * Returns whether the task attempt should be committed or not |
---|
420 | */ |
---|
421 | public boolean shouldCommit(TaskAttemptID taskid) { |
---|
422 | return !isComplete() && isCommitPending(taskid) && |
---|
423 | taskToCommit.equals(taskid); |
---|
424 | } |
---|
425 | |
---|
426 | /** |
---|
427 | * Creates a "status report" for this task. Includes the |
---|
428 | * task ID and overall status, plus reports for all the |
---|
429 | * component task-threads that have ever been started. |
---|
430 | */ |
---|
431 | synchronized TaskReport generateSingleReport() { |
---|
432 | ArrayList<String> diagnostics = new ArrayList<String>(); |
---|
433 | for (List<String> l : taskDiagnosticData.values()) { |
---|
434 | diagnostics.addAll(l); |
---|
435 | } |
---|
436 | TIPStatus currentStatus = null; |
---|
437 | if (isRunning() && !isComplete()) { |
---|
438 | currentStatus = TIPStatus.RUNNING; |
---|
439 | } else if (isComplete()) { |
---|
440 | currentStatus = TIPStatus.COMPLETE; |
---|
441 | } else if (wasKilled()) { |
---|
442 | currentStatus = TIPStatus.KILLED; |
---|
443 | } else if (isFailed()) { |
---|
444 | currentStatus = TIPStatus.FAILED; |
---|
445 | } else if (!(isComplete() || isRunning() || wasKilled())) { |
---|
446 | currentStatus = TIPStatus.PENDING; |
---|
447 | } |
---|
448 | |
---|
449 | TaskReport report = new TaskReport |
---|
450 | (getTIPId(), (float)progress, state, |
---|
451 | diagnostics.toArray(new String[diagnostics.size()]), |
---|
452 | currentStatus, execStartTime, execFinishTime, counters); |
---|
453 | if (currentStatus == TIPStatus.RUNNING) { |
---|
454 | report.setRunningTaskAttempts(activeTasks.keySet()); |
---|
455 | } else if (currentStatus == TIPStatus.COMPLETE) { |
---|
456 | report.setSuccessfulAttempt(getSuccessfulTaskid()); |
---|
457 | } |
---|
458 | return report; |
---|
459 | } |
---|
460 | |
---|
461 | /** |
---|
462 | * Get the diagnostic messages for a given task within this tip. |
---|
463 | * |
---|
464 | * @param taskId the id of the required task |
---|
465 | * @return the list of diagnostics for that task |
---|
466 | */ |
---|
467 | synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) { |
---|
468 | return taskDiagnosticData.get(taskId); |
---|
469 | } |
---|
470 | |
---|
471 | //////////////////////////////////////////////// |
---|
472 | // Update methods, usually invoked by the owning |
---|
473 | // job. |
---|
474 | //////////////////////////////////////////////// |
---|
475 | |
---|
476 | /** |
---|
477 | * Save diagnostic information for a given task. |
---|
478 | * |
---|
479 | * @param taskId id of the task |
---|
480 | * @param diagInfo diagnostic information for the task |
---|
481 | */ |
---|
482 | public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) { |
---|
483 | List<String> diagHistory = taskDiagnosticData.get(taskId); |
---|
484 | if (diagHistory == null) { |
---|
485 | diagHistory = new ArrayList<String>(); |
---|
486 | taskDiagnosticData.put(taskId, diagHistory); |
---|
487 | } |
---|
488 | diagHistory.add(diagInfo); |
---|
489 | } |
---|
490 | |
---|
491 | /** |
---|
492 | * A status message from a client has arrived. |
---|
493 | * It updates the status of a single component-thread-task, |
---|
494 | * which might result in an overall TaskInProgress status update. |
---|
495 | * @return has the task changed its state noticably? |
---|
496 | */ |
---|
497 | synchronized boolean updateStatus(TaskStatus status) { |
---|
498 | TaskAttemptID taskid = status.getTaskID(); |
---|
499 | String diagInfo = status.getDiagnosticInfo(); |
---|
500 | TaskStatus oldStatus = taskStatuses.get(taskid); |
---|
501 | boolean changed = true; |
---|
502 | if (diagInfo != null && diagInfo.length() > 0) { |
---|
503 | LOG.info("Error from "+taskid+": "+diagInfo); |
---|
504 | addDiagnosticInfo(taskid, diagInfo); |
---|
505 | } |
---|
506 | |
---|
507 | if(skipping) { |
---|
508 | failedRanges.updateState(status); |
---|
509 | } |
---|
510 | |
---|
511 | if (oldStatus != null) { |
---|
512 | TaskStatus.State oldState = oldStatus.getRunState(); |
---|
513 | TaskStatus.State newState = status.getRunState(); |
---|
514 | |
---|
515 | // We should never recieve a duplicate success/failure/killed |
---|
516 | // status update for the same taskid! This is a safety check, |
---|
517 | // and is addressed better at the TaskTracker to ensure this. |
---|
518 | // @see {@link TaskTracker.transmitHeartbeat()} |
---|
519 | if ((newState != TaskStatus.State.RUNNING && |
---|
520 | newState != TaskStatus.State.COMMIT_PENDING && |
---|
521 | newState != TaskStatus.State.FAILED_UNCLEAN && |
---|
522 | newState != TaskStatus.State.KILLED_UNCLEAN && |
---|
523 | newState != TaskStatus.State.UNASSIGNED) && |
---|
524 | (oldState == newState)) { |
---|
525 | LOG.warn("Recieved duplicate status update of '" + newState + |
---|
526 | "' for '" + taskid + "' of TIP '" + getTIPId() + "'"); |
---|
527 | return false; |
---|
528 | } |
---|
529 | |
---|
530 | // The task is not allowed to move from completed back to running. |
---|
531 | // We have seen out of order status messagesmoving tasks from complete |
---|
532 | // to running. This is a spot fix, but it should be addressed more |
---|
533 | // globally. |
---|
534 | if ((newState == TaskStatus.State.RUNNING || |
---|
535 | newState == TaskStatus.State.UNASSIGNED) && |
---|
536 | (oldState == TaskStatus.State.FAILED || |
---|
537 | oldState == TaskStatus.State.KILLED || |
---|
538 | oldState == TaskStatus.State.FAILED_UNCLEAN || |
---|
539 | oldState == TaskStatus.State.KILLED_UNCLEAN || |
---|
540 | oldState == TaskStatus.State.SUCCEEDED || |
---|
541 | oldState == TaskStatus.State.COMMIT_PENDING)) { |
---|
542 | return false; |
---|
543 | } |
---|
544 | |
---|
545 | //Do not accept any status once the task is marked FAILED/KILLED |
---|
546 | //This is to handle the case of the JobTracker timing out a task |
---|
547 | //due to launch delay, but the TT comes back with any state or |
---|
548 | //TT got expired |
---|
549 | if (oldState == TaskStatus.State.FAILED || |
---|
550 | oldState == TaskStatus.State.KILLED) { |
---|
551 | tasksToKill.put(taskid, true); |
---|
552 | return false; |
---|
553 | } |
---|
554 | |
---|
555 | changed = oldState != newState; |
---|
556 | } |
---|
557 | // if task is a cleanup attempt, do not replace the complete status, |
---|
558 | // update only specific fields. |
---|
559 | // For example, startTime should not be updated, |
---|
560 | // but finishTime has to be updated. |
---|
561 | if (!isCleanupAttempt(taskid)) { |
---|
562 | taskStatuses.put(taskid, status); |
---|
563 | } else { |
---|
564 | taskStatuses.get(taskid).statusUpdate(status.getRunState(), |
---|
565 | status.getProgress(), status.getStateString(), status.getPhase(), |
---|
566 | status.getFinishTime()); |
---|
567 | } |
---|
568 | |
---|
569 | // Recompute progress |
---|
570 | recomputeProgress(); |
---|
571 | return changed; |
---|
572 | } |
---|
573 | |
---|
574 | /** |
---|
575 | * Indicate that one of the taskids in this TaskInProgress |
---|
576 | * has failed. |
---|
577 | */ |
---|
578 | public void incompleteSubTask(TaskAttemptID taskid, |
---|
579 | JobStatus jobStatus) { |
---|
580 | // |
---|
581 | // Note the failure and its location |
---|
582 | // |
---|
583 | TaskStatus status = taskStatuses.get(taskid); |
---|
584 | String trackerName; |
---|
585 | String trackerHostName = null; |
---|
586 | TaskStatus.State taskState = TaskStatus.State.FAILED; |
---|
587 | if (status != null) { |
---|
588 | trackerName = status.getTaskTracker(); |
---|
589 | trackerHostName = |
---|
590 | JobInProgress.convertTrackerNameToHostName(trackerName); |
---|
591 | // Check if the user manually KILLED/FAILED this task-attempt... |
---|
592 | Boolean shouldFail = tasksToKill.remove(taskid); |
---|
593 | if (shouldFail != null) { |
---|
594 | if (status.getRunState() == TaskStatus.State.FAILED || |
---|
595 | status.getRunState() == TaskStatus.State.KILLED) { |
---|
596 | taskState = (shouldFail) ? TaskStatus.State.FAILED : |
---|
597 | TaskStatus.State.KILLED; |
---|
598 | } else { |
---|
599 | taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN : |
---|
600 | TaskStatus.State.KILLED_UNCLEAN; |
---|
601 | |
---|
602 | } |
---|
603 | status.setRunState(taskState); |
---|
604 | addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" ); |
---|
605 | } |
---|
606 | |
---|
607 | taskState = status.getRunState(); |
---|
608 | if (taskState != TaskStatus.State.FAILED && |
---|
609 | taskState != TaskStatus.State.KILLED && |
---|
610 | taskState != TaskStatus.State.FAILED_UNCLEAN && |
---|
611 | taskState != TaskStatus.State.KILLED_UNCLEAN) { |
---|
612 | LOG.info("Task '" + taskid + "' running on '" + trackerName + |
---|
613 | "' in state: '" + taskState + "' being failed!"); |
---|
614 | status.setRunState(TaskStatus.State.FAILED); |
---|
615 | taskState = TaskStatus.State.FAILED; |
---|
616 | } |
---|
617 | |
---|
618 | // tasktracker went down and failed time was not reported. |
---|
619 | if (0 == status.getFinishTime()){ |
---|
620 | status.setFinishTime(System.currentTimeMillis()); |
---|
621 | } |
---|
622 | } |
---|
623 | |
---|
624 | this.activeTasks.remove(taskid); |
---|
625 | |
---|
626 | // Since we do not fail completed reduces (whose outputs go to hdfs), we |
---|
627 | // should note this failure only for completed maps, only if this taskid; |
---|
628 | // completed this map. however if the job is done, there is no need to |
---|
629 | // manipulate completed maps |
---|
630 | if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && |
---|
631 | jobStatus.getRunState() != JobStatus.SUCCEEDED) { |
---|
632 | this.completes--; |
---|
633 | |
---|
634 | // Reset the successfulTaskId since we don't have a SUCCESSFUL task now |
---|
635 | resetSuccessfulTaskid(); |
---|
636 | } |
---|
637 | |
---|
638 | // Note that there can be failures of tasks that are hosted on a machine |
---|
639 | // that has not yet registered with restarted jobtracker |
---|
640 | // recalculate the counts only if its a genuine failure |
---|
641 | if (tasks.contains(taskid)) { |
---|
642 | if (taskState == TaskStatus.State.FAILED) { |
---|
643 | numTaskFailures++; |
---|
644 | machinesWhereFailed.add(trackerHostName); |
---|
645 | if(maxSkipRecords>0) { |
---|
646 | //skipping feature enabled |
---|
647 | LOG.debug("TaskInProgress adding" + status.getNextRecordRange()); |
---|
648 | failedRanges.add(status.getNextRecordRange()); |
---|
649 | skipping = startSkipping(); |
---|
650 | } |
---|
651 | |
---|
652 | } else if (taskState == TaskStatus.State.KILLED) { |
---|
653 | numKilledTasks++; |
---|
654 | } |
---|
655 | } |
---|
656 | |
---|
657 | if (numTaskFailures >= maxTaskAttempts) { |
---|
658 | LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times."); |
---|
659 | kill(); |
---|
660 | } |
---|
661 | } |
---|
662 | |
---|
663 | /** |
---|
664 | * Get whether to start skipping mode. |
---|
665 | */ |
---|
666 | private boolean startSkipping() { |
---|
667 | if(maxSkipRecords>0 && |
---|
668 | numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) { |
---|
669 | return true; |
---|
670 | } |
---|
671 | return false; |
---|
672 | } |
---|
673 | |
---|
674 | /** |
---|
675 | * Finalize the <b>completed</b> task; note that this might not be the first |
---|
676 | * task-attempt of the {@link TaskInProgress} and hence might be declared |
---|
677 | * {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED} |
---|
678 | * |
---|
679 | * @param taskId id of the completed task-attempt |
---|
680 | * @param finalTaskState final {@link TaskStatus.State} of the task-attempt |
---|
681 | */ |
---|
682 | private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) { |
---|
683 | TaskStatus status = taskStatuses.get(taskId); |
---|
684 | status.setRunState(finalTaskState); |
---|
685 | activeTasks.remove(taskId); |
---|
686 | } |
---|
687 | |
---|
688 | /** |
---|
689 | * Indicate that one of the taskids in this already-completed |
---|
690 | * TaskInProgress has successfully completed; hence we mark this |
---|
691 | * taskid as {@link TaskStatus.State.KILLED}. |
---|
692 | */ |
---|
693 | void alreadyCompletedTask(TaskAttemptID taskid) { |
---|
694 | // 'KILL' the task |
---|
695 | completedTask(taskid, TaskStatus.State.KILLED); |
---|
696 | |
---|
697 | // Note the reason for the task being 'KILLED' |
---|
698 | addDiagnosticInfo(taskid, "Already completed TIP"); |
---|
699 | |
---|
700 | LOG.info("Already complete TIP " + getTIPId() + |
---|
701 | " has completed task " + taskid); |
---|
702 | } |
---|
703 | |
---|
704 | /** |
---|
705 | * Indicate that one of the taskids in this TaskInProgress |
---|
706 | * has successfully completed! |
---|
707 | */ |
---|
708 | public void completed(TaskAttemptID taskid) { |
---|
709 | // |
---|
710 | // Record that this taskid is complete |
---|
711 | // |
---|
712 | completedTask(taskid, TaskStatus.State.SUCCEEDED); |
---|
713 | |
---|
714 | // Note the successful taskid |
---|
715 | setSuccessfulTaskid(taskid); |
---|
716 | |
---|
717 | // |
---|
718 | // Now that the TIP is complete, the other speculative |
---|
719 | // subtasks will be closed when the owning tasktracker |
---|
720 | // reports in and calls shouldClose() on this object. |
---|
721 | // |
---|
722 | |
---|
723 | this.completes++; |
---|
724 | this.execFinishTime = System.currentTimeMillis(); |
---|
725 | recomputeProgress(); |
---|
726 | |
---|
727 | } |
---|
728 | |
---|
729 | /** |
---|
730 | * Get the split locations |
---|
731 | */ |
---|
732 | public String[] getSplitLocations() { |
---|
733 | if (isMapTask() && !jobSetup && !jobCleanup) { |
---|
734 | return rawSplit.getLocations(); |
---|
735 | } |
---|
736 | return new String[0]; |
---|
737 | } |
---|
738 | |
---|
739 | /** |
---|
740 | * Get the Status of the tasks managed by this TIP |
---|
741 | */ |
---|
742 | public TaskStatus[] getTaskStatuses() { |
---|
743 | return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]); |
---|
744 | } |
---|
745 | |
---|
746 | /** |
---|
747 | * Get the status of the specified task |
---|
748 | * @param taskid |
---|
749 | * @return |
---|
750 | */ |
---|
751 | public TaskStatus getTaskStatus(TaskAttemptID taskid) { |
---|
752 | return taskStatuses.get(taskid); |
---|
753 | } |
---|
754 | /** |
---|
755 | * The TIP's been ordered kill()ed. |
---|
756 | */ |
---|
757 | public void kill() { |
---|
758 | if (isComplete() || failed) { |
---|
759 | return; |
---|
760 | } |
---|
761 | this.failed = true; |
---|
762 | killed = true; |
---|
763 | this.execFinishTime = System.currentTimeMillis(); |
---|
764 | recomputeProgress(); |
---|
765 | } |
---|
766 | |
---|
767 | /** |
---|
768 | * Was the task killed? |
---|
769 | * @return true if the task killed |
---|
770 | */ |
---|
771 | public boolean wasKilled() { |
---|
772 | return killed; |
---|
773 | } |
---|
774 | |
---|
775 | /** |
---|
776 | * Kill the given task |
---|
777 | */ |
---|
778 | boolean killTask(TaskAttemptID taskId, boolean shouldFail) { |
---|
779 | TaskStatus st = taskStatuses.get(taskId); |
---|
780 | if(st != null && (st.getRunState() == TaskStatus.State.RUNNING |
---|
781 | || st.getRunState() == TaskStatus.State.COMMIT_PENDING || |
---|
782 | st.inTaskCleanupPhase() || |
---|
783 | st.getRunState() == TaskStatus.State.UNASSIGNED) |
---|
784 | && tasksToKill.put(taskId, shouldFail) == null ) { |
---|
785 | String logStr = "Request received to " + (shouldFail ? "fail" : "kill") |
---|
786 | + " task '" + taskId + "' by user"; |
---|
787 | addDiagnosticInfo(taskId, logStr); |
---|
788 | LOG.info(logStr); |
---|
789 | return true; |
---|
790 | } |
---|
791 | return false; |
---|
792 | } |
---|
793 | |
---|
794 | /** |
---|
795 | * This method is called whenever there's a status change |
---|
796 | * for one of the TIP's sub-tasks. It recomputes the overall |
---|
797 | * progress for the TIP. We examine all sub-tasks and find |
---|
798 | * the one that's most advanced (and non-failed). |
---|
799 | */ |
---|
800 | void recomputeProgress() { |
---|
801 | if (isComplete()) { |
---|
802 | this.progress = 1; |
---|
803 | // update the counters and the state |
---|
804 | TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid()); |
---|
805 | this.counters = completedStatus.getCounters(); |
---|
806 | this.state = completedStatus.getStateString(); |
---|
807 | } else if (failed) { |
---|
808 | this.progress = 0; |
---|
809 | // reset the counters and the state |
---|
810 | this.state = ""; |
---|
811 | this.counters = new Counters(); |
---|
812 | } else { |
---|
813 | double bestProgress = 0; |
---|
814 | String bestState = ""; |
---|
815 | Counters bestCounters = new Counters(); |
---|
816 | for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) { |
---|
817 | TaskAttemptID taskid = it.next(); |
---|
818 | TaskStatus status = taskStatuses.get(taskid); |
---|
819 | if (status.getRunState() == TaskStatus.State.SUCCEEDED) { |
---|
820 | bestProgress = 1; |
---|
821 | bestState = status.getStateString(); |
---|
822 | bestCounters = status.getCounters(); |
---|
823 | break; |
---|
824 | } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) { |
---|
825 | //for COMMIT_PENDING, we take the last state that we recorded |
---|
826 | //when the task was RUNNING |
---|
827 | bestProgress = this.progress; |
---|
828 | bestState = this.state; |
---|
829 | bestCounters = this.counters; |
---|
830 | } else if (status.getRunState() == TaskStatus.State.RUNNING) { |
---|
831 | if (status.getProgress() >= bestProgress) { |
---|
832 | bestProgress = status.getProgress(); |
---|
833 | bestState = status.getStateString(); |
---|
834 | if (status.getIncludeCounters()) { |
---|
835 | bestCounters = status.getCounters(); |
---|
836 | } else { |
---|
837 | bestCounters = this.counters; |
---|
838 | } |
---|
839 | } |
---|
840 | } |
---|
841 | } |
---|
842 | this.progress = bestProgress; |
---|
843 | this.state = bestState; |
---|
844 | this.counters = bestCounters; |
---|
845 | } |
---|
846 | } |
---|
847 | |
---|
848 | ///////////////////////////////////////////////// |
---|
849 | // "Action" methods that actually require the TIP |
---|
850 | // to do something. |
---|
851 | ///////////////////////////////////////////////// |
---|
852 | |
---|
853 | /** |
---|
854 | * Return whether this TIP still needs to run |
---|
855 | */ |
---|
856 | boolean isRunnable() { |
---|
857 | return !failed && (completes == 0); |
---|
858 | } |
---|
859 | |
---|
860 | /** |
---|
861 | * Return whether the TIP has a speculative task to run. We |
---|
862 | * only launch a speculative task if the current TIP is really |
---|
863 | * far behind, and has been behind for a non-trivial amount of |
---|
864 | * time. |
---|
865 | */ |
---|
866 | boolean hasSpeculativeTask(long currentTime, double averageProgress) { |
---|
867 | // |
---|
868 | // REMIND - mjc - these constants should be examined |
---|
869 | // in more depth eventually... |
---|
870 | // |
---|
871 | |
---|
872 | if (!skipping && activeTasks.size() <= MAX_TASK_EXECS && |
---|
873 | (averageProgress - progress >= SPECULATIVE_GAP) && |
---|
874 | (currentTime - startTime >= SPECULATIVE_LAG) |
---|
875 | && completes == 0 && !isOnlyCommitPending()) { |
---|
876 | return true; |
---|
877 | } |
---|
878 | return false; |
---|
879 | } |
---|
880 | |
---|
881 | /** |
---|
882 | * Return a Task that can be sent to a TaskTracker for execution. |
---|
883 | */ |
---|
884 | public Task getTaskToRun(String taskTracker) throws IOException { |
---|
885 | if (0 == execStartTime){ |
---|
886 | // assume task starts running now |
---|
887 | execStartTime = System.currentTimeMillis(); |
---|
888 | } |
---|
889 | |
---|
890 | // Create the 'taskid'; do not count the 'killed' tasks against the job! |
---|
891 | TaskAttemptID taskid = null; |
---|
892 | if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) { |
---|
893 | // Make sure that the attempts are unqiue across restarts |
---|
894 | int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId; |
---|
895 | taskid = new TaskAttemptID( id, attemptId); |
---|
896 | ++nextTaskId; |
---|
897 | } else { |
---|
898 | LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) + |
---|
899 | " (plus " + numKilledTasks + " killed)" + |
---|
900 | " attempts for the tip '" + getTIPId() + "'"); |
---|
901 | return null; |
---|
902 | } |
---|
903 | |
---|
904 | return addRunningTask(taskid, taskTracker); |
---|
905 | } |
---|
906 | |
---|
907 | public Task addRunningTask(TaskAttemptID taskid, String taskTracker) { |
---|
908 | return addRunningTask(taskid, taskTracker, false); |
---|
909 | } |
---|
910 | |
---|
911 | /** |
---|
912 | * Adds a previously running task to this tip. This is used in case of |
---|
913 | * jobtracker restarts. |
---|
914 | */ |
---|
915 | public Task addRunningTask(TaskAttemptID taskid, |
---|
916 | String taskTracker, |
---|
917 | boolean taskCleanup) { |
---|
918 | // create the task |
---|
919 | Task t = null; |
---|
920 | if (isMapTask()) { |
---|
921 | LOG.debug("attempt " + numTaskFailures + " sending skippedRecords " |
---|
922 | + failedRanges.getIndicesCount()); |
---|
923 | String splitClass = null; |
---|
924 | BytesWritable split; |
---|
925 | if (!jobSetup && !jobCleanup) { |
---|
926 | splitClass = rawSplit.getClassName(); |
---|
927 | split = rawSplit.getBytes(); |
---|
928 | } else { |
---|
929 | split = new BytesWritable(); |
---|
930 | } |
---|
931 | t = new MapTask(jobFile, taskid, partition, splitClass, split); |
---|
932 | } else { |
---|
933 | t = new ReduceTask(jobFile, taskid, partition, numMaps); |
---|
934 | } |
---|
935 | if (jobCleanup) { |
---|
936 | t.setJobCleanupTask(); |
---|
937 | } |
---|
938 | if (jobSetup) { |
---|
939 | t.setJobSetupTask(); |
---|
940 | } |
---|
941 | if (taskCleanup) { |
---|
942 | t.setTaskCleanupTask(); |
---|
943 | t.setState(taskStatuses.get(taskid).getRunState()); |
---|
944 | cleanupTasks.put(taskid, taskTracker); |
---|
945 | } |
---|
946 | t.setConf(conf); |
---|
947 | LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges()); |
---|
948 | t.setSkipRanges(failedRanges.getSkipRanges()); |
---|
949 | t.setSkipping(skipping); |
---|
950 | if(failedRanges.isTestAttempt()) { |
---|
951 | t.setWriteSkipRecs(false); |
---|
952 | } |
---|
953 | |
---|
954 | activeTasks.put(taskid, taskTracker); |
---|
955 | tasks.add(taskid); |
---|
956 | |
---|
957 | // Ask JobTracker to note that the task exists |
---|
958 | jobtracker.createTaskEntry(taskid, taskTracker, this); |
---|
959 | |
---|
960 | // check and set the first attempt |
---|
961 | if (firstTaskId == null) { |
---|
962 | firstTaskId = taskid; |
---|
963 | } |
---|
964 | return t; |
---|
965 | } |
---|
966 | |
---|
967 | boolean isRunningTask(TaskAttemptID taskid) { |
---|
968 | TaskStatus status = taskStatuses.get(taskid); |
---|
969 | return status != null && status.getRunState() == TaskStatus.State.RUNNING; |
---|
970 | } |
---|
971 | |
---|
972 | boolean isCleanupAttempt(TaskAttemptID taskid) { |
---|
973 | return cleanupTasks.containsKey(taskid); |
---|
974 | } |
---|
975 | |
---|
976 | String machineWhereCleanupRan(TaskAttemptID taskid) { |
---|
977 | return cleanupTasks.get(taskid); |
---|
978 | } |
---|
979 | |
---|
980 | String machineWhereTaskRan(TaskAttemptID taskid) { |
---|
981 | return taskStatuses.get(taskid).getTaskTracker(); |
---|
982 | } |
---|
983 | |
---|
984 | boolean wasKilled(TaskAttemptID taskid) { |
---|
985 | return tasksToKill.containsKey(taskid); |
---|
986 | } |
---|
987 | |
---|
988 | /** |
---|
989 | * Has this task already failed on this machine? |
---|
990 | * @param trackerHost The task tracker hostname |
---|
991 | * @return Has it failed? |
---|
992 | */ |
---|
993 | public boolean hasFailedOnMachine(String trackerHost) { |
---|
994 | return machinesWhereFailed.contains(trackerHost); |
---|
995 | } |
---|
996 | |
---|
997 | /** |
---|
998 | * Was this task ever scheduled to run on this machine? |
---|
999 | * @param trackerHost The task tracker hostname |
---|
1000 | * @param trackerName The tracker name |
---|
1001 | * @return Was task scheduled on the tracker? |
---|
1002 | */ |
---|
1003 | public boolean hasRunOnMachine(String trackerHost, String trackerName) { |
---|
1004 | return this.activeTasks.values().contains(trackerName) || |
---|
1005 | hasFailedOnMachine(trackerHost); |
---|
1006 | } |
---|
1007 | /** |
---|
1008 | * Get the number of machines where this task has failed. |
---|
1009 | * @return the size of the failed machine set |
---|
1010 | */ |
---|
1011 | public int getNumberOfFailedMachines() { |
---|
1012 | return machinesWhereFailed.size(); |
---|
1013 | } |
---|
1014 | |
---|
1015 | /** |
---|
1016 | * Get the id of this map or reduce task. |
---|
1017 | * @return The index of this tip in the maps/reduces lists. |
---|
1018 | */ |
---|
1019 | public int getIdWithinJob() { |
---|
1020 | return partition; |
---|
1021 | } |
---|
1022 | |
---|
1023 | /** |
---|
1024 | * Set the event number that was raised for this tip |
---|
1025 | */ |
---|
1026 | public void setSuccessEventNumber(int eventNumber) { |
---|
1027 | successEventNumber = eventNumber; |
---|
1028 | } |
---|
1029 | |
---|
1030 | /** |
---|
1031 | * Get the event number that was raised for this tip |
---|
1032 | */ |
---|
1033 | public int getSuccessEventNumber() { |
---|
1034 | return successEventNumber; |
---|
1035 | } |
---|
1036 | |
---|
1037 | /** |
---|
1038 | * Gets the Node list of input split locations sorted in rack order. |
---|
1039 | */ |
---|
1040 | public String getSplitNodes() { |
---|
1041 | if (!isMapTask() || jobSetup || jobCleanup) { |
---|
1042 | return ""; |
---|
1043 | } |
---|
1044 | String[] splits = rawSplit.getLocations(); |
---|
1045 | Node[] nodes = new Node[splits.length]; |
---|
1046 | for (int i = 0; i < splits.length; i++) { |
---|
1047 | nodes[i] = jobtracker.getNode(splits[i]); |
---|
1048 | } |
---|
1049 | // sort nodes on rack location |
---|
1050 | Arrays.sort(nodes, new Comparator<Node>() { |
---|
1051 | public int compare(Node a, Node b) { |
---|
1052 | String left = a.getNetworkLocation(); |
---|
1053 | String right = b.getNetworkLocation(); |
---|
1054 | return left.compareTo(right); |
---|
1055 | } |
---|
1056 | }); |
---|
1057 | return nodeToString(nodes); |
---|
1058 | } |
---|
1059 | |
---|
1060 | private static String nodeToString(Node[] nodes) { |
---|
1061 | if (nodes == null || nodes.length == 0) { |
---|
1062 | return ""; |
---|
1063 | } |
---|
1064 | StringBuffer ret = new StringBuffer(nodes[0].toString()); |
---|
1065 | for(int i = 1; i < nodes.length;i++) { |
---|
1066 | ret.append(","); |
---|
1067 | ret.append(nodes[i].toString()); |
---|
1068 | } |
---|
1069 | return ret.toString(); |
---|
1070 | } |
---|
1071 | |
---|
1072 | public long getMapInputSize() { |
---|
1073 | if(isMapTask() && !jobSetup && !jobCleanup) { |
---|
1074 | return rawSplit.getDataLength(); |
---|
1075 | } else { |
---|
1076 | return 0; |
---|
1077 | } |
---|
1078 | } |
---|
1079 | |
---|
1080 | public void clearSplit() { |
---|
1081 | rawSplit.clearBytes(); |
---|
1082 | } |
---|
1083 | |
---|
1084 | /** |
---|
1085 | * This class keeps the records to be skipped during further executions |
---|
1086 | * based on failed records from all the previous attempts. |
---|
1087 | * It also narrow down the skip records if it is more than the |
---|
1088 | * acceptable value by dividing the failed range into half. In this case one |
---|
1089 | * half is executed in the next attempt (test attempt). |
---|
1090 | * In the test attempt, only the test range gets executed, others get skipped. |
---|
1091 | * Based on the success/failure of the test attempt, the range is divided |
---|
1092 | * further. |
---|
1093 | */ |
---|
1094 | private class FailedRanges { |
---|
1095 | private SortedRanges skipRanges = new SortedRanges(); |
---|
1096 | private Divide divide; |
---|
1097 | |
---|
1098 | synchronized SortedRanges getSkipRanges() { |
---|
1099 | if(divide!=null) { |
---|
1100 | return divide.skipRange; |
---|
1101 | } |
---|
1102 | return skipRanges; |
---|
1103 | } |
---|
1104 | |
---|
1105 | synchronized boolean isTestAttempt() { |
---|
1106 | return divide!=null; |
---|
1107 | } |
---|
1108 | |
---|
1109 | synchronized long getIndicesCount() { |
---|
1110 | if(isTestAttempt()) { |
---|
1111 | return divide.skipRange.getIndicesCount(); |
---|
1112 | } |
---|
1113 | return skipRanges.getIndicesCount(); |
---|
1114 | } |
---|
1115 | |
---|
1116 | synchronized void updateState(TaskStatus status){ |
---|
1117 | if (isTestAttempt() && |
---|
1118 | (status.getRunState() == TaskStatus.State.SUCCEEDED)) { |
---|
1119 | divide.testPassed = true; |
---|
1120 | //since it was the test attempt we need to set it to failed |
---|
1121 | //as it worked only on the test range |
---|
1122 | status.setRunState(TaskStatus.State.FAILED); |
---|
1123 | |
---|
1124 | } |
---|
1125 | } |
---|
1126 | |
---|
1127 | synchronized void add(Range failedRange) { |
---|
1128 | LOG.warn("FailedRange:"+ failedRange); |
---|
1129 | if(divide!=null) { |
---|
1130 | LOG.warn("FailedRange:"+ failedRange +" test:"+divide.test + |
---|
1131 | " pass:"+divide.testPassed); |
---|
1132 | if(divide.testPassed) { |
---|
1133 | //test range passed |
---|
1134 | //other range would be bad. test it |
---|
1135 | failedRange = divide.other; |
---|
1136 | } |
---|
1137 | else { |
---|
1138 | //test range failed |
---|
1139 | //other range would be good. |
---|
1140 | failedRange = divide.test; |
---|
1141 | } |
---|
1142 | //reset |
---|
1143 | divide = null; |
---|
1144 | } |
---|
1145 | |
---|
1146 | if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) { |
---|
1147 | skipRanges.add(failedRange); |
---|
1148 | } else { |
---|
1149 | //start dividing the range to narrow down the skipped |
---|
1150 | //records until maxSkipRecords are met OR all attempts |
---|
1151 | //get exhausted |
---|
1152 | divide = new Divide(failedRange); |
---|
1153 | } |
---|
1154 | } |
---|
1155 | |
---|
1156 | class Divide { |
---|
1157 | private final SortedRanges skipRange; |
---|
1158 | private final Range test; |
---|
1159 | private final Range other; |
---|
1160 | private boolean testPassed; |
---|
1161 | Divide(Range range){ |
---|
1162 | long half = range.getLength()/2; |
---|
1163 | test = new Range(range.getStartIndex(), half); |
---|
1164 | other = new Range(test.getEndIndex(), range.getLength()-half); |
---|
1165 | //construct the skip range from the skipRanges |
---|
1166 | skipRange = new SortedRanges(); |
---|
1167 | for(Range r : skipRanges.getRanges()) { |
---|
1168 | skipRange.add(r); |
---|
1169 | } |
---|
1170 | skipRange.add(new Range(0,test.getStartIndex())); |
---|
1171 | skipRange.add(new Range(test.getEndIndex(), |
---|
1172 | (Long.MAX_VALUE-test.getEndIndex()))); |
---|
1173 | } |
---|
1174 | } |
---|
1175 | |
---|
1176 | } |
---|
1177 | |
---|
1178 | TreeMap<TaskAttemptID, String> getActiveTasks() { |
---|
1179 | return activeTasks; |
---|
1180 | } |
---|
1181 | } |
---|