source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestTaskFail.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.DataOutputStream;
21import java.io.IOException;
22
23import junit.framework.TestCase;
24
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.hdfs.MiniDFSCluster;
29import org.apache.hadoop.io.IntWritable;
30import org.apache.hadoop.io.LongWritable;
31import org.apache.hadoop.io.Text;
32import org.apache.hadoop.mapred.lib.IdentityReducer;
33
34public class TestTaskFail extends TestCase {
35  private static String taskLog = "Task attempt log";
36  private static String cleanupLog = "cleanup attempt log";
37
38  public static class MapperClass extends MapReduceBase
39  implements Mapper<LongWritable, Text, Text, IntWritable> {
40    String taskid;
41    public void configure(JobConf job) {
42      taskid = job.get("mapred.task.id");
43    }
44    public void map (LongWritable key, Text value, 
45                     OutputCollector<Text, IntWritable> output, 
46                     Reporter reporter) throws IOException {
47      System.err.println(taskLog);
48      if (taskid.endsWith("_0")) {
49        throw new IOException();
50      } else if (taskid.endsWith("_1")) {
51        System.exit(-1);
52      } else if (taskid.endsWith("_2")) {
53        throw new Error();
54      }
55    }
56  }
57
58  static class CommitterWithLogs extends FileOutputCommitter {
59    public void abortTask(TaskAttemptContext context) throws IOException {
60      System.err.println(cleanupLog);
61      super.abortTask(context);
62    }
63  }
64
65  static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
66    public void abortTask(TaskAttemptContext context) throws IOException {
67      System.err.println(cleanupLog);
68      System.exit(-1);
69    }
70  }
71
72  static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
73    public void abortTask(TaskAttemptContext context) throws IOException {
74      System.err.println(cleanupLog);
75      throw new IOException();
76    }
77  }
78
79  public RunningJob launchJob(JobConf conf,
80                              Path inDir,
81                              Path outDir,
82                              String input) 
83  throws IOException {
84    // set up the input file system and write input text.
85    FileSystem inFs = inDir.getFileSystem(conf);
86    FileSystem outFs = outDir.getFileSystem(conf);
87    outFs.delete(outDir, true);
88    if (!inFs.mkdirs(inDir)) {
89      throw new IOException("Mkdirs failed to create " + inDir.toString());
90    }
91    {
92      // write input into input file
93      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
94      file.writeBytes(input);
95      file.close();
96    }
97
98    // configure the mapred Job
99    conf.setMapperClass(MapperClass.class);       
100    conf.setReducerClass(IdentityReducer.class);
101    conf.setNumReduceTasks(0);
102    FileInputFormat.setInputPaths(conf, inDir);
103    FileOutputFormat.setOutputPath(conf, outDir);
104    conf.setSpeculativeExecution(false);
105    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
106                                    "/tmp")).toString().replace(' ', '+');
107    conf.set("test.build.data", TEST_ROOT_DIR);
108    // return the RunningJob handle.
109    return new JobClient(conf).submitJob(conf);
110  }
111 
112  private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
113                  TaskStatus ts, boolean isCleanup) 
114  throws IOException {
115    assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
116    assertTrue(ts != null);
117    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
118    // validate tasklogs for task attempt
119    String log = TestMiniMRMapRedDebugScript.readTaskLog(
120    TaskLog.LogName.STDERR, attemptId, false);
121    assertTrue(log.contains(taskLog));
122    if (!isCleanup) {
123      // validate task logs: tasklog should contain both task logs
124      // and cleanup logs
125      assertTrue(log.contains(cleanupLog));
126    } else {
127      // validate tasklogs for cleanup attempt
128      log = TestMiniMRMapRedDebugScript.readTaskLog(
129      TaskLog.LogName.STDERR, attemptId, true);
130      assertTrue(log.contains(cleanupLog));
131    }
132  }
133
134  private void validateJob(RunningJob job, MiniMRCluster mr) 
135  throws IOException {
136    assertEquals(JobStatus.SUCCEEDED, job.getJobState());
137           
138    JobID jobId = job.getID();
139    // construct the task id of first map task
140    // this should not be cleanup attempt since the first attempt
141    // fails with an exception
142    TaskAttemptID attemptId = 
143      new TaskAttemptID(new TaskID(jobId, true, 0), 0);
144    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
145                            getTip(attemptId.getTaskID());
146    TaskStatus ts = 
147      mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
148    validateAttempt(tip, attemptId, ts, false);
149   
150    attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
151    // this should be cleanup attempt since the second attempt fails
152    // with System.exit
153    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
154    validateAttempt(tip, attemptId, ts, true);
155   
156    attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 2);
157    // this should be cleanup attempt since the third attempt fails
158    // with Error
159    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
160    validateAttempt(tip, attemptId, ts, true);
161  }
162 
163  public void testWithDFS() throws IOException {
164    MiniDFSCluster dfs = null;
165    MiniMRCluster mr = null;
166    FileSystem fileSys = null;
167    try {
168      final int taskTrackers = 4;
169
170      Configuration conf = new Configuration();
171      dfs = new MiniDFSCluster(conf, 4, true, null);
172      fileSys = dfs.getFileSystem();
173      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
174      final Path inDir = new Path("./input");
175      final Path outDir = new Path("./output");
176      String input = "The quick brown fox\nhas many silly\nred fox sox\n";
177      // launch job with fail tasks
178      JobConf jobConf = mr.createJobConf();
179      jobConf.setOutputCommitter(CommitterWithLogs.class);
180      RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
181      rJob.waitForCompletion();
182      validateJob(rJob, mr);
183      // launch job with fail tasks and fail-cleanups
184      fileSys.delete(outDir, true);
185      jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
186      rJob = launchJob(jobConf, inDir, outDir, input);
187      rJob.waitForCompletion();
188      validateJob(rJob, mr);
189      fileSys.delete(outDir, true);
190      jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
191      rJob = launchJob(jobConf, inDir, outDir, input);
192      rJob.waitForCompletion();
193      validateJob(rJob, mr);
194    } finally {
195      if (dfs != null) { dfs.shutdown(); }
196      if (mr != null) { mr.shutdown(); }
197    }
198  }
199
200  public static void main(String[] argv) throws Exception {
201    TestTaskFail td = new TestTaskFail();
202    td.testWithDFS();
203  }
204}
Note: See TracBrowser for help on using the repository browser.