[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | package org.apache.hadoop.mapred; |
---|
| 19 | |
---|
| 20 | import org.apache.hadoop.conf.Configuration; |
---|
| 21 | import org.apache.hadoop.fs.*; |
---|
| 22 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
| 23 | import org.apache.hadoop.mapred.UtilsForTests; |
---|
| 24 | import org.apache.hadoop.security.UserGroupInformation; |
---|
| 25 | |
---|
| 26 | import junit.framework.TestCase; |
---|
| 27 | import java.io.*; |
---|
| 28 | import java.util.ArrayList; |
---|
| 29 | import java.util.List; |
---|
| 30 | |
---|
| 31 | /** |
---|
| 32 | * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker |
---|
| 33 | * should be able to continue running the previously running jobs and also |
---|
| 34 | * recover previosuly submitted jobs. |
---|
| 35 | */ |
---|
| 36 | public class TestJobTrackerRestart extends TestCase { |
---|
| 37 | static final Path testDir = |
---|
| 38 | new Path(System.getProperty("test.build.data","/tmp"), |
---|
| 39 | "jt-restart-testing"); |
---|
| 40 | final Path inDir = new Path(testDir, "input"); |
---|
| 41 | static final Path shareDir = new Path(testDir, "share"); |
---|
| 42 | final Path outputDir = new Path(testDir, "output"); |
---|
| 43 | private static int numJobsSubmitted = 0; |
---|
| 44 | |
---|
| 45 | /** |
---|
| 46 | * Return the job conf configured with the priorities and mappers as passed. |
---|
| 47 | * @param conf The default conf |
---|
| 48 | * @param priorities priorities for the jobs |
---|
| 49 | * @param numMaps number of maps for the jobs |
---|
| 50 | * @param numReds number of reducers for the jobs |
---|
| 51 | * @param outputDir output dir |
---|
| 52 | * @param inDir input dir |
---|
| 53 | * @param mapSignalFile filename thats acts as a signal for maps |
---|
| 54 | * @param reduceSignalFile filename thats acts as a signal for reducers |
---|
| 55 | * @return a array of jobconfs configured as needed |
---|
| 56 | * @throws IOException |
---|
| 57 | */ |
---|
| 58 | private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, |
---|
| 59 | int[] numMaps, int[] numReds, |
---|
| 60 | Path outputDir, Path inDir, |
---|
| 61 | String mapSignalFile, String reduceSignalFile) |
---|
| 62 | throws IOException { |
---|
| 63 | JobConf[] jobs = new JobConf[priorities.length]; |
---|
| 64 | for (int i = 0; i < jobs.length; ++i) { |
---|
| 65 | jobs[i] = new JobConf(conf); |
---|
| 66 | Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++)); |
---|
| 67 | UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, |
---|
| 68 | numMaps[i], numReds[i], "jt restart test job", mapSignalFile, |
---|
| 69 | reduceSignalFile); |
---|
| 70 | jobs[i].setJobPriority(priorities[i]); |
---|
| 71 | } |
---|
| 72 | return jobs; |
---|
| 73 | } |
---|
| 74 | |
---|
| 75 | /** |
---|
| 76 | * Clean up the signals. |
---|
| 77 | */ |
---|
| 78 | private static void cleanUp(FileSystem fileSys, Path dir) throws IOException { |
---|
| 79 | // Delete the map signal file |
---|
| 80 | fileSys.delete(new Path(getMapSignalFile(dir)), false); |
---|
| 81 | // Delete the reduce signal file |
---|
| 82 | fileSys.delete(new Path(getReduceSignalFile(dir)), false); |
---|
| 83 | } |
---|
| 84 | |
---|
| 85 | /** |
---|
| 86 | * Tests the jobtracker with restart-recovery turned off. |
---|
| 87 | * Submit a job with normal priority, maps = 2, reducers = 0} |
---|
| 88 | * |
---|
| 89 | * Wait for the job to complete 50% |
---|
| 90 | * |
---|
| 91 | * Restart the jobtracker with recovery turned off |
---|
| 92 | * |
---|
| 93 | * Check if the job is missing |
---|
| 94 | */ |
---|
| 95 | public void testRestartWithoutRecovery(MiniDFSCluster dfs, |
---|
| 96 | MiniMRCluster mr) |
---|
| 97 | throws IOException { |
---|
| 98 | // III. Test a job with waiting mapper and recovery turned off |
---|
| 99 | |
---|
| 100 | FileSystem fileSys = dfs.getFileSystem(); |
---|
| 101 | |
---|
| 102 | cleanUp(fileSys, shareDir); |
---|
| 103 | |
---|
| 104 | JobConf newConf = getJobs(mr.createJobConf(), |
---|
| 105 | new JobPriority[] {JobPriority.NORMAL}, |
---|
| 106 | new int[] {2}, new int[] {0}, |
---|
| 107 | outputDir, inDir, |
---|
| 108 | getMapSignalFile(shareDir), |
---|
| 109 | getReduceSignalFile(shareDir))[0]; |
---|
| 110 | |
---|
| 111 | JobClient jobClient = new JobClient(newConf); |
---|
| 112 | RunningJob job = jobClient.submitJob(newConf); |
---|
| 113 | JobID id = job.getID(); |
---|
| 114 | |
---|
| 115 | // make sure that the job is 50% completed |
---|
| 116 | while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) { |
---|
| 117 | UtilsForTests.waitFor(100); |
---|
| 118 | } |
---|
| 119 | |
---|
| 120 | mr.stopJobTracker(); |
---|
| 121 | |
---|
| 122 | // Turn off the recovery |
---|
| 123 | mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", |
---|
| 124 | false); |
---|
| 125 | |
---|
| 126 | // Wait for a minute before submitting a job |
---|
| 127 | UtilsForTests.waitFor(60 * 1000); |
---|
| 128 | |
---|
| 129 | mr.startJobTracker(); |
---|
| 130 | |
---|
| 131 | // Signal the tasks |
---|
| 132 | UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), |
---|
| 133 | getReduceSignalFile(shareDir)); |
---|
| 134 | |
---|
| 135 | // Wait for the JT to be ready |
---|
| 136 | UtilsForTests.waitForJobTracker(jobClient); |
---|
| 137 | |
---|
| 138 | UtilsForTests.waitTillDone(jobClient); |
---|
| 139 | |
---|
| 140 | // The submitted job should not exist |
---|
| 141 | assertTrue("Submitted job was detected with recovery disabled", |
---|
| 142 | UtilsForTests.getJobStatus(jobClient, id) == null); |
---|
| 143 | } |
---|
| 144 | |
---|
| 145 | /** Tests a job on jobtracker with restart-recovery turned on. |
---|
| 146 | * Preparation : |
---|
| 147 | * - Configure a job with |
---|
| 148 | * - num-maps : 50 |
---|
| 149 | * - num-reducers : 1 |
---|
| 150 | * - Configure the cluster to run 1 reducer |
---|
| 151 | * - Lower the history file block size and buffer |
---|
| 152 | * |
---|
| 153 | * Wait for the job to complete 50%. Note that all the job is configured to |
---|
| 154 | * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will |
---|
| 155 | * eventually wait on 50% |
---|
| 156 | * |
---|
| 157 | * Make a note of the following things |
---|
| 158 | * - Task completion events |
---|
| 159 | * - Cluster status |
---|
| 160 | * - Task Reports |
---|
| 161 | * - Job start time |
---|
| 162 | * |
---|
| 163 | * Restart the jobtracker |
---|
| 164 | * |
---|
| 165 | * Wait for job to finish all the maps and note the TaskCompletion events at |
---|
| 166 | * the tracker. |
---|
| 167 | * |
---|
| 168 | * Wait for all the jobs to finish and note the following |
---|
| 169 | * - New task completion events at the jobtracker |
---|
| 170 | * - Task reports |
---|
| 171 | * - Cluster status |
---|
| 172 | * |
---|
| 173 | * Check for the following |
---|
| 174 | * - Task completion events for recovered tasks should match |
---|
| 175 | * - Task completion events at the tasktracker and the restarted |
---|
| 176 | * jobtracker should be same |
---|
| 177 | * - Cluster status should be fine. |
---|
| 178 | * - Task Reports for recovered tasks should match |
---|
| 179 | * Checks |
---|
| 180 | * - start time |
---|
| 181 | * - finish time |
---|
| 182 | * - counters |
---|
| 183 | * - http-location |
---|
| 184 | * - task-id |
---|
| 185 | * - Job start time should match |
---|
| 186 | * - Check if the counters can be accessed |
---|
| 187 | * - Check if the history files are (re)named properly |
---|
| 188 | */ |
---|
| 189 | public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs, |
---|
| 190 | MiniMRCluster mr) |
---|
| 191 | throws IOException { |
---|
| 192 | // II. Test a tasktracker with waiting mapper and recovery turned on. |
---|
| 193 | // Ideally the tracker should SYNC with the new/restarted jobtracker |
---|
| 194 | |
---|
| 195 | FileSystem fileSys = dfs.getFileSystem(); |
---|
| 196 | final int numMaps = 50; |
---|
| 197 | final int numReducers = 1; |
---|
| 198 | |
---|
| 199 | |
---|
| 200 | cleanUp(fileSys, shareDir); |
---|
| 201 | |
---|
| 202 | JobConf newConf = getJobs(mr.createJobConf(), |
---|
| 203 | new JobPriority[] {JobPriority.NORMAL}, |
---|
| 204 | new int[] {numMaps}, new int[] {numReducers}, |
---|
| 205 | outputDir, inDir, |
---|
| 206 | getMapSignalFile(shareDir), |
---|
| 207 | getReduceSignalFile(shareDir))[0]; |
---|
| 208 | |
---|
| 209 | JobClient jobClient = new JobClient(newConf); |
---|
| 210 | RunningJob job = jobClient.submitJob(newConf); |
---|
| 211 | JobID id = job.getID(); |
---|
| 212 | |
---|
| 213 | // change the job priority |
---|
| 214 | mr.setJobPriority(id, JobPriority.HIGH); |
---|
| 215 | |
---|
| 216 | mr.initializeJob(id); |
---|
| 217 | |
---|
| 218 | // make sure that atleast on reducer is spawned |
---|
| 219 | while (jobClient.getClusterStatus().getReduceTasks() == 0) { |
---|
| 220 | UtilsForTests.waitFor(100); |
---|
| 221 | } |
---|
| 222 | |
---|
| 223 | while(true) { |
---|
| 224 | // Since we are using a half waiting mapper, maps should be stuck at 50% |
---|
| 225 | TaskCompletionEvent[] trackerEvents = |
---|
| 226 | mr.getMapTaskCompletionEventsUpdates(0, id, numMaps) |
---|
| 227 | .getMapTaskCompletionEvents(); |
---|
| 228 | if (trackerEvents.length < numMaps / 2) { |
---|
| 229 | UtilsForTests.waitFor(1000); |
---|
| 230 | } else { |
---|
| 231 | break; |
---|
| 232 | } |
---|
| 233 | } |
---|
| 234 | |
---|
| 235 | TaskCompletionEvent[] prevEvents = |
---|
| 236 | mr.getTaskCompletionEvents(id, 0, numMaps); |
---|
| 237 | TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id); |
---|
| 238 | TaskReport[] prevMapReports = jobClient.getMapTaskReports(id); |
---|
| 239 | ClusterStatus prevStatus = jobClient.getClusterStatus(); |
---|
| 240 | |
---|
| 241 | mr.stopJobTracker(); |
---|
| 242 | |
---|
| 243 | // Turn off the recovery |
---|
| 244 | mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", |
---|
| 245 | true); |
---|
| 246 | |
---|
| 247 | // Wait for a minute before submitting a job |
---|
| 248 | UtilsForTests.waitFor(60 * 1000); |
---|
| 249 | |
---|
| 250 | mr.startJobTracker(); |
---|
| 251 | |
---|
| 252 | // Signal the map tasks |
---|
| 253 | UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), |
---|
| 254 | getReduceSignalFile(shareDir)); |
---|
| 255 | |
---|
| 256 | // Wait for the JT to be ready |
---|
| 257 | UtilsForTests.waitForJobTracker(jobClient); |
---|
| 258 | |
---|
| 259 | int numToMatch = mr.getNumEventsRecovered() / 2; |
---|
| 260 | |
---|
| 261 | // make sure that the maps are completed |
---|
| 262 | while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) { |
---|
| 263 | UtilsForTests.waitFor(100); |
---|
| 264 | } |
---|
| 265 | |
---|
| 266 | // Get the new jobtrackers events |
---|
| 267 | TaskCompletionEvent[] jtEvents = |
---|
| 268 | mr.getTaskCompletionEvents(id, 0, 2 * numMaps); |
---|
| 269 | |
---|
| 270 | // Test if all the events that were recovered match exactly |
---|
| 271 | testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch); |
---|
| 272 | |
---|
| 273 | // Check the task reports |
---|
| 274 | // The reports should match exactly if the attempts are same |
---|
| 275 | TaskReport[] afterMapReports = jobClient.getMapTaskReports(id); |
---|
| 276 | TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id); |
---|
| 277 | testTaskReports(prevMapReports, afterMapReports, numToMatch - 1); |
---|
| 278 | testTaskReports(prevSetupReports, afterSetupReports, 1); |
---|
| 279 | |
---|
| 280 | // check the job priority |
---|
| 281 | assertEquals("Job priority change is not reflected", |
---|
| 282 | JobPriority.HIGH, mr.getJobPriority(id)); |
---|
| 283 | |
---|
| 284 | List<TaskCompletionEvent> jtMapEvents = |
---|
| 285 | new ArrayList<TaskCompletionEvent>(); |
---|
| 286 | for (TaskCompletionEvent tce : jtEvents) { |
---|
| 287 | if (tce.isMapTask()) { |
---|
| 288 | jtMapEvents.add(tce); |
---|
| 289 | } |
---|
| 290 | } |
---|
| 291 | |
---|
| 292 | TaskCompletionEvent[] trackerEvents; |
---|
| 293 | while(true) { |
---|
| 294 | // Wait for the tracker to pull all the map events |
---|
| 295 | trackerEvents = |
---|
| 296 | mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size()) |
---|
| 297 | .getMapTaskCompletionEvents(); |
---|
| 298 | if (trackerEvents.length < jtMapEvents.size()) { |
---|
| 299 | UtilsForTests.waitFor(1000); |
---|
| 300 | } else { |
---|
| 301 | break; |
---|
| 302 | } |
---|
| 303 | } |
---|
| 304 | |
---|
| 305 | // Signal the reduce tasks |
---|
| 306 | UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), |
---|
| 307 | getReduceSignalFile(shareDir)); |
---|
| 308 | |
---|
| 309 | UtilsForTests.waitTillDone(jobClient); |
---|
| 310 | |
---|
| 311 | testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]), |
---|
| 312 | trackerEvents, true, -1); |
---|
| 313 | |
---|
| 314 | // validate the history file |
---|
| 315 | TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true); |
---|
| 316 | TestJobHistory.validateJobHistoryFileContent(mr, job, newConf); |
---|
| 317 | |
---|
| 318 | // check if the cluster status is insane |
---|
| 319 | ClusterStatus status = jobClient.getClusterStatus(); |
---|
| 320 | assertTrue("Cluster status is insane", |
---|
| 321 | checkClusterStatusOnCompletion(status, prevStatus)); |
---|
| 322 | } |
---|
| 323 | |
---|
| 324 | /** |
---|
| 325 | * Checks if the history files are as expected |
---|
| 326 | * @param id job id |
---|
| 327 | * @param conf job conf |
---|
| 328 | */ |
---|
| 329 | private void testJobHistoryFiles(JobID id, JobConf conf) |
---|
| 330 | throws IOException { |
---|
| 331 | // Get the history files for users |
---|
| 332 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
| 333 | String tempLogFileName = |
---|
| 334 | JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName); |
---|
| 335 | |
---|
| 336 | // I. User files |
---|
| 337 | Path logFile = |
---|
| 338 | JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf); |
---|
| 339 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
| 340 | |
---|
| 341 | // Check if the history file exists |
---|
| 342 | assertTrue("User log file does not exist", fileSys.exists(logFile)); |
---|
| 343 | |
---|
| 344 | // Check if the temporary file is deleted |
---|
| 345 | Path tempLogFile = |
---|
| 346 | JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, |
---|
| 347 | conf); |
---|
| 348 | assertFalse("User temporary log file exists", fileSys.exists(tempLogFile)); |
---|
| 349 | |
---|
| 350 | // II. Framework files |
---|
| 351 | // Get the history file |
---|
| 352 | logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
| 353 | fileSys = logFile.getFileSystem(conf); |
---|
| 354 | |
---|
| 355 | // Check if the history file exists |
---|
| 356 | assertTrue("Log file does not exist", fileSys.exists(logFile)); |
---|
| 357 | |
---|
| 358 | // Check if the temporary file is deleted |
---|
| 359 | tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName); |
---|
| 360 | assertFalse("Temporary log file exists", fileSys.exists(tempLogFile)); |
---|
| 361 | } |
---|
| 362 | |
---|
| 363 | /** |
---|
| 364 | * Matches specified number of task reports. |
---|
| 365 | * @param source the reports to be matched |
---|
| 366 | * @param target reports to match with |
---|
| 367 | * @param numToMatch num reports to match |
---|
| 368 | * @param mismatchSet reports that should not match |
---|
| 369 | */ |
---|
| 370 | private void testTaskReports(TaskReport[] source, TaskReport[] target, |
---|
| 371 | int numToMatch) { |
---|
| 372 | for (int i = 0; i < numToMatch; ++i) { |
---|
| 373 | // Check if the task reports was recovered correctly |
---|
| 374 | assertTrue("Task reports for same attempt has changed", |
---|
| 375 | source[i].equals(target[i])); |
---|
| 376 | } |
---|
| 377 | } |
---|
| 378 | |
---|
| 379 | /** |
---|
| 380 | * Matches the task completion events. |
---|
| 381 | * @param source the events to be matched |
---|
| 382 | * @param target events to match with |
---|
| 383 | * @param fullMatch whether to match the events completely or partially |
---|
| 384 | * @param numToMatch number of events to match in case full match is not |
---|
| 385 | * desired |
---|
| 386 | * @param ignoreSet a set of taskids to ignore |
---|
| 387 | */ |
---|
| 388 | private void testTaskCompletionEvents(TaskCompletionEvent[] source, |
---|
| 389 | TaskCompletionEvent[] target, |
---|
| 390 | boolean fullMatch, |
---|
| 391 | int numToMatch) { |
---|
| 392 | // Check if the event list size matches |
---|
| 393 | // The lengths should match only incase of full match |
---|
| 394 | if (fullMatch) { |
---|
| 395 | assertEquals("Map task completion events mismatch", |
---|
| 396 | source.length, target.length); |
---|
| 397 | numToMatch = source.length; |
---|
| 398 | } |
---|
| 399 | // Check if the events match |
---|
| 400 | for (int i = 0; i < numToMatch; ++i) { |
---|
| 401 | if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){ |
---|
| 402 | assertTrue("Map task completion events ordering mismatch", |
---|
| 403 | source[i].equals(target[i])); |
---|
| 404 | } |
---|
| 405 | } |
---|
| 406 | } |
---|
| 407 | |
---|
| 408 | private boolean checkClusterStatusOnCompletion(ClusterStatus status, |
---|
| 409 | ClusterStatus prevStatus) { |
---|
| 410 | return status.getJobTrackerState() == prevStatus.getJobTrackerState() |
---|
| 411 | && status.getMapTasks() == 0 |
---|
| 412 | && status.getReduceTasks() == 0; |
---|
| 413 | } |
---|
| 414 | |
---|
| 415 | /** Committer with setup waiting |
---|
| 416 | */ |
---|
| 417 | static class CommitterWithDelaySetup extends FileOutputCommitter { |
---|
| 418 | @Override |
---|
| 419 | public void setupJob(JobContext context) throws IOException { |
---|
| 420 | FileSystem fs = FileSystem.get(context.getConfiguration()); |
---|
| 421 | while (true) { |
---|
| 422 | if (fs.exists(shareDir)) { |
---|
| 423 | break; |
---|
| 424 | } |
---|
| 425 | UtilsForTests.waitFor(100); |
---|
| 426 | } |
---|
| 427 | super.cleanupJob(context); |
---|
| 428 | } |
---|
| 429 | } |
---|
| 430 | |
---|
| 431 | /** Tests a job on jobtracker with restart-recovery turned on and empty |
---|
| 432 | * jobhistory file. |
---|
| 433 | * Preparation : |
---|
| 434 | * - Configure a job with |
---|
| 435 | * - num-maps : 0 (long waiting setup) |
---|
| 436 | * - num-reducers : 0 |
---|
| 437 | * |
---|
| 438 | * Check if the job succeedes after restart. |
---|
| 439 | * |
---|
| 440 | * Assumption that map slots are given first for setup. |
---|
| 441 | */ |
---|
| 442 | public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, |
---|
| 443 | MiniMRCluster mr) |
---|
| 444 | throws IOException { |
---|
| 445 | mr.startTaskTracker(null, null, 1, 1); |
---|
| 446 | FileSystem fileSys = dfs.getFileSystem(); |
---|
| 447 | |
---|
| 448 | cleanUp(fileSys, shareDir); |
---|
| 449 | cleanUp(fileSys, inDir); |
---|
| 450 | cleanUp(fileSys, outputDir); |
---|
| 451 | |
---|
| 452 | JobConf conf = mr.createJobConf(); |
---|
| 453 | conf.setNumReduceTasks(0); |
---|
| 454 | conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class); |
---|
| 455 | fileSys.delete(outputDir, false); |
---|
| 456 | RunningJob job1 = |
---|
| 457 | UtilsForTests.runJob(conf, inDir, outputDir, 30, 0); |
---|
| 458 | |
---|
| 459 | conf.setNumReduceTasks(0); |
---|
| 460 | conf.setOutputCommitter(CommitterWithDelaySetup.class); |
---|
| 461 | Path inDir2 = new Path(testDir, "input2"); |
---|
| 462 | fileSys.mkdirs(inDir2); |
---|
| 463 | Path outDir2 = new Path(testDir, "output2"); |
---|
| 464 | fileSys.delete(outDir2, false); |
---|
| 465 | JobConf newConf = getJobs(mr.createJobConf(), |
---|
| 466 | new JobPriority[] {JobPriority.NORMAL}, |
---|
| 467 | new int[] {10}, new int[] {0}, |
---|
| 468 | outDir2, inDir2, |
---|
| 469 | getMapSignalFile(shareDir), |
---|
| 470 | getReduceSignalFile(shareDir))[0]; |
---|
| 471 | |
---|
| 472 | JobClient jobClient = new JobClient(newConf); |
---|
| 473 | RunningJob job2 = jobClient.submitJob(newConf); |
---|
| 474 | JobID id = job2.getID(); |
---|
| 475 | |
---|
| 476 | /*RunningJob job2 = |
---|
| 477 | UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0); |
---|
| 478 | |
---|
| 479 | JobID id = job2.getID();*/ |
---|
| 480 | JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id); |
---|
| 481 | |
---|
| 482 | mr.getJobTrackerRunner().getJobTracker().initJob(jip); |
---|
| 483 | |
---|
| 484 | // find out the history filename |
---|
| 485 | String history = |
---|
| 486 | JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id); |
---|
| 487 | Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history); |
---|
| 488 | |
---|
| 489 | // make sure that setup is launched |
---|
| 490 | while (jip.runningMaps() == 0) { |
---|
| 491 | UtilsForTests.waitFor(100); |
---|
| 492 | } |
---|
| 493 | |
---|
| 494 | id = job1.getID(); |
---|
| 495 | jip = mr.getJobTrackerRunner().getJobTracker().getJob(id); |
---|
| 496 | |
---|
| 497 | mr.getJobTrackerRunner().getJobTracker().initJob(jip); |
---|
| 498 | |
---|
| 499 | // make sure that cleanup is launched and is waiting |
---|
| 500 | while (!jip.isCleanupLaunched()) { |
---|
| 501 | UtilsForTests.waitFor(100); |
---|
| 502 | } |
---|
| 503 | |
---|
| 504 | mr.stopJobTracker(); |
---|
| 505 | |
---|
| 506 | // delete the history file .. just to be safe. |
---|
| 507 | FileSystem historyFS = historyPath.getFileSystem(conf); |
---|
| 508 | historyFS.delete(historyPath, false); |
---|
| 509 | historyFS.create(historyPath).close(); // create an empty file |
---|
| 510 | |
---|
| 511 | |
---|
| 512 | UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1); |
---|
| 513 | |
---|
| 514 | // Turn on the recovery |
---|
| 515 | mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", |
---|
| 516 | true); |
---|
| 517 | |
---|
| 518 | mr.startJobTracker(); |
---|
| 519 | |
---|
| 520 | job1.waitForCompletion(); |
---|
| 521 | job2.waitForCompletion(); |
---|
| 522 | } |
---|
| 523 | |
---|
| 524 | public void testJobTrackerRestart() throws IOException { |
---|
| 525 | String namenode = null; |
---|
| 526 | MiniDFSCluster dfs = null; |
---|
| 527 | MiniMRCluster mr = null; |
---|
| 528 | FileSystem fileSys = null; |
---|
| 529 | |
---|
| 530 | try { |
---|
| 531 | Configuration conf = new Configuration(); |
---|
| 532 | conf.setBoolean("dfs.replication.considerLoad", false); |
---|
| 533 | dfs = new MiniDFSCluster(conf, 1, true, null, null); |
---|
| 534 | dfs.waitActive(); |
---|
| 535 | fileSys = dfs.getFileSystem(); |
---|
| 536 | |
---|
| 537 | // clean up |
---|
| 538 | fileSys.delete(testDir, true); |
---|
| 539 | |
---|
| 540 | if (!fileSys.mkdirs(inDir)) { |
---|
| 541 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
| 542 | } |
---|
| 543 | |
---|
| 544 | // Write the input file |
---|
| 545 | UtilsForTests.writeFile(dfs.getNameNode(), conf, |
---|
| 546 | new Path(inDir + "/file"), (short)1); |
---|
| 547 | |
---|
| 548 | dfs.startDataNodes(conf, 1, true, null, null, null, null); |
---|
| 549 | dfs.waitActive(); |
---|
| 550 | |
---|
| 551 | namenode = (dfs.getFileSystem()).getUri().getHost() + ":" |
---|
| 552 | + (dfs.getFileSystem()).getUri().getPort(); |
---|
| 553 | |
---|
| 554 | // Make sure that jobhistory leads to a proper job restart |
---|
| 555 | // So keep the blocksize and the buffer size small |
---|
| 556 | JobConf jtConf = new JobConf(); |
---|
| 557 | jtConf.set("mapred.jobtracker.job.history.block.size", "1024"); |
---|
| 558 | jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024"); |
---|
| 559 | jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); |
---|
| 560 | jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000); |
---|
| 561 | jtConf.setBoolean("mapred.acls.enabled", true); |
---|
| 562 | // get the user group info |
---|
| 563 | UserGroupInformation ugi = UserGroupInformation.getCurrentUGI(); |
---|
| 564 | jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName()); |
---|
| 565 | |
---|
| 566 | mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf); |
---|
| 567 | |
---|
| 568 | // Test the tasktracker SYNC |
---|
| 569 | testTaskEventsAndReportsWithRecovery(dfs, mr); |
---|
| 570 | |
---|
| 571 | // Test jobtracker with restart-recovery turned off |
---|
| 572 | testRestartWithoutRecovery(dfs, mr); |
---|
| 573 | |
---|
| 574 | // test recovery with empty file |
---|
| 575 | testJobRecoveryWithEmptyHistory(dfs, mr); |
---|
| 576 | } finally { |
---|
| 577 | if (mr != null) { |
---|
| 578 | try { |
---|
| 579 | mr.shutdown(); |
---|
| 580 | } catch (Exception e) {} |
---|
| 581 | } |
---|
| 582 | if (dfs != null) { |
---|
| 583 | try { |
---|
| 584 | dfs.shutdown(); |
---|
| 585 | } catch (Exception e) {} |
---|
| 586 | } |
---|
| 587 | } |
---|
| 588 | } |
---|
| 589 | |
---|
| 590 | private static String getMapSignalFile(Path dir) { |
---|
| 591 | return (new Path(dir, "jt-restart-map-signal")).toString(); |
---|
| 592 | } |
---|
| 593 | |
---|
| 594 | private static String getReduceSignalFile(Path dir) { |
---|
| 595 | return (new Path(dir, "jt-restart-reduce-signal")).toString(); |
---|
| 596 | } |
---|
| 597 | |
---|
| 598 | public static void main(String[] args) throws IOException { |
---|
| 599 | new TestJobTrackerRestart().testJobTrackerRestart(); |
---|
| 600 | } |
---|
| 601 | } |
---|