source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/ReliabilityTest.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 17.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.File;
22import java.io.FileOutputStream;
23import java.io.IOException;
24import java.util.ArrayList;
25import java.util.Collection;
26import java.util.Collections;
27import java.util.Date;
28import java.util.HashMap;
29import java.util.Map;
30import java.util.StringTokenizer;
31
32import org.apache.commons.logging.Log;
33import org.apache.commons.logging.LogFactory;
34import org.apache.hadoop.conf.Configuration;
35import org.apache.hadoop.conf.Configured;
36import org.apache.hadoop.fs.FileSystem;
37import org.apache.hadoop.fs.Path;
38import org.apache.hadoop.util.GenericOptionsParser;
39import org.apache.hadoop.util.Shell;
40import org.apache.hadoop.util.StringUtils;
41import org.apache.hadoop.util.Tool;
42import org.apache.hadoop.util.ToolRunner;
43
44/**
45 * This class tests reliability of the framework in the face of failures of
46 * both tasks and tasktrackers. Steps:
47 * 1) Get the cluster status
48 * 2) Get the number of slots in the cluster
49 * 3) Spawn a sleepjob that occupies the entire cluster (with two waves of maps)
50 * 4) Get the list of running attempts for the job
51 * 5) Fail a few of them
52 * 6) Now fail a few trackers (ssh)
53 * 7) Job should run to completion
54 * 8) The above is repeated for the Sort suite of job (randomwriter, sort,
55 *    validator). All jobs must complete, and finally, the sort validation
56 *    should succeed.
57 * To run the test:
58 * ./bin/hadoop --config <config> jar
59 *   build/hadoop-<version>-test.jar MRReliabilityTest -libjars
60 *   build/hadoop-<version>-examples.jar [-scratchdir <dir>]"
61 *   
62 *   The scratchdir is optional and by default the current directory on the client
63 *   will be used as the scratch space. Note that password-less SSH must be set up
64 *   between the client machine from where the test is submitted, and the cluster
65 *   nodes where the test runs.
66 */
67
68public class ReliabilityTest extends Configured implements Tool {
69
70  private String dir;
71  private static final Log LOG = LogFactory.getLog(ReliabilityTest.class); 
72
73  private void displayUsage() {
74    LOG.info("This must be run in only the distributed mode " +
75                "(LocalJobRunner not supported).\n\tUsage: MRReliabilityTest " +
76                "-libjars <path to hadoop-examples.jar> [-scratchdir <dir>]" +
77                "\n[-scratchdir] points to a scratch space on this host where temp" +
78                " files for this test will be created. Defaults to current working" +
79                " dir. \nPasswordless SSH must be set up between this host and the" +
80                " nodes which the test is going to use");
81    System.exit(-1);
82  }
83 
84  public int run(String[] args) throws Exception {
85    Configuration conf = getConf();
86    if ("local".equals(conf.get("mapred.job.tracker", "local"))) {
87      displayUsage();
88    }
89    String[] otherArgs = 
90      new GenericOptionsParser(conf, args).getRemainingArgs();
91    if (otherArgs.length == 2) {
92      if (otherArgs[0].equals("-scratchdir")) {
93        dir = otherArgs[1];
94      } else {
95        displayUsage();
96      }
97    }
98    else if (otherArgs.length == 0) {
99      dir = System.getProperty("user.dir");
100    } else {
101      displayUsage();
102    }
103   
104    //to protect against the case of jobs failing even when multiple attempts
105    //fail, set some high values for the max attempts
106    conf.setInt("mapred.map.max.attempts", 10);
107    conf.setInt("mapred.reduce.max.attempts", 10);
108    runSleepJobTest(new JobClient(new JobConf(conf)), conf);
109    runSortJobTests(new JobClient(new JobConf(conf)), conf);
110    return 0;
111  }
112 
113  private void runSleepJobTest(final JobClient jc, final Configuration conf) 
114  throws Exception {
115    ClusterStatus c = jc.getClusterStatus();
116    int maxMaps = c.getMaxMapTasks() * 2;
117    int maxReduces = maxMaps;
118    int mapSleepTime = (int)c.getTTExpiryInterval();
119    int reduceSleepTime = mapSleepTime;
120    String[] sleepJobArgs = new String[] {     
121        "-m", Integer.toString(maxMaps), 
122        "-r", Integer.toString(maxReduces),
123        "-mt", Integer.toString(mapSleepTime),
124        "-rt", Integer.toString(reduceSleepTime)};
125    runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs, 
126        new KillTaskThread(jc, 2, 0.2f, false, 2),
127        new KillTrackerThread(jc, 2, 0.4f, false, 1));
128    LOG.info("SleepJob done");
129  }
130 
131  private void runSortJobTests(final JobClient jc, final Configuration conf) 
132  throws Exception {
133    String inputPath = "my_reliability_test_input";
134    String outputPath = "my_reliability_test_output";
135    FileSystem fs = jc.getFs();
136    fs.delete(new Path(inputPath), true);
137    fs.delete(new Path(outputPath), true);
138    runRandomWriterTest(jc, conf, inputPath);
139    runSortTest(jc, conf, inputPath, outputPath);
140    runSortValidatorTest(jc, conf, inputPath, outputPath);
141  }
142 
143  private void runRandomWriterTest(final JobClient jc, 
144      final Configuration conf, final String inputPath) 
145  throws Exception {
146    runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter", 
147        new String[]{inputPath}, 
148        null, new KillTrackerThread(jc, 0, 0.4f, false, 1));
149    LOG.info("RandomWriter job done");
150  }
151 
152  private void runSortTest(final JobClient jc, final Configuration conf,
153      final String inputPath, final String outputPath) 
154  throws Exception {
155    runTest(jc, conf, "org.apache.hadoop.examples.Sort", 
156        new String[]{inputPath, outputPath},
157        new KillTaskThread(jc, 2, 0.2f, false, 2),
158        new KillTrackerThread(jc, 2, 0.8f, false, 1));
159    LOG.info("Sort job done");
160  }
161 
162  private void runSortValidatorTest(final JobClient jc, 
163      final Configuration conf, final String inputPath, final String outputPath)
164  throws Exception {
165    runTest(jc, conf, "org.apache.hadoop.mapred.SortValidator", new String[] {
166        "-sortInput", inputPath, "-sortOutput", outputPath},
167        new KillTaskThread(jc, 2, 0.2f, false, 1),
168        new KillTrackerThread(jc, 2, 0.8f, false, 1)); 
169    LOG.info("SortValidator job done");   
170  }
171 
172  private String normalizeCommandPath(String command) {
173    final String hadoopHome;
174    if ((hadoopHome = System.getenv("HADOOP_HOME")) != null) {
175      command = hadoopHome + "/" + command;
176    }
177    return command;
178  }
179 
180  private void checkJobExitStatus(int status, String jobName) {
181    if (status != 0) {
182      LOG.info(jobName + " job failed with status: " + status);
183      System.exit(status);
184    } else {
185      LOG.info(jobName + " done.");
186    }
187  }
188
189  //Starts the job in a thread. It also starts the taskKill/tasktrackerKill
190  //threads.
191  private void runTest(final JobClient jc, final Configuration conf,
192      final String jobClass, final String[] args, KillTaskThread killTaskThread,
193      KillTrackerThread killTrackerThread) throws Exception {
194    int prevJobsNum = jc.getAllJobs().length;
195    Thread t = new Thread("Job Test") {
196      public void run() {
197        try {
198          Class<?> jobClassObj = conf.getClassByName(jobClass);
199          int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()), 
200              args);
201          checkJobExitStatus(status, jobClass);
202        } catch (Exception e) {
203          LOG.fatal("JOB " + jobClass + " failed to run");
204          System.exit(-1);
205        }
206      }
207    };
208    t.setDaemon(true);
209    t.start();
210    JobStatus[] jobs;
211    //get the job ID. This is the job that we just submitted
212    while ((jobs = jc.getAllJobs()).length - prevJobsNum == 0) {
213      LOG.info("Waiting for the job " + jobClass +" to start");
214      Thread.sleep(1000);
215    }
216    JobID jobId = jobs[jobs.length - 1].getJobID();
217    RunningJob rJob = jc.getJob(jobId);
218    while (rJob.getJobState() == JobStatus.PREP) {
219      LOG.info("JobID : " + jobId + " not started RUNNING yet");
220      Thread.sleep(1000);
221      rJob = jc.getJob(jobId);
222    }
223    if (killTaskThread != null) {
224      killTaskThread.setRunningJob(rJob);
225      killTaskThread.start();
226      killTaskThread.join();
227      LOG.info("DONE WITH THE TASK KILL/FAIL TESTS");
228    }
229    if (killTrackerThread != null) {
230      killTrackerThread.setRunningJob(rJob);
231      killTrackerThread.start();
232      killTrackerThread.join();
233      LOG.info("DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
234    }
235    t.join();
236  }
237 
238  private class KillTrackerThread extends Thread {
239    private volatile boolean killed = false;
240    private JobClient jc;
241    private RunningJob rJob;
242    final private int thresholdMultiplier;
243    private float threshold = 0.2f;
244    private boolean onlyMapsProgress;
245    private int numIterations;
246    final private String slavesFile = dir + "/_reliability_test_slaves_file_";
247    final String shellCommand = normalizeCommandPath("bin/slaves.sh");
248    final private String STOP_COMMAND = "ps uwwx | grep java | grep " + 
249    "org.apache.hadoop.mapred.TaskTracker"+ " |" + 
250    " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
251    final private String RESUME_COMMAND = "ps uwwx | grep java | grep " + 
252    "org.apache.hadoop.mapred.TaskTracker"+ " |" + 
253    " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";
254    //Only one instance must be active at any point
255    public KillTrackerThread(JobClient jc, int threshaldMultiplier,
256        float threshold, boolean onlyMapsProgress, int numIterations) {
257      this.jc = jc;
258      this.thresholdMultiplier = threshaldMultiplier;
259      this.threshold = threshold;
260      this.onlyMapsProgress = onlyMapsProgress;
261      this.numIterations = numIterations;
262      setDaemon(true);
263    }
264    public void setRunningJob(RunningJob rJob) {
265      this.rJob = rJob;
266    }
267    public void kill() {
268      killed = true;
269    }
270    public void run() {
271      stopStartTrackers(true);
272      if (!onlyMapsProgress) {
273        stopStartTrackers(false);
274      }
275    }
276    private void stopStartTrackers(boolean considerMaps) {
277      if (considerMaps) {
278        LOG.info("Will STOP/RESUME tasktrackers based on Maps'" +
279                " progress");
280      } else {
281        LOG.info("Will STOP/RESUME tasktrackers based on " +
282                "Reduces' progress");
283      }
284      LOG.info("Initial progress threshold: " + threshold + 
285          ". Threshold Multiplier: " + thresholdMultiplier + 
286          ". Number of iterations: " + numIterations);
287      float thresholdVal = threshold;
288      int numIterationsDone = 0;
289      while (!killed) {
290        try {
291          float progress;
292          if (jc.getJob(rJob.getID()).isComplete() ||
293              numIterationsDone == numIterations) {
294            break;
295          }
296
297          if (considerMaps) {
298            progress = jc.getJob(rJob.getID()).mapProgress();
299          } else {
300            progress = jc.getJob(rJob.getID()).reduceProgress();
301          }
302          if (progress >= thresholdVal) {
303            numIterationsDone++;
304            ClusterStatus c;
305            stopTaskTrackers((c = jc.getClusterStatus(true)));
306            Thread.sleep((int)Math.ceil(1.5 * c.getTTExpiryInterval()));
307            startTaskTrackers();
308            thresholdVal = thresholdVal * thresholdMultiplier;
309          }
310          Thread.sleep(5000);
311        } catch (InterruptedException ie) {
312          killed = true;
313          return;
314        } catch (Exception e) {
315          LOG.fatal(StringUtils.stringifyException(e));
316        }
317      }
318    }
319    private void stopTaskTrackers(ClusterStatus c) throws Exception {
320
321      Collection <String> trackerNames = c.getActiveTrackerNames();
322      ArrayList<String> trackerNamesList = new ArrayList<String>(trackerNames);
323      Collections.shuffle(trackerNamesList);
324
325      int count = 0;
326
327      FileOutputStream fos = new FileOutputStream(new File(slavesFile));
328      LOG.info(new Date() + " Stopping a few trackers");
329
330      for (String tracker : trackerNamesList) {
331        String host = convertTrackerNameToHostName(tracker);
332        LOG.info(new Date() + " Marking tracker on host: " + host);
333        fos.write((host + "\n").getBytes());
334        if (count++ >= trackerNamesList.size()/2) {
335          break;
336        }
337      }
338      fos.close();
339
340      runOperationOnTT("suspend");
341    }
342
343    private void startTaskTrackers() throws Exception {
344      LOG.info(new Date() + " Resuming the stopped trackers");
345      runOperationOnTT("resume");
346      new File(slavesFile).delete();
347    }
348   
349    private void runOperationOnTT(String operation) throws IOException {
350      Map<String,String> hMap = new HashMap<String,String>();
351      hMap.put("HADOOP_SLAVES", slavesFile);
352      StringTokenizer strToken;
353      if (operation.equals("suspend")) {
354        strToken = new StringTokenizer(STOP_COMMAND, " ");
355      } else {
356        strToken = new StringTokenizer(RESUME_COMMAND, " ");
357      }
358      String commandArgs[] = new String[strToken.countTokens() + 1];
359      int i = 0;
360      commandArgs[i++] = shellCommand;
361      while (strToken.hasMoreTokens()) {
362        commandArgs[i++] = strToken.nextToken();
363      }
364      String output = Shell.execCommand(hMap, commandArgs);
365      if (output != null && !output.equals("")) {
366        LOG.info(output);
367      }
368    }
369
370    private String convertTrackerNameToHostName(String trackerName) {
371      // Convert the trackerName to it's host name
372      int indexOfColon = trackerName.indexOf(":");
373      String trackerHostName = (indexOfColon == -1) ? 
374          trackerName : 
375            trackerName.substring(0, indexOfColon);
376      return trackerHostName.substring("tracker_".length());
377    }
378
379  }
380 
381  private class KillTaskThread extends Thread {
382
383    private volatile boolean killed = false;
384    private RunningJob rJob;
385    private JobClient jc;
386    final private int thresholdMultiplier;
387    private float threshold = 0.2f;
388    private boolean onlyMapsProgress;
389    private int numIterations;
390    public KillTaskThread(JobClient jc, int thresholdMultiplier, 
391        float threshold, boolean onlyMapsProgress, int numIterations) {
392      this.jc = jc;
393      this.thresholdMultiplier = thresholdMultiplier;
394      this.threshold = threshold;
395      this.onlyMapsProgress = onlyMapsProgress;
396      this.numIterations = numIterations;
397      setDaemon(true);
398    }
399    public void setRunningJob(RunningJob rJob) {
400      this.rJob = rJob;
401    }
402    public void kill() {
403      killed = true;
404    }
405    public void run() {
406      killBasedOnProgress(true);
407      if (!onlyMapsProgress) {
408        killBasedOnProgress(false);
409      }
410    }
411    private void killBasedOnProgress(boolean considerMaps) {
412      boolean fail = false;
413      if (considerMaps) {
414        LOG.info("Will kill tasks based on Maps' progress");
415      } else {
416        LOG.info("Will kill tasks based on Reduces' progress");
417      }
418      LOG.info("Initial progress threshold: " + threshold + 
419          ". Threshold Multiplier: " + thresholdMultiplier + 
420          ". Number of iterations: " + numIterations);
421      float thresholdVal = threshold;
422      int numIterationsDone = 0;
423      while (!killed) {
424        try {
425          float progress;
426          if (jc.getJob(rJob.getID()).isComplete() || 
427              numIterationsDone == numIterations) {
428            break;
429          }
430          if (considerMaps) {
431            progress = jc.getJob(rJob.getID()).mapProgress();
432          } else {
433            progress = jc.getJob(rJob.getID()).reduceProgress();
434          }
435          if (progress >= thresholdVal) {
436            numIterationsDone++;
437            if (numIterationsDone > 0 && numIterationsDone % 2 == 0) {
438              fail = true; //fail tasks instead of kill
439            }
440            ClusterStatus c = jc.getClusterStatus();
441
442            LOG.info(new Date() + " Killing a few tasks");
443
444            Collection<TaskAttemptID> runningTasks =
445              new ArrayList<TaskAttemptID>();
446            TaskReport mapReports[] = jc.getMapTaskReports(rJob.getID());
447            for (TaskReport mapReport : mapReports) {
448              if (mapReport.getCurrentStatus() == TIPStatus.RUNNING) {
449                runningTasks.addAll(mapReport.getRunningTaskAttempts());
450              }
451            }
452            if (runningTasks.size() > c.getTaskTrackers()/2) {
453              int count = 0;
454              for (TaskAttemptID t : runningTasks) {
455                LOG.info(new Date() + " Killed task : " + t);
456                rJob.killTask(t, fail);
457                if (count++ > runningTasks.size()/2) { //kill 50%
458                  break;
459                }
460              }
461            }
462            runningTasks.clear();
463            TaskReport reduceReports[] = jc.getReduceTaskReports(rJob.getID());
464            for (TaskReport reduceReport : reduceReports) {
465              if (reduceReport.getCurrentStatus() == TIPStatus.RUNNING) {
466                runningTasks.addAll(reduceReport.getRunningTaskAttempts());
467              }
468            }
469            if (runningTasks.size() > c.getTaskTrackers()/2) {
470              int count = 0;
471              for (TaskAttemptID t : runningTasks) {
472                LOG.info(new Date() + " Killed task : " + t);
473                rJob.killTask(t, fail);
474                if (count++ > runningTasks.size()/2) { //kill 50%
475                  break;
476                }
477              }
478            }
479            thresholdVal = thresholdVal * thresholdMultiplier;
480          }
481          Thread.sleep(5000);
482        } catch (InterruptedException ie) {
483          killed = true;
484        } catch (Exception e) {
485          LOG.fatal(StringUtils.stringifyException(e));
486        }
487      }
488    }
489  }
490 
491  public static void main(String args[]) throws Exception {
492    int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
493    System.exit(res);
494  }
495}
Note: See TracBrowser for help on using the repository browser.