source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestLostTracker.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.hadoop.conf.Configuration;
21import org.apache.hadoop.fs.*;
22import org.apache.hadoop.hdfs.MiniDFSCluster;
23
24import junit.framework.TestCase;
25import java.io.*;
26
27public class TestLostTracker extends TestCase {
28  final Path testDir = new Path("/jt-lost-tt");
29  final Path inDir = new Path(testDir, "input");
30  final Path shareDir = new Path(testDir, "share");
31  final Path outputDir = new Path(testDir, "output");
32 
33  private JobConf configureJob(JobConf conf, int maps, int reduces,
34                               String mapSignal, String redSignal) 
35  throws IOException {
36    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
37        maps, reduces, "test-lost-tt", 
38        mapSignal, redSignal);
39    return conf;
40  }
41 
42  public void testLostTracker(MiniDFSCluster dfs,
43                              MiniMRCluster mr) 
44  throws IOException {
45    FileSystem fileSys = dfs.getFileSystem();
46    JobConf jobConf = mr.createJobConf();
47    int numMaps = 10;
48    int numReds = 1;
49    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
50    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
51   
52    // Configure the job
53    JobConf job = configureJob(jobConf, numMaps, numReds, 
54                               mapSignalFile, redSignalFile);
55     
56    fileSys.delete(shareDir, true);
57   
58    // Submit the job   
59    JobClient jobClient = new JobClient(job);
60    RunningJob rJob = jobClient.submitJob(job);
61    JobID id = rJob.getID();
62   
63    // wait for the job to be inited
64    mr.initializeJob(id);
65   
66    // Make sure that the master job is 50% completed
67    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
68           < 0.5f) {
69      UtilsForTests.waitFor(10);
70    }
71
72    // get a completed task on 1st tracker
73    TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
74                              getNonRunningTasks().get(0).getTaskID();
75
76    // Kill the 1st tasktracker
77    mr.stopTaskTracker(0);
78
79    // Signal all the maps to complete
80    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
81   
82    // Signal the reducers to complete
83    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
84                              redSignalFile);
85    // wait till the job is done
86    UtilsForTests.waitTillDone(jobClient);
87
88    // Check if the tasks on the lost tracker got killed and re-executed
89    assertEquals(jobClient.getClusterStatus().getTaskTrackers(), 1);
90    assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
91    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
92                         getTip(taskid.getTaskID());
93    assertTrue(tip.isComplete());
94    assertEquals(tip.numKilledTasks(), 1);
95   
96    // check if the task statuses for the tasks are sane
97    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
98    for (TaskInProgress taskInProgress : jt.getJob(id).getMapTasks()) {
99      testTaskStatuses(taskInProgress.getTaskStatuses());
100    }
101   
102    // validate the history file
103    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
104    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
105  }
106 
107  private void testTaskStatuses(TaskStatus[] tasks) {
108    for (TaskStatus status : tasks) {
109      assertTrue("Invalid start time " + status.getStartTime(), 
110                 status.getStartTime() > 0);
111      assertTrue("Invalid finish time " + status.getFinishTime(), 
112                 status.getFinishTime() > 0);
113      assertTrue("Start time (" + status.getStartTime() + ") is greater than " 
114                 + "the finish time (" + status.getFinishTime() + ")", 
115                 status.getStartTime() <= status.getFinishTime());
116      assertNotNull("Task phase information is null", status.getPhase());
117      assertNotNull("Task run-state information is null", status.getRunState());
118      assertNotNull("TaskTracker information is null", status.getTaskTracker());
119    }
120  }
121
122  public void testLostTracker() throws IOException {
123    String namenode = null;
124    MiniDFSCluster dfs = null;
125    MiniMRCluster mr = null;
126    FileSystem fileSys = null;
127
128    try {
129      Configuration conf = new Configuration();
130      conf.setBoolean("dfs.replication.considerLoad", false);
131      dfs = new MiniDFSCluster(conf, 1, true, null, null);
132      dfs.waitActive();
133      fileSys = dfs.getFileSystem();
134     
135      // clean up
136      fileSys.delete(testDir, true);
137     
138      if (!fileSys.mkdirs(inDir)) {
139        throw new IOException("Mkdirs failed to create " + inDir.toString());
140      }
141
142      // Write the input file
143      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
144                              new Path(inDir + "/file"), (short)1);
145
146      dfs.startDataNodes(conf, 1, true, null, null, null, null);
147      dfs.waitActive();
148
149      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
150                 + (dfs.getFileSystem()).getUri().getPort();
151
152      JobConf jtConf = new JobConf();
153      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
154      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
155      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
156      jtConf.setInt("mapred.reduce.copy.backoff", 4);
157     
158      mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
159     
160      // Test Lost tracker case
161      testLostTracker(dfs, mr);
162    } finally {
163      if (mr != null) {
164        try {
165          mr.shutdown();
166        } catch (Exception e) {}
167      }
168      if (dfs != null) {
169        try {
170          dfs.shutdown();
171        } catch (Exception e) {}
172      }
173    }
174  }
175
176  public static void main(String[] args) throws IOException {
177    new TestLostTracker().testLostTracker();
178  }
179}
Note: See TracBrowser for help on using the repository browser.