source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce.lib.output;
20
21import java.io.IOException;
22import java.net.URI;
23
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.apache.hadoop.fs.FileStatus;
27import org.apache.hadoop.fs.FileSystem;
28import org.apache.hadoop.fs.Path;
29import org.apache.hadoop.mapreduce.JobContext;
30import org.apache.hadoop.mapreduce.OutputCommitter;
31import org.apache.hadoop.mapreduce.TaskAttemptContext;
32import org.apache.hadoop.mapreduce.TaskAttemptID;
33import org.apache.hadoop.util.StringUtils;
34
35/** An {@link OutputCommitter} that commits files specified
36 * in job output directory i.e. ${mapred.output.dir}.
37 **/
38public class FileOutputCommitter extends OutputCommitter {
39
40  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
41
42  /**
43   * Temporary directory name
44   */
45  protected static final String TEMP_DIR_NAME = "_temporary";
46  private FileSystem outputFileSystem = null;
47  private Path outputPath = null;
48  private Path workPath = null;
49
50  /**
51   * Create a file output committer
52   * @param outputPath the job's output path
53   * @param context the task's context
54   * @throws IOException
55   */
56  public FileOutputCommitter(Path outputPath, 
57                             TaskAttemptContext context) throws IOException {
58    if (outputPath != null) {
59      this.outputPath = outputPath;
60      outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
61      workPath = new Path(outputPath,
62                          (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
63                           "_" + context.getTaskAttemptID().toString()
64                           )).makeQualified(outputFileSystem);
65    }
66  }
67
68  /**
69   * Create the temporary directory that is the root of all of the task
70   * work directories.
71   * @param context the job's context
72   */
73  public void setupJob(JobContext context) throws IOException {
74    if (outputPath != null) {
75      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
76      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
77      if (!fileSys.mkdirs(tmpDir)) {
78        LOG.error("Mkdirs failed to create " + tmpDir.toString());
79      }
80    }
81  }
82
83  /**
84   * Delete the temporary directory, including all of the work directories.
85   * @param context the job's context
86   */
87  public void cleanupJob(JobContext context) throws IOException {
88    if (outputPath != null) {
89      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
90      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
91      if (fileSys.exists(tmpDir)) {
92        fileSys.delete(tmpDir, true);
93      }
94    }
95  }
96
97  /**
98   * No task setup required.
99   */
100  @Override
101  public void setupTask(TaskAttemptContext context) throws IOException {
102    // FileOutputCommitter's setupTask doesn't do anything. Because the
103    // temporary task directory is created on demand when the
104    // task is writing.
105  }
106
107  /**
108   * Move the files from the work directory to the job output directory
109   * @param context the task context
110   */
111  public void commitTask(TaskAttemptContext context) 
112  throws IOException {
113    TaskAttemptID attemptId = context.getTaskAttemptID();
114    if (workPath != null) {
115      context.progress();
116      if (outputFileSystem.exists(workPath)) {
117        // Move the task outputs to their final place
118        moveTaskOutputs(context, outputFileSystem, outputPath, workPath);
119        // Delete the temporary task-specific output directory
120        if (!outputFileSystem.delete(workPath, true)) {
121          LOG.warn("Failed to delete the temporary output" + 
122          " directory of task: " + attemptId + " - " + workPath);
123        }
124        LOG.info("Saved output of task '" + attemptId + "' to " + 
125                 outputPath);
126      }
127    }
128  }
129
130  /**
131   * Move all of the files from the work directory to the final output
132   * @param context the task context
133   * @param fs the output file system
134   * @param jobOutputDir the final output direcotry
135   * @param taskOutput the work path
136   * @throws IOException
137   */
138  private void moveTaskOutputs(TaskAttemptContext context,
139                               FileSystem fs,
140                               Path jobOutputDir,
141                               Path taskOutput) 
142  throws IOException {
143    TaskAttemptID attemptId = context.getTaskAttemptID();
144    context.progress();
145    if (fs.isFile(taskOutput)) {
146      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
147                                          workPath);
148      if (!fs.rename(taskOutput, finalOutputPath)) {
149        if (!fs.delete(finalOutputPath, true)) {
150          throw new IOException("Failed to delete earlier output of task: " + 
151                                 attemptId);
152        }
153        if (!fs.rename(taskOutput, finalOutputPath)) {
154          throw new IOException("Failed to save output of task: " + 
155                          attemptId);
156        }
157      }
158      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
159    } else if(fs.getFileStatus(taskOutput).isDir()) {
160      FileStatus[] paths = fs.listStatus(taskOutput);
161      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
162      fs.mkdirs(finalOutputPath);
163      if (paths != null) {
164        for (FileStatus path : paths) {
165          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
166        }
167      }
168    }
169  }
170
171  /**
172   * Delete the work directory
173   */
174  @Override
175  public void abortTask(TaskAttemptContext context) {
176    try {
177      if (workPath != null) { 
178        context.progress();
179        outputFileSystem.delete(workPath, true);
180      }
181    } catch (IOException ie) {
182      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
183    }
184  }
185
186  /**
187   * Find the final name of a given output file, given the job output directory
188   * and the work directory.
189   * @param jobOutputDir the job's output directory
190   * @param taskOutput the specific task output file
191   * @param taskOutputPath the job's work directory
192   * @return the final path for the specific output file
193   * @throws IOException
194   */
195  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
196                            Path taskOutputPath) throws IOException {
197    URI taskOutputUri = taskOutput.toUri();
198    URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
199    if (taskOutputUri == relativePath) {
200      throw new IOException("Can not get the relative path: base = " + 
201          taskOutputPath + " child = " + taskOutput);
202    }
203    if (relativePath.getPath().length() > 0) {
204      return new Path(jobOutputDir, relativePath.getPath());
205    } else {
206      return jobOutputDir;
207    }
208  }
209
210  /**
211   * Did this task write any files in the work directory?
212   * @param context the task's context
213   */
214  @Override
215  public boolean needsTaskCommit(TaskAttemptContext context
216                                 ) throws IOException {
217    return workPath != null && outputFileSystem.exists(workPath);
218  }
219
220  /**
221   * Get the directory that the task should write results into
222   * @return the work directory
223   * @throws IOException
224   */
225  public Path getWorkPath() throws IOException {
226    return workPath;
227  }
228}
Note: See TracBrowser for help on using the repository browser.