source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce.lib.input;
20
21import java.io.IOException;
22import java.util.ArrayList;
23import java.util.List;
24
25import org.apache.commons.logging.Log;
26import org.apache.commons.logging.LogFactory;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.fs.FileStatus;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.fs.PathFilter;
32import org.apache.hadoop.fs.BlockLocation;
33import org.apache.hadoop.mapreduce.InputFormat;
34import org.apache.hadoop.mapreduce.InputSplit;
35import org.apache.hadoop.mapreduce.Job;
36import org.apache.hadoop.mapreduce.JobContext;
37import org.apache.hadoop.mapreduce.Mapper;
38import org.apache.hadoop.util.ReflectionUtils;
39import org.apache.hadoop.util.StringUtils;
40
41/**
42 * A base class for file-based {@link InputFormat}s.
43 *
44 * <p><code>FileInputFormat</code> is the base class for all file-based
45 * <code>InputFormat</code>s. This provides a generic implementation of
46 * {@link #getSplits(JobContext)}.
47 * Subclasses of <code>FileInputFormat</code> can also override the
48 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
49 * not split-up and are processed as a whole by {@link Mapper}s.
50 */
51public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
52
53  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
54
55  private static final double SPLIT_SLOP = 1.1;   // 10% slop
56
57  private static final PathFilter hiddenFileFilter = new PathFilter(){
58      public boolean accept(Path p){
59        String name = p.getName(); 
60        return !name.startsWith("_") && !name.startsWith("."); 
61      }
62    }; 
63
64  /**
65   * Proxy PathFilter that accepts a path only if all filters given in the
66   * constructor do. Used by the listPaths() to apply the built-in
67   * hiddenFileFilter together with a user provided one (if any).
68   */
69  private static class MultiPathFilter implements PathFilter {
70    private List<PathFilter> filters;
71
72    public MultiPathFilter(List<PathFilter> filters) {
73      this.filters = filters;
74    }
75
76    public boolean accept(Path path) {
77      for (PathFilter filter : filters) {
78        if (!filter.accept(path)) {
79          return false;
80        }
81      }
82      return true;
83    }
84  }
85
86  /**
87   * Get the lower bound on split size imposed by the format.
88   * @return the number of bytes of the minimal split for this format
89   */
90  protected long getFormatMinSplitSize() {
91    return 1;
92  }
93
94  /**
95   * Is the given filename splitable? Usually, true, but if the file is
96   * stream compressed, it will not be.
97   *
98   * <code>FileInputFormat</code> implementations can override this and return
99   * <code>false</code> to ensure that individual input files are never split-up
100   * so that {@link Mapper}s process entire files.
101   *
102   * @param context the job context
103   * @param filename the file name to check
104   * @return is this file splitable?
105   */
106  protected boolean isSplitable(JobContext context, Path filename) {
107    return true;
108  }
109
110  /**
111   * Set a PathFilter to be applied to the input paths for the map-reduce job.
112   * @param job the job to modify
113   * @param filter the PathFilter class use for filtering the input paths.
114   */
115  public static void setInputPathFilter(Job job,
116                                        Class<? extends PathFilter> filter) {
117    job.getConfiguration().setClass("mapred.input.pathFilter.class", filter, 
118                                    PathFilter.class);
119  }
120
121  /**
122   * Set the minimum input split size
123   * @param job the job to modify
124   * @param size the minimum size
125   */
126  public static void setMinInputSplitSize(Job job,
127                                          long size) {
128    job.getConfiguration().setLong("mapred.min.split.size", size);
129  }
130
131  /**
132   * Get the minimum split size
133   * @param job the job
134   * @return the minimum number of bytes that can be in a split
135   */
136  public static long getMinSplitSize(JobContext job) {
137    return job.getConfiguration().getLong("mapred.min.split.size", 1L);
138  }
139
140  /**
141   * Set the maximum split size
142   * @param job the job to modify
143   * @param size the maximum split size
144   */
145  public static void setMaxInputSplitSize(Job job,
146                                          long size) {
147    job.getConfiguration().setLong("mapred.max.split.size", size);
148  }
149
150  /**
151   * Get the maximum split size.
152   * @param context the job to look at.
153   * @return the maximum number of bytes a split can include
154   */
155  public static long getMaxSplitSize(JobContext context) {
156    return context.getConfiguration().getLong("mapred.max.split.size", 
157                                              Long.MAX_VALUE);
158  }
159
160  /**
161   * Get a PathFilter instance of the filter set for the input paths.
162   *
163   * @return the PathFilter instance set for the job, NULL if none has been set.
164   */
165  public static PathFilter getInputPathFilter(JobContext context) {
166    Configuration conf = context.getConfiguration();
167    Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
168        PathFilter.class);
169    return (filterClass != null) ?
170        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
171  }
172
173  /** List input directories.
174   * Subclasses may override to, e.g., select only files matching a regular
175   * expression.
176   *
177   * @param job the job to list input paths for
178   * @return array of FileStatus objects
179   * @throws IOException if zero items.
180   */
181  protected List<FileStatus> listStatus(JobContext job
182                                        ) throws IOException {
183    List<FileStatus> result = new ArrayList<FileStatus>();
184    Path[] dirs = getInputPaths(job);
185    if (dirs.length == 0) {
186      throw new IOException("No input paths specified in job");
187    }
188
189    List<IOException> errors = new ArrayList<IOException>();
190   
191    // creates a MultiPathFilter with the hiddenFileFilter and the
192    // user provided one (if any).
193    List<PathFilter> filters = new ArrayList<PathFilter>();
194    filters.add(hiddenFileFilter);
195    PathFilter jobFilter = getInputPathFilter(job);
196    if (jobFilter != null) {
197      filters.add(jobFilter);
198    }
199    PathFilter inputFilter = new MultiPathFilter(filters);
200   
201    for (int i=0; i < dirs.length; ++i) {
202      Path p = dirs[i];
203      FileSystem fs = p.getFileSystem(job.getConfiguration()); 
204      FileStatus[] matches = fs.globStatus(p, inputFilter);
205      if (matches == null) {
206        errors.add(new IOException("Input path does not exist: " + p));
207      } else if (matches.length == 0) {
208        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
209      } else {
210        for (FileStatus globStat: matches) {
211          if (globStat.isDir()) {
212            for(FileStatus stat: fs.listStatus(globStat.getPath(),
213                inputFilter)) {
214              result.add(stat);
215            }         
216          } else {
217            result.add(globStat);
218          }
219        }
220      }
221    }
222
223    if (!errors.isEmpty()) {
224      throw new InvalidInputException(errors);
225    }
226    LOG.info("Total input paths to process : " + result.size()); 
227    return result;
228  }
229 
230
231  /**
232   * Generate the list of files and make them into FileSplits.
233   */ 
234  public List<InputSplit> getSplits(JobContext job
235                                    ) throws IOException {
236    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
237    long maxSize = getMaxSplitSize(job);
238
239    // generate splits
240    List<InputSplit> splits = new ArrayList<InputSplit>();
241    for (FileStatus file: listStatus(job)) {
242      Path path = file.getPath();
243      FileSystem fs = path.getFileSystem(job.getConfiguration());
244      long length = file.getLen();
245      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
246      if ((length != 0) && isSplitable(job, path)) { 
247        long blockSize = file.getBlockSize();
248        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
249
250        long bytesRemaining = length;
251        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
252          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
253          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
254                                   blkLocations[blkIndex].getHosts()));
255          bytesRemaining -= splitSize;
256        }
257       
258        if (bytesRemaining != 0) {
259          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
260                     blkLocations[blkLocations.length-1].getHosts()));
261        }
262      } else if (length != 0) {
263        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
264      } else { 
265        //Create empty hosts array for zero length files
266        splits.add(new FileSplit(path, 0, length, new String[0]));
267      }
268    }
269    LOG.debug("Total # of splits: " + splits.size());
270    return splits;
271  }
272
273  protected long computeSplitSize(long blockSize, long minSize,
274                                  long maxSize) {
275    return Math.max(minSize, Math.min(maxSize, blockSize));
276  }
277
278  protected int getBlockIndex(BlockLocation[] blkLocations, 
279                              long offset) {
280    for (int i = 0 ; i < blkLocations.length; i++) {
281      // is the offset inside this block?
282      if ((blkLocations[i].getOffset() <= offset) &&
283          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
284        return i;
285      }
286    }
287    BlockLocation last = blkLocations[blkLocations.length -1];
288    long fileLength = last.getOffset() + last.getLength() -1;
289    throw new IllegalArgumentException("Offset " + offset + 
290                                       " is outside of file (0.." +
291                                       fileLength + ")");
292  }
293
294  /**
295   * Sets the given comma separated paths as the list of inputs
296   * for the map-reduce job.
297   *
298   * @param job the job
299   * @param commaSeparatedPaths Comma separated paths to be set as
300   *        the list of inputs for the map-reduce job.
301   */
302  public static void setInputPaths(Job job, 
303                                   String commaSeparatedPaths
304                                   ) throws IOException {
305    setInputPaths(job, StringUtils.stringToPath(
306                        getPathStrings(commaSeparatedPaths)));
307  }
308
309  /**
310   * Add the given comma separated paths to the list of inputs for
311   *  the map-reduce job.
312   *
313   * @param job The job to modify
314   * @param commaSeparatedPaths Comma separated paths to be added to
315   *        the list of inputs for the map-reduce job.
316   */
317  public static void addInputPaths(Job job, 
318                                   String commaSeparatedPaths
319                                   ) throws IOException {
320    for (String str : getPathStrings(commaSeparatedPaths)) {
321      addInputPath(job, new Path(str));
322    }
323  }
324
325  /**
326   * Set the array of {@link Path}s as the list of inputs
327   * for the map-reduce job.
328   *
329   * @param job The job to modify
330   * @param inputPaths the {@link Path}s of the input directories/files
331   * for the map-reduce job.
332   */ 
333  public static void setInputPaths(Job job, 
334                                   Path... inputPaths) throws IOException {
335    Configuration conf = job.getConfiguration();
336    FileSystem fs = FileSystem.get(conf);
337    Path path = inputPaths[0].makeQualified(fs);
338    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
339    for(int i = 1; i < inputPaths.length;i++) {
340      str.append(StringUtils.COMMA_STR);
341      path = inputPaths[i].makeQualified(fs);
342      str.append(StringUtils.escapeString(path.toString()));
343    }
344    conf.set("mapred.input.dir", str.toString());
345  }
346
347  /**
348   * Add a {@link Path} to the list of inputs for the map-reduce job.
349   *
350   * @param job The {@link Job} to modify
351   * @param path {@link Path} to be added to the list of inputs for
352   *            the map-reduce job.
353   */
354  public static void addInputPath(Job job, 
355                                  Path path) throws IOException {
356    Configuration conf = job.getConfiguration();
357    FileSystem fs = FileSystem.get(conf);
358    path = path.makeQualified(fs);
359    String dirStr = StringUtils.escapeString(path.toString());
360    String dirs = conf.get("mapred.input.dir");
361    conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
362  }
363 
364  // This method escapes commas in the glob pattern of the given paths.
365  private static String[] getPathStrings(String commaSeparatedPaths) {
366    int length = commaSeparatedPaths.length();
367    int curlyOpen = 0;
368    int pathStart = 0;
369    boolean globPattern = false;
370    List<String> pathStrings = new ArrayList<String>();
371   
372    for (int i=0; i<length; i++) {
373      char ch = commaSeparatedPaths.charAt(i);
374      switch(ch) {
375        case '{' : {
376          curlyOpen++;
377          if (!globPattern) {
378            globPattern = true;
379          }
380          break;
381        }
382        case '}' : {
383          curlyOpen--;
384          if (curlyOpen == 0 && globPattern) {
385            globPattern = false;
386          }
387          break;
388        }
389        case ',' : {
390          if (!globPattern) {
391            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
392            pathStart = i + 1 ;
393          }
394          break;
395        }
396      }
397    }
398    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
399   
400    return pathStrings.toArray(new String[0]);
401  }
402 
403  /**
404   * Get the list of input {@link Path}s for the map-reduce job.
405   *
406   * @param context The job
407   * @return the list of input {@link Path}s for the map-reduce job.
408   */
409  public static Path[] getInputPaths(JobContext context) {
410    String dirs = context.getConfiguration().get("mapred.input.dir", "");
411    String [] list = StringUtils.split(dirs);
412    Path[] result = new Path[list.length];
413    for (int i = 0; i < list.length; i++) {
414      result[i] = new Path(StringUtils.unEscapeString(list[i]));
415    }
416    return result;
417  }
418
419}
Note: See TracBrowser for help on using the repository browser.