source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/Job.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce;
20
21import java.io.IOException;
22
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.fs.Path;
25import org.apache.hadoop.io.RawComparator;
26import org.apache.hadoop.mapreduce.TaskAttemptID;
27import org.apache.hadoop.mapred.JobClient;
28import org.apache.hadoop.mapred.JobConf;
29import org.apache.hadoop.mapred.RunningJob;
30import org.apache.hadoop.mapred.TaskCompletionEvent;
31
32/**
33 * The job submitter's view of the Job. It allows the user to configure the
34 * job, submit it, control its execution, and query the state. The set methods
35 * only work until the job is submitted, afterwards they will throw an
36 * IllegalStateException.
37 */
38public class Job extends JobContext { 
39  public static enum JobState {DEFINE, RUNNING};
40  private JobState state = JobState.DEFINE;
41  private JobClient jobClient;
42  private RunningJob info;
43
44  public Job() throws IOException {
45    this(new Configuration());
46  }
47
48  public Job(Configuration conf) throws IOException {
49    super(conf, null);
50    jobClient = new JobClient((JobConf) getConfiguration());
51  }
52
53  public Job(Configuration conf, String jobName) throws IOException {
54    this(conf);
55    setJobName(jobName);
56  }
57
58  private void ensureState(JobState state) throws IllegalStateException {
59    if (state != this.state) {
60      throw new IllegalStateException("Job in state "+ this.state + 
61                                      " instead of " + state);
62    }
63  }
64
65  /**
66   * Set the number of reduce tasks for the job.
67   * @param tasks the number of reduce tasks
68   * @throws IllegalStateException if the job is submitted
69   */
70  public void setNumReduceTasks(int tasks) throws IllegalStateException {
71    ensureState(JobState.DEFINE);
72    conf.setNumReduceTasks(tasks);
73  }
74
75  /**
76   * Set the current working directory for the default file system.
77   *
78   * @param dir the new current working directory.
79   * @throws IllegalStateException if the job is submitted
80   */
81  public void setWorkingDirectory(Path dir) throws IOException {
82    ensureState(JobState.DEFINE);
83    conf.setWorkingDirectory(dir);
84  }
85
86  /**
87   * Set the {@link InputFormat} for the job.
88   * @param cls the <code>InputFormat</code> to use
89   * @throws IllegalStateException if the job is submitted
90   */
91  public void setInputFormatClass(Class<? extends InputFormat> cls
92                                  ) throws IllegalStateException {
93    ensureState(JobState.DEFINE);
94    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
95  }
96
97  /**
98   * Set the {@link OutputFormat} for the job.
99   * @param cls the <code>OutputFormat</code> to use
100   * @throws IllegalStateException if the job is submitted
101   */
102  public void setOutputFormatClass(Class<? extends OutputFormat> cls
103                                   ) throws IllegalStateException {
104    ensureState(JobState.DEFINE);
105    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
106  }
107
108  /**
109   * Set the {@link Mapper} for the job.
110   * @param cls the <code>Mapper</code> to use
111   * @throws IllegalStateException if the job is submitted
112   */
113  public void setMapperClass(Class<? extends Mapper> cls
114                             ) throws IllegalStateException {
115    ensureState(JobState.DEFINE);
116    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
117  }
118
119  /**
120   * Set the Jar by finding where a given class came from.
121   * @param cls the example class
122   */
123  public void setJarByClass(Class<?> cls) {
124    conf.setJarByClass(cls);
125  }
126 
127  /**
128   * Get the pathname of the job's jar.
129   * @return the pathname
130   */
131  public String getJar() {
132    return conf.getJar();
133  }
134
135  /**
136   * Set the combiner class for the job.
137   * @param cls the combiner to use
138   * @throws IllegalStateException if the job is submitted
139   */
140  public void setCombinerClass(Class<? extends Reducer> cls
141                               ) throws IllegalStateException {
142    ensureState(JobState.DEFINE);
143    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
144  }
145
146  /**
147   * Set the {@link Reducer} for the job.
148   * @param cls the <code>Reducer</code> to use
149   * @throws IllegalStateException if the job is submitted
150   */
151  public void setReducerClass(Class<? extends Reducer> cls
152                              ) throws IllegalStateException {
153    ensureState(JobState.DEFINE);
154    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
155  }
156
157  /**
158   * Set the {@link Partitioner} for the job.
159   * @param cls the <code>Partitioner</code> to use
160   * @throws IllegalStateException if the job is submitted
161   */
162  public void setPartitionerClass(Class<? extends Partitioner> cls
163                                  ) throws IllegalStateException {
164    ensureState(JobState.DEFINE);
165    conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
166  }
167
168  /**
169   * Set the key class for the map output data. This allows the user to
170   * specify the map output key class to be different than the final output
171   * value class.
172   *
173   * @param theClass the map output key class.
174   * @throws IllegalStateException if the job is submitted
175   */
176  public void setMapOutputKeyClass(Class<?> theClass
177                                   ) throws IllegalStateException {
178    ensureState(JobState.DEFINE);
179    conf.setMapOutputKeyClass(theClass);
180  }
181
182  /**
183   * Set the value class for the map output data. This allows the user to
184   * specify the map output value class to be different than the final output
185   * value class.
186   *
187   * @param theClass the map output value class.
188   * @throws IllegalStateException if the job is submitted
189   */
190  public void setMapOutputValueClass(Class<?> theClass
191                                     ) throws IllegalStateException {
192    ensureState(JobState.DEFINE);
193    conf.setMapOutputValueClass(theClass);
194  }
195
196  /**
197   * Set the key class for the job output data.
198   *
199   * @param theClass the key class for the job output data.
200   * @throws IllegalStateException if the job is submitted
201   */
202  public void setOutputKeyClass(Class<?> theClass
203                                ) throws IllegalStateException {
204    ensureState(JobState.DEFINE);
205    conf.setOutputKeyClass(theClass);
206  }
207
208  /**
209   * Set the value class for job outputs.
210   *
211   * @param theClass the value class for job outputs.
212   * @throws IllegalStateException if the job is submitted
213   */
214  public void setOutputValueClass(Class<?> theClass
215                                  ) throws IllegalStateException {
216    ensureState(JobState.DEFINE);
217    conf.setOutputValueClass(theClass);
218  }
219
220  /**
221   * Define the comparator that controls how the keys are sorted before they
222   * are passed to the {@link Reducer}.
223   * @param cls the raw comparator
224   * @throws IllegalStateException if the job is submitted
225   */
226  public void setSortComparatorClass(Class<? extends RawComparator> cls
227                                     ) throws IllegalStateException {
228    ensureState(JobState.DEFINE);
229    conf.setOutputKeyComparatorClass(cls);
230  }
231
232  /**
233   * Define the comparator that controls which keys are grouped together
234   * for a single call to
235   * {@link Reducer#reduce(Object, Iterable,
236   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
237   * @param cls the raw comparator to use
238   * @throws IllegalStateException if the job is submitted
239   */
240  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
241                                         ) throws IllegalStateException {
242    ensureState(JobState.DEFINE);
243    conf.setOutputValueGroupingComparator(cls);
244  }
245
246  /**
247   * Set the user-specified job name.
248   *
249   * @param name the job's new name.
250   * @throws IllegalStateException if the job is submitted
251   */
252  public void setJobName(String name) throws IllegalStateException {
253    ensureState(JobState.DEFINE);
254    conf.setJobName(name);
255  }
256
257  /**
258   * Get the URL where some job progress information will be displayed.
259   *
260   * @return the URL where some job progress information will be displayed.
261   */
262  public String getTrackingURL() {
263    ensureState(JobState.RUNNING);
264    return info.getTrackingURL();
265  }
266
267  /**
268   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
269   * and 1.0.  When all map tasks have completed, the function returns 1.0.
270   *
271   * @return the progress of the job's map-tasks.
272   * @throws IOException
273   */
274  public float mapProgress() throws IOException {
275    ensureState(JobState.RUNNING);
276    return info.mapProgress();
277  }
278
279  /**
280   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
281   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
282   *
283   * @return the progress of the job's reduce-tasks.
284   * @throws IOException
285   */
286  public float reduceProgress() throws IOException {
287    ensureState(JobState.RUNNING);
288    return info.reduceProgress();
289  }
290
291  /**
292   * Check if the job is finished or not.
293   * This is a non-blocking call.
294   *
295   * @return <code>true</code> if the job is complete, else <code>false</code>.
296   * @throws IOException
297   */
298  public boolean isComplete() throws IOException {
299    ensureState(JobState.RUNNING);
300    return info.isComplete();
301  }
302
303  /**
304   * Check if the job completed successfully.
305   *
306   * @return <code>true</code> if the job succeeded, else <code>false</code>.
307   * @throws IOException
308   */
309  public boolean isSuccessful() throws IOException {
310    ensureState(JobState.RUNNING);
311    return info.isSuccessful();
312  }
313
314  /**
315   * Kill the running job.  Blocks until all job tasks have been
316   * killed as well.  If the job is no longer running, it simply returns.
317   *
318   * @throws IOException
319   */
320  public void killJob() throws IOException {
321    ensureState(JobState.RUNNING);
322    info.killJob();
323  }
324   
325  /**
326   * Get events indicating completion (success/failure) of component tasks.
327   * 
328   * @param startFrom index to start fetching events from
329   * @return an array of {@link TaskCompletionEvent}s
330   * @throws IOException
331   */
332  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
333                                                       ) throws IOException {
334    ensureState(JobState.RUNNING);
335    return info.getTaskCompletionEvents(startFrom);
336  }
337 
338  /**
339   * Kill indicated task attempt.
340   *
341   * @param taskId the id of the task to be terminated.
342   * @throws IOException
343   */
344  public void killTask(TaskAttemptID taskId) throws IOException {
345    ensureState(JobState.RUNNING);
346    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
347                  false);
348  }
349
350  /**
351   * Fail indicated task attempt.
352   *
353   * @param taskId the id of the task to be terminated.
354   * @throws IOException
355   */
356  public void failTask(TaskAttemptID taskId) throws IOException {
357    ensureState(JobState.RUNNING);
358    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
359                  true);
360  }
361
362  /**
363   * Gets the counters for this job.
364   *
365   * @return the counters for this job.
366   * @throws IOException
367   */
368  public Counters getCounters() throws IOException {
369    ensureState(JobState.RUNNING);
370    return new Counters(info.getCounters());
371  }
372
373  private void ensureNotSet(String attr, String msg) throws IOException {
374    if (conf.get(attr) != null) {
375      throw new IOException(attr + " is incompatible with " + msg + " mode.");
376    }   
377  }
378
379  /**
380   * Default to the new APIs unless they are explicitly set or the old mapper or
381   * reduce attributes are used.
382   * @throws IOException if the configuration is inconsistant
383   */
384  private void setUseNewAPI() throws IOException {
385    int numReduces = conf.getNumReduceTasks();
386    String oldMapperClass = "mapred.mapper.class";
387    String oldReduceClass = "mapred.reducer.class";
388    conf.setBooleanIfUnset("mapred.mapper.new-api",
389                           conf.get(oldMapperClass) == null);
390    if (conf.getUseNewMapper()) {
391      String mode = "new map API";
392      ensureNotSet("mapred.input.format.class", mode);
393      ensureNotSet(oldMapperClass, mode);
394      if (numReduces != 0) {
395        ensureNotSet("mapred.partitioner.class", mode);
396       } else {
397        ensureNotSet("mapred.output.format.class", mode);
398      }     
399    } else {
400      String mode = "map compatability";
401      ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
402      ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
403      if (numReduces != 0) {
404        ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
405       } else {
406        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
407      }
408    }
409    if (numReduces != 0) {
410      conf.setBooleanIfUnset("mapred.reducer.new-api",
411                             conf.get(oldReduceClass) == null);
412      if (conf.getUseNewReducer()) {
413        String mode = "new reduce API";
414        ensureNotSet("mapred.output.format.class", mode);
415        ensureNotSet(oldReduceClass, mode);   
416      } else {
417        String mode = "reduce compatability";
418        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
419        ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);   
420      }
421    }   
422  }
423
424  /**
425   * Submit the job to the cluster and return immediately.
426   * @throws IOException
427   */
428  public void submit() throws IOException, InterruptedException, 
429                              ClassNotFoundException {
430    ensureState(JobState.DEFINE);
431    setUseNewAPI();
432    info = jobClient.submitJobInternal(conf);
433    state = JobState.RUNNING;
434   }
435 
436  /**
437   * Submit the job to the cluster and wait for it to finish.
438   * @param verbose print the progress to the user
439   * @return true if the job succeeded
440   * @throws IOException thrown if the communication with the
441   *         <code>JobTracker</code> is lost
442   */
443  public boolean waitForCompletion(boolean verbose
444                                   ) throws IOException, InterruptedException,
445                                            ClassNotFoundException {
446    if (state == JobState.DEFINE) {
447      submit();
448    }
449    if (verbose) {
450      jobClient.monitorAndPrintJob(conf, info);
451    } else {
452      info.waitForCompletion();
453    }
454    return isSuccessful();
455  }
456 
457}
Note: See TracBrowser for help on using the repository browser.