source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/ControlledMapReduceJob.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 18.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.IOException;
24import java.util.ArrayList;
25import java.util.Iterator;
26import java.util.Random;
27
28import org.apache.commons.logging.Log;
29import org.apache.commons.logging.LogFactory;
30import org.apache.hadoop.conf.Configuration;
31import org.apache.hadoop.conf.Configured;
32import org.apache.hadoop.fs.FileStatus;
33import org.apache.hadoop.fs.FileSystem;
34import org.apache.hadoop.fs.Path;
35import org.apache.hadoop.fs.PathFilter;
36import org.apache.hadoop.io.BytesWritable;
37import org.apache.hadoop.io.IntWritable;
38import org.apache.hadoop.io.NullWritable;
39import org.apache.hadoop.io.SequenceFile;
40import org.apache.hadoop.io.SequenceFile.CompressionType;
41import org.apache.hadoop.mapred.lib.NullOutputFormat;
42import org.apache.hadoop.util.StringUtils;
43import org.apache.hadoop.util.Tool;
44import org.apache.hadoop.util.ToolRunner;
45
46/**
47 * A Controlled Map/Reduce Job. The tasks are controlled by the presence of
48 * particularly named files in the directory signalFileDir on the file-system
49 * that the job is configured to work with. Tasks get scheduled by the
50 * scheduler, occupy the slots on the TaskTrackers and keep running till the
51 * user gives a signal via files whose names are of the form MAPS_[0-9]* and
52 * REDUCES_[0-9]*. For e.g., whenever the map tasks see that a file name MAPS_5
53 * is created in the singalFileDir, all the maps whose TaskAttemptIDs are below
54 * 4 get finished. At any time, there should be only one MAPS_[0-9]* file and
55 * only one REDUCES_[0-9]* file in the singnalFileDir. In the beginning MAPS_0
56 * and REDUCE_0 files are present, and further signals are given by renaming
57 * these files.
58 *
59 */
60class ControlledMapReduceJob extends Configured implements Tool,
61    Mapper<NullWritable, NullWritable, IntWritable, NullWritable>,
62    Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
63    Partitioner<IntWritable, NullWritable>,
64    InputFormat<NullWritable, NullWritable> {
65
66  static final Log LOG = LogFactory.getLog(ControlledMapReduceJob.class);
67
68  private FileSystem fs = null;
69  private int taskNumber;
70
71  private static ArrayList<Path> signalFileDirCache = new ArrayList<Path>();
72
73  private Path signalFileDir;
74  {
75    Random random = new Random();
76    signalFileDir = new Path("signalFileDir-" + random.nextLong());
77    while (signalFileDirCache.contains(signalFileDir)) {
78      signalFileDir = new Path("signalFileDir-" + random.nextLong());
79    }
80    signalFileDirCache.add(signalFileDir);
81  }
82
83  private long mapsFinished = 0;
84  private long reducesFinished = 0;
85
86  private RunningJob rJob = null;
87
88  private int numMappers;
89  private int numReducers;
90
91  private final String MAP_SIGFILE_PREFIX = "MAPS_";
92  private final String REDUCE_SIGFILE_PREFIX = "REDUCES_";
93
94  private void initialize()
95      throws IOException {
96    fs = FileSystem.get(getConf());
97    fs.mkdirs(signalFileDir);
98    writeFile(new Path(signalFileDir, MAP_SIGFILE_PREFIX + mapsFinished));
99    writeFile(new Path(signalFileDir, REDUCE_SIGFILE_PREFIX + reducesFinished));
100  }
101
102  /**
103   * Finish N number of maps/reduces.
104   *
105   * @param isMap
106   * @param noOfTasksToFinish
107   * @throws IOException
108   */
109  public void finishNTasks(boolean isMap, int noOfTasksToFinish)
110      throws IOException {
111    if (noOfTasksToFinish < 0) {
112      throw new IOException(
113          "Negative values for noOfTasksToFinish not acceptable");
114    }
115
116    if (noOfTasksToFinish == 0) {
117      return;
118    }
119
120    LOG.info("Going to finish off " + noOfTasksToFinish);
121    String PREFIX = isMap ? MAP_SIGFILE_PREFIX : REDUCE_SIGFILE_PREFIX;
122    long tasksFinished = isMap ? mapsFinished : reducesFinished;
123    Path oldSignalFile =
124        new Path(signalFileDir, PREFIX + String.valueOf(tasksFinished));
125    Path newSignalFile =
126        new Path(signalFileDir, PREFIX
127            + String.valueOf(tasksFinished + noOfTasksToFinish));
128    fs.rename(oldSignalFile, newSignalFile);
129    if (isMap) {
130      mapsFinished += noOfTasksToFinish;
131    } else {
132      reducesFinished += noOfTasksToFinish;
133    }
134    LOG.info("Successfully sent signal to finish off " + noOfTasksToFinish);
135  }
136
137  /**
138   * Finished all tasks of type determined by isMap
139   *
140   * @param isMap
141   * @throws IOException
142   */
143  public void finishAllTasks(boolean isMap)
144      throws IOException {
145    finishNTasks(isMap, (isMap ? numMappers : numReducers));
146  }
147
148  /**
149   * Finish the job
150   *
151   * @throws IOException
152   */
153  public void finishJob()
154      throws IOException {
155    finishAllTasks(true);
156    finishAllTasks(false);
157  }
158
159  /**
160   * Wait till noOfTasksToBeRunning number of tasks of type specified by isMap
161   * started running. This currently uses a jip object and directly uses its api
162   * to determine the number of tasks running.
163   *
164   * <p>
165   *
166   * TODO: It should eventually use a JobID and then get the information from
167   * the JT to check the number of running tasks.
168   *
169   * @param jip
170   * @param isMap
171   * @param noOfTasksToBeRunning
172   */
173  static void waitTillNTasksStartRunning(JobInProgress jip, boolean isMap,
174      int noOfTasksToBeRunning)
175      throws InterruptedException {
176    int numTasks = 0;
177    while (numTasks != noOfTasksToBeRunning) {
178      Thread.sleep(1000);
179      numTasks = isMap ? jip.runningMaps() : jip.runningReduces();
180      LOG.info("Waiting till " + noOfTasksToBeRunning
181          + (isMap ? " map" : " reduce") + " tasks of the job "
182          + jip.getJobID() + " start running. " + numTasks
183          + " tasks already started running.");
184    }
185  }
186
187  /**
188   * Make sure that the number of tasks of type specified by isMap running in
189   * the given job is the same as noOfTasksToBeRunning
190   *
191   * <p>
192   *
193   * TODO: It should eventually use a JobID and then get the information from
194   * the JT to check the number of running tasks.
195   *
196   * @param jip
197   * @param isMap
198   * @param noOfTasksToBeRunning
199   */
200  static void assertNumTasksRunning(JobInProgress jip, boolean isMap,
201      int noOfTasksToBeRunning)
202      throws Exception {
203    if ((isMap ? jip.runningMaps() : jip.runningReduces()) != noOfTasksToBeRunning) {
204      throw new Exception("Number of tasks running is not "
205          + noOfTasksToBeRunning);
206    }
207  }
208
209  /**
210   * Wait till noOfTasksToFinish number of tasks of type specified by isMap
211   * are finished. This currently uses a jip object and directly uses its api to
212   * determine the number of tasks finished.
213   *
214   * <p>
215   *
216   * TODO: It should eventually use a JobID and then get the information from
217   * the JT to check the number of finished tasks.
218   *
219   * @param jip
220   * @param isMap
221   * @param noOfTasksToFinish
222   * @throws InterruptedException
223   */
224  static void waitTillNTotalTasksFinish(JobInProgress jip, boolean isMap,
225      int noOfTasksToFinish)
226      throws InterruptedException {
227    int noOfTasksAlreadyFinished = 0;
228    while (noOfTasksAlreadyFinished < noOfTasksToFinish) {
229      Thread.sleep(1000);
230      noOfTasksAlreadyFinished =
231          (isMap ? jip.finishedMaps() : jip.finishedReduces());
232      LOG.info("Waiting till " + noOfTasksToFinish
233          + (isMap ? " map" : " reduce") + " tasks of the job "
234          + jip.getJobID() + " finish. " + noOfTasksAlreadyFinished
235          + " tasks already got finished.");
236    }
237  }
238
239  /**
240   * Have all the tasks of type specified by isMap finished in this job?
241   *
242   * @param jip
243   * @param isMap
244   * @return true if finished, false otherwise
245   */
246  static boolean haveAllTasksFinished(JobInProgress jip, boolean isMap) {
247    return ((isMap ? jip.runningMaps() : jip.runningReduces()) == 0);
248  }
249
250  private void writeFile(Path name)
251      throws IOException {
252    Configuration conf = new Configuration(false);
253    SequenceFile.Writer writer =
254        SequenceFile.createWriter(fs, conf, name, BytesWritable.class,
255            BytesWritable.class, CompressionType.NONE);
256    writer.append(new BytesWritable(), new BytesWritable());
257    writer.close();
258  }
259
260  @Override
261  public void configure(JobConf conf) {
262    try {
263      signalFileDir = new Path(conf.get("signal.dir.path"));
264      numReducers = conf.getNumReduceTasks();
265      fs = FileSystem.get(conf);
266      String taskAttemptId = conf.get("mapred.task.id");
267      if (taskAttemptId != null) {
268        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptId);
269        taskNumber = taskAttemptID.getTaskID().getId();
270      }
271    } catch (IOException ioe) {
272      LOG.warn("Caught exception " + ioe);
273    }
274  }
275
276  private FileStatus[] listSignalFiles(FileSystem fileSys, final boolean isMap)
277      throws IOException {
278    return fileSys.globStatus(new Path(signalFileDir.toString() + "/*"),
279        new PathFilter() {
280          @Override
281          public boolean accept(Path path) {
282            if (isMap && path.getName().startsWith(MAP_SIGFILE_PREFIX)) {
283              LOG.debug("Found signal file : " + path.getName());
284              return true;
285            } else if (!isMap
286                && path.getName().startsWith(REDUCE_SIGFILE_PREFIX)) {
287              LOG.debug("Found signal file : " + path.getName());
288              return true;
289            }
290            LOG.info("Didn't find any relevant signal files.");
291            return false;
292          }
293        });
294  }
295
296  @Override
297  public void map(NullWritable key, NullWritable value,
298      OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
299      throws IOException {
300    LOG.info(taskNumber + " has started.");
301    FileStatus[] files = listSignalFiles(fs, true);
302    String[] sigFileComps = files[0].getPath().getName().split("_");
303    String signalType = sigFileComps[0];
304    int noOfTasks = Integer.parseInt(sigFileComps[1]);
305
306    while (!signalType.equals("MAPS") || taskNumber + 1 > noOfTasks) {
307      LOG.info("Signal type found : " + signalType
308          + " .Number of tasks to be finished by this signal : " + noOfTasks
309          + " . My id : " + taskNumber);
310      LOG.info(taskNumber + " is still alive.");
311      try {
312        reporter.progress();
313        Thread.sleep(1000);
314      } catch (InterruptedException ie) {
315        LOG.info(taskNumber + " is still alive.");
316        break;
317      }
318      files = listSignalFiles(fs, true);
319      sigFileComps = files[0].getPath().getName().split("_");
320      signalType = sigFileComps[0];
321      noOfTasks = Integer.parseInt(sigFileComps[1]);
322    }
323    LOG.info("Signal type found : " + signalType
324        + " .Number of tasks to be finished by this signal : " + noOfTasks
325        + " . My id : " + taskNumber);
326    // output numReduce number of random values, so that
327    // each reducer will get one key each.
328    for (int i = 0; i < numReducers; i++) {
329      output.collect(new IntWritable(i), NullWritable.get());
330    }
331
332    LOG.info(taskNumber + " is finished.");
333  }
334
335  @Override
336  public void reduce(IntWritable key, Iterator<NullWritable> values,
337      OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
338      throws IOException {
339    LOG.info(taskNumber + " has started.");
340    FileStatus[] files = listSignalFiles(fs, false);
341    String[] sigFileComps = files[0].getPath().getName().split("_");
342    String signalType = sigFileComps[0];
343    int noOfTasks = Integer.parseInt(sigFileComps[1]);
344
345    while (!signalType.equals("REDUCES") || taskNumber + 1 > noOfTasks) {
346      LOG.info("Signal type found : " + signalType
347          + " .Number of tasks to be finished by this signal : " + noOfTasks
348          + " . My id : " + taskNumber);
349      LOG.info(taskNumber + " is still alive.");
350      try {
351        reporter.progress();
352        Thread.sleep(1000);
353      } catch (InterruptedException ie) {
354        LOG.info(taskNumber + " is still alive.");
355        break;
356      }
357      files = listSignalFiles(fs, false);
358      sigFileComps = files[0].getPath().getName().split("_");
359      signalType = sigFileComps[0];
360      noOfTasks = Integer.parseInt(sigFileComps[1]);
361    }
362    LOG.info("Signal type found : " + signalType
363        + " .Number of tasks to be finished by this signal : " + noOfTasks
364        + " . My id : " + taskNumber);
365    LOG.info(taskNumber + " is finished.");
366  }
367
368  @Override
369  public void close()
370      throws IOException {
371    // nothing
372  }
373
374  public JobID getJobId() {
375    if (rJob == null) {
376      return null;
377    }
378    return rJob.getID();
379  }
380
381  public int run(int numMapper, int numReducer)
382      throws IOException {
383    JobConf conf =
384        getControlledMapReduceJobConf(getConf(), numMapper, numReducer);
385    JobClient client = new JobClient(conf);
386    rJob = client.submitJob(conf);
387    while (!rJob.isComplete()) {
388      try {
389        Thread.sleep(1000);
390      } catch (InterruptedException ie) {
391        break;
392      }
393    }
394    if (rJob.isSuccessful()) {
395      return 0;
396    }
397    return 1;
398  }
399
400  private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
401      int numMapper, int numReducer)
402      throws IOException {
403    setConf(clusterConf);
404    initialize();
405    JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
406    conf.setJobName("ControlledJob");
407    conf.set("signal.dir.path", signalFileDir.toString());
408    conf.setNumMapTasks(numMapper);
409    conf.setNumReduceTasks(numReducer);
410    conf.setMapperClass(ControlledMapReduceJob.class);
411    conf.setMapOutputKeyClass(IntWritable.class);
412    conf.setMapOutputValueClass(NullWritable.class);
413    conf.setReducerClass(ControlledMapReduceJob.class);
414    conf.setOutputKeyClass(NullWritable.class);
415    conf.setOutputValueClass(NullWritable.class);
416    conf.setInputFormat(ControlledMapReduceJob.class);
417    FileInputFormat.addInputPath(conf, new Path("ignored"));
418    conf.setOutputFormat(NullOutputFormat.class);
419    conf.setMapSpeculativeExecution(false);
420    conf.setReduceSpeculativeExecution(false);
421
422    // Set the following for reduce tasks to be able to be started running
423    // immediately along with maps.
424    conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));
425
426    return conf;
427  }
428
429  @Override
430  public int run(String[] args)
431      throws Exception {
432    numMappers = Integer.parseInt(args[0]);
433    numReducers = Integer.parseInt(args[1]);
434    return run(numMappers, numReducers);
435  }
436
437  @Override
438  public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
439    return k.get() % numPartitions;
440  }
441
442  @Override
443  public RecordReader<NullWritable, NullWritable> getRecordReader(
444      InputSplit split, JobConf job, Reporter reporter) {
445    LOG.debug("Inside RecordReader.getRecordReader");
446    return new RecordReader<NullWritable, NullWritable>() {
447      private int pos = 0;
448
449      public void close() {
450        // nothing
451      }
452
453      public NullWritable createKey() {
454        return NullWritable.get();
455      }
456
457      public NullWritable createValue() {
458        return NullWritable.get();
459      }
460
461      public long getPos() {
462        return pos;
463      }
464
465      public float getProgress() {
466        return pos * 100;
467      }
468
469      public boolean next(NullWritable key, NullWritable value) {
470        if (pos++ == 0) {
471          LOG.debug("Returning the next record");
472          return true;
473        }
474        LOG.debug("No more records. Returning none.");
475        return false;
476      }
477    };
478  }
479
480  @Override
481  public InputSplit[] getSplits(JobConf job, int numSplits) {
482    LOG.debug("Inside InputSplit.getSplits");
483    InputSplit[] ret = new InputSplit[numSplits];
484    for (int i = 0; i < numSplits; ++i) {
485      ret[i] = new EmptySplit();
486    }
487    return ret;
488  }
489
490  public static class EmptySplit implements InputSplit {
491    public void write(DataOutput out)
492        throws IOException {
493    }
494
495    public void readFields(DataInput in)
496        throws IOException {
497    }
498
499    public long getLength() {
500      return 0L;
501    }
502
503    public String[] getLocations() {
504      return new String[0];
505    }
506  }
507
508  static class ControlledMapReduceJobRunner extends Thread {
509    private JobConf conf;
510    private ControlledMapReduceJob job;
511    private JobID jobID;
512
513    private int numMappers;
514    private int numReducers;
515
516    public ControlledMapReduceJobRunner() {
517      this(new JobConf(), 5, 5);
518    }
519
520    public ControlledMapReduceJobRunner(JobConf cnf, int numMap, int numRed) {
521      this.conf = cnf;
522      this.numMappers = numMap;
523      this.numReducers = numRed;
524    }
525
526    public ControlledMapReduceJob getJob() {
527      while (job == null) {
528        try {
529          Thread.sleep(1000);
530        } catch (InterruptedException ie) {
531          LOG.info(ControlledMapReduceJobRunner.class.getName()
532              + " is interrupted.");
533          break;
534        }
535      }
536      return job;
537    }
538
539    public JobID getJobID()
540        throws IOException {
541      ControlledMapReduceJob job = getJob();
542      JobID id = job.getJobId();
543      while (id == null) {
544        id = job.getJobId();
545        try {
546          Thread.sleep(1000);
547        } catch (InterruptedException ie) {
548          LOG.info(ControlledMapReduceJobRunner.class.getName()
549              + " is interrupted.");
550          break;
551        }
552      }
553      return id;
554    }
555
556    @Override
557    public void run() {
558      if (job != null) {
559        LOG.warn("Job is already running.");
560        return;
561      }
562      try {
563        job = new ControlledMapReduceJob();
564        int ret =
565            ToolRunner.run(this.conf, job, new String[] {
566                String.valueOf(numMappers), String.valueOf(numReducers) });
567        LOG.info("Return value for the job : " + ret);
568      } catch (Exception e) {
569        LOG.warn("Caught exception : " + StringUtils.stringifyException(e));
570      }
571    }
572
573    static ControlledMapReduceJobRunner getControlledMapReduceJobRunner(
574        JobConf conf, int numMappers, int numReducers) {
575      return new ControlledMapReduceJobRunner(conf, numMappers, numReducers);
576    }
577  }
578}
Note: See TracBrowser for help on using the repository browser.