source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.hadoop.conf.Configuration;
21import org.apache.hadoop.fs.*;
22import org.apache.hadoop.hdfs.MiniDFSCluster;
23import org.apache.hadoop.mapred.TestJobTrackerRestart;
24
25import junit.framework.TestCase;
26import java.io.*;
27
28/**
29 * This test checks if the jobtracker can detect and recover a tracker that was
30 * lost while the jobtracker was down.
31 */
32public class TestJobTrackerRestartWithLostTracker extends TestCase {
33  final Path testDir = new Path("/jt-restart-lost-tt-testing");
34  final Path inDir = new Path(testDir, "input");
35  final Path shareDir = new Path(testDir, "share");
36  final Path outputDir = new Path(testDir, "output");
37 
38  private JobConf configureJob(JobConf conf, int maps, int reduces,
39                               String mapSignal, String redSignal) 
40  throws IOException {
41    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
42        maps, reduces, "test-jobtracker-restart-with-lost-tt", 
43        mapSignal, redSignal);
44    return conf;
45  }
46 
47  public void testRecoveryWithLostTracker(MiniDFSCluster dfs,
48                                          MiniMRCluster mr) 
49  throws IOException {
50    FileSystem fileSys = dfs.getFileSystem();
51    JobConf jobConf = mr.createJobConf();
52    int numMaps = 50;
53    int numReds = 1;
54    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
55    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
56   
57    // Configure the jobs
58    JobConf job = configureJob(jobConf, numMaps, numReds, 
59                               mapSignalFile, redSignalFile);
60     
61    fileSys.delete(shareDir, true);
62   
63    // Submit a master job   
64    JobClient jobClient = new JobClient(job);
65    RunningJob rJob = jobClient.submitJob(job);
66    JobID id = rJob.getID();
67   
68    // wait for the job to be inited
69    mr.initializeJob(id);
70   
71    // Make sure that the master job is 50% completed
72    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
73           < 0.5f) {
74      UtilsForTests.waitFor(100);
75    }
76
77    // Kill the jobtracker
78    mr.stopJobTracker();
79
80    // Signal the maps to complete
81    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
82   
83    // Enable recovery on restart
84    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
85                                      true);
86   
87    // Kill the 2nd tasktracker
88    mr.stopTaskTracker(1);
89   
90    // Wait for a minute before submitting a job
91    UtilsForTests.waitFor(60 * 1000);
92   
93    // Restart the jobtracker
94    mr.startJobTracker();
95
96    // Check if the jobs are still running
97   
98    // Wait for the JT to be ready
99    UtilsForTests.waitForJobTracker(jobClient);
100
101    // Signal the reducers to complete
102    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
103                              redSignalFile);
104   
105    UtilsForTests.waitTillDone(jobClient);
106
107    // Check if the tasks on the lost tracker got re-executed
108    assertEquals("Tracker killed while the jobtracker was down did not get lost "
109                 + "upon restart", 
110                 jobClient.getClusterStatus().getTaskTrackers(), 1);
111
112    // validate the history file
113    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
114    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
115  }
116 
117  public void testRestartWithLostTracker() throws IOException {
118    String namenode = null;
119    MiniDFSCluster dfs = null;
120    MiniMRCluster mr = null;
121    FileSystem fileSys = null;
122
123    try {
124      Configuration conf = new Configuration();
125      conf.setBoolean("dfs.replication.considerLoad", false);
126      dfs = new MiniDFSCluster(conf, 1, true, null, null);
127      dfs.waitActive();
128      fileSys = dfs.getFileSystem();
129     
130      // clean up
131      fileSys.delete(testDir, true);
132     
133      if (!fileSys.mkdirs(inDir)) {
134        throw new IOException("Mkdirs failed to create " + inDir.toString());
135      }
136
137      // Write the input file
138      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
139                              new Path(inDir + "/file"), (short)1);
140
141      dfs.startDataNodes(conf, 1, true, null, null, null, null);
142      dfs.waitActive();
143
144      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
145                 + (dfs.getFileSystem()).getUri().getPort();
146
147      // Make sure that jobhistory leads to a proper job restart
148      // So keep the blocksize and the buffer size small
149      JobConf jtConf = new JobConf();
150      jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
151      jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
152      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
153      jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
154      jtConf.setInt("mapred.reduce.copy.backoff", 4);
155     
156      mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
157     
158      // Test Lost tracker case
159      testRecoveryWithLostTracker(dfs, mr);
160    } finally {
161      if (mr != null) {
162        try {
163          mr.shutdown();
164        } catch (Exception e) {}
165      }
166      if (dfs != null) {
167        try {
168          dfs.shutdown();
169        } catch (Exception e) {}
170      }
171    }
172  }
173
174  public static void main(String[] args) throws IOException {
175    new TestJobTrackerRestartWithLostTracker().testRestartWithLostTracker();
176  }
177}
Note: See TracBrowser for help on using the repository browser.