1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import org.apache.hadoop.conf.Configuration; |
---|
21 | import org.apache.hadoop.fs.*; |
---|
22 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
23 | import org.apache.hadoop.mapred.UtilsForTests; |
---|
24 | import org.apache.hadoop.security.UserGroupInformation; |
---|
25 | |
---|
26 | import junit.framework.TestCase; |
---|
27 | import java.io.*; |
---|
28 | import java.util.ArrayList; |
---|
29 | import java.util.List; |
---|
30 | |
---|
31 | /** |
---|
32 | * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker |
---|
33 | * should be able to continue running the previously running jobs and also |
---|
34 | * recover previosuly submitted jobs. |
---|
35 | */ |
---|
36 | public class TestJobTrackerRestart extends TestCase { |
---|
37 | static final Path testDir = |
---|
38 | new Path(System.getProperty("test.build.data","/tmp"), |
---|
39 | "jt-restart-testing"); |
---|
40 | final Path inDir = new Path(testDir, "input"); |
---|
41 | static final Path shareDir = new Path(testDir, "share"); |
---|
42 | final Path outputDir = new Path(testDir, "output"); |
---|
43 | private static int numJobsSubmitted = 0; |
---|
44 | |
---|
45 | /** |
---|
46 | * Return the job conf configured with the priorities and mappers as passed. |
---|
47 | * @param conf The default conf |
---|
48 | * @param priorities priorities for the jobs |
---|
49 | * @param numMaps number of maps for the jobs |
---|
50 | * @param numReds number of reducers for the jobs |
---|
51 | * @param outputDir output dir |
---|
52 | * @param inDir input dir |
---|
53 | * @param mapSignalFile filename thats acts as a signal for maps |
---|
54 | * @param reduceSignalFile filename thats acts as a signal for reducers |
---|
55 | * @return a array of jobconfs configured as needed |
---|
56 | * @throws IOException |
---|
57 | */ |
---|
58 | private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, |
---|
59 | int[] numMaps, int[] numReds, |
---|
60 | Path outputDir, Path inDir, |
---|
61 | String mapSignalFile, String reduceSignalFile) |
---|
62 | throws IOException { |
---|
63 | JobConf[] jobs = new JobConf[priorities.length]; |
---|
64 | for (int i = 0; i < jobs.length; ++i) { |
---|
65 | jobs[i] = new JobConf(conf); |
---|
66 | Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++)); |
---|
67 | UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, |
---|
68 | numMaps[i], numReds[i], "jt restart test job", mapSignalFile, |
---|
69 | reduceSignalFile); |
---|
70 | jobs[i].setJobPriority(priorities[i]); |
---|
71 | } |
---|
72 | return jobs; |
---|
73 | } |
---|
74 | |
---|
75 | /** |
---|
76 | * Clean up the signals. |
---|
77 | */ |
---|
78 | private static void cleanUp(FileSystem fileSys, Path dir) throws IOException { |
---|
79 | // Delete the map signal file |
---|
80 | fileSys.delete(new Path(getMapSignalFile(dir)), false); |
---|
81 | // Delete the reduce signal file |
---|
82 | fileSys.delete(new Path(getReduceSignalFile(dir)), false); |
---|
83 | } |
---|
84 | |
---|
85 | /** |
---|
86 | * Tests the jobtracker with restart-recovery turned off. |
---|
87 | * Submit a job with normal priority, maps = 2, reducers = 0} |
---|
88 | * |
---|
89 | * Wait for the job to complete 50% |
---|
90 | * |
---|
91 | * Restart the jobtracker with recovery turned off |
---|
92 | * |
---|
93 | * Check if the job is missing |
---|
94 | */ |
---|
95 | public void testRestartWithoutRecovery(MiniDFSCluster dfs, |
---|
96 | MiniMRCluster mr) |
---|
97 | throws IOException { |
---|
98 | // III. Test a job with waiting mapper and recovery turned off |
---|
99 | |
---|
100 | FileSystem fileSys = dfs.getFileSystem(); |
---|
101 | |
---|
102 | cleanUp(fileSys, shareDir); |
---|
103 | |
---|
104 | JobConf newConf = getJobs(mr.createJobConf(), |
---|
105 | new JobPriority[] {JobPriority.NORMAL}, |
---|
106 | new int[] {2}, new int[] {0}, |
---|
107 | outputDir, inDir, |
---|
108 | getMapSignalFile(shareDir), |
---|
109 | getReduceSignalFile(shareDir))[0]; |
---|
110 | |
---|
111 | JobClient jobClient = new JobClient(newConf); |
---|
112 | RunningJob job = jobClient.submitJob(newConf); |
---|
113 | JobID id = job.getID(); |
---|
114 | |
---|
115 | // make sure that the job is 50% completed |
---|
116 | while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) { |
---|
117 | UtilsForTests.waitFor(100); |
---|
118 | } |
---|
119 | |
---|
120 | mr.stopJobTracker(); |
---|
121 | |
---|
122 | // Turn off the recovery |
---|
123 | mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", |
---|
124 | false); |
---|
125 | |
---|
126 | // Wait for a minute before submitting a job |
---|
127 | UtilsForTests.waitFor(60 * 1000); |
---|
128 | |
---|
129 | mr.startJobTracker(); |
---|
130 | |
---|
131 | // Signal the tasks |
---|
132 | UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), |
---|
133 | getReduceSignalFile(shareDir)); |
---|
134 | |
---|
135 | // Wait for the JT to be ready |
---|
136 | UtilsForTests.waitForJobTracker(jobClient); |
---|
137 | |
---|
138 | UtilsForTests.waitTillDone(jobClient); |
---|
139 | |
---|
140 | // The submitted job should not exist |
---|
141 | assertTrue("Submitted job was detected with recovery disabled", |
---|
142 | UtilsForTests.getJobStatus(jobClient, id) == null); |
---|
143 | } |
---|
144 | |
---|
145 | /** Tests a job on jobtracker with restart-recovery turned on. |
---|
146 | * Preparation : |
---|
147 | * - Configure a job with |
---|
148 | * - num-maps : 50 |
---|
149 | * - num-reducers : 1 |
---|
150 | * - Configure the cluster to run 1 reducer |
---|
151 | * - Lower the history file block size and buffer |
---|
152 | * |
---|
153 | * Wait for the job to complete 50%. Note that all the job is configured to |
---|
154 | * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will |
---|
155 | * eventually wait on 50% |
---|
156 | * |
---|
157 | * Make a note of the following things |
---|
158 | * - Task completion events |
---|
159 | * - Cluster status |
---|
160 | * - Task Reports |
---|
161 | * - Job start time |
---|
162 | * |
---|
163 | * Restart the jobtracker |
---|
164 | * |
---|
165 | * Wait for job to finish all the maps and note the TaskCompletion events at |
---|
166 | * the tracker. |
---|
167 | * |
---|
168 | * Wait for all the jobs to finish and note the following |
---|
169 | * - New task completion events at the jobtracker |
---|
170 | * - Task reports |
---|
171 | * - Cluster status |
---|
172 | * |
---|
173 | * Check for the following |
---|
174 | * - Task completion events for recovered tasks should match |
---|
175 | * - Task completion events at the tasktracker and the restarted |
---|
176 | * jobtracker should be same |
---|
177 | * - Cluster status should be fine. |
---|
178 | * - Task Reports for recovered tasks should match |
---|
179 | * Checks |
---|
180 | * - start time |
---|
181 | * - finish time |
---|
182 | * - counters |
---|
183 | * - http-location |
---|
184 | * - task-id |
---|
185 | * - Job start time should match |
---|
186 | * - Check if the counters can be accessed |
---|
187 | * - Check if the history files are (re)named properly |
---|
188 | */ |
---|
189 | public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs, |
---|
190 | MiniMRCluster mr) |
---|
191 | throws IOException { |
---|
192 | // II. Test a tasktracker with waiting mapper and recovery turned on. |
---|
193 | // Ideally the tracker should SYNC with the new/restarted jobtracker |
---|
194 | |
---|
195 | FileSystem fileSys = dfs.getFileSystem(); |
---|
196 | final int numMaps = 50; |
---|
197 | final int numReducers = 1; |
---|
198 | |
---|
199 | |
---|
200 | cleanUp(fileSys, shareDir); |
---|
201 | |
---|
202 | JobConf newConf = getJobs(mr.createJobConf(), |
---|
203 | new JobPriority[] {JobPriority.NORMAL}, |
---|
204 | new int[] {numMaps}, new int[] {numReducers}, |
---|
205 | outputDir, inDir, |
---|
206 | getMapSignalFile(shareDir), |
---|
207 | getReduceSignalFile(shareDir))[0]; |
---|
208 | |
---|
209 | JobClient jobClient = new JobClient(newConf); |
---|
210 | RunningJob job = jobClient.submitJob(newConf); |
---|
211 | JobID id = job.getID(); |
---|
212 | |
---|
213 | // change the job priority |
---|
214 | mr.setJobPriority(id, JobPriority.HIGH); |
---|
215 | |
---|
216 | mr.initializeJob(id); |
---|
217 | |
---|
218 | // make sure that atleast on reducer is spawned |
---|
219 | while (jobClient.getClusterStatus().getReduceTasks() == 0) { |
---|
220 | UtilsForTests.waitFor(100); |
---|
221 | } |
---|
222 | |
---|
223 | while(true) { |
---|
224 | // Since we are using a half waiting mapper, maps should be stuck at 50% |
---|
225 | TaskCompletionEvent[] trackerEvents = |
---|
226 | mr.getMapTaskCompletionEventsUpdates(0, id, numMaps) |
---|
227 | .getMapTaskCompletionEvents(); |
---|
228 | if (trackerEvents.length < numMaps / 2) { |
---|
229 | UtilsForTests.waitFor(1000); |
---|
230 | } else { |
---|
231 | break; |
---|
232 | } |
---|
233 | } |
---|
234 | |
---|
235 | TaskCompletionEvent[] prevEvents = |
---|
236 | mr.getTaskCompletionEvents(id, 0, numMaps); |
---|
237 | TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id); |
---|
238 | TaskReport[] prevMapReports = jobClient.getMapTaskReports(id); |
---|
239 | ClusterStatus prevStatus = jobClient.getClusterStatus(); |
---|
240 | |
---|
241 | mr.stopJobTracker(); |
---|
242 | |
---|
243 | // Turn off the recovery |
---|
244 | mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", |
---|
245 | true); |
---|
246 | |
---|
247 | // Wait for a minute before submitting a job |
---|
248 | UtilsForTests.waitFor(60 * 1000); |
---|
249 | |
---|
250 | mr.startJobTracker(); |
---|
251 | |
---|
252 | // Signal the map tasks |
---|
253 | UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), |
---|
254 | getReduceSignalFile(shareDir)); |
---|
255 | |
---|
256 | // Wait for the JT to be ready |
---|
257 | UtilsForTests.waitForJobTracker(jobClient); |
---|
258 | |
---|
259 | int numToMatch = mr.getNumEventsRecovered() / 2; |
---|
260 | |
---|
261 | // make sure that the maps are completed |
---|
262 | while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) { |
---|
263 | UtilsForTests.waitFor(100); |
---|
264 | } |
---|
265 | |
---|
266 | // Get the new jobtrackers events |
---|
267 | TaskCompletionEvent[] jtEvents = |
---|
268 | mr.getTaskCompletionEvents(id, 0, 2 * numMaps); |
---|
269 | |
---|
270 | // Test if all the events that were recovered match exactly |
---|
271 | testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch); |
---|
272 | |
---|
273 | // Check the task reports |
---|
274 | // The reports should match exactly if the attempts are same |
---|
275 | TaskReport[] afterMapReports = jobClient.getMapTaskReports(id); |
---|
276 | TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id); |
---|
277 | testTaskReports(prevMapReports, afterMapReports, numToMatch - 1); |
---|
278 | testTaskReports(prevSetupReports, afterSetupReports, 1); |
---|
279 | |
---|
280 | // check the job priority |
---|
281 | assertEquals("Job priority change is not reflected", |
---|
282 | JobPriority.HIGH, mr.getJobPriority(id)); |
---|
283 | |
---|
284 | List<TaskCompletionEvent> jtMapEvents = |
---|
285 | new ArrayList<TaskCompletionEvent>(); |
---|
286 | for (TaskCompletionEvent tce : jtEvents) { |
---|
287 | if (tce.isMapTask()) { |
---|
288 | jtMapEvents.add(tce); |
---|
289 | } |
---|
290 | } |
---|
291 | |
---|
292 | TaskCompletionEvent[] trackerEvents; |
---|
293 | while(true) { |
---|
294 | // Wait for the tracker to pull all the map events |
---|
295 | trackerEvents = |
---|
296 | mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size()) |
---|
297 | .getMapTaskCompletionEvents(); |
---|
298 | if (trackerEvents.length < jtMapEvents.size()) { |
---|
299 | UtilsForTests.waitFor(1000); |
---|
300 | } else { |
---|
301 | break; |
---|
302 | } |
---|
303 | } |
---|
304 | |
---|
305 | // Signal the reduce tasks |
---|
306 | UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), |
---|
307 | getReduceSignalFile(shareDir)); |
---|
308 | |
---|
309 | UtilsForTests.waitTillDone(jobClient); |
---|
310 | |
---|
311 | testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]), |
---|
312 | trackerEvents, true, -1); |
---|
313 | |
---|
314 | // validate the history file |
---|
315 | TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true); |
---|
316 | TestJobHistory.validateJobHistoryFileContent(mr, job, newConf); |
---|
317 | |
---|
318 | // check if the cluster status is insane |
---|
319 | ClusterStatus status = jobClient.getClusterStatus(); |
---|
320 | assertTrue("Cluster status is insane", |
---|
321 | checkClusterStatusOnCompletion(status, prevStatus)); |
---|
322 | } |
---|
323 | |
---|
324 | /** |
---|
325 | * Checks if the history files are as expected |
---|
326 | * @param id job id |
---|
327 | * @param conf job conf |
---|
328 | */ |
---|
329 | private void testJobHistoryFiles(JobID id, JobConf conf) |
---|
330 | throws IOException { |
---|
331 | // Get the history files for users |
---|
332 | String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); |
---|
333 | String tempLogFileName = |
---|
334 | JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName); |
---|
335 | |
---|
336 | // I. User files |
---|
337 | Path logFile = |
---|
338 | JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf); |
---|
339 | FileSystem fileSys = logFile.getFileSystem(conf); |
---|
340 | |
---|
341 | // Check if the history file exists |
---|
342 | assertTrue("User log file does not exist", fileSys.exists(logFile)); |
---|
343 | |
---|
344 | // Check if the temporary file is deleted |
---|
345 | Path tempLogFile = |
---|
346 | JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, |
---|
347 | conf); |
---|
348 | assertFalse("User temporary log file exists", fileSys.exists(tempLogFile)); |
---|
349 | |
---|
350 | // II. Framework files |
---|
351 | // Get the history file |
---|
352 | logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); |
---|
353 | fileSys = logFile.getFileSystem(conf); |
---|
354 | |
---|
355 | // Check if the history file exists |
---|
356 | assertTrue("Log file does not exist", fileSys.exists(logFile)); |
---|
357 | |
---|
358 | // Check if the temporary file is deleted |
---|
359 | tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName); |
---|
360 | assertFalse("Temporary log file exists", fileSys.exists(tempLogFile)); |
---|
361 | } |
---|
362 | |
---|
363 | /** |
---|
364 | * Matches specified number of task reports. |
---|
365 | * @param source the reports to be matched |
---|
366 | * @param target reports to match with |
---|
367 | * @param numToMatch num reports to match |
---|
368 | * @param mismatchSet reports that should not match |
---|
369 | */ |
---|
370 | private void testTaskReports(TaskReport[] source, TaskReport[] target, |
---|
371 | int numToMatch) { |
---|
372 | for (int i = 0; i < numToMatch; ++i) { |
---|
373 | // Check if the task reports was recovered correctly |
---|
374 | assertTrue("Task reports for same attempt has changed", |
---|
375 | source[i].equals(target[i])); |
---|
376 | } |
---|
377 | } |
---|
378 | |
---|
379 | /** |
---|
380 | * Matches the task completion events. |
---|
381 | * @param source the events to be matched |
---|
382 | * @param target events to match with |
---|
383 | * @param fullMatch whether to match the events completely or partially |
---|
384 | * @param numToMatch number of events to match in case full match is not |
---|
385 | * desired |
---|
386 | * @param ignoreSet a set of taskids to ignore |
---|
387 | */ |
---|
388 | private void testTaskCompletionEvents(TaskCompletionEvent[] source, |
---|
389 | TaskCompletionEvent[] target, |
---|
390 | boolean fullMatch, |
---|
391 | int numToMatch) { |
---|
392 | // Check if the event list size matches |
---|
393 | // The lengths should match only incase of full match |
---|
394 | if (fullMatch) { |
---|
395 | assertEquals("Map task completion events mismatch", |
---|
396 | source.length, target.length); |
---|
397 | numToMatch = source.length; |
---|
398 | } |
---|
399 | // Check if the events match |
---|
400 | for (int i = 0; i < numToMatch; ++i) { |
---|
401 | if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){ |
---|
402 | assertTrue("Map task completion events ordering mismatch", |
---|
403 | source[i].equals(target[i])); |
---|
404 | } |
---|
405 | } |
---|
406 | } |
---|
407 | |
---|
408 | private boolean checkClusterStatusOnCompletion(ClusterStatus status, |
---|
409 | ClusterStatus prevStatus) { |
---|
410 | return status.getJobTrackerState() == prevStatus.getJobTrackerState() |
---|
411 | && status.getMapTasks() == 0 |
---|
412 | && status.getReduceTasks() == 0; |
---|
413 | } |
---|
414 | |
---|
415 | /** Committer with setup waiting |
---|
416 | */ |
---|
417 | static class CommitterWithDelaySetup extends FileOutputCommitter { |
---|
418 | @Override |
---|
419 | public void setupJob(JobContext context) throws IOException { |
---|
420 | FileSystem fs = FileSystem.get(context.getConfiguration()); |
---|
421 | while (true) { |
---|
422 | if (fs.exists(shareDir)) { |
---|
423 | break; |
---|
424 | } |
---|
425 | UtilsForTests.waitFor(100); |
---|
426 | } |
---|
427 | super.cleanupJob(context); |
---|
428 | } |
---|
429 | } |
---|
430 | |
---|
431 | /** Tests a job on jobtracker with restart-recovery turned on and empty |
---|
432 | * jobhistory file. |
---|
433 | * Preparation : |
---|
434 | * - Configure a job with |
---|
435 | * - num-maps : 0 (long waiting setup) |
---|
436 | * - num-reducers : 0 |
---|
437 | * |
---|
438 | * Check if the job succeedes after restart. |
---|
439 | * |
---|
440 | * Assumption that map slots are given first for setup. |
---|
441 | */ |
---|
442 | public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, |
---|
443 | MiniMRCluster mr) |
---|
444 | throws IOException { |
---|
445 | mr.startTaskTracker(null, null, 1, 1); |
---|
446 | FileSystem fileSys = dfs.getFileSystem(); |
---|
447 | |
---|
448 | cleanUp(fileSys, shareDir); |
---|
449 | cleanUp(fileSys, inDir); |
---|
450 | cleanUp(fileSys, outputDir); |
---|
451 | |
---|
452 | JobConf conf = mr.createJobConf(); |
---|
453 | conf.setNumReduceTasks(0); |
---|
454 | conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class); |
---|
455 | fileSys.delete(outputDir, false); |
---|
456 | RunningJob job1 = |
---|
457 | UtilsForTests.runJob(conf, inDir, outputDir, 30, 0); |
---|
458 | |
---|
459 | conf.setNumReduceTasks(0); |
---|
460 | conf.setOutputCommitter(CommitterWithDelaySetup.class); |
---|
461 | Path inDir2 = new Path(testDir, "input2"); |
---|
462 | fileSys.mkdirs(inDir2); |
---|
463 | Path outDir2 = new Path(testDir, "output2"); |
---|
464 | fileSys.delete(outDir2, false); |
---|
465 | JobConf newConf = getJobs(mr.createJobConf(), |
---|
466 | new JobPriority[] {JobPriority.NORMAL}, |
---|
467 | new int[] {10}, new int[] {0}, |
---|
468 | outDir2, inDir2, |
---|
469 | getMapSignalFile(shareDir), |
---|
470 | getReduceSignalFile(shareDir))[0]; |
---|
471 | |
---|
472 | JobClient jobClient = new JobClient(newConf); |
---|
473 | RunningJob job2 = jobClient.submitJob(newConf); |
---|
474 | JobID id = job2.getID(); |
---|
475 | |
---|
476 | /*RunningJob job2 = |
---|
477 | UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0); |
---|
478 | |
---|
479 | JobID id = job2.getID();*/ |
---|
480 | JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id); |
---|
481 | |
---|
482 | mr.getJobTrackerRunner().getJobTracker().initJob(jip); |
---|
483 | |
---|
484 | // find out the history filename |
---|
485 | String history = |
---|
486 | JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id); |
---|
487 | Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history); |
---|
488 | |
---|
489 | // make sure that setup is launched |
---|
490 | while (jip.runningMaps() == 0) { |
---|
491 | UtilsForTests.waitFor(100); |
---|
492 | } |
---|
493 | |
---|
494 | id = job1.getID(); |
---|
495 | jip = mr.getJobTrackerRunner().getJobTracker().getJob(id); |
---|
496 | |
---|
497 | mr.getJobTrackerRunner().getJobTracker().initJob(jip); |
---|
498 | |
---|
499 | // make sure that cleanup is launched and is waiting |
---|
500 | while (!jip.isCleanupLaunched()) { |
---|
501 | UtilsForTests.waitFor(100); |
---|
502 | } |
---|
503 | |
---|
504 | mr.stopJobTracker(); |
---|
505 | |
---|
506 | // delete the history file .. just to be safe. |
---|
507 | FileSystem historyFS = historyPath.getFileSystem(conf); |
---|
508 | historyFS.delete(historyPath, false); |
---|
509 | historyFS.create(historyPath).close(); // create an empty file |
---|
510 | |
---|
511 | |
---|
512 | UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1); |
---|
513 | |
---|
514 | // Turn on the recovery |
---|
515 | mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", |
---|
516 | true); |
---|
517 | |
---|
518 | mr.startJobTracker(); |
---|
519 | |
---|
520 | job1.waitForCompletion(); |
---|
521 | job2.waitForCompletion(); |
---|
522 | } |
---|
523 | |
---|
524 | public void testJobTrackerRestart() throws IOException { |
---|
525 | String namenode = null; |
---|
526 | MiniDFSCluster dfs = null; |
---|
527 | MiniMRCluster mr = null; |
---|
528 | FileSystem fileSys = null; |
---|
529 | |
---|
530 | try { |
---|
531 | Configuration conf = new Configuration(); |
---|
532 | conf.setBoolean("dfs.replication.considerLoad", false); |
---|
533 | dfs = new MiniDFSCluster(conf, 1, true, null, null); |
---|
534 | dfs.waitActive(); |
---|
535 | fileSys = dfs.getFileSystem(); |
---|
536 | |
---|
537 | // clean up |
---|
538 | fileSys.delete(testDir, true); |
---|
539 | |
---|
540 | if (!fileSys.mkdirs(inDir)) { |
---|
541 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
542 | } |
---|
543 | |
---|
544 | // Write the input file |
---|
545 | UtilsForTests.writeFile(dfs.getNameNode(), conf, |
---|
546 | new Path(inDir + "/file"), (short)1); |
---|
547 | |
---|
548 | dfs.startDataNodes(conf, 1, true, null, null, null, null); |
---|
549 | dfs.waitActive(); |
---|
550 | |
---|
551 | namenode = (dfs.getFileSystem()).getUri().getHost() + ":" |
---|
552 | + (dfs.getFileSystem()).getUri().getPort(); |
---|
553 | |
---|
554 | // Make sure that jobhistory leads to a proper job restart |
---|
555 | // So keep the blocksize and the buffer size small |
---|
556 | JobConf jtConf = new JobConf(); |
---|
557 | jtConf.set("mapred.jobtracker.job.history.block.size", "1024"); |
---|
558 | jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024"); |
---|
559 | jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); |
---|
560 | jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000); |
---|
561 | jtConf.setBoolean("mapred.acls.enabled", true); |
---|
562 | // get the user group info |
---|
563 | UserGroupInformation ugi = UserGroupInformation.getCurrentUGI(); |
---|
564 | jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName()); |
---|
565 | |
---|
566 | mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf); |
---|
567 | |
---|
568 | // Test the tasktracker SYNC |
---|
569 | testTaskEventsAndReportsWithRecovery(dfs, mr); |
---|
570 | |
---|
571 | // Test jobtracker with restart-recovery turned off |
---|
572 | testRestartWithoutRecovery(dfs, mr); |
---|
573 | |
---|
574 | // test recovery with empty file |
---|
575 | testJobRecoveryWithEmptyHistory(dfs, mr); |
---|
576 | } finally { |
---|
577 | if (mr != null) { |
---|
578 | try { |
---|
579 | mr.shutdown(); |
---|
580 | } catch (Exception e) {} |
---|
581 | } |
---|
582 | if (dfs != null) { |
---|
583 | try { |
---|
584 | dfs.shutdown(); |
---|
585 | } catch (Exception e) {} |
---|
586 | } |
---|
587 | } |
---|
588 | } |
---|
589 | |
---|
590 | private static String getMapSignalFile(Path dir) { |
---|
591 | return (new Path(dir, "jt-restart-map-signal")).toString(); |
---|
592 | } |
---|
593 | |
---|
594 | private static String getReduceSignalFile(Path dir) { |
---|
595 | return (new Path(dir, "jt-restart-reduce-signal")).toString(); |
---|
596 | } |
---|
597 | |
---|
598 | public static void main(String[] args) throws IOException { |
---|
599 | new TestJobTrackerRestart().testJobTrackerRestart(); |
---|
600 | } |
---|
601 | } |
---|