source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.DataOutputStream;
21import java.io.IOException;
22import java.io.InputStream;
23import java.net.URI;
24
25import junit.framework.TestCase;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.hdfs.MiniDFSCluster;
31import org.apache.hadoop.filecache.DistributedCache;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.io.IntWritable;
35import org.apache.hadoop.io.LongWritable;
36import org.apache.hadoop.io.Text;
37import org.apache.hadoop.mapred.lib.IdentityReducer;
38
39/**
40 * Class to test mapred debug Script
41 */
42public class TestMiniMRMapRedDebugScript extends TestCase {
43  private static final Log LOG =
44    LogFactory.getLog(TestMiniMRMapRedDebugScript.class.getName());
45
46  private MiniMRCluster mr;
47  private MiniDFSCluster dfs;
48  private FileSystem fileSys;
49 
50  /**
51   * Fail map class
52   */
53  public static class MapClass extends MapReduceBase
54  implements Mapper<LongWritable, Text, Text, IntWritable> {
55     public void map (LongWritable key, Text value, 
56                     OutputCollector<Text, IntWritable> output, 
57                     Reporter reporter) throws IOException {
58       System.err.println("Bailing out");
59       throw new IOException();
60     }
61  }
62
63  /**
64   * Reads tasklog and returns it as string after trimming it.
65   * @param filter Task log filter; can be STDOUT, STDERR,
66   *                SYSLOG, DEBUGOUT, DEBUGERR
67   * @param taskId The task id for which the log has to collected
68   * @param isCleanup whether the task is a cleanup attempt or not.
69   * @return task log as string
70   * @throws IOException
71   */
72  public static String readTaskLog(TaskLog.LogName  filter, 
73                                   TaskAttemptID taskId, 
74                                   boolean isCleanup)
75  throws IOException {
76    // string buffer to store task log
77    StringBuffer result = new StringBuffer();
78    int res;
79
80    // reads the whole tasklog into inputstream
81    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
82    // construct string log from inputstream.
83    byte[] b = new byte[65536];
84    while (true) {
85      res = taskLogReader.read(b);
86      if (res > 0) {
87        result.append(new String(b));
88      } else {
89        break;
90      }
91    }
92    taskLogReader.close();
93   
94    // trim the string and return it
95    String str = result.toString();
96    str = str.trim();
97    return str;
98  }
99
100  /**
101   * Launches failed map task and debugs the failed task
102   * @param conf configuration for the mapred job
103   * @param inDir input path
104   * @param outDir output path
105   * @param debugDir debug directory where script is present
106   * @param debugCommand The command to execute script
107   * @param input Input text
108   * @return the output of debug script
109   * @throws IOException
110   */
111  public String launchFailMapAndDebug(JobConf conf,
112                                      Path inDir,
113                                      Path outDir,
114                                      Path debugDir,
115                                      String debugScript,
116                                      String input)
117  throws IOException {
118
119    // set up the input file system and write input text.
120    FileSystem inFs = inDir.getFileSystem(conf);
121    FileSystem outFs = outDir.getFileSystem(conf);
122    outFs.delete(outDir, true);
123    if (!inFs.mkdirs(inDir)) {
124      throw new IOException("Mkdirs failed to create " + inDir.toString());
125    }
126    {
127      // write input into input file
128      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
129      file.writeBytes(input);
130      file.close();
131    }
132
133    // configure the mapred Job for failing map task.
134    conf.setJobName("failmap");
135    conf.setMapperClass(MapClass.class);       
136    conf.setReducerClass(IdentityReducer.class);
137    conf.setNumMapTasks(1);
138    conf.setNumReduceTasks(0);
139    conf.setMapDebugScript(debugScript);
140    FileInputFormat.setInputPaths(conf, inDir);
141    FileOutputFormat.setOutputPath(conf, outDir);
142    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
143                                      "/tmp")).toString().replace(' ', '+');
144    conf.set("test.build.data", TEST_ROOT_DIR);
145
146    // copy debug script to cache from local file system.
147    FileSystem debugFs = debugDir.getFileSystem(conf);
148    Path scriptPath = new Path(debugDir,"testscript.txt");
149    Path cachePath = new Path("/cacheDir");
150    if (!debugFs.mkdirs(cachePath)) {
151      throw new IOException("Mkdirs failed to create " + cachePath.toString());
152    }
153    debugFs.copyFromLocalFile(scriptPath,cachePath);
154   
155    URI uri = debugFs.getUri().resolve(cachePath+"/testscript.txt#testscript");
156    DistributedCache.createSymlink(conf);
157    DistributedCache.addCacheFile(uri, conf);
158
159    RunningJob job =null;
160    // run the job. It will fail with IOException.
161    try {
162      job = new JobClient(conf).submitJob(conf);
163    } catch (IOException e) {
164        LOG.info("Running Job failed", e);
165    }
166
167    JobID jobId = job.getID();
168    // construct the task id of first map task of failmap
169    TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
170    // wait for the job to finish.
171    while (!job.isComplete()) ;
172   
173    // return the output of debugout log.
174    return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId, false);
175  }
176
177  /**
178   * Tests Map task's debug script
179   *
180   * In this test, we launch a mapreduce program which
181   * writes 'Bailing out' to stderr and throws an exception.
182   * We will run the script when tsk fails and validate
183   * the output of debug out log.
184   *
185   */
186  public void testMapDebugScript() throws Exception {
187    try {
188     
189      // create configuration, dfs, file system and mapred cluster
190      Configuration cnf = new Configuration();
191      dfs = new MiniDFSCluster(cnf, 1, true, null);
192      fileSys = dfs.getFileSystem();
193      mr = new MiniMRCluster(2, fileSys.getName(), 1);
194      JobConf conf = mr.createJobConf();
195     
196      // intialize input, output and debug directories
197      final Path debugDir = new Path("build/test/debug");
198      Path inDir = new Path("testing/wc/input");
199      Path outDir = new Path("testing/wc/output");
200     
201      // initialize debug command and input text
202      String debugScript = "./testscript";
203      String input = "The input";
204     
205      // Launch failed map task and run debug script
206      String result = launchFailMapAndDebug(conf,inDir, 
207                               outDir,debugDir, debugScript, input);
208     
209      // Assert the output of debug script.
210      assertTrue(result.contains("Test Script\nBailing out"));
211
212    } finally { 
213      // close file system and shut down dfs and mapred cluster
214      try {
215        if (fileSys != null) {
216          fileSys.close();
217        }
218        if (dfs != null) {
219          dfs.shutdown();
220        }
221        if (mr != null) {
222          mr.shutdown();
223        }
224      } catch (IOException ioe) {
225        LOG.info("IO exception in closing file system:"+ioe.getMessage(), ioe);
226      }
227    }
228  }
229
230  public static void main(String args[]){
231    TestMiniMRMapRedDebugScript tmds = new TestMiniMRMapRedDebugScript();
232    try {
233      tmds.testMapDebugScript();
234    } catch (Exception e) {
235      LOG.error("Exception in test: "+e.getMessage(), e);
236    }
237  }
238 
239}
240
Note: See TracBrowser for help on using the repository browser.