source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 16.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22
23import junit.framework.TestCase;
24
25import org.apache.commons.logging.Log;
26import org.apache.commons.logging.LogFactory;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.fs.FSDataOutputStream;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.hdfs.MiniDFSCluster;
32import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
33import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
34import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
35import org.apache.hadoop.security.UserGroupInformation;
36
37/**
38 * Test whether the {@link RecoveryManager} is able to tolerate job-recovery
39 * failures and the jobtracker is able to tolerate {@link RecoveryManager}
40 * failure.
41 */
42public class TestRecoveryManager extends TestCase {
43  private static final Log LOG = 
44    LogFactory.getLog(TestRecoveryManager.class);
45  private static final Path TEST_DIR = 
46    new Path(System.getProperty("test.build.data", "/tmp"), 
47             "test-recovery-manager");
48 
49  /**
50   * Tests the {@link JobTracker} against the exceptions thrown in
51   * {@link JobTracker.RecoveryManager}. It does the following :
52   *  - submits 2 jobs
53   *  - kills the jobtracker
54   *  - Garble job.xml for one job causing it to fail in constructor
55   *    and job.split for another causing it to fail in init.
56   *  - restarts the jobtracker
57   *  - checks if the jobtraker starts normally
58   */
59  public void testJobTracker() throws Exception {
60    LOG.info("Testing jobtracker restart with faulty job");
61    String signalFile = new Path(TEST_DIR, "signal").toString();
62    JobConf conf = new JobConf();
63   
64    FileSystem fs = FileSystem.get(new Configuration());
65    fs.delete(TEST_DIR, true); // cleanup
66   
67    conf.set("mapred.jobtracker.job.history.block.size", "1024");
68    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
69   
70    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
71   
72    JobConf job1 = mr.createJobConf();
73   
74    UtilsForTests.configureWaitingJobConf(job1, 
75        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, 
76        "test-recovery-manager", signalFile, signalFile);
77   
78    // submit the faulty job
79    RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
80    LOG.info("Submitted job " + rJob1.getID());
81   
82    while (rJob1.mapProgress() < 0.5f) {
83      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
84      UtilsForTests.waitFor(100);
85    }
86   
87    JobConf job2 = mr.createJobConf();
88   
89    UtilsForTests.configureWaitingJobConf(job2, 
90        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, 
91        "test-recovery-manager", signalFile, signalFile);
92   
93    // submit the faulty job
94    RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
95    LOG.info("Submitted job " + rJob2.getID());
96   
97    while (rJob2.mapProgress() < 0.5f) {
98      LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
99      UtilsForTests.waitFor(100);
100    }
101   
102    // kill the jobtracker
103    LOG.info("Stopping jobtracker");
104    String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
105    mr.stopJobTracker();
106   
107    // delete the job.xml of job #1 causing the job to fail in constructor
108    Path jobFile = 
109      new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
110    LOG.info("Deleting job.xml file : " + jobFile.toString());
111    fs.delete(jobFile, false); // delete the job.xml file
112   
113    // create the job.xml file with 0 bytes
114    FSDataOutputStream out = fs.create(jobFile);
115    out.write(1);
116    out.close();
117
118    // delete the job.split of job #2 causing the job to fail in initTasks
119    Path jobSplitFile = 
120      new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
121    LOG.info("Deleting job.split file : " + jobSplitFile.toString());
122    fs.delete(jobSplitFile, false); // delete the job.split file
123   
124    // create the job.split file with 0 bytes
125    out = fs.create(jobSplitFile);
126    out.write(1);
127    out.close();
128
129    // make sure that the jobtracker is in recovery mode
130    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
131                                      true);
132    // start the jobtracker
133    LOG.info("Starting jobtracker");
134    mr.startJobTracker();
135    ClusterStatus status = 
136      mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
137   
138    // check if the jobtracker came up or not
139    assertEquals("JobTracker crashed!", 
140                 JobTracker.State.RUNNING, status.getJobTrackerState());
141   
142    mr.shutdown();
143  }
144 
145  /**
146   * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown
147   * during recovery. It does the following :
148   *  - submits a job with HIGH priority and x tasks
149   *  - allows it to complete 50%
150   *  - submits another job with normal priority and y tasks
151   *  - kills the jobtracker
152   *  - restarts the jobtracker with max-tasks-per-job such that
153   *        y < max-tasks-per-job < x
154   *  - checks if the jobtraker starts normally and job#2 is recovered while
155   *    job#1 is failed.
156   */
157  public void testRecoveryManager() throws Exception {
158    LOG.info("Testing recovery-manager");
159    String signalFile = new Path(TEST_DIR, "signal").toString();
160   
161    // clean up
162    FileSystem fs = FileSystem.get(new Configuration());
163    fs.delete(TEST_DIR, true);
164   
165    JobConf conf = new JobConf();
166    conf.set("mapred.jobtracker.job.history.block.size", "1024");
167    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
168   
169    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
170    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
171   
172    JobConf job1 = mr.createJobConf();
173    //  set the high priority
174    job1.setJobPriority(JobPriority.HIGH);
175   
176    UtilsForTests.configureWaitingJobConf(job1, 
177        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, 
178        "test-recovery-manager", signalFile, signalFile);
179   
180    // submit the faulty job
181    JobClient jc = new JobClient(job1);
182    RunningJob rJob1 = jc.submitJob(job1);
183    LOG.info("Submitted first job " + rJob1.getID());
184   
185    while (rJob1.mapProgress() < 0.5f) {
186      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
187      UtilsForTests.waitFor(100);
188    }
189   
190    // now submit job2
191    JobConf job2 = mr.createJobConf();
192
193    String signalFile1 = new Path(TEST_DIR, "signal1").toString();
194    UtilsForTests.configureWaitingJobConf(job2, 
195        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, 
196        "test-recovery-manager", signalFile1, signalFile1);
197   
198    // submit the job
199    RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
200    LOG.info("Submitted job " + rJob2.getID());
201   
202    // wait for it to init
203    JobInProgress jip = jobtracker.getJob(rJob2.getID());
204   
205    while (!jip.inited()) {
206      LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
207      UtilsForTests.waitFor(100);
208    }
209   
210    // now submit job3 with inappropriate acls
211    JobConf job3 = mr.createJobConf();
212    job3.set("hadoop.job.ugi","abc,users");
213
214    UtilsForTests.configureWaitingJobConf(job3, 
215        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0, 
216        "test-recovery-manager", signalFile, signalFile);
217   
218    // submit the job
219    RunningJob rJob3 = (new JobClient(job3)).submitJob(job3);
220    LOG.info("Submitted job " + rJob3.getID() + " with different user");
221   
222    jip = jobtracker.getJob(rJob3.getID());
223
224    while (!jip.inited()) {
225      LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
226      UtilsForTests.waitFor(100);
227    }
228
229    // kill the jobtracker
230    LOG.info("Stopping jobtracker");
231    mr.stopJobTracker();
232   
233    // make sure that the jobtracker is in recovery mode
234    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
235                                      true);
236    mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
237   
238    mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
239    UserGroupInformation ugi = UserGroupInformation.readFrom(job1);
240    mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job", 
241                               ugi.getUserName());
242
243    // start the jobtracker
244    LOG.info("Starting jobtracker");
245    mr.startJobTracker();
246    UtilsForTests.waitForJobTracker(jc);
247   
248    jobtracker = mr.getJobTrackerRunner().getJobTracker();
249   
250    // assert that job2 is recovered by the jobtracker as job1 would fail
251    assertEquals("Recovery manager failed to tolerate job failures",
252                 2, jobtracker.getAllJobs().length);
253   
254    // check if the job#1 has failed
255    JobStatus status = jobtracker.getJobStatus(rJob1.getID());
256    assertEquals("Faulty job not failed", 
257                 JobStatus.FAILED, status.getRunState());
258   
259    jip = jobtracker.getJob(rJob2.getID());
260    assertFalse("Job should be running", jip.isComplete());
261   
262    status = jobtracker.getJobStatus(rJob3.getID());
263    assertNull("Job should be missing", status);
264   
265    mr.shutdown();
266  }
267 
268  /**
269   * Test if restart count of the jobtracker is correctly managed.
270   * Steps are as follows :
271   *   - start the jobtracker and check if the info file gets created.
272   *   - stops the jobtracker, deletes the jobtracker.info file and checks if
273   *     upon restart the recovery is 'off'
274   *   - submit a job to the jobtracker.
275   *   - restart the jobtracker k times and check if the restart count on ith
276   *     iteration is i.
277   *   - submit a new job and check if its restart count is 0.
278   *   - garble the jobtracker.info file and restart he jobtracker, the
279   *     jobtracker should crash.
280   */
281  public void testRestartCount() throws Exception {
282    LOG.info("Testing restart-count");
283    String signalFile = new Path(TEST_DIR, "signal").toString();
284   
285    // clean up
286    FileSystem fs = FileSystem.get(new Configuration());
287    fs.delete(TEST_DIR, true);
288   
289    JobConf conf = new JobConf();
290    conf.set("mapred.jobtracker.job.history.block.size", "1024");
291    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
292    conf.setBoolean("mapred.jobtracker.restart.recover", true);
293    // since there is no need for initing
294    conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
295                  TaskScheduler.class);
296   
297    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
298    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
299    JobClient jc = new JobClient(mr.createJobConf());
300
301    // check if the jobtracker info file exists
302    Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
303    assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
304
305    // check if garbling the system files disables the recovery process
306    LOG.info("Stopping jobtracker for testing with system files deleted");
307    mr.stopJobTracker();
308   
309    // delete the info file
310    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
311    fs.delete(rFile,false);
312   
313    // start the jobtracker
314    LOG.info("Starting jobtracker with system files deleted");
315    mr.startJobTracker();
316   
317    UtilsForTests.waitForJobTracker(jc);
318    jobtracker = mr.getJobTrackerRunner().getJobTracker();
319
320    // check if the recovey is disabled
321    assertFalse("Recovery is not disabled upon missing system files", 
322                jobtracker.recoveryManager.shouldRecover());
323
324    // check if the system dir is sane
325    assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
326    Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
327    assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
328
329    // submit a job
330    JobConf job = mr.createJobConf();
331   
332    UtilsForTests.configureWaitingJobConf(job, 
333        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, 
334        "test-recovery-manager", signalFile, signalFile);
335   
336    // submit the faulty job
337    RunningJob rJob = jc.submitJob(job);
338    LOG.info("Submitted first job " + rJob.getID());
339
340    // kill the jobtracker multiple times and check if the count is correct
341    for (int i = 1; i <= 5; ++i) {
342      LOG.info("Stopping jobtracker for " + i + " time");
343      mr.stopJobTracker();
344     
345      // start the jobtracker
346      LOG.info("Starting jobtracker for " + i + " time");
347      mr.startJobTracker();
348     
349      UtilsForTests.waitForJobTracker(jc);
350     
351      // check if the system dir is sane
352      assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
353      assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
354     
355      jobtracker = mr.getJobTrackerRunner().getJobTracker();
356      JobInProgress jip = jobtracker.getJob(rJob.getID());
357     
358      // assert if restart count is correct
359      assertEquals("Recovery manager failed to recover restart count",
360                   i, jip.getNumRestarts());
361    }
362   
363    // kill the old job
364    rJob.killJob();
365
366    // II. Submit a new job and check if the restart count is 0
367    JobConf job1 = mr.createJobConf();
368   
369    UtilsForTests.configureWaitingJobConf(job1, 
370        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
371        "test-recovery-manager", signalFile, signalFile);
372   
373    // make sure that the job id's dont clash
374    jobtracker.getNewJobId();
375
376    // submit a new job
377    rJob = jc.submitJob(job1);
378    LOG.info("Submitted first job after restart" + rJob.getID());
379
380    // assert if restart count is correct
381    JobInProgress jip = jobtracker.getJob(rJob.getID());
382    assertEquals("Restart count for new job is incorrect",
383                 0, jip.getNumRestarts());
384
385    LOG.info("Stopping jobtracker for testing the fs errors");
386    mr.stopJobTracker();
387
388    // check if system.dir problems in recovery kills the jobtracker
389    fs.delete(rFile, false);
390    FSDataOutputStream out = fs.create(rFile);
391    out.writeBoolean(true);
392    out.close();
393
394    // start the jobtracker
395    LOG.info("Starting jobtracker with fs errors");
396    mr.startJobTracker();
397    JobTrackerRunner runner = mr.getJobTrackerRunner();
398    assertFalse("JobTracker is still alive", runner.isActive());
399
400    mr.shutdown();
401  } 
402
403  /**
404   * Test if the jobtracker waits for the info file to be created before
405   * starting.
406   */
407  public void testJobTrackerInfoCreation() throws Exception {
408    LOG.info("Testing jobtracker.info file");
409    MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
410    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
411                      + (dfs.getFileSystem()).getUri().getPort();
412    // shut down the data nodes
413    dfs.shutdownDataNodes();
414
415    // start the jobtracker
416    JobConf conf = new JobConf();
417    FileSystem.setDefaultUri(conf, namenode);
418    conf.set("mapred.job.tracker", "localhost:0");
419    conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
420
421    JobTracker jobtracker = new JobTracker(conf);
422
423    // now check if the update restart count works fine or not
424    boolean failed = false;
425    try {
426      jobtracker.recoveryManager.updateRestartCount();
427    } catch (IOException ioe) {
428      failed = true;
429    }
430    assertTrue("JobTracker created info files without datanodes!!!", failed);
431
432    Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
433    Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
434    FileSystem fs = dfs.getFileSystem();
435    assertFalse("Info file exists after update failure", 
436                fs.exists(restartFile));
437    assertFalse("Temporary restart-file exists after update failure", 
438                fs.exists(restartFile));
439
440    // start 1 data node
441    dfs.startDataNodes(conf, 1, true, null, null, null, null);
442    dfs.waitActive();
443
444    failed = false;
445    try {
446      jobtracker.recoveryManager.updateRestartCount();
447    } catch (IOException ioe) {
448      failed = true;
449    }
450    assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
451  }
452}
Note: See TracBrowser for help on using the repository browser.