1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.io.File; |
---|
22 | import java.io.IOException; |
---|
23 | import java.text.ParseException; |
---|
24 | import java.util.ArrayList; |
---|
25 | import java.util.List; |
---|
26 | import java.util.HashMap; |
---|
27 | import java.util.Map; |
---|
28 | import java.util.Iterator; |
---|
29 | import java.util.regex.Matcher; |
---|
30 | import java.util.regex.Pattern; |
---|
31 | |
---|
32 | import junit.framework.TestCase; |
---|
33 | |
---|
34 | import org.apache.hadoop.conf.Configuration; |
---|
35 | import org.apache.hadoop.fs.FileSystem; |
---|
36 | import org.apache.hadoop.fs.Path; |
---|
37 | import org.apache.hadoop.fs.permission.FsPermission; |
---|
38 | import org.apache.hadoop.mapred.JobHistory.*; |
---|
39 | import org.apache.commons.logging.Log; |
---|
40 | import org.apache.commons.logging.LogFactory; |
---|
41 | |
---|
42 | /** |
---|
43 | * Tests the JobHistory files - to catch any changes to JobHistory that can |
---|
44 | * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer. |
---|
45 | * |
---|
46 | * testJobHistoryFile |
---|
47 | * Run a job that will be succeeded and validate its history file format and |
---|
48 | * content. |
---|
49 | * |
---|
50 | * testJobHistoryUserLogLocation |
---|
51 | * Run jobs with the given values of hadoop.job.history.user.location as |
---|
52 | * (1)null(default case), (2)"none", and (3)some dir like "/tmp". |
---|
53 | * Validate user history file location in each case. |
---|
54 | * |
---|
55 | * testJobHistoryJobStatus |
---|
56 | * Run jobs that will be (1) succeeded (2) failed (3) killed. |
---|
57 | * Validate job status read from history file in each case. |
---|
58 | * |
---|
59 | * Future changes to job history are to be reflected here in this file. |
---|
60 | */ |
---|
61 | public class TestJobHistory extends TestCase { |
---|
62 | private static final Log LOG = LogFactory.getLog(TestJobHistory.class); |
---|
63 | |
---|
64 | private static String TEST_ROOT_DIR = new File(System.getProperty( |
---|
65 | "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); |
---|
66 | |
---|
67 | private static final Pattern digitsPattern = |
---|
68 | Pattern.compile(JobHistory.DIGITS); |
---|
69 | |
---|
70 | // hostname like /default-rack/host1.foo.com OR host1.foo.com |
---|
71 | private static final Pattern hostNamePattern = Pattern.compile( |
---|
72 | "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)"); |
---|
73 | |
---|
74 | private static final String IP_ADDR = |
---|
75 | "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?"; |
---|
76 | |
---|
77 | // hostname like /default-rack/host1.foo.com OR host1.foo.com |
---|
78 | private static final Pattern trackerNamePattern = Pattern.compile( |
---|
79 | "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" + |
---|
80 | IP_ADDR + ":" + JobHistory.DIGITS); |
---|
81 | |
---|
82 | private static final Pattern splitsPattern = Pattern.compile( |
---|
83 | hostNamePattern + "(," + hostNamePattern + ")*"); |
---|
84 | |
---|
85 | private static Map<String, List<String>> taskIDsToAttemptIDs = |
---|
86 | new HashMap<String, List<String>>(); |
---|
87 | |
---|
88 | //Each Task End seen from history file is added here |
---|
89 | private static List<String> taskEnds = new ArrayList<String>(); |
---|
90 | |
---|
91 | // List of tasks that appear in history file after JT reatart. This is to |
---|
92 | // allow START_TIME=0 for these tasks. |
---|
93 | private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>(); |
---|
94 | |
---|
95 | // List of potential tasks whose start time can be 0 because of JT restart |
---|
96 | private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>(); |
---|
97 | |
---|
98 | /** |
---|
99 | * Listener for history log file, it populates JobHistory.JobInfo |
---|
100 | * object with data from log file and validates the data. |
---|
101 | */ |
---|
102 | static class TestListener |
---|
103 | extends DefaultJobHistoryParser.JobTasksParseListener { |
---|
104 | int lineNum;//line number of history log file |
---|
105 | boolean isJobLaunched; |
---|
106 | boolean isJTRestarted; |
---|
107 | |
---|
108 | TestListener(JobInfo job) { |
---|
109 | super(job); |
---|
110 | lineNum = 0; |
---|
111 | isJobLaunched = false; |
---|
112 | isJTRestarted = false; |
---|
113 | } |
---|
114 | |
---|
115 | // TestListener implementation |
---|
116 | public void handle(RecordTypes recType, Map<Keys, String> values) |
---|
117 | throws IOException { |
---|
118 | |
---|
119 | lineNum++; |
---|
120 | |
---|
121 | // Check if the record is of type Meta |
---|
122 | if (recType == JobHistory.RecordTypes.Meta) { |
---|
123 | long version = Long.parseLong(values.get(Keys.VERSION)); |
---|
124 | assertTrue("Unexpected job history version ", |
---|
125 | (version >= 0 && version <= JobHistory.VERSION)); |
---|
126 | } |
---|
127 | else if (recType.equals(RecordTypes.Job)) { |
---|
128 | String jobid = values.get(Keys.JOBID); |
---|
129 | assertTrue("record type 'Job' is seen without JOBID key" + |
---|
130 | " in history file at line " + lineNum, jobid != null); |
---|
131 | JobID id = JobID.forName(jobid); |
---|
132 | assertTrue("JobID in history file is in unexpected format " + |
---|
133 | "at line " + lineNum, id != null); |
---|
134 | String time = values.get(Keys.LAUNCH_TIME); |
---|
135 | if (time != null) { |
---|
136 | if (isJobLaunched) { |
---|
137 | // We assume that if we see LAUNCH_TIME again, it is because of JT restart |
---|
138 | isJTRestarted = true; |
---|
139 | } |
---|
140 | else {// job launched first time |
---|
141 | isJobLaunched = true; |
---|
142 | } |
---|
143 | } |
---|
144 | time = values.get(Keys.FINISH_TIME); |
---|
145 | if (time != null) { |
---|
146 | assertTrue ("Job FINISH_TIME is seen in history file at line " + |
---|
147 | lineNum + " before LAUNCH_TIME is seen", isJobLaunched); |
---|
148 | } |
---|
149 | } |
---|
150 | else if (recType.equals(RecordTypes.Task)) { |
---|
151 | String taskid = values.get(Keys.TASKID); |
---|
152 | assertTrue("record type 'Task' is seen without TASKID key" + |
---|
153 | " in history file at line " + lineNum, taskid != null); |
---|
154 | TaskID id = TaskID.forName(taskid); |
---|
155 | assertTrue("TaskID in history file is in unexpected format " + |
---|
156 | "at line " + lineNum, id != null); |
---|
157 | |
---|
158 | String time = values.get(Keys.START_TIME); |
---|
159 | if (time != null) { |
---|
160 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
161 | assertTrue("Duplicate START_TIME seen for task " + taskid + |
---|
162 | " in history file at line " + lineNum, attemptIDs == null); |
---|
163 | attemptIDs = new ArrayList<String>(); |
---|
164 | taskIDsToAttemptIDs.put(taskid, attemptIDs); |
---|
165 | |
---|
166 | if (isJTRestarted) { |
---|
167 | // This maintains a potential ignoreStartTimeTasks list |
---|
168 | tempIgnoreStartTimeOfTasks.add(taskid); |
---|
169 | } |
---|
170 | } |
---|
171 | |
---|
172 | time = values.get(Keys.FINISH_TIME); |
---|
173 | if (time != null) { |
---|
174 | String s = values.get(Keys.TASK_STATUS); |
---|
175 | if (s != null) { |
---|
176 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
177 | assertTrue ("Task FINISH_TIME is seen in history file at line " + |
---|
178 | lineNum + " before START_TIME is seen", attemptIDs != null); |
---|
179 | |
---|
180 | // Check if all the attemptIDs of this task are finished |
---|
181 | assertTrue("TaskId " + taskid + " is finished at line " + |
---|
182 | lineNum + " but its attemptID is not finished.", |
---|
183 | (attemptIDs.size() <= 1)); |
---|
184 | |
---|
185 | // Check if at least 1 attempt of this task is seen |
---|
186 | assertTrue("TaskId " + taskid + " is finished at line " + |
---|
187 | lineNum + " but no attemptID is seen before this.", |
---|
188 | attemptIDs.size() == 1); |
---|
189 | |
---|
190 | if (s.equals("KILLED") || s.equals("FAILED")) { |
---|
191 | // Task End with KILLED/FAILED status in history file is |
---|
192 | // considered as TaskEnd, TaskStart. This is useful in checking |
---|
193 | // the order of history lines. |
---|
194 | attemptIDs = new ArrayList<String>(); |
---|
195 | taskIDsToAttemptIDs.put(taskid, attemptIDs); |
---|
196 | } |
---|
197 | else { |
---|
198 | taskEnds.add(taskid); |
---|
199 | } |
---|
200 | } |
---|
201 | else { |
---|
202 | // This line of history file could be just an update to finish time |
---|
203 | } |
---|
204 | } |
---|
205 | } |
---|
206 | else if (recType.equals(RecordTypes.MapAttempt) || |
---|
207 | recType.equals(RecordTypes.ReduceAttempt)) { |
---|
208 | String taskid = values.get(Keys.TASKID); |
---|
209 | assertTrue("record type " + recType + " is seen without TASKID key" + |
---|
210 | " in history file at line " + lineNum, taskid != null); |
---|
211 | |
---|
212 | String attemptId = values.get(Keys.TASK_ATTEMPT_ID); |
---|
213 | TaskAttemptID id = TaskAttemptID.forName(attemptId); |
---|
214 | assertTrue("AttemptID in history file is in unexpected format " + |
---|
215 | "at line " + lineNum, id != null); |
---|
216 | |
---|
217 | String time = values.get(Keys.START_TIME); |
---|
218 | if (time != null) { |
---|
219 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
220 | assertTrue ("TaskAttempt is seen in history file at line " + lineNum + |
---|
221 | " before Task is seen", attemptIDs != null); |
---|
222 | assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " + |
---|
223 | "file at line " + lineNum, attemptIDs.remove(attemptId)); |
---|
224 | |
---|
225 | if (attemptIDs.isEmpty()) { |
---|
226 | //just a boolean whether any attempt is seen or not |
---|
227 | attemptIDs.add("firstAttemptIsSeen"); |
---|
228 | } |
---|
229 | attemptIDs.add(attemptId); |
---|
230 | |
---|
231 | if (tempIgnoreStartTimeOfTasks.contains(taskid) && |
---|
232 | (id.getId() < 1000)) { |
---|
233 | // If Task line of this attempt is seen in history file after |
---|
234 | // JT restart and if this attempt is < 1000(i.e. attempt is noti |
---|
235 | // started after JT restart) - assuming single JT restart happened |
---|
236 | ignoreStartTimeOfTasks.add(taskid); |
---|
237 | } |
---|
238 | } |
---|
239 | |
---|
240 | time = values.get(Keys.FINISH_TIME); |
---|
241 | if (time != null) { |
---|
242 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
243 | assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line " |
---|
244 | + lineNum + " before Task is seen", attemptIDs != null); |
---|
245 | |
---|
246 | assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line " |
---|
247 | + lineNum + " before TaskAttempt START_TIME is seen", |
---|
248 | attemptIDs.remove(attemptId)); |
---|
249 | } |
---|
250 | } |
---|
251 | super.handle(recType, values); |
---|
252 | } |
---|
253 | } |
---|
254 | |
---|
255 | // Check if the time is in the expected format |
---|
256 | private static boolean isTimeValid(String time) { |
---|
257 | Matcher m = digitsPattern.matcher(time); |
---|
258 | return m.matches() && (Long.parseLong(time) > 0); |
---|
259 | } |
---|
260 | |
---|
261 | private static boolean areTimesInOrder(String time1, String time2) { |
---|
262 | return (Long.parseLong(time1) <= Long.parseLong(time2)); |
---|
263 | } |
---|
264 | |
---|
265 | // Validate Format of Job Level Keys, Values read from history file |
---|
266 | private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values, |
---|
267 | String status) { |
---|
268 | String time = values.get(Keys.SUBMIT_TIME); |
---|
269 | assertTrue("Job SUBMIT_TIME is in unexpected format:" + time + |
---|
270 | " in history file", isTimeValid(time)); |
---|
271 | |
---|
272 | time = values.get(Keys.LAUNCH_TIME); |
---|
273 | assertTrue("Job LAUNCH_TIME is in unexpected format:" + time + |
---|
274 | " in history file", isTimeValid(time)); |
---|
275 | |
---|
276 | String time1 = values.get(Keys.FINISH_TIME); |
---|
277 | assertTrue("Job FINISH_TIME is in unexpected format:" + time1 + |
---|
278 | " in history file", isTimeValid(time1)); |
---|
279 | assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file", |
---|
280 | areTimesInOrder(time, time1)); |
---|
281 | |
---|
282 | String stat = values.get(Keys.JOB_STATUS); |
---|
283 | assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" + |
---|
284 | " history file", (status.equals(stat))); |
---|
285 | |
---|
286 | String priority = values.get(Keys.JOB_PRIORITY); |
---|
287 | assertTrue("Unknown priority for the job in history file", |
---|
288 | (priority.equals("HIGH") || |
---|
289 | priority.equals("LOW") || priority.equals("NORMAL") || |
---|
290 | priority.equals("VERY_HIGH") || priority.equals("VERY_LOW"))); |
---|
291 | } |
---|
292 | |
---|
293 | // Validate Format of Task Level Keys, Values read from history file |
---|
294 | private static void validateTaskLevelKeyValuesFormat(JobInfo job, |
---|
295 | boolean splitsCanBeEmpty) { |
---|
296 | Map<String, JobHistory.Task> tasks = job.getAllTasks(); |
---|
297 | |
---|
298 | // validate info of each task |
---|
299 | for (JobHistory.Task task : tasks.values()) { |
---|
300 | |
---|
301 | String tid = task.get(Keys.TASKID); |
---|
302 | String time = task.get(Keys.START_TIME); |
---|
303 | // We allow START_TIME=0 for tasks seen in history after JT restart |
---|
304 | if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) { |
---|
305 | assertTrue("Task START_TIME of " + tid + " is in unexpected format:" + |
---|
306 | time + " in history file", isTimeValid(time)); |
---|
307 | } |
---|
308 | |
---|
309 | String time1 = task.get(Keys.FINISH_TIME); |
---|
310 | assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" + |
---|
311 | time1 + " in history file", isTimeValid(time1)); |
---|
312 | assertTrue("Task FINISH_TIME is < START_TIME in history file", |
---|
313 | areTimesInOrder(time, time1)); |
---|
314 | |
---|
315 | // Make sure that the Task type exists and it is valid |
---|
316 | String type = task.get(Keys.TASK_TYPE); |
---|
317 | assertTrue("Unknown Task type \"" + type + "\" is seen in " + |
---|
318 | "history file for task " + tid, |
---|
319 | (type.equals("MAP") || type.equals("REDUCE") || |
---|
320 | type.equals("SETUP") || type.equals("CLEANUP"))); |
---|
321 | |
---|
322 | if (type.equals("MAP")) { |
---|
323 | String splits = task.get(Keys.SPLITS); |
---|
324 | //order in the condition OR check is important here |
---|
325 | if (!splitsCanBeEmpty || splits.length() != 0) { |
---|
326 | Matcher m = splitsPattern.matcher(splits); |
---|
327 | assertTrue("Unexpected format of SPLITS \"" + splits + "\" is seen" + |
---|
328 | " in history file for task " + tid, m.matches()); |
---|
329 | } |
---|
330 | } |
---|
331 | |
---|
332 | // Validate task status |
---|
333 | String status = task.get(Keys.TASK_STATUS); |
---|
334 | assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" + |
---|
335 | " history file for task " + tid, (status.equals("SUCCESS") || |
---|
336 | status.equals("FAILED") || status.equals("KILLED"))); |
---|
337 | } |
---|
338 | } |
---|
339 | |
---|
340 | // Validate foramt of Task Attempt Level Keys, Values read from history file |
---|
341 | private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) { |
---|
342 | Map<String, JobHistory.Task> tasks = job.getAllTasks(); |
---|
343 | |
---|
344 | // For each task |
---|
345 | for (JobHistory.Task task : tasks.values()) { |
---|
346 | // validate info of each attempt |
---|
347 | for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) { |
---|
348 | |
---|
349 | String id = attempt.get(Keys.TASK_ATTEMPT_ID); |
---|
350 | String time = attempt.get(Keys.START_TIME); |
---|
351 | assertTrue("START_TIME of task attempt " + id + |
---|
352 | " is in unexpected format:" + time + |
---|
353 | " in history file", isTimeValid(time)); |
---|
354 | |
---|
355 | String time1 = attempt.get(Keys.FINISH_TIME); |
---|
356 | assertTrue("FINISH_TIME of task attempt " + id + |
---|
357 | " is in unexpected format:" + time1 + |
---|
358 | " in history file", isTimeValid(time1)); |
---|
359 | assertTrue("Task FINISH_TIME is < START_TIME in history file", |
---|
360 | areTimesInOrder(time, time1)); |
---|
361 | |
---|
362 | // Make sure that the Task type exists and it is valid |
---|
363 | String type = attempt.get(Keys.TASK_TYPE); |
---|
364 | assertTrue("Unknown Task type \"" + type + "\" is seen in " + |
---|
365 | "history file for task attempt " + id, |
---|
366 | (type.equals("MAP") || type.equals("REDUCE") || |
---|
367 | type.equals("SETUP") || type.equals("CLEANUP"))); |
---|
368 | |
---|
369 | // Validate task status |
---|
370 | String status = attempt.get(Keys.TASK_STATUS); |
---|
371 | assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" + |
---|
372 | " history file for task attempt " + id, |
---|
373 | (status.equals("SUCCESS") || status.equals("FAILED") || |
---|
374 | status.equals("KILLED"))); |
---|
375 | |
---|
376 | // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and |
---|
377 | // SORT_FINISHED time |
---|
378 | if (type.equals("REDUCE") && status.equals("SUCCESS")) { |
---|
379 | time1 = attempt.get(Keys.SHUFFLE_FINISHED); |
---|
380 | assertTrue("SHUFFLE_FINISHED time of task attempt " + id + |
---|
381 | " is in unexpected format:" + time1 + |
---|
382 | " in history file", isTimeValid(time1)); |
---|
383 | assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " + |
---|
384 | "in history file", areTimesInOrder(time, time1)); |
---|
385 | time = attempt.get(Keys.SORT_FINISHED); |
---|
386 | assertTrue("SORT_FINISHED of task attempt " + id + |
---|
387 | " is in unexpected format:" + time + |
---|
388 | " in history file", isTimeValid(time)); |
---|
389 | assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" + |
---|
390 | " in history file", areTimesInOrder(time1, time)); |
---|
391 | } |
---|
392 | |
---|
393 | // check if hostname is valid |
---|
394 | String hostname = attempt.get(Keys.HOSTNAME); |
---|
395 | Matcher m = hostNamePattern.matcher(hostname); |
---|
396 | assertTrue("Unexpected Host name of task attempt " + id, m.matches()); |
---|
397 | |
---|
398 | // check if trackername is valid |
---|
399 | String trackerName = attempt.get(Keys.TRACKER_NAME); |
---|
400 | m = trackerNamePattern.matcher(trackerName); |
---|
401 | assertTrue("Unexpected tracker name of task attempt " + id, |
---|
402 | m.matches()); |
---|
403 | |
---|
404 | if (!status.equals("KILLED")) { |
---|
405 | // check if http port is valid |
---|
406 | String httpPort = attempt.get(Keys.HTTP_PORT); |
---|
407 | m = digitsPattern.matcher(httpPort); |
---|
408 | assertTrue("Unexpected http port of task attempt " + id, m.matches()); |
---|
409 | } |
---|
410 | |
---|
411 | // check if counters are parsable |
---|
412 | String counters = attempt.get(Keys.COUNTERS); |
---|
413 | try { |
---|
414 | Counters readCounters = Counters.fromEscapedCompactString(counters); |
---|
415 | assertTrue("Counters of task attempt " + id + " are not parsable", |
---|
416 | readCounters != null); |
---|
417 | } catch (ParseException pe) { |
---|
418 | LOG.warn("While trying to parse counters of task attempt " + id + |
---|
419 | ", " + pe); |
---|
420 | } |
---|
421 | } |
---|
422 | } |
---|
423 | } |
---|
424 | |
---|
425 | /** |
---|
426 | * Validates the format of contents of history file |
---|
427 | * (1) history file exists and in correct location |
---|
428 | * (2) Verify if the history file is parsable |
---|
429 | * (3) Validate the contents of history file |
---|
430 | * (a) Format of all TIMEs are checked against a regex |
---|
431 | * (b) validate legality/format of job level key, values |
---|
432 | * (c) validate legality/format of task level key, values |
---|
433 | * (d) validate legality/format of attempt level key, values |
---|
434 | * (e) check if all the TaskAttempts, Tasks started are finished. |
---|
435 | * Check finish of each TaskAttemptID against its start to make sure |
---|
436 | * that all TaskAttempts, Tasks started are indeed finished and the |
---|
437 | * history log lines are in the proper order. |
---|
438 | * We want to catch ordering of history lines like |
---|
439 | * Task START |
---|
440 | * Attempt START |
---|
441 | * Task FINISH |
---|
442 | * Attempt FINISH |
---|
443 | * (speculative execution is turned off for this). |
---|
444 | * @param id job id |
---|
445 | * @param conf job conf |
---|
446 | */ |
---|
447 | static void validateJobHistoryFileFormat(JobID id, JobConf conf, |
---|
448 | String status, boolean splitsCanBeEmpty) throws IOException { |
---|
449 | |
---|
450 | // Get the history file name |
---|
451 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
452 | |
---|
453 | // Framework history log file location |
---|
454 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
455 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
456 | |
---|
457 | // Check if the history file exists |
---|
458 | assertTrue("History file does not exist", fileSys.exists(logFile)); |
---|
459 | |
---|
460 | |
---|
461 | // check if the history file is parsable |
---|
462 | String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( |
---|
463 | logFileName).split("_"); |
---|
464 | |
---|
465 | String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; |
---|
466 | JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); |
---|
467 | |
---|
468 | TestListener l = new TestListener(jobInfo); |
---|
469 | JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); |
---|
470 | |
---|
471 | |
---|
472 | // validate format of job level key, values |
---|
473 | validateJobLevelKeyValuesFormat(jobInfo.getValues(), status); |
---|
474 | |
---|
475 | // validate format of task level key, values |
---|
476 | validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty); |
---|
477 | |
---|
478 | // validate format of attempt level key, values |
---|
479 | validateTaskAttemptLevelKeyValuesFormat(jobInfo); |
---|
480 | |
---|
481 | // check if all the TaskAttempts, Tasks started are finished for |
---|
482 | // successful jobs |
---|
483 | if (status.equals("SUCCESS")) { |
---|
484 | // Make sure that the lists in taskIDsToAttemptIDs are empty. |
---|
485 | for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) { |
---|
486 | String taskid = it.next(); |
---|
487 | assertTrue("There are some Tasks which are not finished in history " + |
---|
488 | "file.", taskEnds.contains(taskid)); |
---|
489 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
490 | if(attemptIDs != null) { |
---|
491 | assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" + |
---|
492 | " that are not finished.", (attemptIDs.size() == 1)); |
---|
493 | } |
---|
494 | } |
---|
495 | } |
---|
496 | } |
---|
497 | |
---|
498 | // Validate Job Level Keys, Values read from history file by |
---|
499 | // comparing them with the actual values from JT. |
---|
500 | private static void validateJobLevelKeyValues(MiniMRCluster mr, |
---|
501 | RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException { |
---|
502 | |
---|
503 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
504 | JobInProgress jip = jt.getJob(job.getID()); |
---|
505 | |
---|
506 | Map<Keys, String> values = jobInfo.getValues(); |
---|
507 | |
---|
508 | assertTrue("SUBMIT_TIME of job obtained from history file did not " + |
---|
509 | "match the expected value", jip.getStartTime() == |
---|
510 | Long.parseLong(values.get(Keys.SUBMIT_TIME))); |
---|
511 | |
---|
512 | assertTrue("LAUNCH_TIME of job obtained from history file did not " + |
---|
513 | "match the expected value", jip.getLaunchTime() == |
---|
514 | Long.parseLong(values.get(Keys.LAUNCH_TIME))); |
---|
515 | |
---|
516 | assertTrue("FINISH_TIME of job obtained from history file did not " + |
---|
517 | "match the expected value", jip.getFinishTime() == |
---|
518 | Long.parseLong(values.get(Keys.FINISH_TIME))); |
---|
519 | |
---|
520 | assertTrue("Job Status of job obtained from history file did not " + |
---|
521 | "match the expected value", |
---|
522 | values.get(Keys.JOB_STATUS).equals("SUCCESS")); |
---|
523 | |
---|
524 | assertTrue("Job Priority of job obtained from history file did not " + |
---|
525 | "match the expected value", jip.getPriority().toString().equals( |
---|
526 | values.get(Keys.JOB_PRIORITY))); |
---|
527 | |
---|
528 | assertTrue("Job Name of job obtained from history file did not " + |
---|
529 | "match the expected value", JobInfo.getJobName(conf).equals( |
---|
530 | values.get(Keys.JOBNAME))); |
---|
531 | |
---|
532 | assertTrue("User Name of job obtained from history file did not " + |
---|
533 | "match the expected value", JobInfo.getUserName(conf).equals( |
---|
534 | values.get(Keys.USER))); |
---|
535 | |
---|
536 | // Validate job counters |
---|
537 | Counters c = jip.getCounters(); |
---|
538 | assertTrue("Counters of job obtained from history file did not " + |
---|
539 | "match the expected value", |
---|
540 | c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS))); |
---|
541 | |
---|
542 | // Validate number of total maps, total reduces, finished maps, |
---|
543 | // finished reduces, failed maps, failed recudes |
---|
544 | String totalMaps = values.get(Keys.TOTAL_MAPS); |
---|
545 | assertTrue("Unexpected number of total maps in history file", |
---|
546 | Integer.parseInt(totalMaps) == jip.desiredMaps()); |
---|
547 | |
---|
548 | String totalReduces = values.get(Keys.TOTAL_REDUCES); |
---|
549 | assertTrue("Unexpected number of total reduces in history file", |
---|
550 | Integer.parseInt(totalReduces) == jip.desiredReduces()); |
---|
551 | |
---|
552 | String finMaps = values.get(Keys.FINISHED_MAPS); |
---|
553 | assertTrue("Unexpected number of finished maps in history file", |
---|
554 | Integer.parseInt(finMaps) == jip.finishedMaps()); |
---|
555 | |
---|
556 | String finReduces = values.get(Keys.FINISHED_REDUCES); |
---|
557 | assertTrue("Unexpected number of finished reduces in history file", |
---|
558 | Integer.parseInt(finReduces) == jip.finishedReduces()); |
---|
559 | |
---|
560 | String failedMaps = values.get(Keys.FAILED_MAPS); |
---|
561 | assertTrue("Unexpected number of failed maps in history file", |
---|
562 | Integer.parseInt(failedMaps) == jip.failedMapTasks); |
---|
563 | |
---|
564 | String failedReduces = values.get(Keys.FAILED_REDUCES); |
---|
565 | assertTrue("Unexpected number of failed reduces in history file", |
---|
566 | Integer.parseInt(failedReduces) == jip.failedReduceTasks); |
---|
567 | } |
---|
568 | |
---|
569 | // Validate Task Level Keys, Values read from history file by |
---|
570 | // comparing them with the actual values from JT. |
---|
571 | private static void validateTaskLevelKeyValues(MiniMRCluster mr, |
---|
572 | RunningJob job, JobInfo jobInfo) throws IOException { |
---|
573 | |
---|
574 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
575 | JobInProgress jip = jt.getJob(job.getID()); |
---|
576 | |
---|
577 | // Get the 1st map, 1st reduce, cleanup & setup taskIDs and |
---|
578 | // validate their history info |
---|
579 | TaskID mapTaskId = new TaskID(job.getID(), true, 0); |
---|
580 | TaskID reduceTaskId = new TaskID(job.getID(), false, 0); |
---|
581 | |
---|
582 | TaskInProgress cleanups[] = jip.getCleanupTasks(); |
---|
583 | TaskID cleanupTaskId; |
---|
584 | if (cleanups[0].isComplete()) { |
---|
585 | cleanupTaskId = cleanups[0].getTIPId(); |
---|
586 | } |
---|
587 | else { |
---|
588 | cleanupTaskId = cleanups[1].getTIPId(); |
---|
589 | } |
---|
590 | |
---|
591 | TaskInProgress setups[] = jip.getSetupTasks(); |
---|
592 | TaskID setupTaskId; |
---|
593 | if (setups[0].isComplete()) { |
---|
594 | setupTaskId = setups[0].getTIPId(); |
---|
595 | } |
---|
596 | else { |
---|
597 | setupTaskId = setups[1].getTIPId(); |
---|
598 | } |
---|
599 | |
---|
600 | Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks(); |
---|
601 | |
---|
602 | // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce) |
---|
603 | for (JobHistory.Task task : tasks.values()) { |
---|
604 | |
---|
605 | String tid = task.get(Keys.TASKID); |
---|
606 | if (tid.equals(mapTaskId.toString()) || |
---|
607 | tid.equals(reduceTaskId.toString()) || |
---|
608 | tid.equals(cleanupTaskId.toString()) || |
---|
609 | tid.equals(setupTaskId.toString())) { |
---|
610 | |
---|
611 | TaskID taskId = null; |
---|
612 | if (tid.equals(mapTaskId.toString())) { |
---|
613 | taskId = mapTaskId; |
---|
614 | } |
---|
615 | else if (tid.equals(reduceTaskId.toString())) { |
---|
616 | taskId = reduceTaskId; |
---|
617 | } |
---|
618 | else if (tid.equals(cleanupTaskId.toString())) { |
---|
619 | taskId = cleanupTaskId; |
---|
620 | } |
---|
621 | else if (tid.equals(setupTaskId.toString())) { |
---|
622 | taskId = setupTaskId; |
---|
623 | } |
---|
624 | TaskInProgress tip = jip.getTaskInProgress(taskId); |
---|
625 | assertTrue("START_TIME of Task " + tid + " obtained from history " + |
---|
626 | "file did not match the expected value", tip.getExecStartTime() == |
---|
627 | Long.parseLong(task.get(Keys.START_TIME))); |
---|
628 | |
---|
629 | assertTrue("FINISH_TIME of Task " + tid + " obtained from history " + |
---|
630 | "file did not match the expected value", tip.getExecFinishTime() == |
---|
631 | Long.parseLong(task.get(Keys.FINISH_TIME))); |
---|
632 | |
---|
633 | if (taskId == mapTaskId) {//check splits only for map task |
---|
634 | assertTrue("Splits of Task " + tid + " obtained from history file " + |
---|
635 | " did not match the expected value", |
---|
636 | tip.getSplitNodes().equals(task.get(Keys.SPLITS))); |
---|
637 | } |
---|
638 | |
---|
639 | TaskAttemptID attemptId = tip.getSuccessfulTaskid(); |
---|
640 | TaskStatus ts = tip.getTaskStatus(attemptId); |
---|
641 | |
---|
642 | // Validate task counters |
---|
643 | Counters c = ts.getCounters(); |
---|
644 | assertTrue("Counters of Task " + tid + " obtained from history file " + |
---|
645 | " did not match the expected value", |
---|
646 | c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS))); |
---|
647 | } |
---|
648 | } |
---|
649 | } |
---|
650 | |
---|
651 | // Validate Task Attempt Level Keys, Values read from history file by |
---|
652 | // comparing them with the actual values from JT. |
---|
653 | private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr, |
---|
654 | RunningJob job, JobInfo jobInfo) throws IOException { |
---|
655 | |
---|
656 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
657 | JobInProgress jip = jt.getJob(job.getID()); |
---|
658 | |
---|
659 | Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks(); |
---|
660 | |
---|
661 | // For each task |
---|
662 | for (JobHistory.Task task : tasks.values()) { |
---|
663 | // validate info of each attempt |
---|
664 | for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) { |
---|
665 | |
---|
666 | String idStr = attempt.get(Keys.TASK_ATTEMPT_ID); |
---|
667 | TaskAttemptID attemptId = TaskAttemptID.forName(idStr); |
---|
668 | TaskID tid = attemptId.getTaskID(); |
---|
669 | |
---|
670 | // Validate task id |
---|
671 | assertTrue("Task id of Task Attempt " + idStr + " obtained from " + |
---|
672 | "history file did not match the expected value", |
---|
673 | tid.toString().equals(attempt.get(Keys.TASKID))); |
---|
674 | |
---|
675 | TaskInProgress tip = jip.getTaskInProgress(tid); |
---|
676 | TaskStatus ts = tip.getTaskStatus(attemptId); |
---|
677 | |
---|
678 | // Validate task attempt start time |
---|
679 | assertTrue("START_TIME of Task attempt " + idStr + " obtained from " + |
---|
680 | "history file did not match the expected value", |
---|
681 | ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME))); |
---|
682 | |
---|
683 | // Validate task attempt finish time |
---|
684 | assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " + |
---|
685 | "history file did not match the expected value", |
---|
686 | ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME))); |
---|
687 | |
---|
688 | |
---|
689 | TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker()); |
---|
690 | |
---|
691 | if (ttStatus != null) { |
---|
692 | assertTrue("http port of task attempt " + idStr + " obtained from " + |
---|
693 | "history file did not match the expected value", |
---|
694 | ttStatus.getHttpPort() == |
---|
695 | Integer.parseInt(attempt.get(Keys.HTTP_PORT))); |
---|
696 | |
---|
697 | if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) { |
---|
698 | String ttHostname = jt.getNode(ttStatus.getHost()).toString(); |
---|
699 | |
---|
700 | // check if hostname is valid |
---|
701 | assertTrue("Host name of task attempt " + idStr + " obtained from" + |
---|
702 | " history file did not match the expected value", |
---|
703 | ttHostname.equals(attempt.get(Keys.HOSTNAME))); |
---|
704 | } |
---|
705 | } |
---|
706 | if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) { |
---|
707 | // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of |
---|
708 | // Reduce Task Attempts |
---|
709 | if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) { |
---|
710 | assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr + |
---|
711 | " obtained from history file did not match the expected" + |
---|
712 | " value", ts.getShuffleFinishTime() == |
---|
713 | Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED))); |
---|
714 | assertTrue("SORT_FINISHED time of task attempt " + idStr + |
---|
715 | " obtained from history file did not match the expected" + |
---|
716 | " value", ts.getSortFinishTime() == |
---|
717 | Long.parseLong(attempt.get(Keys.SORT_FINISHED))); |
---|
718 | } |
---|
719 | |
---|
720 | //Validate task counters |
---|
721 | Counters c = ts.getCounters(); |
---|
722 | assertTrue("Counters of Task Attempt " + idStr + " obtained from " + |
---|
723 | "history file did not match the expected value", |
---|
724 | c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS))); |
---|
725 | } |
---|
726 | |
---|
727 | // check if tracker name is valid |
---|
728 | assertTrue("Tracker name of task attempt " + idStr + " obtained from " + |
---|
729 | "history file did not match the expected value", |
---|
730 | ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME))); |
---|
731 | } |
---|
732 | } |
---|
733 | } |
---|
734 | |
---|
735 | /** |
---|
736 | * Checks if the history file content is as expected comparing with the |
---|
737 | * actual values obtained from JT. |
---|
738 | * Job Level, Task Level and Task Attempt Level Keys, Values are validated. |
---|
739 | * @param job RunningJob object of the job whose history is to be validated |
---|
740 | * @param conf job conf |
---|
741 | */ |
---|
742 | static void validateJobHistoryFileContent(MiniMRCluster mr, |
---|
743 | RunningJob job, JobConf conf) throws IOException { |
---|
744 | |
---|
745 | JobID id = job.getID(); |
---|
746 | // Get the history file name |
---|
747 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
748 | |
---|
749 | // Framework history log file location |
---|
750 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
751 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
752 | |
---|
753 | // Check if the history file exists |
---|
754 | assertTrue("History file does not exist", fileSys.exists(logFile)); |
---|
755 | |
---|
756 | |
---|
757 | // check if the history file is parsable |
---|
758 | String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( |
---|
759 | logFileName).split("_"); |
---|
760 | |
---|
761 | String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; |
---|
762 | JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); |
---|
763 | |
---|
764 | DefaultJobHistoryParser.JobTasksParseListener l = |
---|
765 | new DefaultJobHistoryParser.JobTasksParseListener(jobInfo); |
---|
766 | JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); |
---|
767 | |
---|
768 | // Now the history file contents are available in jobInfo. Let us compare |
---|
769 | // them with the actual values from JT. |
---|
770 | validateJobLevelKeyValues(mr, job, jobInfo, conf); |
---|
771 | validateTaskLevelKeyValues(mr, job, jobInfo); |
---|
772 | validateTaskAttemptLevelKeyValues(mr, job, jobInfo); |
---|
773 | } |
---|
774 | |
---|
775 | /** Run a job that will be succeeded and validate its history file format |
---|
776 | * and its content. |
---|
777 | */ |
---|
778 | public void testJobHistoryFile() throws IOException { |
---|
779 | MiniMRCluster mr = null; |
---|
780 | try { |
---|
781 | JobConf conf = new JobConf(); |
---|
782 | // keep for less time |
---|
783 | conf.setLong("mapred.jobtracker.retirejob.check", 1000); |
---|
784 | conf.setLong("mapred.jobtracker.retirejob.interval", 1000); |
---|
785 | mr = new MiniMRCluster(2, "file:///", 3, null, null, conf); |
---|
786 | |
---|
787 | // run the TCs |
---|
788 | conf = mr.createJobConf(); |
---|
789 | |
---|
790 | FileSystem fs = FileSystem.get(conf); |
---|
791 | // clean up |
---|
792 | fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true); |
---|
793 | |
---|
794 | Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input"); |
---|
795 | Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output"); |
---|
796 | |
---|
797 | //Disable speculative execution |
---|
798 | conf.setSpeculativeExecution(false); |
---|
799 | |
---|
800 | // Make sure that the job is not removed from memory until we do finish |
---|
801 | // the validation of history file content |
---|
802 | conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10); |
---|
803 | |
---|
804 | // Run a job that will be succeeded and validate its history file |
---|
805 | RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
806 | validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false); |
---|
807 | validateJobHistoryFileContent(mr, job, conf); |
---|
808 | |
---|
809 | // get the job conf filename |
---|
810 | String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID()); |
---|
811 | File file = new File(name); |
---|
812 | |
---|
813 | // check if the file get deleted |
---|
814 | while (file.exists()) { |
---|
815 | LOG.info("Waiting for " + file + " to be deleted"); |
---|
816 | UtilsForTests.waitFor(100); |
---|
817 | } |
---|
818 | } finally { |
---|
819 | if (mr != null) { |
---|
820 | cleanupLocalFiles(mr); |
---|
821 | mr.shutdown(); |
---|
822 | } |
---|
823 | } |
---|
824 | } |
---|
825 | |
---|
826 | // Returns the output path where user history log file is written to with |
---|
827 | // default configuration setting for hadoop.job.history.user.location |
---|
828 | private static Path getLogLocationInOutputPath(String logFileName, |
---|
829 | JobConf conf) { |
---|
830 | JobConf jobConf = new JobConf(true);//default JobConf |
---|
831 | FileOutputFormat.setOutputPath(jobConf, |
---|
832 | FileOutputFormat.getOutputPath(conf)); |
---|
833 | return JobHistory.JobInfo.getJobHistoryLogLocationForUser( |
---|
834 | logFileName, jobConf); |
---|
835 | } |
---|
836 | |
---|
837 | /** |
---|
838 | * Checks if the user history file exists in the correct dir |
---|
839 | * @param id job id |
---|
840 | * @param conf job conf |
---|
841 | */ |
---|
842 | private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) |
---|
843 | throws IOException { |
---|
844 | // Get the history file name |
---|
845 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
846 | |
---|
847 | // User history log file location |
---|
848 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser( |
---|
849 | logFileName, conf); |
---|
850 | if(logFile == null) { |
---|
851 | // get the output path where history file is written to when |
---|
852 | // hadoop.job.history.user.location is not set |
---|
853 | logFile = getLogLocationInOutputPath(logFileName, conf); |
---|
854 | } |
---|
855 | FileSystem fileSys = null; |
---|
856 | fileSys = logFile.getFileSystem(conf); |
---|
857 | |
---|
858 | // Check if the user history file exists in the correct dir |
---|
859 | if (conf.get("hadoop.job.history.user.location") == null) { |
---|
860 | assertTrue("User log file " + logFile + " does not exist", |
---|
861 | fileSys.exists(logFile)); |
---|
862 | } |
---|
863 | else if ("none".equals(conf.get("hadoop.job.history.user.location"))) { |
---|
864 | // history file should not exist in the output path |
---|
865 | assertFalse("Unexpected. User log file exists in output dir when " + |
---|
866 | "hadoop.job.history.user.location is set to \"none\"", |
---|
867 | fileSys.exists(logFile)); |
---|
868 | } |
---|
869 | else { |
---|
870 | //hadoop.job.history.user.location is set to a specific location. |
---|
871 | // User log file should exist in that location |
---|
872 | assertTrue("User log file " + logFile + " does not exist", |
---|
873 | fileSys.exists(logFile)); |
---|
874 | |
---|
875 | // User log file should not exist in output path. |
---|
876 | |
---|
877 | // get the output path where history file is written to when |
---|
878 | // hadoop.job.history.user.location is not set |
---|
879 | Path logFile1 = getLogLocationInOutputPath(logFileName, conf); |
---|
880 | |
---|
881 | if (logFile != logFile1) { |
---|
882 | fileSys = logFile1.getFileSystem(conf); |
---|
883 | assertFalse("Unexpected. User log file exists in output dir when " + |
---|
884 | "hadoop.job.history.user.location is set to a different location", |
---|
885 | fileSys.exists(logFile1)); |
---|
886 | } |
---|
887 | } |
---|
888 | } |
---|
889 | |
---|
890 | // Validate user history file location for the given values of |
---|
891 | // hadoop.job.history.user.location as |
---|
892 | // (1)null(default case), (2)"none", and (3)some dir "/tmp" |
---|
893 | public void testJobHistoryUserLogLocation() throws IOException { |
---|
894 | MiniMRCluster mr = null; |
---|
895 | try { |
---|
896 | mr = new MiniMRCluster(2, "file:///", 3); |
---|
897 | |
---|
898 | // run the TCs |
---|
899 | JobConf conf = mr.createJobConf(); |
---|
900 | |
---|
901 | FileSystem fs = FileSystem.get(conf); |
---|
902 | // clean up |
---|
903 | fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true); |
---|
904 | |
---|
905 | Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1"); |
---|
906 | Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1"); |
---|
907 | |
---|
908 | // validate for the case of null(default) |
---|
909 | RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
910 | validateJobHistoryUserLogLocation(job.getID(), conf); |
---|
911 | |
---|
912 | inDir = new Path(TEST_ROOT_DIR + "/succeed/input2"); |
---|
913 | outDir = new Path(TEST_ROOT_DIR + "/succeed/output2"); |
---|
914 | // validate for the case of "none" |
---|
915 | conf.set("hadoop.job.history.user.location", "none"); |
---|
916 | job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
917 | validateJobHistoryUserLogLocation(job.getID(), conf); |
---|
918 | |
---|
919 | inDir = new Path(TEST_ROOT_DIR + "/succeed/input3"); |
---|
920 | outDir = new Path(TEST_ROOT_DIR + "/succeed/output3"); |
---|
921 | // validate for the case of any dir |
---|
922 | conf.set("hadoop.job.history.user.location", "/tmp"); |
---|
923 | job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
924 | validateJobHistoryUserLogLocation(job.getID(), conf); |
---|
925 | |
---|
926 | } finally { |
---|
927 | if (mr != null) { |
---|
928 | cleanupLocalFiles(mr); |
---|
929 | mr.shutdown(); |
---|
930 | } |
---|
931 | } |
---|
932 | } |
---|
933 | |
---|
934 | private void cleanupLocalFiles(MiniMRCluster mr) |
---|
935 | throws IOException { |
---|
936 | Configuration conf = mr.createJobConf(); |
---|
937 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
938 | Path sysDir = new Path(jt.getSystemDir()); |
---|
939 | FileSystem fs = sysDir.getFileSystem(conf); |
---|
940 | fs.delete(sysDir, true); |
---|
941 | Path jobHistoryDir = JobHistory.getJobHistoryLocation(); |
---|
942 | fs = jobHistoryDir.getFileSystem(conf); |
---|
943 | fs.delete(jobHistoryDir, true); |
---|
944 | } |
---|
945 | |
---|
946 | /** |
---|
947 | * Checks if the history file has expected job status |
---|
948 | * @param id job id |
---|
949 | * @param conf job conf |
---|
950 | */ |
---|
951 | private static void validateJobHistoryJobStatus(JobID id, JobConf conf, |
---|
952 | String status) throws IOException { |
---|
953 | |
---|
954 | // Get the history file name |
---|
955 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
956 | |
---|
957 | // Framework history log file location |
---|
958 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
959 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
960 | |
---|
961 | // Check if the history file exists |
---|
962 | assertTrue("History file does not exist", fileSys.exists(logFile)); |
---|
963 | |
---|
964 | // check history file permission |
---|
965 | assertTrue("History file permissions does not match", |
---|
966 | fileSys.getFileStatus(logFile).getPermission().equals( |
---|
967 | new FsPermission(JobHistory.HISTORY_FILE_PERMISSION))); |
---|
968 | |
---|
969 | // check if the history file is parsable |
---|
970 | String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( |
---|
971 | logFileName).split("_"); |
---|
972 | |
---|
973 | String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; |
---|
974 | JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); |
---|
975 | |
---|
976 | DefaultJobHistoryParser.JobTasksParseListener l = |
---|
977 | new DefaultJobHistoryParser.JobTasksParseListener(jobInfo); |
---|
978 | JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); |
---|
979 | |
---|
980 | assertTrue("Job Status read from job history file is not the expected" + |
---|
981 | " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS))); |
---|
982 | } |
---|
983 | |
---|
984 | // run jobs that will be (1) succeeded (2) failed (3) killed |
---|
985 | // and validate job status read from history file in each case |
---|
986 | public void testJobHistoryJobStatus() throws IOException { |
---|
987 | MiniMRCluster mr = null; |
---|
988 | try { |
---|
989 | mr = new MiniMRCluster(2, "file:///", 3); |
---|
990 | |
---|
991 | // run the TCs |
---|
992 | JobConf conf = mr.createJobConf(); |
---|
993 | |
---|
994 | FileSystem fs = FileSystem.get(conf); |
---|
995 | // clean up |
---|
996 | fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true); |
---|
997 | |
---|
998 | Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input"); |
---|
999 | Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output"); |
---|
1000 | |
---|
1001 | // Run a job that will be succeeded and validate its job status |
---|
1002 | // existing in history file |
---|
1003 | RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
1004 | validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS"); |
---|
1005 | long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan(); |
---|
1006 | assertTrue(historyCleanerRanAt != 0); |
---|
1007 | |
---|
1008 | // Run a job that will be failed and validate its job status |
---|
1009 | // existing in history file |
---|
1010 | job = UtilsForTests.runJobFail(conf, inDir, outDir); |
---|
1011 | validateJobHistoryJobStatus(job.getID(), conf, "FAILED"); |
---|
1012 | assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan()); |
---|
1013 | |
---|
1014 | // Run a job that will be killed and validate its job status |
---|
1015 | // existing in history file |
---|
1016 | job = UtilsForTests.runJobKill(conf, inDir, outDir); |
---|
1017 | validateJobHistoryJobStatus(job.getID(), conf, "KILLED"); |
---|
1018 | assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan()); |
---|
1019 | |
---|
1020 | } finally { |
---|
1021 | if (mr != null) { |
---|
1022 | cleanupLocalFiles(mr); |
---|
1023 | mr.shutdown(); |
---|
1024 | } |
---|
1025 | } |
---|
1026 | } |
---|
1027 | } |
---|