1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapreduce.lib.output; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.text.NumberFormat; |
---|
23 | |
---|
24 | import org.apache.hadoop.conf.Configuration; |
---|
25 | import org.apache.hadoop.fs.FileSystem; |
---|
26 | import org.apache.hadoop.fs.Path; |
---|
27 | import org.apache.hadoop.io.compress.CompressionCodec; |
---|
28 | import org.apache.hadoop.mapred.FileAlreadyExistsException; |
---|
29 | import org.apache.hadoop.mapred.InvalidJobConfException; |
---|
30 | import org.apache.hadoop.mapreduce.Job; |
---|
31 | import org.apache.hadoop.mapreduce.JobContext; |
---|
32 | import org.apache.hadoop.mapreduce.OutputCommitter; |
---|
33 | import org.apache.hadoop.mapreduce.OutputFormat; |
---|
34 | import org.apache.hadoop.mapreduce.RecordWriter; |
---|
35 | import org.apache.hadoop.mapreduce.TaskAttemptContext; |
---|
36 | import org.apache.hadoop.mapreduce.TaskID; |
---|
37 | import org.apache.hadoop.mapreduce.TaskInputOutputContext; |
---|
38 | |
---|
39 | /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/ |
---|
40 | public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> { |
---|
41 | |
---|
42 | /** Construct output file names so that, when an output directory listing is |
---|
43 | * sorted lexicographically, positions correspond to output partitions.*/ |
---|
44 | private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); |
---|
45 | static { |
---|
46 | NUMBER_FORMAT.setMinimumIntegerDigits(5); |
---|
47 | NUMBER_FORMAT.setGroupingUsed(false); |
---|
48 | } |
---|
49 | private FileOutputCommitter committer = null; |
---|
50 | |
---|
51 | /** |
---|
52 | * Set whether the output of the job is compressed. |
---|
53 | * @param job the job to modify |
---|
54 | * @param compress should the output of the job be compressed? |
---|
55 | */ |
---|
56 | public static void setCompressOutput(Job job, boolean compress) { |
---|
57 | job.getConfiguration().setBoolean("mapred.output.compress", compress); |
---|
58 | } |
---|
59 | |
---|
60 | /** |
---|
61 | * Is the job output compressed? |
---|
62 | * @param job the Job to look in |
---|
63 | * @return <code>true</code> if the job output should be compressed, |
---|
64 | * <code>false</code> otherwise |
---|
65 | */ |
---|
66 | public static boolean getCompressOutput(JobContext job) { |
---|
67 | return job.getConfiguration().getBoolean("mapred.output.compress", false); |
---|
68 | } |
---|
69 | |
---|
70 | /** |
---|
71 | * Set the {@link CompressionCodec} to be used to compress job outputs. |
---|
72 | * @param job the job to modify |
---|
73 | * @param codecClass the {@link CompressionCodec} to be used to |
---|
74 | * compress the job outputs |
---|
75 | */ |
---|
76 | public static void |
---|
77 | setOutputCompressorClass(Job job, |
---|
78 | Class<? extends CompressionCodec> codecClass) { |
---|
79 | setCompressOutput(job, true); |
---|
80 | job.getConfiguration().setClass("mapred.output.compression.codec", |
---|
81 | codecClass, |
---|
82 | CompressionCodec.class); |
---|
83 | } |
---|
84 | |
---|
85 | /** |
---|
86 | * Get the {@link CompressionCodec} for compressing the job outputs. |
---|
87 | * @param job the {@link Job} to look in |
---|
88 | * @param defaultValue the {@link CompressionCodec} to return if not set |
---|
89 | * @return the {@link CompressionCodec} to be used to compress the |
---|
90 | * job outputs |
---|
91 | * @throws IllegalArgumentException if the class was specified, but not found |
---|
92 | */ |
---|
93 | public static Class<? extends CompressionCodec> |
---|
94 | getOutputCompressorClass(JobContext job, |
---|
95 | Class<? extends CompressionCodec> defaultValue) { |
---|
96 | Class<? extends CompressionCodec> codecClass = defaultValue; |
---|
97 | Configuration conf = job.getConfiguration(); |
---|
98 | String name = conf.get("mapred.output.compression.codec"); |
---|
99 | if (name != null) { |
---|
100 | try { |
---|
101 | codecClass = |
---|
102 | conf.getClassByName(name).asSubclass(CompressionCodec.class); |
---|
103 | } catch (ClassNotFoundException e) { |
---|
104 | throw new IllegalArgumentException("Compression codec " + name + |
---|
105 | " was not found.", e); |
---|
106 | } |
---|
107 | } |
---|
108 | return codecClass; |
---|
109 | } |
---|
110 | |
---|
111 | public abstract RecordWriter<K, V> |
---|
112 | getRecordWriter(TaskAttemptContext job |
---|
113 | ) throws IOException, InterruptedException; |
---|
114 | |
---|
115 | public void checkOutputSpecs(JobContext job |
---|
116 | ) throws FileAlreadyExistsException, IOException{ |
---|
117 | // Ensure that the output directory is set and not already there |
---|
118 | Path outDir = getOutputPath(job); |
---|
119 | if (outDir == null) { |
---|
120 | throw new InvalidJobConfException("Output directory not set."); |
---|
121 | } |
---|
122 | if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { |
---|
123 | throw new FileAlreadyExistsException("Output directory " + outDir + |
---|
124 | " already exists"); |
---|
125 | } |
---|
126 | } |
---|
127 | |
---|
128 | /** |
---|
129 | * Set the {@link Path} of the output directory for the map-reduce job. |
---|
130 | * |
---|
131 | * @param job The job to modify |
---|
132 | * @param outputDir the {@link Path} of the output directory for |
---|
133 | * the map-reduce job. |
---|
134 | */ |
---|
135 | public static void setOutputPath(Job job, Path outputDir) { |
---|
136 | job.getConfiguration().set("mapred.output.dir", outputDir.toString()); |
---|
137 | } |
---|
138 | |
---|
139 | /** |
---|
140 | * Get the {@link Path} to the output directory for the map-reduce job. |
---|
141 | * |
---|
142 | * @return the {@link Path} to the output directory for the map-reduce job. |
---|
143 | * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext) |
---|
144 | */ |
---|
145 | public static Path getOutputPath(JobContext job) { |
---|
146 | String name = job.getConfiguration().get("mapred.output.dir"); |
---|
147 | return name == null ? null: new Path(name); |
---|
148 | } |
---|
149 | |
---|
150 | /** |
---|
151 | * Get the {@link Path} to the task's temporary output directory |
---|
152 | * for the map-reduce job |
---|
153 | * |
---|
154 | * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4> |
---|
155 | * |
---|
156 | * <p>Some applications need to create/write-to side-files, which differ from |
---|
157 | * the actual job-outputs. |
---|
158 | * |
---|
159 | * <p>In such cases there could be issues with 2 instances of the same TIP |
---|
160 | * (running simultaneously e.g. speculative tasks) trying to open/write-to the |
---|
161 | * same file (path) on HDFS. Hence the application-writer will have to pick |
---|
162 | * unique names per task-attempt (e.g. using the attemptid, say |
---|
163 | * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> |
---|
164 | * |
---|
165 | * <p>To get around this the Map-Reduce framework helps the application-writer |
---|
166 | * out by maintaining a special |
---|
167 | * <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> |
---|
168 | * sub-directory for each task-attempt on HDFS where the output of the |
---|
169 | * task-attempt goes. On successful completion of the task-attempt the files |
---|
170 | * in the <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> (only) |
---|
171 | * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the |
---|
172 | * framework discards the sub-directory of unsuccessful task-attempts. This |
---|
173 | * is completely transparent to the application.</p> |
---|
174 | * |
---|
175 | * <p>The application-writer can take advantage of this by creating any |
---|
176 | * side-files required in a work directory during execution |
---|
177 | * of his task i.e. via |
---|
178 | * {@link #getWorkOutputPath(TaskInputOutputContext)}, and |
---|
179 | * the framework will move them out similarly - thus she doesn't have to pick |
---|
180 | * unique paths per task-attempt.</p> |
---|
181 | * |
---|
182 | * <p>The entire discussion holds true for maps of jobs with |
---|
183 | * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, |
---|
184 | * goes directly to HDFS.</p> |
---|
185 | * |
---|
186 | * @return the {@link Path} to the task's temporary output directory |
---|
187 | * for the map-reduce job. |
---|
188 | */ |
---|
189 | public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context |
---|
190 | ) throws IOException, |
---|
191 | InterruptedException { |
---|
192 | FileOutputCommitter committer = (FileOutputCommitter) |
---|
193 | context.getOutputCommitter(); |
---|
194 | return committer.getWorkPath(); |
---|
195 | } |
---|
196 | |
---|
197 | /** |
---|
198 | * Helper function to generate a {@link Path} for a file that is unique for |
---|
199 | * the task within the job output directory. |
---|
200 | * |
---|
201 | * <p>The path can be used to create custom files from within the map and |
---|
202 | * reduce tasks. The path name will be unique for each task. The path parent |
---|
203 | * will be the job output directory.</p>ls |
---|
204 | * |
---|
205 | * <p>This method uses the {@link #getUniqueFile} method to make the file name |
---|
206 | * unique for the task.</p> |
---|
207 | * |
---|
208 | * @param context the context for the task. |
---|
209 | * @param name the name for the file. |
---|
210 | * @param extension the extension for the file |
---|
211 | * @return a unique path accross all tasks of the job. |
---|
212 | */ |
---|
213 | public |
---|
214 | static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, |
---|
215 | String name, |
---|
216 | String extension |
---|
217 | ) throws IOException, InterruptedException { |
---|
218 | return new Path(getWorkOutputPath(context), |
---|
219 | getUniqueFile(context, name, extension)); |
---|
220 | } |
---|
221 | |
---|
222 | /** |
---|
223 | * Generate a unique filename, based on the task id, name, and extension |
---|
224 | * @param context the task that is calling this |
---|
225 | * @param name the base filename |
---|
226 | * @param extension the filename extension |
---|
227 | * @return a string like $name-[mr]-$id$extension |
---|
228 | */ |
---|
229 | public synchronized static String getUniqueFile(TaskAttemptContext context, |
---|
230 | String name, |
---|
231 | String extension) { |
---|
232 | TaskID taskId = context.getTaskAttemptID().getTaskID(); |
---|
233 | int partition = taskId.getId(); |
---|
234 | StringBuilder result = new StringBuilder(); |
---|
235 | result.append(name); |
---|
236 | result.append('-'); |
---|
237 | result.append(taskId.isMap() ? 'm' : 'r'); |
---|
238 | result.append('-'); |
---|
239 | result.append(NUMBER_FORMAT.format(partition)); |
---|
240 | result.append(extension); |
---|
241 | return result.toString(); |
---|
242 | } |
---|
243 | |
---|
244 | /** |
---|
245 | * Get the default path and filename for the output format. |
---|
246 | * @param context the task context |
---|
247 | * @param extension an extension to add to the filename |
---|
248 | * @return a full path $output/_temporary/$taskid/part-[mr]-$id |
---|
249 | * @throws IOException |
---|
250 | */ |
---|
251 | public Path getDefaultWorkFile(TaskAttemptContext context, |
---|
252 | String extension) throws IOException{ |
---|
253 | FileOutputCommitter committer = |
---|
254 | (FileOutputCommitter) getOutputCommitter(context); |
---|
255 | return new Path(committer.getWorkPath(), getUniqueFile(context, "part", |
---|
256 | extension)); |
---|
257 | } |
---|
258 | |
---|
259 | public synchronized |
---|
260 | OutputCommitter getOutputCommitter(TaskAttemptContext context |
---|
261 | ) throws IOException { |
---|
262 | if (committer == null) { |
---|
263 | Path output = getOutputPath(context); |
---|
264 | committer = new FileOutputCommitter(output, context); |
---|
265 | } |
---|
266 | return committer; |
---|
267 | } |
---|
268 | } |
---|
269 | |
---|