source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 9.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22import org.apache.hadoop.conf.Configuration;
23import org.apache.hadoop.fs.*;
24import org.apache.hadoop.hdfs.MiniDFSCluster;
25
26import junit.framework.TestCase;
27import java.io.*;
28import java.util.HashSet;
29import java.util.Set;
30
31/**
32 * This test checks jobtracker in safe mode. In safe mode the jobtracker upon
33 * restart doesnt schedule any new tasks and waits for the (old) trackers to
34 * join back.
35 */
36public class TestJobTrackerSafeMode extends TestCase {
37  final Path testDir = 
38    new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
39  final Path inDir = new Path(testDir, "input");
40  final Path shareDir = new Path(testDir, "share");
41  final Path outputDir = new Path(testDir, "output");
42  final int numDir = 1;
43  final int numTrackers = 2;
44 
45  private static final Log LOG = 
46    LogFactory.getLog(TestJobTrackerSafeMode.class);
47 
48  private JobConf configureJob(JobConf conf, int maps, int reduces,
49                               String mapSignal, String redSignal) 
50  throws IOException {
51    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
52        maps, reduces, "test-jobtracker-safemode", 
53        mapSignal, redSignal);
54    return conf;
55  }
56 
57  /**
58   * Tests the jobtracker's safemode. The test is as follows :
59   *   - starts a cluster with 2 trackers
60   *   - submits a job with large (40) maps to make sure that all the trackers
61   *     are logged to the job history
62   *   - wait for the job to be 50% done
63   *   - stop the jobtracker
64   *   - wait for the trackers to be done with all the tasks
65   *   - kill a task tracker
66   *   - start the jobtracker
67   *   - start 2 more trackers
68   *   - now check that while all the tracker are detected (or lost) the
69   *     scheduling window is closed
70   *   - check that after all the trackers are recovered, scheduling is opened
71   */
72  private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr) 
73  throws IOException {
74    FileSystem fileSys = dfs.getFileSystem();
75    JobConf jobConf = mr.createJobConf();
76    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
77    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
78    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
79    int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
80   
81    // Configure the jobs
82    JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
83     
84    fileSys.delete(shareDir, true);
85   
86    // Submit a master job   
87    JobClient jobClient = new JobClient(job);
88    RunningJob rJob = jobClient.submitJob(job);
89    JobID id = rJob.getID();
90   
91    // wait for the job to be inited
92    mr.initializeJob(id);
93   
94    // Make sure that the master job is 50% completed
95    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
96           < 0.5f) {
97      LOG.info("Waiting for the job to be 50% done");
98      UtilsForTests.waitFor(100);
99    }
100
101    // Kill the jobtracker
102    mr.stopJobTracker();
103
104    // Enable recovery on restart
105    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
106                                      true);
107   
108    // Signal the maps to complete
109    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
110   
111    // Signal the reducers to complete
112    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
113                              redSignalFile);
114   
115    // wait for the tasks to complete at the tracker
116    Set<String> trackers = new HashSet<String>();
117    for (int i = 0 ; i < numTracker; ++i) {
118      TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
119      trackers.add(t.getName());
120      int runningCount = t.getRunningTaskStatuses().size();
121      while (runningCount != 0) {
122        LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
123        UtilsForTests.waitFor(100);
124        runningCount = 0;
125        for (TaskStatus status : t.getRunningTaskStatuses()) {
126          if (status.getIsMap() 
127              && (status.getRunState() == TaskStatus.State.UNASSIGNED 
128                  || status.getRunState() == TaskStatus.State.RUNNING)) {
129            ++runningCount;
130          }
131        }
132      }
133    }
134
135    LOG.info("Trackers have stabilized");
136   
137    // Kill a tasktracker
138    int trackerToKill = --numTracker;
139    TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
140   
141    trackers.remove(t.getName()); // remove this from the set to check
142   
143    Set<String> lostTrackers = new HashSet<String>();
144    lostTrackers.add(t.getName());
145   
146    // get the attempt-id's to ignore
147    // stop the tracker
148    LOG.info("Stopping tracker : " + t.getName());
149    mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
150    mr.stopTaskTracker(trackerToKill);
151
152    // Restart the jobtracker
153    mr.startJobTracker();
154
155    // Wait for the JT to be ready
156    UtilsForTests.waitForJobTracker(jobClient);
157
158    jobtracker = mr.getJobTrackerRunner().getJobTracker();
159
160    // Start a tracker
161    LOG.info("Start a new tracker");
162    mr.startTaskTracker(null, null, ++numTracker, numDir);
163   
164    // Start a tracker
165    LOG.info("Start a new tracker");
166    mr.startTaskTracker(null, null, ++numTracker, numDir);
167
168    // Check if the jobs are still running
169   
170    // Wait for the tracker to be lost
171    boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
172    while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
173      assertFalse("JobTracker has opened up scheduling before all the" 
174                  + " trackers were recovered", shouldSchedule);
175      UtilsForTests.waitFor(100);
176     
177      // snapshot jobtracker's scheduling status
178      shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
179    }
180
181    assertTrue("JobTracker hasnt opened up scheduling even all the" 
182               + " trackers were recovered", 
183               jobtracker.recoveryManager.shouldSchedule());
184   
185    assertEquals("Recovery manager is in inconsistent state", 
186                 0, jobtracker.recoveryManager.recoveredTrackers.size());
187   
188    // wait for the job to be complete
189    UtilsForTests.waitTillDone(jobClient);
190  }
191
192  private boolean checkTrackers(JobTracker jobtracker, Set<String> present, 
193                                Set<String> absent) {
194    long jobtrackerRecoveryFinishTime = 
195      jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
196    for (String trackerName : present) {
197      TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
198      // check if the status is present and also the tracker has contacted back
199      // after restart
200      if (status == null 
201          || status.getLastSeen() < jobtrackerRecoveryFinishTime) {
202        return false;
203      }
204    }
205    for (String trackerName : absent) {
206      TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
207      // check if the status is still present
208      if ( status != null) {
209        return false;
210      }
211    }
212    return true;
213  }
214
215  /**
216   * Test {@link JobTracker}'s safe mode.
217   */
218  public void testJobTrackerSafeMode() throws IOException {
219    String namenode = null;
220    MiniDFSCluster dfs = null;
221    MiniMRCluster mr = null;
222    FileSystem fileSys = null;
223
224    try {
225      Configuration conf = new Configuration();
226      conf.setBoolean("dfs.replication.considerLoad", false);
227      dfs = new MiniDFSCluster(conf, 1, true, null, null);
228      dfs.waitActive();
229      fileSys = dfs.getFileSystem();
230     
231      // clean up
232      fileSys.delete(testDir, true);
233     
234      if (!fileSys.mkdirs(inDir)) {
235        throw new IOException("Mkdirs failed to create " + inDir.toString());
236      }
237
238      // Write the input file
239      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
240                              new Path(inDir + "/file"), (short)1);
241
242      dfs.startDataNodes(conf, 1, true, null, null, null, null);
243      dfs.waitActive();
244
245      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
246                 + (dfs.getFileSystem()).getUri().getPort();
247
248      // Make sure that jobhistory leads to a proper job restart
249      // So keep the blocksize and the buffer size small
250      JobConf jtConf = new JobConf();
251      jtConf.set("mapred.jobtracker.job.history.block.size", "512");
252      jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
253      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
254      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
255      jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
256      jtConf.setInt("mapred.reduce.copy.backoff", 4);
257      jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
258     
259      mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
260     
261      // Test Lost tracker case
262      testSafeMode(dfs, mr);
263    } finally {
264      if (mr != null) {
265        try {
266          mr.shutdown();
267        } catch (Exception e) {}
268      }
269      if (dfs != null) {
270        try {
271          dfs.shutdown();
272        } catch (Exception e) {}
273      }
274    }
275  }
276
277  public static void main(String[] args) throws IOException {
278    new TestJobTrackerSafeMode().testJobTrackerSafeMode();
279  }
280}
Note: See TracBrowser for help on using the repository browser.