[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.mapred; |
---|
| 20 | |
---|
| 21 | import java.io.File; |
---|
| 22 | import java.io.IOException; |
---|
| 23 | import java.text.ParseException; |
---|
| 24 | import java.util.ArrayList; |
---|
| 25 | import java.util.List; |
---|
| 26 | import java.util.HashMap; |
---|
| 27 | import java.util.Map; |
---|
| 28 | import java.util.Iterator; |
---|
| 29 | import java.util.regex.Matcher; |
---|
| 30 | import java.util.regex.Pattern; |
---|
| 31 | |
---|
| 32 | import junit.framework.TestCase; |
---|
| 33 | |
---|
| 34 | import org.apache.hadoop.conf.Configuration; |
---|
| 35 | import org.apache.hadoop.fs.FileSystem; |
---|
| 36 | import org.apache.hadoop.fs.Path; |
---|
| 37 | import org.apache.hadoop.fs.permission.FsPermission; |
---|
| 38 | import org.apache.hadoop.mapred.JobHistory.*; |
---|
| 39 | import org.apache.commons.logging.Log; |
---|
| 40 | import org.apache.commons.logging.LogFactory; |
---|
| 41 | |
---|
| 42 | /** |
---|
| 43 | * Tests the JobHistory files - to catch any changes to JobHistory that can |
---|
| 44 | * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer. |
---|
| 45 | * |
---|
| 46 | * testJobHistoryFile |
---|
| 47 | * Run a job that will be succeeded and validate its history file format and |
---|
| 48 | * content. |
---|
| 49 | * |
---|
| 50 | * testJobHistoryUserLogLocation |
---|
| 51 | * Run jobs with the given values of hadoop.job.history.user.location as |
---|
| 52 | * (1)null(default case), (2)"none", and (3)some dir like "/tmp". |
---|
| 53 | * Validate user history file location in each case. |
---|
| 54 | * |
---|
| 55 | * testJobHistoryJobStatus |
---|
| 56 | * Run jobs that will be (1) succeeded (2) failed (3) killed. |
---|
| 57 | * Validate job status read from history file in each case. |
---|
| 58 | * |
---|
| 59 | * Future changes to job history are to be reflected here in this file. |
---|
| 60 | */ |
---|
| 61 | public class TestJobHistory extends TestCase { |
---|
| 62 | private static final Log LOG = LogFactory.getLog(TestJobHistory.class); |
---|
| 63 | |
---|
| 64 | private static String TEST_ROOT_DIR = new File(System.getProperty( |
---|
| 65 | "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); |
---|
| 66 | |
---|
| 67 | private static final Pattern digitsPattern = |
---|
| 68 | Pattern.compile(JobHistory.DIGITS); |
---|
| 69 | |
---|
| 70 | // hostname like /default-rack/host1.foo.com OR host1.foo.com |
---|
| 71 | private static final Pattern hostNamePattern = Pattern.compile( |
---|
| 72 | "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)"); |
---|
| 73 | |
---|
| 74 | private static final String IP_ADDR = |
---|
| 75 | "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?"; |
---|
| 76 | |
---|
| 77 | // hostname like /default-rack/host1.foo.com OR host1.foo.com |
---|
| 78 | private static final Pattern trackerNamePattern = Pattern.compile( |
---|
| 79 | "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" + |
---|
| 80 | IP_ADDR + ":" + JobHistory.DIGITS); |
---|
| 81 | |
---|
| 82 | private static final Pattern splitsPattern = Pattern.compile( |
---|
| 83 | hostNamePattern + "(," + hostNamePattern + ")*"); |
---|
| 84 | |
---|
| 85 | private static Map<String, List<String>> taskIDsToAttemptIDs = |
---|
| 86 | new HashMap<String, List<String>>(); |
---|
| 87 | |
---|
| 88 | //Each Task End seen from history file is added here |
---|
| 89 | private static List<String> taskEnds = new ArrayList<String>(); |
---|
| 90 | |
---|
| 91 | // List of tasks that appear in history file after JT reatart. This is to |
---|
| 92 | // allow START_TIME=0 for these tasks. |
---|
| 93 | private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>(); |
---|
| 94 | |
---|
| 95 | // List of potential tasks whose start time can be 0 because of JT restart |
---|
| 96 | private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>(); |
---|
| 97 | |
---|
| 98 | /** |
---|
| 99 | * Listener for history log file, it populates JobHistory.JobInfo |
---|
| 100 | * object with data from log file and validates the data. |
---|
| 101 | */ |
---|
| 102 | static class TestListener |
---|
| 103 | extends DefaultJobHistoryParser.JobTasksParseListener { |
---|
| 104 | int lineNum;//line number of history log file |
---|
| 105 | boolean isJobLaunched; |
---|
| 106 | boolean isJTRestarted; |
---|
| 107 | |
---|
| 108 | TestListener(JobInfo job) { |
---|
| 109 | super(job); |
---|
| 110 | lineNum = 0; |
---|
| 111 | isJobLaunched = false; |
---|
| 112 | isJTRestarted = false; |
---|
| 113 | } |
---|
| 114 | |
---|
| 115 | // TestListener implementation |
---|
| 116 | public void handle(RecordTypes recType, Map<Keys, String> values) |
---|
| 117 | throws IOException { |
---|
| 118 | |
---|
| 119 | lineNum++; |
---|
| 120 | |
---|
| 121 | // Check if the record is of type Meta |
---|
| 122 | if (recType == JobHistory.RecordTypes.Meta) { |
---|
| 123 | long version = Long.parseLong(values.get(Keys.VERSION)); |
---|
| 124 | assertTrue("Unexpected job history version ", |
---|
| 125 | (version >= 0 && version <= JobHistory.VERSION)); |
---|
| 126 | } |
---|
| 127 | else if (recType.equals(RecordTypes.Job)) { |
---|
| 128 | String jobid = values.get(Keys.JOBID); |
---|
| 129 | assertTrue("record type 'Job' is seen without JOBID key" + |
---|
| 130 | " in history file at line " + lineNum, jobid != null); |
---|
| 131 | JobID id = JobID.forName(jobid); |
---|
| 132 | assertTrue("JobID in history file is in unexpected format " + |
---|
| 133 | "at line " + lineNum, id != null); |
---|
| 134 | String time = values.get(Keys.LAUNCH_TIME); |
---|
| 135 | if (time != null) { |
---|
| 136 | if (isJobLaunched) { |
---|
| 137 | // We assume that if we see LAUNCH_TIME again, it is because of JT restart |
---|
| 138 | isJTRestarted = true; |
---|
| 139 | } |
---|
| 140 | else {// job launched first time |
---|
| 141 | isJobLaunched = true; |
---|
| 142 | } |
---|
| 143 | } |
---|
| 144 | time = values.get(Keys.FINISH_TIME); |
---|
| 145 | if (time != null) { |
---|
| 146 | assertTrue ("Job FINISH_TIME is seen in history file at line " + |
---|
| 147 | lineNum + " before LAUNCH_TIME is seen", isJobLaunched); |
---|
| 148 | } |
---|
| 149 | } |
---|
| 150 | else if (recType.equals(RecordTypes.Task)) { |
---|
| 151 | String taskid = values.get(Keys.TASKID); |
---|
| 152 | assertTrue("record type 'Task' is seen without TASKID key" + |
---|
| 153 | " in history file at line " + lineNum, taskid != null); |
---|
| 154 | TaskID id = TaskID.forName(taskid); |
---|
| 155 | assertTrue("TaskID in history file is in unexpected format " + |
---|
| 156 | "at line " + lineNum, id != null); |
---|
| 157 | |
---|
| 158 | String time = values.get(Keys.START_TIME); |
---|
| 159 | if (time != null) { |
---|
| 160 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
| 161 | assertTrue("Duplicate START_TIME seen for task " + taskid + |
---|
| 162 | " in history file at line " + lineNum, attemptIDs == null); |
---|
| 163 | attemptIDs = new ArrayList<String>(); |
---|
| 164 | taskIDsToAttemptIDs.put(taskid, attemptIDs); |
---|
| 165 | |
---|
| 166 | if (isJTRestarted) { |
---|
| 167 | // This maintains a potential ignoreStartTimeTasks list |
---|
| 168 | tempIgnoreStartTimeOfTasks.add(taskid); |
---|
| 169 | } |
---|
| 170 | } |
---|
| 171 | |
---|
| 172 | time = values.get(Keys.FINISH_TIME); |
---|
| 173 | if (time != null) { |
---|
| 174 | String s = values.get(Keys.TASK_STATUS); |
---|
| 175 | if (s != null) { |
---|
| 176 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
| 177 | assertTrue ("Task FINISH_TIME is seen in history file at line " + |
---|
| 178 | lineNum + " before START_TIME is seen", attemptIDs != null); |
---|
| 179 | |
---|
| 180 | // Check if all the attemptIDs of this task are finished |
---|
| 181 | assertTrue("TaskId " + taskid + " is finished at line " + |
---|
| 182 | lineNum + " but its attemptID is not finished.", |
---|
| 183 | (attemptIDs.size() <= 1)); |
---|
| 184 | |
---|
| 185 | // Check if at least 1 attempt of this task is seen |
---|
| 186 | assertTrue("TaskId " + taskid + " is finished at line " + |
---|
| 187 | lineNum + " but no attemptID is seen before this.", |
---|
| 188 | attemptIDs.size() == 1); |
---|
| 189 | |
---|
| 190 | if (s.equals("KILLED") || s.equals("FAILED")) { |
---|
| 191 | // Task End with KILLED/FAILED status in history file is |
---|
| 192 | // considered as TaskEnd, TaskStart. This is useful in checking |
---|
| 193 | // the order of history lines. |
---|
| 194 | attemptIDs = new ArrayList<String>(); |
---|
| 195 | taskIDsToAttemptIDs.put(taskid, attemptIDs); |
---|
| 196 | } |
---|
| 197 | else { |
---|
| 198 | taskEnds.add(taskid); |
---|
| 199 | } |
---|
| 200 | } |
---|
| 201 | else { |
---|
| 202 | // This line of history file could be just an update to finish time |
---|
| 203 | } |
---|
| 204 | } |
---|
| 205 | } |
---|
| 206 | else if (recType.equals(RecordTypes.MapAttempt) || |
---|
| 207 | recType.equals(RecordTypes.ReduceAttempt)) { |
---|
| 208 | String taskid = values.get(Keys.TASKID); |
---|
| 209 | assertTrue("record type " + recType + " is seen without TASKID key" + |
---|
| 210 | " in history file at line " + lineNum, taskid != null); |
---|
| 211 | |
---|
| 212 | String attemptId = values.get(Keys.TASK_ATTEMPT_ID); |
---|
| 213 | TaskAttemptID id = TaskAttemptID.forName(attemptId); |
---|
| 214 | assertTrue("AttemptID in history file is in unexpected format " + |
---|
| 215 | "at line " + lineNum, id != null); |
---|
| 216 | |
---|
| 217 | String time = values.get(Keys.START_TIME); |
---|
| 218 | if (time != null) { |
---|
| 219 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
| 220 | assertTrue ("TaskAttempt is seen in history file at line " + lineNum + |
---|
| 221 | " before Task is seen", attemptIDs != null); |
---|
| 222 | assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " + |
---|
| 223 | "file at line " + lineNum, attemptIDs.remove(attemptId)); |
---|
| 224 | |
---|
| 225 | if (attemptIDs.isEmpty()) { |
---|
| 226 | //just a boolean whether any attempt is seen or not |
---|
| 227 | attemptIDs.add("firstAttemptIsSeen"); |
---|
| 228 | } |
---|
| 229 | attemptIDs.add(attemptId); |
---|
| 230 | |
---|
| 231 | if (tempIgnoreStartTimeOfTasks.contains(taskid) && |
---|
| 232 | (id.getId() < 1000)) { |
---|
| 233 | // If Task line of this attempt is seen in history file after |
---|
| 234 | // JT restart and if this attempt is < 1000(i.e. attempt is noti |
---|
| 235 | // started after JT restart) - assuming single JT restart happened |
---|
| 236 | ignoreStartTimeOfTasks.add(taskid); |
---|
| 237 | } |
---|
| 238 | } |
---|
| 239 | |
---|
| 240 | time = values.get(Keys.FINISH_TIME); |
---|
| 241 | if (time != null) { |
---|
| 242 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
| 243 | assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line " |
---|
| 244 | + lineNum + " before Task is seen", attemptIDs != null); |
---|
| 245 | |
---|
| 246 | assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line " |
---|
| 247 | + lineNum + " before TaskAttempt START_TIME is seen", |
---|
| 248 | attemptIDs.remove(attemptId)); |
---|
| 249 | } |
---|
| 250 | } |
---|
| 251 | super.handle(recType, values); |
---|
| 252 | } |
---|
| 253 | } |
---|
| 254 | |
---|
| 255 | // Check if the time is in the expected format |
---|
| 256 | private static boolean isTimeValid(String time) { |
---|
| 257 | Matcher m = digitsPattern.matcher(time); |
---|
| 258 | return m.matches() && (Long.parseLong(time) > 0); |
---|
| 259 | } |
---|
| 260 | |
---|
| 261 | private static boolean areTimesInOrder(String time1, String time2) { |
---|
| 262 | return (Long.parseLong(time1) <= Long.parseLong(time2)); |
---|
| 263 | } |
---|
| 264 | |
---|
| 265 | // Validate Format of Job Level Keys, Values read from history file |
---|
| 266 | private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values, |
---|
| 267 | String status) { |
---|
| 268 | String time = values.get(Keys.SUBMIT_TIME); |
---|
| 269 | assertTrue("Job SUBMIT_TIME is in unexpected format:" + time + |
---|
| 270 | " in history file", isTimeValid(time)); |
---|
| 271 | |
---|
| 272 | time = values.get(Keys.LAUNCH_TIME); |
---|
| 273 | assertTrue("Job LAUNCH_TIME is in unexpected format:" + time + |
---|
| 274 | " in history file", isTimeValid(time)); |
---|
| 275 | |
---|
| 276 | String time1 = values.get(Keys.FINISH_TIME); |
---|
| 277 | assertTrue("Job FINISH_TIME is in unexpected format:" + time1 + |
---|
| 278 | " in history file", isTimeValid(time1)); |
---|
| 279 | assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file", |
---|
| 280 | areTimesInOrder(time, time1)); |
---|
| 281 | |
---|
| 282 | String stat = values.get(Keys.JOB_STATUS); |
---|
| 283 | assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" + |
---|
| 284 | " history file", (status.equals(stat))); |
---|
| 285 | |
---|
| 286 | String priority = values.get(Keys.JOB_PRIORITY); |
---|
| 287 | assertTrue("Unknown priority for the job in history file", |
---|
| 288 | (priority.equals("HIGH") || |
---|
| 289 | priority.equals("LOW") || priority.equals("NORMAL") || |
---|
| 290 | priority.equals("VERY_HIGH") || priority.equals("VERY_LOW"))); |
---|
| 291 | } |
---|
| 292 | |
---|
| 293 | // Validate Format of Task Level Keys, Values read from history file |
---|
| 294 | private static void validateTaskLevelKeyValuesFormat(JobInfo job, |
---|
| 295 | boolean splitsCanBeEmpty) { |
---|
| 296 | Map<String, JobHistory.Task> tasks = job.getAllTasks(); |
---|
| 297 | |
---|
| 298 | // validate info of each task |
---|
| 299 | for (JobHistory.Task task : tasks.values()) { |
---|
| 300 | |
---|
| 301 | String tid = task.get(Keys.TASKID); |
---|
| 302 | String time = task.get(Keys.START_TIME); |
---|
| 303 | // We allow START_TIME=0 for tasks seen in history after JT restart |
---|
| 304 | if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) { |
---|
| 305 | assertTrue("Task START_TIME of " + tid + " is in unexpected format:" + |
---|
| 306 | time + " in history file", isTimeValid(time)); |
---|
| 307 | } |
---|
| 308 | |
---|
| 309 | String time1 = task.get(Keys.FINISH_TIME); |
---|
| 310 | assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" + |
---|
| 311 | time1 + " in history file", isTimeValid(time1)); |
---|
| 312 | assertTrue("Task FINISH_TIME is < START_TIME in history file", |
---|
| 313 | areTimesInOrder(time, time1)); |
---|
| 314 | |
---|
| 315 | // Make sure that the Task type exists and it is valid |
---|
| 316 | String type = task.get(Keys.TASK_TYPE); |
---|
| 317 | assertTrue("Unknown Task type \"" + type + "\" is seen in " + |
---|
| 318 | "history file for task " + tid, |
---|
| 319 | (type.equals("MAP") || type.equals("REDUCE") || |
---|
| 320 | type.equals("SETUP") || type.equals("CLEANUP"))); |
---|
| 321 | |
---|
| 322 | if (type.equals("MAP")) { |
---|
| 323 | String splits = task.get(Keys.SPLITS); |
---|
| 324 | //order in the condition OR check is important here |
---|
| 325 | if (!splitsCanBeEmpty || splits.length() != 0) { |
---|
| 326 | Matcher m = splitsPattern.matcher(splits); |
---|
| 327 | assertTrue("Unexpected format of SPLITS \"" + splits + "\" is seen" + |
---|
| 328 | " in history file for task " + tid, m.matches()); |
---|
| 329 | } |
---|
| 330 | } |
---|
| 331 | |
---|
| 332 | // Validate task status |
---|
| 333 | String status = task.get(Keys.TASK_STATUS); |
---|
| 334 | assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" + |
---|
| 335 | " history file for task " + tid, (status.equals("SUCCESS") || |
---|
| 336 | status.equals("FAILED") || status.equals("KILLED"))); |
---|
| 337 | } |
---|
| 338 | } |
---|
| 339 | |
---|
| 340 | // Validate foramt of Task Attempt Level Keys, Values read from history file |
---|
| 341 | private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) { |
---|
| 342 | Map<String, JobHistory.Task> tasks = job.getAllTasks(); |
---|
| 343 | |
---|
| 344 | // For each task |
---|
| 345 | for (JobHistory.Task task : tasks.values()) { |
---|
| 346 | // validate info of each attempt |
---|
| 347 | for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) { |
---|
| 348 | |
---|
| 349 | String id = attempt.get(Keys.TASK_ATTEMPT_ID); |
---|
| 350 | String time = attempt.get(Keys.START_TIME); |
---|
| 351 | assertTrue("START_TIME of task attempt " + id + |
---|
| 352 | " is in unexpected format:" + time + |
---|
| 353 | " in history file", isTimeValid(time)); |
---|
| 354 | |
---|
| 355 | String time1 = attempt.get(Keys.FINISH_TIME); |
---|
| 356 | assertTrue("FINISH_TIME of task attempt " + id + |
---|
| 357 | " is in unexpected format:" + time1 + |
---|
| 358 | " in history file", isTimeValid(time1)); |
---|
| 359 | assertTrue("Task FINISH_TIME is < START_TIME in history file", |
---|
| 360 | areTimesInOrder(time, time1)); |
---|
| 361 | |
---|
| 362 | // Make sure that the Task type exists and it is valid |
---|
| 363 | String type = attempt.get(Keys.TASK_TYPE); |
---|
| 364 | assertTrue("Unknown Task type \"" + type + "\" is seen in " + |
---|
| 365 | "history file for task attempt " + id, |
---|
| 366 | (type.equals("MAP") || type.equals("REDUCE") || |
---|
| 367 | type.equals("SETUP") || type.equals("CLEANUP"))); |
---|
| 368 | |
---|
| 369 | // Validate task status |
---|
| 370 | String status = attempt.get(Keys.TASK_STATUS); |
---|
| 371 | assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" + |
---|
| 372 | " history file for task attempt " + id, |
---|
| 373 | (status.equals("SUCCESS") || status.equals("FAILED") || |
---|
| 374 | status.equals("KILLED"))); |
---|
| 375 | |
---|
| 376 | // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and |
---|
| 377 | // SORT_FINISHED time |
---|
| 378 | if (type.equals("REDUCE") && status.equals("SUCCESS")) { |
---|
| 379 | time1 = attempt.get(Keys.SHUFFLE_FINISHED); |
---|
| 380 | assertTrue("SHUFFLE_FINISHED time of task attempt " + id + |
---|
| 381 | " is in unexpected format:" + time1 + |
---|
| 382 | " in history file", isTimeValid(time1)); |
---|
| 383 | assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " + |
---|
| 384 | "in history file", areTimesInOrder(time, time1)); |
---|
| 385 | time = attempt.get(Keys.SORT_FINISHED); |
---|
| 386 | assertTrue("SORT_FINISHED of task attempt " + id + |
---|
| 387 | " is in unexpected format:" + time + |
---|
| 388 | " in history file", isTimeValid(time)); |
---|
| 389 | assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" + |
---|
| 390 | " in history file", areTimesInOrder(time1, time)); |
---|
| 391 | } |
---|
| 392 | |
---|
| 393 | // check if hostname is valid |
---|
| 394 | String hostname = attempt.get(Keys.HOSTNAME); |
---|
| 395 | Matcher m = hostNamePattern.matcher(hostname); |
---|
| 396 | assertTrue("Unexpected Host name of task attempt " + id, m.matches()); |
---|
| 397 | |
---|
| 398 | // check if trackername is valid |
---|
| 399 | String trackerName = attempt.get(Keys.TRACKER_NAME); |
---|
| 400 | m = trackerNamePattern.matcher(trackerName); |
---|
| 401 | assertTrue("Unexpected tracker name of task attempt " + id, |
---|
| 402 | m.matches()); |
---|
| 403 | |
---|
| 404 | if (!status.equals("KILLED")) { |
---|
| 405 | // check if http port is valid |
---|
| 406 | String httpPort = attempt.get(Keys.HTTP_PORT); |
---|
| 407 | m = digitsPattern.matcher(httpPort); |
---|
| 408 | assertTrue("Unexpected http port of task attempt " + id, m.matches()); |
---|
| 409 | } |
---|
| 410 | |
---|
| 411 | // check if counters are parsable |
---|
| 412 | String counters = attempt.get(Keys.COUNTERS); |
---|
| 413 | try { |
---|
| 414 | Counters readCounters = Counters.fromEscapedCompactString(counters); |
---|
| 415 | assertTrue("Counters of task attempt " + id + " are not parsable", |
---|
| 416 | readCounters != null); |
---|
| 417 | } catch (ParseException pe) { |
---|
| 418 | LOG.warn("While trying to parse counters of task attempt " + id + |
---|
| 419 | ", " + pe); |
---|
| 420 | } |
---|
| 421 | } |
---|
| 422 | } |
---|
| 423 | } |
---|
| 424 | |
---|
| 425 | /** |
---|
| 426 | * Validates the format of contents of history file |
---|
| 427 | * (1) history file exists and in correct location |
---|
| 428 | * (2) Verify if the history file is parsable |
---|
| 429 | * (3) Validate the contents of history file |
---|
| 430 | * (a) Format of all TIMEs are checked against a regex |
---|
| 431 | * (b) validate legality/format of job level key, values |
---|
| 432 | * (c) validate legality/format of task level key, values |
---|
| 433 | * (d) validate legality/format of attempt level key, values |
---|
| 434 | * (e) check if all the TaskAttempts, Tasks started are finished. |
---|
| 435 | * Check finish of each TaskAttemptID against its start to make sure |
---|
| 436 | * that all TaskAttempts, Tasks started are indeed finished and the |
---|
| 437 | * history log lines are in the proper order. |
---|
| 438 | * We want to catch ordering of history lines like |
---|
| 439 | * Task START |
---|
| 440 | * Attempt START |
---|
| 441 | * Task FINISH |
---|
| 442 | * Attempt FINISH |
---|
| 443 | * (speculative execution is turned off for this). |
---|
| 444 | * @param id job id |
---|
| 445 | * @param conf job conf |
---|
| 446 | */ |
---|
| 447 | static void validateJobHistoryFileFormat(JobID id, JobConf conf, |
---|
| 448 | String status, boolean splitsCanBeEmpty) throws IOException { |
---|
| 449 | |
---|
| 450 | // Get the history file name |
---|
| 451 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
| 452 | |
---|
| 453 | // Framework history log file location |
---|
| 454 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
| 455 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
| 456 | |
---|
| 457 | // Check if the history file exists |
---|
| 458 | assertTrue("History file does not exist", fileSys.exists(logFile)); |
---|
| 459 | |
---|
| 460 | |
---|
| 461 | // check if the history file is parsable |
---|
| 462 | String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( |
---|
| 463 | logFileName).split("_"); |
---|
| 464 | |
---|
| 465 | String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; |
---|
| 466 | JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); |
---|
| 467 | |
---|
| 468 | TestListener l = new TestListener(jobInfo); |
---|
| 469 | JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); |
---|
| 470 | |
---|
| 471 | |
---|
| 472 | // validate format of job level key, values |
---|
| 473 | validateJobLevelKeyValuesFormat(jobInfo.getValues(), status); |
---|
| 474 | |
---|
| 475 | // validate format of task level key, values |
---|
| 476 | validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty); |
---|
| 477 | |
---|
| 478 | // validate format of attempt level key, values |
---|
| 479 | validateTaskAttemptLevelKeyValuesFormat(jobInfo); |
---|
| 480 | |
---|
| 481 | // check if all the TaskAttempts, Tasks started are finished for |
---|
| 482 | // successful jobs |
---|
| 483 | if (status.equals("SUCCESS")) { |
---|
| 484 | // Make sure that the lists in taskIDsToAttemptIDs are empty. |
---|
| 485 | for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) { |
---|
| 486 | String taskid = it.next(); |
---|
| 487 | assertTrue("There are some Tasks which are not finished in history " + |
---|
| 488 | "file.", taskEnds.contains(taskid)); |
---|
| 489 | List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); |
---|
| 490 | if(attemptIDs != null) { |
---|
| 491 | assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" + |
---|
| 492 | " that are not finished.", (attemptIDs.size() == 1)); |
---|
| 493 | } |
---|
| 494 | } |
---|
| 495 | } |
---|
| 496 | } |
---|
| 497 | |
---|
| 498 | // Validate Job Level Keys, Values read from history file by |
---|
| 499 | // comparing them with the actual values from JT. |
---|
| 500 | private static void validateJobLevelKeyValues(MiniMRCluster mr, |
---|
| 501 | RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException { |
---|
| 502 | |
---|
| 503 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
| 504 | JobInProgress jip = jt.getJob(job.getID()); |
---|
| 505 | |
---|
| 506 | Map<Keys, String> values = jobInfo.getValues(); |
---|
| 507 | |
---|
| 508 | assertTrue("SUBMIT_TIME of job obtained from history file did not " + |
---|
| 509 | "match the expected value", jip.getStartTime() == |
---|
| 510 | Long.parseLong(values.get(Keys.SUBMIT_TIME))); |
---|
| 511 | |
---|
| 512 | assertTrue("LAUNCH_TIME of job obtained from history file did not " + |
---|
| 513 | "match the expected value", jip.getLaunchTime() == |
---|
| 514 | Long.parseLong(values.get(Keys.LAUNCH_TIME))); |
---|
| 515 | |
---|
| 516 | assertTrue("FINISH_TIME of job obtained from history file did not " + |
---|
| 517 | "match the expected value", jip.getFinishTime() == |
---|
| 518 | Long.parseLong(values.get(Keys.FINISH_TIME))); |
---|
| 519 | |
---|
| 520 | assertTrue("Job Status of job obtained from history file did not " + |
---|
| 521 | "match the expected value", |
---|
| 522 | values.get(Keys.JOB_STATUS).equals("SUCCESS")); |
---|
| 523 | |
---|
| 524 | assertTrue("Job Priority of job obtained from history file did not " + |
---|
| 525 | "match the expected value", jip.getPriority().toString().equals( |
---|
| 526 | values.get(Keys.JOB_PRIORITY))); |
---|
| 527 | |
---|
| 528 | assertTrue("Job Name of job obtained from history file did not " + |
---|
| 529 | "match the expected value", JobInfo.getJobName(conf).equals( |
---|
| 530 | values.get(Keys.JOBNAME))); |
---|
| 531 | |
---|
| 532 | assertTrue("User Name of job obtained from history file did not " + |
---|
| 533 | "match the expected value", JobInfo.getUserName(conf).equals( |
---|
| 534 | values.get(Keys.USER))); |
---|
| 535 | |
---|
| 536 | // Validate job counters |
---|
| 537 | Counters c = jip.getCounters(); |
---|
| 538 | assertTrue("Counters of job obtained from history file did not " + |
---|
| 539 | "match the expected value", |
---|
| 540 | c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS))); |
---|
| 541 | |
---|
| 542 | // Validate number of total maps, total reduces, finished maps, |
---|
| 543 | // finished reduces, failed maps, failed recudes |
---|
| 544 | String totalMaps = values.get(Keys.TOTAL_MAPS); |
---|
| 545 | assertTrue("Unexpected number of total maps in history file", |
---|
| 546 | Integer.parseInt(totalMaps) == jip.desiredMaps()); |
---|
| 547 | |
---|
| 548 | String totalReduces = values.get(Keys.TOTAL_REDUCES); |
---|
| 549 | assertTrue("Unexpected number of total reduces in history file", |
---|
| 550 | Integer.parseInt(totalReduces) == jip.desiredReduces()); |
---|
| 551 | |
---|
| 552 | String finMaps = values.get(Keys.FINISHED_MAPS); |
---|
| 553 | assertTrue("Unexpected number of finished maps in history file", |
---|
| 554 | Integer.parseInt(finMaps) == jip.finishedMaps()); |
---|
| 555 | |
---|
| 556 | String finReduces = values.get(Keys.FINISHED_REDUCES); |
---|
| 557 | assertTrue("Unexpected number of finished reduces in history file", |
---|
| 558 | Integer.parseInt(finReduces) == jip.finishedReduces()); |
---|
| 559 | |
---|
| 560 | String failedMaps = values.get(Keys.FAILED_MAPS); |
---|
| 561 | assertTrue("Unexpected number of failed maps in history file", |
---|
| 562 | Integer.parseInt(failedMaps) == jip.failedMapTasks); |
---|
| 563 | |
---|
| 564 | String failedReduces = values.get(Keys.FAILED_REDUCES); |
---|
| 565 | assertTrue("Unexpected number of failed reduces in history file", |
---|
| 566 | Integer.parseInt(failedReduces) == jip.failedReduceTasks); |
---|
| 567 | } |
---|
| 568 | |
---|
| 569 | // Validate Task Level Keys, Values read from history file by |
---|
| 570 | // comparing them with the actual values from JT. |
---|
| 571 | private static void validateTaskLevelKeyValues(MiniMRCluster mr, |
---|
| 572 | RunningJob job, JobInfo jobInfo) throws IOException { |
---|
| 573 | |
---|
| 574 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
| 575 | JobInProgress jip = jt.getJob(job.getID()); |
---|
| 576 | |
---|
| 577 | // Get the 1st map, 1st reduce, cleanup & setup taskIDs and |
---|
| 578 | // validate their history info |
---|
| 579 | TaskID mapTaskId = new TaskID(job.getID(), true, 0); |
---|
| 580 | TaskID reduceTaskId = new TaskID(job.getID(), false, 0); |
---|
| 581 | |
---|
| 582 | TaskInProgress cleanups[] = jip.getCleanupTasks(); |
---|
| 583 | TaskID cleanupTaskId; |
---|
| 584 | if (cleanups[0].isComplete()) { |
---|
| 585 | cleanupTaskId = cleanups[0].getTIPId(); |
---|
| 586 | } |
---|
| 587 | else { |
---|
| 588 | cleanupTaskId = cleanups[1].getTIPId(); |
---|
| 589 | } |
---|
| 590 | |
---|
| 591 | TaskInProgress setups[] = jip.getSetupTasks(); |
---|
| 592 | TaskID setupTaskId; |
---|
| 593 | if (setups[0].isComplete()) { |
---|
| 594 | setupTaskId = setups[0].getTIPId(); |
---|
| 595 | } |
---|
| 596 | else { |
---|
| 597 | setupTaskId = setups[1].getTIPId(); |
---|
| 598 | } |
---|
| 599 | |
---|
| 600 | Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks(); |
---|
| 601 | |
---|
| 602 | // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce) |
---|
| 603 | for (JobHistory.Task task : tasks.values()) { |
---|
| 604 | |
---|
| 605 | String tid = task.get(Keys.TASKID); |
---|
| 606 | if (tid.equals(mapTaskId.toString()) || |
---|
| 607 | tid.equals(reduceTaskId.toString()) || |
---|
| 608 | tid.equals(cleanupTaskId.toString()) || |
---|
| 609 | tid.equals(setupTaskId.toString())) { |
---|
| 610 | |
---|
| 611 | TaskID taskId = null; |
---|
| 612 | if (tid.equals(mapTaskId.toString())) { |
---|
| 613 | taskId = mapTaskId; |
---|
| 614 | } |
---|
| 615 | else if (tid.equals(reduceTaskId.toString())) { |
---|
| 616 | taskId = reduceTaskId; |
---|
| 617 | } |
---|
| 618 | else if (tid.equals(cleanupTaskId.toString())) { |
---|
| 619 | taskId = cleanupTaskId; |
---|
| 620 | } |
---|
| 621 | else if (tid.equals(setupTaskId.toString())) { |
---|
| 622 | taskId = setupTaskId; |
---|
| 623 | } |
---|
| 624 | TaskInProgress tip = jip.getTaskInProgress(taskId); |
---|
| 625 | assertTrue("START_TIME of Task " + tid + " obtained from history " + |
---|
| 626 | "file did not match the expected value", tip.getExecStartTime() == |
---|
| 627 | Long.parseLong(task.get(Keys.START_TIME))); |
---|
| 628 | |
---|
| 629 | assertTrue("FINISH_TIME of Task " + tid + " obtained from history " + |
---|
| 630 | "file did not match the expected value", tip.getExecFinishTime() == |
---|
| 631 | Long.parseLong(task.get(Keys.FINISH_TIME))); |
---|
| 632 | |
---|
| 633 | if (taskId == mapTaskId) {//check splits only for map task |
---|
| 634 | assertTrue("Splits of Task " + tid + " obtained from history file " + |
---|
| 635 | " did not match the expected value", |
---|
| 636 | tip.getSplitNodes().equals(task.get(Keys.SPLITS))); |
---|
| 637 | } |
---|
| 638 | |
---|
| 639 | TaskAttemptID attemptId = tip.getSuccessfulTaskid(); |
---|
| 640 | TaskStatus ts = tip.getTaskStatus(attemptId); |
---|
| 641 | |
---|
| 642 | // Validate task counters |
---|
| 643 | Counters c = ts.getCounters(); |
---|
| 644 | assertTrue("Counters of Task " + tid + " obtained from history file " + |
---|
| 645 | " did not match the expected value", |
---|
| 646 | c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS))); |
---|
| 647 | } |
---|
| 648 | } |
---|
| 649 | } |
---|
| 650 | |
---|
| 651 | // Validate Task Attempt Level Keys, Values read from history file by |
---|
| 652 | // comparing them with the actual values from JT. |
---|
| 653 | private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr, |
---|
| 654 | RunningJob job, JobInfo jobInfo) throws IOException { |
---|
| 655 | |
---|
| 656 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
| 657 | JobInProgress jip = jt.getJob(job.getID()); |
---|
| 658 | |
---|
| 659 | Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks(); |
---|
| 660 | |
---|
| 661 | // For each task |
---|
| 662 | for (JobHistory.Task task : tasks.values()) { |
---|
| 663 | // validate info of each attempt |
---|
| 664 | for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) { |
---|
| 665 | |
---|
| 666 | String idStr = attempt.get(Keys.TASK_ATTEMPT_ID); |
---|
| 667 | TaskAttemptID attemptId = TaskAttemptID.forName(idStr); |
---|
| 668 | TaskID tid = attemptId.getTaskID(); |
---|
| 669 | |
---|
| 670 | // Validate task id |
---|
| 671 | assertTrue("Task id of Task Attempt " + idStr + " obtained from " + |
---|
| 672 | "history file did not match the expected value", |
---|
| 673 | tid.toString().equals(attempt.get(Keys.TASKID))); |
---|
| 674 | |
---|
| 675 | TaskInProgress tip = jip.getTaskInProgress(tid); |
---|
| 676 | TaskStatus ts = tip.getTaskStatus(attemptId); |
---|
| 677 | |
---|
| 678 | // Validate task attempt start time |
---|
| 679 | assertTrue("START_TIME of Task attempt " + idStr + " obtained from " + |
---|
| 680 | "history file did not match the expected value", |
---|
| 681 | ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME))); |
---|
| 682 | |
---|
| 683 | // Validate task attempt finish time |
---|
| 684 | assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " + |
---|
| 685 | "history file did not match the expected value", |
---|
| 686 | ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME))); |
---|
| 687 | |
---|
| 688 | |
---|
| 689 | TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker()); |
---|
| 690 | |
---|
| 691 | if (ttStatus != null) { |
---|
| 692 | assertTrue("http port of task attempt " + idStr + " obtained from " + |
---|
| 693 | "history file did not match the expected value", |
---|
| 694 | ttStatus.getHttpPort() == |
---|
| 695 | Integer.parseInt(attempt.get(Keys.HTTP_PORT))); |
---|
| 696 | |
---|
| 697 | if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) { |
---|
| 698 | String ttHostname = jt.getNode(ttStatus.getHost()).toString(); |
---|
| 699 | |
---|
| 700 | // check if hostname is valid |
---|
| 701 | assertTrue("Host name of task attempt " + idStr + " obtained from" + |
---|
| 702 | " history file did not match the expected value", |
---|
| 703 | ttHostname.equals(attempt.get(Keys.HOSTNAME))); |
---|
| 704 | } |
---|
| 705 | } |
---|
| 706 | if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) { |
---|
| 707 | // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of |
---|
| 708 | // Reduce Task Attempts |
---|
| 709 | if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) { |
---|
| 710 | assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr + |
---|
| 711 | " obtained from history file did not match the expected" + |
---|
| 712 | " value", ts.getShuffleFinishTime() == |
---|
| 713 | Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED))); |
---|
| 714 | assertTrue("SORT_FINISHED time of task attempt " + idStr + |
---|
| 715 | " obtained from history file did not match the expected" + |
---|
| 716 | " value", ts.getSortFinishTime() == |
---|
| 717 | Long.parseLong(attempt.get(Keys.SORT_FINISHED))); |
---|
| 718 | } |
---|
| 719 | |
---|
| 720 | //Validate task counters |
---|
| 721 | Counters c = ts.getCounters(); |
---|
| 722 | assertTrue("Counters of Task Attempt " + idStr + " obtained from " + |
---|
| 723 | "history file did not match the expected value", |
---|
| 724 | c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS))); |
---|
| 725 | } |
---|
| 726 | |
---|
| 727 | // check if tracker name is valid |
---|
| 728 | assertTrue("Tracker name of task attempt " + idStr + " obtained from " + |
---|
| 729 | "history file did not match the expected value", |
---|
| 730 | ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME))); |
---|
| 731 | } |
---|
| 732 | } |
---|
| 733 | } |
---|
| 734 | |
---|
| 735 | /** |
---|
| 736 | * Checks if the history file content is as expected comparing with the |
---|
| 737 | * actual values obtained from JT. |
---|
| 738 | * Job Level, Task Level and Task Attempt Level Keys, Values are validated. |
---|
| 739 | * @param job RunningJob object of the job whose history is to be validated |
---|
| 740 | * @param conf job conf |
---|
| 741 | */ |
---|
| 742 | static void validateJobHistoryFileContent(MiniMRCluster mr, |
---|
| 743 | RunningJob job, JobConf conf) throws IOException { |
---|
| 744 | |
---|
| 745 | JobID id = job.getID(); |
---|
| 746 | // Get the history file name |
---|
| 747 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
| 748 | |
---|
| 749 | // Framework history log file location |
---|
| 750 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
| 751 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
| 752 | |
---|
| 753 | // Check if the history file exists |
---|
| 754 | assertTrue("History file does not exist", fileSys.exists(logFile)); |
---|
| 755 | |
---|
| 756 | |
---|
| 757 | // check if the history file is parsable |
---|
| 758 | String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( |
---|
| 759 | logFileName).split("_"); |
---|
| 760 | |
---|
| 761 | String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; |
---|
| 762 | JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); |
---|
| 763 | |
---|
| 764 | DefaultJobHistoryParser.JobTasksParseListener l = |
---|
| 765 | new DefaultJobHistoryParser.JobTasksParseListener(jobInfo); |
---|
| 766 | JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); |
---|
| 767 | |
---|
| 768 | // Now the history file contents are available in jobInfo. Let us compare |
---|
| 769 | // them with the actual values from JT. |
---|
| 770 | validateJobLevelKeyValues(mr, job, jobInfo, conf); |
---|
| 771 | validateTaskLevelKeyValues(mr, job, jobInfo); |
---|
| 772 | validateTaskAttemptLevelKeyValues(mr, job, jobInfo); |
---|
| 773 | } |
---|
| 774 | |
---|
| 775 | /** Run a job that will be succeeded and validate its history file format |
---|
| 776 | * and its content. |
---|
| 777 | */ |
---|
| 778 | public void testJobHistoryFile() throws IOException { |
---|
| 779 | MiniMRCluster mr = null; |
---|
| 780 | try { |
---|
| 781 | JobConf conf = new JobConf(); |
---|
| 782 | // keep for less time |
---|
| 783 | conf.setLong("mapred.jobtracker.retirejob.check", 1000); |
---|
| 784 | conf.setLong("mapred.jobtracker.retirejob.interval", 1000); |
---|
| 785 | mr = new MiniMRCluster(2, "file:///", 3, null, null, conf); |
---|
| 786 | |
---|
| 787 | // run the TCs |
---|
| 788 | conf = mr.createJobConf(); |
---|
| 789 | |
---|
| 790 | FileSystem fs = FileSystem.get(conf); |
---|
| 791 | // clean up |
---|
| 792 | fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true); |
---|
| 793 | |
---|
| 794 | Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input"); |
---|
| 795 | Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output"); |
---|
| 796 | |
---|
| 797 | //Disable speculative execution |
---|
| 798 | conf.setSpeculativeExecution(false); |
---|
| 799 | |
---|
| 800 | // Make sure that the job is not removed from memory until we do finish |
---|
| 801 | // the validation of history file content |
---|
| 802 | conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10); |
---|
| 803 | |
---|
| 804 | // Run a job that will be succeeded and validate its history file |
---|
| 805 | RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
| 806 | validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false); |
---|
| 807 | validateJobHistoryFileContent(mr, job, conf); |
---|
| 808 | |
---|
| 809 | // get the job conf filename |
---|
| 810 | String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID()); |
---|
| 811 | File file = new File(name); |
---|
| 812 | |
---|
| 813 | // check if the file get deleted |
---|
| 814 | while (file.exists()) { |
---|
| 815 | LOG.info("Waiting for " + file + " to be deleted"); |
---|
| 816 | UtilsForTests.waitFor(100); |
---|
| 817 | } |
---|
| 818 | } finally { |
---|
| 819 | if (mr != null) { |
---|
| 820 | cleanupLocalFiles(mr); |
---|
| 821 | mr.shutdown(); |
---|
| 822 | } |
---|
| 823 | } |
---|
| 824 | } |
---|
| 825 | |
---|
| 826 | // Returns the output path where user history log file is written to with |
---|
| 827 | // default configuration setting for hadoop.job.history.user.location |
---|
| 828 | private static Path getLogLocationInOutputPath(String logFileName, |
---|
| 829 | JobConf conf) { |
---|
| 830 | JobConf jobConf = new JobConf(true);//default JobConf |
---|
| 831 | FileOutputFormat.setOutputPath(jobConf, |
---|
| 832 | FileOutputFormat.getOutputPath(conf)); |
---|
| 833 | return JobHistory.JobInfo.getJobHistoryLogLocationForUser( |
---|
| 834 | logFileName, jobConf); |
---|
| 835 | } |
---|
| 836 | |
---|
| 837 | /** |
---|
| 838 | * Checks if the user history file exists in the correct dir |
---|
| 839 | * @param id job id |
---|
| 840 | * @param conf job conf |
---|
| 841 | */ |
---|
| 842 | private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) |
---|
| 843 | throws IOException { |
---|
| 844 | // Get the history file name |
---|
| 845 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
| 846 | |
---|
| 847 | // User history log file location |
---|
| 848 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser( |
---|
| 849 | logFileName, conf); |
---|
| 850 | if(logFile == null) { |
---|
| 851 | // get the output path where history file is written to when |
---|
| 852 | // hadoop.job.history.user.location is not set |
---|
| 853 | logFile = getLogLocationInOutputPath(logFileName, conf); |
---|
| 854 | } |
---|
| 855 | FileSystem fileSys = null; |
---|
| 856 | fileSys = logFile.getFileSystem(conf); |
---|
| 857 | |
---|
| 858 | // Check if the user history file exists in the correct dir |
---|
| 859 | if (conf.get("hadoop.job.history.user.location") == null) { |
---|
| 860 | assertTrue("User log file " + logFile + " does not exist", |
---|
| 861 | fileSys.exists(logFile)); |
---|
| 862 | } |
---|
| 863 | else if ("none".equals(conf.get("hadoop.job.history.user.location"))) { |
---|
| 864 | // history file should not exist in the output path |
---|
| 865 | assertFalse("Unexpected. User log file exists in output dir when " + |
---|
| 866 | "hadoop.job.history.user.location is set to \"none\"", |
---|
| 867 | fileSys.exists(logFile)); |
---|
| 868 | } |
---|
| 869 | else { |
---|
| 870 | //hadoop.job.history.user.location is set to a specific location. |
---|
| 871 | // User log file should exist in that location |
---|
| 872 | assertTrue("User log file " + logFile + " does not exist", |
---|
| 873 | fileSys.exists(logFile)); |
---|
| 874 | |
---|
| 875 | // User log file should not exist in output path. |
---|
| 876 | |
---|
| 877 | // get the output path where history file is written to when |
---|
| 878 | // hadoop.job.history.user.location is not set |
---|
| 879 | Path logFile1 = getLogLocationInOutputPath(logFileName, conf); |
---|
| 880 | |
---|
| 881 | if (logFile != logFile1) { |
---|
| 882 | fileSys = logFile1.getFileSystem(conf); |
---|
| 883 | assertFalse("Unexpected. User log file exists in output dir when " + |
---|
| 884 | "hadoop.job.history.user.location is set to a different location", |
---|
| 885 | fileSys.exists(logFile1)); |
---|
| 886 | } |
---|
| 887 | } |
---|
| 888 | } |
---|
| 889 | |
---|
| 890 | // Validate user history file location for the given values of |
---|
| 891 | // hadoop.job.history.user.location as |
---|
| 892 | // (1)null(default case), (2)"none", and (3)some dir "/tmp" |
---|
| 893 | public void testJobHistoryUserLogLocation() throws IOException { |
---|
| 894 | MiniMRCluster mr = null; |
---|
| 895 | try { |
---|
| 896 | mr = new MiniMRCluster(2, "file:///", 3); |
---|
| 897 | |
---|
| 898 | // run the TCs |
---|
| 899 | JobConf conf = mr.createJobConf(); |
---|
| 900 | |
---|
| 901 | FileSystem fs = FileSystem.get(conf); |
---|
| 902 | // clean up |
---|
| 903 | fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true); |
---|
| 904 | |
---|
| 905 | Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1"); |
---|
| 906 | Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1"); |
---|
| 907 | |
---|
| 908 | // validate for the case of null(default) |
---|
| 909 | RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
| 910 | validateJobHistoryUserLogLocation(job.getID(), conf); |
---|
| 911 | |
---|
| 912 | inDir = new Path(TEST_ROOT_DIR + "/succeed/input2"); |
---|
| 913 | outDir = new Path(TEST_ROOT_DIR + "/succeed/output2"); |
---|
| 914 | // validate for the case of "none" |
---|
| 915 | conf.set("hadoop.job.history.user.location", "none"); |
---|
| 916 | job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
| 917 | validateJobHistoryUserLogLocation(job.getID(), conf); |
---|
| 918 | |
---|
| 919 | inDir = new Path(TEST_ROOT_DIR + "/succeed/input3"); |
---|
| 920 | outDir = new Path(TEST_ROOT_DIR + "/succeed/output3"); |
---|
| 921 | // validate for the case of any dir |
---|
| 922 | conf.set("hadoop.job.history.user.location", "/tmp"); |
---|
| 923 | job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
| 924 | validateJobHistoryUserLogLocation(job.getID(), conf); |
---|
| 925 | |
---|
| 926 | } finally { |
---|
| 927 | if (mr != null) { |
---|
| 928 | cleanupLocalFiles(mr); |
---|
| 929 | mr.shutdown(); |
---|
| 930 | } |
---|
| 931 | } |
---|
| 932 | } |
---|
| 933 | |
---|
| 934 | private void cleanupLocalFiles(MiniMRCluster mr) |
---|
| 935 | throws IOException { |
---|
| 936 | Configuration conf = mr.createJobConf(); |
---|
| 937 | JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
---|
| 938 | Path sysDir = new Path(jt.getSystemDir()); |
---|
| 939 | FileSystem fs = sysDir.getFileSystem(conf); |
---|
| 940 | fs.delete(sysDir, true); |
---|
| 941 | Path jobHistoryDir = JobHistory.getJobHistoryLocation(); |
---|
| 942 | fs = jobHistoryDir.getFileSystem(conf); |
---|
| 943 | fs.delete(jobHistoryDir, true); |
---|
| 944 | } |
---|
| 945 | |
---|
| 946 | /** |
---|
| 947 | * Checks if the history file has expected job status |
---|
| 948 | * @param id job id |
---|
| 949 | * @param conf job conf |
---|
| 950 | */ |
---|
| 951 | private static void validateJobHistoryJobStatus(JobID id, JobConf conf, |
---|
| 952 | String status) throws IOException { |
---|
| 953 | |
---|
| 954 | // Get the history file name |
---|
| 955 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
| 956 | |
---|
| 957 | // Framework history log file location |
---|
| 958 | Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
| 959 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
| 960 | |
---|
| 961 | // Check if the history file exists |
---|
| 962 | assertTrue("History file does not exist", fileSys.exists(logFile)); |
---|
| 963 | |
---|
| 964 | // check history file permission |
---|
| 965 | assertTrue("History file permissions does not match", |
---|
| 966 | fileSys.getFileStatus(logFile).getPermission().equals( |
---|
| 967 | new FsPermission(JobHistory.HISTORY_FILE_PERMISSION))); |
---|
| 968 | |
---|
| 969 | // check if the history file is parsable |
---|
| 970 | String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( |
---|
| 971 | logFileName).split("_"); |
---|
| 972 | |
---|
| 973 | String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; |
---|
| 974 | JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); |
---|
| 975 | |
---|
| 976 | DefaultJobHistoryParser.JobTasksParseListener l = |
---|
| 977 | new DefaultJobHistoryParser.JobTasksParseListener(jobInfo); |
---|
| 978 | JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); |
---|
| 979 | |
---|
| 980 | assertTrue("Job Status read from job history file is not the expected" + |
---|
| 981 | " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS))); |
---|
| 982 | } |
---|
| 983 | |
---|
| 984 | // run jobs that will be (1) succeeded (2) failed (3) killed |
---|
| 985 | // and validate job status read from history file in each case |
---|
| 986 | public void testJobHistoryJobStatus() throws IOException { |
---|
| 987 | MiniMRCluster mr = null; |
---|
| 988 | try { |
---|
| 989 | mr = new MiniMRCluster(2, "file:///", 3); |
---|
| 990 | |
---|
| 991 | // run the TCs |
---|
| 992 | JobConf conf = mr.createJobConf(); |
---|
| 993 | |
---|
| 994 | FileSystem fs = FileSystem.get(conf); |
---|
| 995 | // clean up |
---|
| 996 | fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true); |
---|
| 997 | |
---|
| 998 | Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input"); |
---|
| 999 | Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output"); |
---|
| 1000 | |
---|
| 1001 | // Run a job that will be succeeded and validate its job status |
---|
| 1002 | // existing in history file |
---|
| 1003 | RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); |
---|
| 1004 | validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS"); |
---|
| 1005 | long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan(); |
---|
| 1006 | assertTrue(historyCleanerRanAt != 0); |
---|
| 1007 | |
---|
| 1008 | // Run a job that will be failed and validate its job status |
---|
| 1009 | // existing in history file |
---|
| 1010 | job = UtilsForTests.runJobFail(conf, inDir, outDir); |
---|
| 1011 | validateJobHistoryJobStatus(job.getID(), conf, "FAILED"); |
---|
| 1012 | assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan()); |
---|
| 1013 | |
---|
| 1014 | // Run a job that will be killed and validate its job status |
---|
| 1015 | // existing in history file |
---|
| 1016 | job = UtilsForTests.runJobKill(conf, inDir, outDir); |
---|
| 1017 | validateJobHistoryJobStatus(job.getID(), conf, "KILLED"); |
---|
| 1018 | assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan()); |
---|
| 1019 | |
---|
| 1020 | } finally { |
---|
| 1021 | if (mr != null) { |
---|
| 1022 | cleanupLocalFiles(mr); |
---|
| 1023 | mr.shutdown(); |
---|
| 1024 | } |
---|
| 1025 | } |
---|
| 1026 | } |
---|
| 1027 | } |
---|