1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.text.NumberFormat; |
---|
23 | |
---|
24 | import org.apache.hadoop.fs.FileSystem; |
---|
25 | import org.apache.hadoop.fs.Path; |
---|
26 | import org.apache.hadoop.io.compress.CompressionCodec; |
---|
27 | import org.apache.hadoop.util.Progressable; |
---|
28 | |
---|
29 | /** A base class for {@link OutputFormat}. */ |
---|
30 | public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> { |
---|
31 | |
---|
32 | /** |
---|
33 | * Set whether the output of the job is compressed. |
---|
34 | * @param conf the {@link JobConf} to modify |
---|
35 | * @param compress should the output of the job be compressed? |
---|
36 | */ |
---|
37 | public static void setCompressOutput(JobConf conf, boolean compress) { |
---|
38 | conf.setBoolean("mapred.output.compress", compress); |
---|
39 | } |
---|
40 | |
---|
41 | /** |
---|
42 | * Is the job output compressed? |
---|
43 | * @param conf the {@link JobConf} to look in |
---|
44 | * @return <code>true</code> if the job output should be compressed, |
---|
45 | * <code>false</code> otherwise |
---|
46 | */ |
---|
47 | public static boolean getCompressOutput(JobConf conf) { |
---|
48 | return conf.getBoolean("mapred.output.compress", false); |
---|
49 | } |
---|
50 | |
---|
51 | /** |
---|
52 | * Set the {@link CompressionCodec} to be used to compress job outputs. |
---|
53 | * @param conf the {@link JobConf} to modify |
---|
54 | * @param codecClass the {@link CompressionCodec} to be used to |
---|
55 | * compress the job outputs |
---|
56 | */ |
---|
57 | public static void |
---|
58 | setOutputCompressorClass(JobConf conf, |
---|
59 | Class<? extends CompressionCodec> codecClass) { |
---|
60 | setCompressOutput(conf, true); |
---|
61 | conf.setClass("mapred.output.compression.codec", codecClass, |
---|
62 | CompressionCodec.class); |
---|
63 | } |
---|
64 | |
---|
65 | /** |
---|
66 | * Get the {@link CompressionCodec} for compressing the job outputs. |
---|
67 | * @param conf the {@link JobConf} to look in |
---|
68 | * @param defaultValue the {@link CompressionCodec} to return if not set |
---|
69 | * @return the {@link CompressionCodec} to be used to compress the |
---|
70 | * job outputs |
---|
71 | * @throws IllegalArgumentException if the class was specified, but not found |
---|
72 | */ |
---|
73 | public static Class<? extends CompressionCodec> |
---|
74 | getOutputCompressorClass(JobConf conf, |
---|
75 | Class<? extends CompressionCodec> defaultValue) { |
---|
76 | Class<? extends CompressionCodec> codecClass = defaultValue; |
---|
77 | |
---|
78 | String name = conf.get("mapred.output.compression.codec"); |
---|
79 | if (name != null) { |
---|
80 | try { |
---|
81 | codecClass = |
---|
82 | conf.getClassByName(name).asSubclass(CompressionCodec.class); |
---|
83 | } catch (ClassNotFoundException e) { |
---|
84 | throw new IllegalArgumentException("Compression codec " + name + |
---|
85 | " was not found.", e); |
---|
86 | } |
---|
87 | } |
---|
88 | return codecClass; |
---|
89 | } |
---|
90 | |
---|
91 | public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored, |
---|
92 | JobConf job, String name, |
---|
93 | Progressable progress) |
---|
94 | throws IOException; |
---|
95 | |
---|
96 | public void checkOutputSpecs(FileSystem ignored, JobConf job) |
---|
97 | throws FileAlreadyExistsException, |
---|
98 | InvalidJobConfException, IOException { |
---|
99 | // Ensure that the output directory is set and not already there |
---|
100 | Path outDir = getOutputPath(job); |
---|
101 | if (outDir == null && job.getNumReduceTasks() != 0) { |
---|
102 | throw new InvalidJobConfException("Output directory not set in JobConf."); |
---|
103 | } |
---|
104 | if (outDir != null) { |
---|
105 | FileSystem fs = outDir.getFileSystem(job); |
---|
106 | // normalize the output directory |
---|
107 | outDir = fs.makeQualified(outDir); |
---|
108 | setOutputPath(job, outDir); |
---|
109 | // check its existence |
---|
110 | if (fs.exists(outDir)) { |
---|
111 | throw new FileAlreadyExistsException("Output directory " + outDir + |
---|
112 | " already exists"); |
---|
113 | } |
---|
114 | } |
---|
115 | } |
---|
116 | |
---|
117 | /** |
---|
118 | * Set the {@link Path} of the output directory for the map-reduce job. |
---|
119 | * |
---|
120 | * @param conf The configuration of the job. |
---|
121 | * @param outputDir the {@link Path} of the output directory for |
---|
122 | * the map-reduce job. |
---|
123 | */ |
---|
124 | public static void setOutputPath(JobConf conf, Path outputDir) { |
---|
125 | outputDir = new Path(conf.getWorkingDirectory(), outputDir); |
---|
126 | conf.set("mapred.output.dir", outputDir.toString()); |
---|
127 | } |
---|
128 | |
---|
129 | /** |
---|
130 | * Set the {@link Path} of the task's temporary output directory |
---|
131 | * for the map-reduce job. |
---|
132 | * |
---|
133 | * <p><i>Note</i>: Task output path is set by the framework. |
---|
134 | * </p> |
---|
135 | * @param conf The configuration of the job. |
---|
136 | * @param outputDir the {@link Path} of the output directory |
---|
137 | * for the map-reduce job. |
---|
138 | */ |
---|
139 | |
---|
140 | static void setWorkOutputPath(JobConf conf, Path outputDir) { |
---|
141 | outputDir = new Path(conf.getWorkingDirectory(), outputDir); |
---|
142 | conf.set("mapred.work.output.dir", outputDir.toString()); |
---|
143 | } |
---|
144 | |
---|
145 | /** |
---|
146 | * Get the {@link Path} to the output directory for the map-reduce job. |
---|
147 | * |
---|
148 | * @return the {@link Path} to the output directory for the map-reduce job. |
---|
149 | * @see FileOutputFormat#getWorkOutputPath(JobConf) |
---|
150 | */ |
---|
151 | public static Path getOutputPath(JobConf conf) { |
---|
152 | String name = conf.get("mapred.output.dir"); |
---|
153 | return name == null ? null: new Path(name); |
---|
154 | } |
---|
155 | |
---|
156 | /** |
---|
157 | * Get the {@link Path} to the task's temporary output directory |
---|
158 | * for the map-reduce job |
---|
159 | * |
---|
160 | * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4> |
---|
161 | * |
---|
162 | * <p><i>Note:</i> The following is valid only if the {@link OutputCommitter} |
---|
163 | * is {@link FileOutputCommitter}. If <code>OutputCommitter</code> is not |
---|
164 | * a <code>FileOutputCommitter</code>, the task's temporary output |
---|
165 | * directory is same as {@link #getOutputPath(JobConf)} i.e. |
---|
166 | * <tt>${mapred.output.dir}$</tt></p> |
---|
167 | * |
---|
168 | * <p>Some applications need to create/write-to side-files, which differ from |
---|
169 | * the actual job-outputs. |
---|
170 | * |
---|
171 | * <p>In such cases there could be issues with 2 instances of the same TIP |
---|
172 | * (running simultaneously e.g. speculative tasks) trying to open/write-to the |
---|
173 | * same file (path) on HDFS. Hence the application-writer will have to pick |
---|
174 | * unique names per task-attempt (e.g. using the attemptid, say |
---|
175 | * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> |
---|
176 | * |
---|
177 | * <p>To get around this the Map-Reduce framework helps the application-writer |
---|
178 | * out by maintaining a special |
---|
179 | * <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> |
---|
180 | * sub-directory for each task-attempt on HDFS where the output of the |
---|
181 | * task-attempt goes. On successful completion of the task-attempt the files |
---|
182 | * in the <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> (only) |
---|
183 | * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the |
---|
184 | * framework discards the sub-directory of unsuccessful task-attempts. This |
---|
185 | * is completely transparent to the application.</p> |
---|
186 | * |
---|
187 | * <p>The application-writer can take advantage of this by creating any |
---|
188 | * side-files required in <tt>${mapred.work.output.dir}</tt> during execution |
---|
189 | * of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the |
---|
190 | * framework will move them out similarly - thus she doesn't have to pick |
---|
191 | * unique paths per task-attempt.</p> |
---|
192 | * |
---|
193 | * <p><i>Note</i>: the value of <tt>${mapred.work.output.dir}</tt> during |
---|
194 | * execution of a particular task-attempt is actually |
---|
195 | * <tt>${mapred.output.dir}/_temporary/_{$taskid}</tt>, and this value is |
---|
196 | * set by the map-reduce framework. So, just create any side-files in the |
---|
197 | * path returned by {@link #getWorkOutputPath(JobConf)} from map/reduce |
---|
198 | * task to take advantage of this feature.</p> |
---|
199 | * |
---|
200 | * <p>The entire discussion holds true for maps of jobs with |
---|
201 | * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, |
---|
202 | * goes directly to HDFS.</p> |
---|
203 | * |
---|
204 | * @return the {@link Path} to the task's temporary output directory |
---|
205 | * for the map-reduce job. |
---|
206 | */ |
---|
207 | public static Path getWorkOutputPath(JobConf conf) { |
---|
208 | String name = conf.get("mapred.work.output.dir"); |
---|
209 | return name == null ? null: new Path(name); |
---|
210 | } |
---|
211 | |
---|
212 | /** |
---|
213 | * Helper function to create the task's temporary output directory and |
---|
214 | * return the path to the task's output file. |
---|
215 | * |
---|
216 | * @param conf job-configuration |
---|
217 | * @param name temporary task-output filename |
---|
218 | * @return path to the task's temporary output file |
---|
219 | * @throws IOException |
---|
220 | */ |
---|
221 | public static Path getTaskOutputPath(JobConf conf, String name) |
---|
222 | throws IOException { |
---|
223 | // ${mapred.out.dir} |
---|
224 | Path outputPath = getOutputPath(conf); |
---|
225 | if (outputPath == null) { |
---|
226 | throw new IOException("Undefined job output-path"); |
---|
227 | } |
---|
228 | |
---|
229 | OutputCommitter committer = conf.getOutputCommitter(); |
---|
230 | Path workPath = outputPath; |
---|
231 | TaskAttemptContext context = new TaskAttemptContext(conf, |
---|
232 | TaskAttemptID.forName(conf.get("mapred.task.id"))); |
---|
233 | if (committer instanceof FileOutputCommitter) { |
---|
234 | workPath = ((FileOutputCommitter)committer).getWorkPath(context, |
---|
235 | outputPath); |
---|
236 | } |
---|
237 | |
---|
238 | // ${mapred.out.dir}/_temporary/_${taskid}/${name} |
---|
239 | return new Path(workPath, name); |
---|
240 | } |
---|
241 | |
---|
242 | /** |
---|
243 | * Helper function to generate a name that is unique for the task. |
---|
244 | * |
---|
245 | * <p>The generated name can be used to create custom files from within the |
---|
246 | * different tasks for the job, the names for different tasks will not collide |
---|
247 | * with each other.</p> |
---|
248 | * |
---|
249 | * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for |
---|
250 | * reduces and the task partition number. For example, give a name 'test' |
---|
251 | * running on the first map o the job the generated name will be |
---|
252 | * 'test-m-00000'.</p> |
---|
253 | * |
---|
254 | * @param conf the configuration for the job. |
---|
255 | * @param name the name to make unique. |
---|
256 | * @return a unique name accross all tasks of the job. |
---|
257 | */ |
---|
258 | public static String getUniqueName(JobConf conf, String name) { |
---|
259 | int partition = conf.getInt("mapred.task.partition", -1); |
---|
260 | if (partition == -1) { |
---|
261 | throw new IllegalArgumentException( |
---|
262 | "This method can only be called from within a Job"); |
---|
263 | } |
---|
264 | |
---|
265 | String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" : "r"; |
---|
266 | |
---|
267 | NumberFormat numberFormat = NumberFormat.getInstance(); |
---|
268 | numberFormat.setMinimumIntegerDigits(5); |
---|
269 | numberFormat.setGroupingUsed(false); |
---|
270 | |
---|
271 | return name + "-" + taskType + "-" + numberFormat.format(partition); |
---|
272 | } |
---|
273 | |
---|
274 | /** |
---|
275 | * Helper function to generate a {@link Path} for a file that is unique for |
---|
276 | * the task within the job output directory. |
---|
277 | * |
---|
278 | * <p>The path can be used to create custom files from within the map and |
---|
279 | * reduce tasks. The path name will be unique for each task. The path parent |
---|
280 | * will be the job output directory.</p>ls |
---|
281 | * |
---|
282 | * <p>This method uses the {@link #getUniqueName} method to make the file name |
---|
283 | * unique for the task.</p> |
---|
284 | * |
---|
285 | * @param conf the configuration for the job. |
---|
286 | * @param name the name for the file. |
---|
287 | * @return a unique path accross all tasks of the job. |
---|
288 | */ |
---|
289 | public static Path getPathForCustomFile(JobConf conf, String name) { |
---|
290 | return new Path(getWorkOutputPath(conf), getUniqueName(conf, name)); |
---|
291 | } |
---|
292 | } |
---|
293 | |
---|