source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.DataOutputStream;
21import java.io.IOException;
22
23import junit.framework.TestCase;
24
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.hdfs.MiniDFSCluster;
29import org.apache.hadoop.mapred.lib.IdentityMapper;
30import org.apache.hadoop.mapred.lib.IdentityReducer;
31
32/**
33 * Tests various failures in setup/cleanup of job, like
34 * throwing exception, command line kill and lost tracker
35 */
36public class TestSetupAndCleanupFailure extends TestCase {
37
38  final Path inDir = new Path("./input");
39  final Path outDir = new Path("./output");
40  static Path setupSignalFile = new Path("/setup-signal");
41  static Path cleanupSignalFile = new Path("/cleanup-signal");
42  String input = "The quick brown fox\nhas many silly\nred fox sox\n";
43 
44  // Commiter with setupJob throwing exception
45  static class CommitterWithFailSetup extends FileOutputCommitter {
46    @Override
47    public void setupJob(JobContext context) throws IOException {
48      throw new IOException();
49    }
50  }
51
52  // Commiter with cleanupJob throwing exception
53  static class CommitterWithFailCleanup extends FileOutputCommitter {
54    @Override
55    public void cleanupJob(JobContext context) throws IOException {
56      throw new IOException();
57    }
58  }
59
60  // Committer waits for a file to be created on dfs.
61  static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter {
62   
63    private void waitForSignalFile(FileSystem fs, Path signalFile) 
64    throws IOException {
65      while (!fs.exists(signalFile)) {
66        try {
67          Thread.sleep(100);
68        } catch (InterruptedException ie) {
69         break;
70        }
71      }
72    }
73   
74    @Override
75    public void setupJob(JobContext context) throws IOException {
76      waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile);
77      super.setupJob(context);
78    }
79   
80    @Override
81    public void cleanupJob(JobContext context) throws IOException {
82      waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
83      super.cleanupJob(context);
84    }
85  }
86 
87  public RunningJob launchJob(JobConf conf) 
88  throws IOException {
89    // set up the input file system and write input text.
90    FileSystem inFs = inDir.getFileSystem(conf);
91    FileSystem outFs = outDir.getFileSystem(conf);
92    outFs.delete(outDir, true);
93    if (!inFs.mkdirs(inDir)) {
94      throw new IOException("Mkdirs failed to create " + inDir.toString());
95    }
96    {
97      // write input into input file
98      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
99      file.writeBytes(input);
100      file.close();
101    }
102
103    // configure the mapred Job
104    conf.setMapperClass(IdentityMapper.class);       
105    conf.setReducerClass(IdentityReducer.class);
106    FileInputFormat.setInputPaths(conf, inDir);
107    FileOutputFormat.setOutputPath(conf, outDir);
108    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
109                                    "/tmp")).toString().replace(' ', '+');
110    conf.set("test.build.data", TEST_ROOT_DIR);
111
112    // return the RunningJob handle.
113    return new JobClient(conf).submitJob(conf);
114  }
115
116  // Among these tips only one of the tasks will be running,
117  // get the taskid for that task
118  private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) {
119    TaskAttemptID taskid = null;
120    while (taskid == null) {
121      for (TaskInProgress tip :tips) {
122        TaskStatus[] statuses = tip.getTaskStatuses();
123        for (TaskStatus status : statuses) {
124          if (status.getRunState() == TaskStatus.State.RUNNING) {
125            taskid = status.getTaskID();
126            break;
127          }
128        }
129        if (taskid != null) break;
130      }
131      try {
132        Thread.sleep(10);
133      } catch (InterruptedException ie) {}
134    }
135    return taskid;
136  }
137 
138  // Tests the failures in setup/cleanup job. Job should cleanly fail.
139  private void testFailCommitter(Class<? extends OutputCommitter> theClass,
140                                 JobConf jobConf) 
141  throws IOException {
142    jobConf.setOutputCommitter(theClass);
143    RunningJob job = launchJob(jobConf);
144    // wait for the job to finish.
145    job.waitForCompletion();
146    assertEquals(JobStatus.FAILED, job.getJobState());
147  }
148 
149  // launch job with CommitterWithLongSetupAndCleanup as committer
150  // and wait till the job is inited.
151  private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) 
152  throws IOException {
153    // launch job with waiting setup/cleanup
154    JobConf jobConf = mr.createJobConf();
155    jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
156    RunningJob job = launchJob(jobConf);
157    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
158    JobInProgress jip = jt.getJob(job.getID());
159    while (!jip.inited()) {
160      try {
161        Thread.sleep(10);
162      } catch (InterruptedException ie) {}
163    }
164    return job;
165  }
166 
167  /**
168   * Tests setup and cleanup attempts getting killed from command-line
169   * and lost tracker
170   *
171   * @param mr
172   * @param dfs
173   * @param commandLineKill if true, test with command-line kill
174   *                        else, test with lost tracker
175   * @throws IOException
176   */
177  private void testSetupAndCleanupKill(MiniMRCluster mr, 
178                                       MiniDFSCluster dfs, 
179                                       boolean commandLineKill) 
180  throws IOException {
181    // launch job with waiting setup/cleanup
182    RunningJob job = launchJobWithWaitingSetupAndCleanup(mr);
183   
184    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
185    JobInProgress jip = jt.getJob(job.getID());
186    // get the running setup task id
187    TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks());
188    if (commandLineKill) {
189      killTaskFromCommandLine(job, setupID, jt);
190    } else {
191      killTaskWithLostTracker(mr, setupID);
192    }
193    // signal the setup to complete
194    UtilsForTests.writeFile(dfs.getNameNode(), 
195                            dfs.getFileSystem().getConf(), 
196                            setupSignalFile, (short)3);
197    // wait for maps and reduces to complete
198    while (job.reduceProgress() != 1.0f) {
199      try {
200        Thread.sleep(100);
201      } catch (InterruptedException ie) {}
202    }
203    // get the running cleanup task id
204    TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks());
205    if (commandLineKill) {
206      killTaskFromCommandLine(job, cleanupID, jt);
207    } else {
208      killTaskWithLostTracker(mr, cleanupID);
209    }
210    // signal the cleanup to complete
211    UtilsForTests.writeFile(dfs.getNameNode(), 
212                            dfs.getFileSystem().getConf(), 
213                            cleanupSignalFile, (short)3);
214    // wait for the job to finish.
215    job.waitForCompletion();
216    assertEquals(JobStatus.SUCCEEDED, job.getJobState());
217    assertEquals(TaskStatus.State.KILLED, 
218                 jt.getTaskStatus(setupID).getRunState());
219    assertEquals(TaskStatus.State.KILLED, 
220                 jt.getTaskStatus(cleanupID).getRunState());
221  }
222 
223  // kill the task from command-line
224  // wait till it kill is reported back
225  private void killTaskFromCommandLine(RunningJob job, 
226                                       TaskAttemptID taskid,
227                                       JobTracker jt) 
228  throws IOException {
229    job.killTask(taskid, false);
230    // wait till the kill happens
231    while (jt.getTaskStatus(taskid).getRunState() != 
232           TaskStatus.State.KILLED) {
233      try {
234        Thread.sleep(10);
235      } catch (InterruptedException ie) {}
236    }
237
238  }
239  // kill the task by losing the tracker
240  private void killTaskWithLostTracker(MiniMRCluster mr, 
241                                       TaskAttemptID taskid) {
242    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
243    String trackerName = jt.getTaskStatus(taskid).getTaskTracker();
244    int trackerID = mr.getTaskTrackerID(trackerName);
245    assertTrue(trackerID != -1);
246    mr.stopTaskTracker(trackerID);
247  }
248 
249  // Tests the failures in setup/cleanup job. Job should cleanly fail.
250  // Also Tests the command-line kill for setup/cleanup attempts.
251  // tests the setup/cleanup attempts getting killed if
252  // they were running on a lost tracker
253  public void testWithDFS() throws IOException {
254    MiniDFSCluster dfs = null;
255    MiniMRCluster mr = null;
256    FileSystem fileSys = null;
257    try {
258      final int taskTrackers = 4;
259      Configuration conf = new Configuration();
260      dfs = new MiniDFSCluster(conf, 4, true, null);
261      fileSys = dfs.getFileSystem();
262      JobConf jtConf = new JobConf();
263      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
264      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
265      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
266      jtConf.setInt("mapred.reduce.copy.backoff", 4);
267      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
268                             null, null, jtConf);
269      // test setup/cleanup throwing exceptions
270      testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
271      testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
272      // test the command-line kill for setup/cleanup attempts.
273      testSetupAndCleanupKill(mr, dfs, true);
274      // remove setup/cleanup signal files.
275      fileSys.delete(setupSignalFile , true);
276      fileSys.delete(cleanupSignalFile , true);
277      // test the setup/cleanup attempts getting killed if
278      // they were running on a lost tracker
279      testSetupAndCleanupKill(mr, dfs, false);
280    } finally {
281      if (dfs != null) { dfs.shutdown(); }
282      if (mr != null) { mr.shutdown();
283      }
284    }
285  }
286
287  public static void main(String[] argv) throws Exception {
288    TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure();
289    td.testWithDFS();
290  }
291}
Note: See TracBrowser for help on using the repository browser.