source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/Merger.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 20.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.Collections;
23import java.util.Comparator;
24import java.util.List;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.fs.FSDataInputStream;
30import org.apache.hadoop.fs.ChecksumFileSystem;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.LocalDirAllocator;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.io.DataInputBuffer;
35import org.apache.hadoop.io.RawComparator;
36import org.apache.hadoop.io.compress.CompressionCodec;
37import org.apache.hadoop.mapred.IFile.Reader;
38import org.apache.hadoop.mapred.IFile.Writer;
39import org.apache.hadoop.util.PriorityQueue;
40import org.apache.hadoop.util.Progress;
41import org.apache.hadoop.util.Progressable;
42
43class Merger { 
44  private static final Log LOG = LogFactory.getLog(Merger.class);
45
46  // Local directories
47  private static LocalDirAllocator lDirAlloc = 
48    new LocalDirAllocator("mapred.local.dir");
49
50  public static <K extends Object, V extends Object>
51  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
52                            Class<K> keyClass, Class<V> valueClass, 
53                            CompressionCodec codec,
54                            Path[] inputs, boolean deleteInputs, 
55                            int mergeFactor, Path tmpDir,
56                            RawComparator<K> comparator, Progressable reporter,
57                            Counters.Counter readsCounter,
58                            Counters.Counter writesCounter)
59  throws IOException {
60    return 
61      new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
62                           reporter).merge(keyClass, valueClass,
63                                           mergeFactor, tmpDir,
64                                           readsCounter, writesCounter);
65  }
66 
67  public static <K extends Object, V extends Object>
68  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
69                                   Class<K> keyClass, Class<V> valueClass,
70                                   CompressionCodec codec,
71                                   List<Segment<K, V>> segments, 
72                                   int mergeFactor, Path tmpDir,
73                                   RawComparator<K> comparator, Progressable reporter,
74                                   Counters.Counter readsCounter,
75                                   Counters.Counter writesCounter)
76      throws IOException {
77    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
78        false, codec).merge(keyClass, valueClass,
79            mergeFactor, tmpDir,
80            readsCounter, writesCounter);
81
82  }
83
84  public static <K extends Object, V extends Object>
85  RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
86                            Class<K> keyClass, Class<V> valueClass, 
87                            List<Segment<K, V>> segments, 
88                            int mergeFactor, Path tmpDir,
89                            RawComparator<K> comparator, Progressable reporter,
90                            Counters.Counter readsCounter,
91                            Counters.Counter writesCounter)
92      throws IOException {
93    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
94                 comparator, reporter, false, readsCounter, writesCounter);
95  }
96
97  public static <K extends Object, V extends Object>
98  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
99                            Class<K> keyClass, Class<V> valueClass,
100                            List<Segment<K, V>> segments,
101                            int mergeFactor, Path tmpDir,
102                            RawComparator<K> comparator, Progressable reporter,
103                            boolean sortSegments,
104                            Counters.Counter readsCounter,
105                            Counters.Counter writesCounter)
106      throws IOException {
107    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
108                           sortSegments).merge(keyClass, valueClass,
109                                               mergeFactor, tmpDir,
110                                               readsCounter, writesCounter);
111  }
112
113  static <K extends Object, V extends Object>
114    RawKeyValueIterator merge(Configuration conf, FileSystem fs,
115                            Class<K> keyClass, Class<V> valueClass,
116                            List<Segment<K, V>> segments,
117                            int mergeFactor, int inMemSegments, Path tmpDir,
118                            RawComparator<K> comparator, Progressable reporter,
119                            boolean sortSegments,
120                            Counters.Counter readsCounter,
121                            Counters.Counter writesCounter)
122      throws IOException {
123    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
124                           sortSegments).merge(keyClass, valueClass,
125                                               mergeFactor, inMemSegments,
126                                               tmpDir,
127                                               readsCounter, writesCounter);
128  }
129
130
131  static <K extends Object, V extends Object>
132  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
133                          Class<K> keyClass, Class<V> valueClass,
134                          CompressionCodec codec,
135                          List<Segment<K, V>> segments,
136                          int mergeFactor, int inMemSegments, Path tmpDir,
137                          RawComparator<K> comparator, Progressable reporter,
138                          boolean sortSegments,
139                          Counters.Counter readsCounter,
140                          Counters.Counter writesCounter)
141    throws IOException {
142  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
143                         sortSegments, codec).merge(keyClass, valueClass,
144                                             mergeFactor, inMemSegments,
145                                             tmpDir,
146                                             readsCounter, writesCounter);
147}
148
149  public static <K extends Object, V extends Object>
150  void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
151                 Progressable progressable, Configuration conf) 
152  throws IOException {
153    long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
154        10000);
155    long recordCtr = 0;
156    while(records.next()) {
157      writer.append(records.getKey(), records.getValue());
158     
159      if (((recordCtr++) % progressBar) == 0) {
160        progressable.progress();
161      }
162    }
163}
164
165  public static class Segment<K extends Object, V extends Object> {
166    Reader<K, V> reader = null;
167    DataInputBuffer key = new DataInputBuffer();
168    DataInputBuffer value = new DataInputBuffer();
169   
170    Configuration conf = null;
171    FileSystem fs = null;
172    Path file = null;
173    boolean preserve = false;
174    CompressionCodec codec = null;
175    long segmentOffset = 0;
176    long segmentLength = -1;
177   
178    public Segment(Configuration conf, FileSystem fs, Path file,
179                   CompressionCodec codec, boolean preserve) throws IOException {
180      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
181    }
182
183    public Segment(Configuration conf, FileSystem fs, Path file,
184        long segmentOffset, long segmentLength, CompressionCodec codec,
185        boolean preserve) throws IOException {
186      this.conf = conf;
187      this.fs = fs;
188      this.file = file;
189      this.codec = codec;
190      this.preserve = preserve;
191
192      this.segmentOffset = segmentOffset;
193      this.segmentLength = segmentLength;
194    }
195   
196    public Segment(Reader<K, V> reader, boolean preserve) {
197      this.reader = reader;
198      this.preserve = preserve;
199     
200      this.segmentLength = reader.getLength();
201    }
202
203    private void init(Counters.Counter readsCounter) throws IOException {
204      if (reader == null) {
205        FSDataInputStream in = fs.open(file);
206        in.seek(segmentOffset);
207        reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
208      }
209    }
210   
211    DataInputBuffer getKey() { return key; }
212    DataInputBuffer getValue() { return value; }
213
214    long getLength() { 
215      return (reader == null) ?
216        segmentLength : reader.getLength();
217    }
218   
219    boolean next() throws IOException {
220      return reader.next(key, value);
221    }
222   
223    void close() throws IOException {
224      reader.close();
225     
226      if (!preserve && fs != null) {
227        fs.delete(file, false);
228      }
229    }
230
231    public long getPosition() throws IOException {
232      return reader.getPosition();
233    }
234  }
235 
236  private static class MergeQueue<K extends Object, V extends Object> 
237  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
238    Configuration conf;
239    FileSystem fs;
240    CompressionCodec codec;
241   
242    List<Segment<K, V>> segments = new ArrayList<Segment<K,V>>();
243   
244    RawComparator<K> comparator;
245   
246    private long totalBytesProcessed;
247    private float progPerByte;
248    private Progress mergeProgress = new Progress();
249   
250    Progressable reporter;
251   
252    DataInputBuffer key;
253    DataInputBuffer value;
254   
255    Segment<K, V> minSegment;
256    Comparator<Segment<K, V>> segmentComparator =   
257      new Comparator<Segment<K, V>>() {
258      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
259        if (o1.getLength() == o2.getLength()) {
260          return 0;
261        }
262
263        return o1.getLength() < o2.getLength() ? -1 : 1;
264      }
265    };
266
267   
268    public MergeQueue(Configuration conf, FileSystem fs, 
269                      Path[] inputs, boolean deleteInputs, 
270                      CompressionCodec codec, RawComparator<K> comparator,
271                      Progressable reporter) 
272    throws IOException {
273      this.conf = conf;
274      this.fs = fs;
275      this.codec = codec;
276      this.comparator = comparator;
277      this.reporter = reporter;
278     
279      for (Path file : inputs) {
280        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
281      }
282     
283      // Sort segments on file-lengths
284      Collections.sort(segments, segmentComparator); 
285    }
286   
287    public MergeQueue(Configuration conf, FileSystem fs,
288        List<Segment<K, V>> segments, RawComparator<K> comparator,
289        Progressable reporter) {
290      this(conf, fs, segments, comparator, reporter, false);
291    }
292
293    public MergeQueue(Configuration conf, FileSystem fs, 
294        List<Segment<K, V>> segments, RawComparator<K> comparator,
295        Progressable reporter, boolean sortSegments) {
296      this.conf = conf;
297      this.fs = fs;
298      this.comparator = comparator;
299      this.segments = segments;
300      this.reporter = reporter;
301      if (sortSegments) {
302        Collections.sort(segments, segmentComparator);
303      }
304    }
305
306    public MergeQueue(Configuration conf, FileSystem fs,
307        List<Segment<K, V>> segments, RawComparator<K> comparator,
308        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
309      this(conf, fs, segments, comparator, reporter, sortSegments);
310      this.codec = codec;
311    }
312
313    public void close() throws IOException {
314      Segment<K, V> segment;
315      while((segment = pop()) != null) {
316        segment.close();
317      }
318    }
319
320    public DataInputBuffer getKey() throws IOException {
321      return key;
322    }
323
324    public DataInputBuffer getValue() throws IOException {
325      return value;
326    }
327
328    private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
329      long startPos = reader.getPosition();
330      boolean hasNext = reader.next();
331      long endPos = reader.getPosition();
332      totalBytesProcessed += endPos - startPos;
333      mergeProgress.set(totalBytesProcessed * progPerByte);
334      if (hasNext) {
335        adjustTop();
336      } else {
337        pop();
338        reader.close();
339      }
340    }
341
342    public boolean next() throws IOException {
343      if (size() == 0)
344        return false;
345
346      if (minSegment != null) {
347        //minSegment is non-null for all invocations of next except the first
348        //one. For the first invocation, the priority queue is ready for use
349        //but for the subsequent invocations, first adjust the queue
350        adjustPriorityQueue(minSegment);
351        if (size() == 0) {
352          minSegment = null;
353          return false;
354        }
355      }
356      minSegment = top();
357     
358      key = minSegment.getKey();
359      value = minSegment.getValue();
360
361      return true;
362    }
363
364    @SuppressWarnings("unchecked")
365    protected boolean lessThan(Object a, Object b) {
366      DataInputBuffer key1 = ((Segment<K, V>)a).getKey();
367      DataInputBuffer key2 = ((Segment<K, V>)b).getKey();
368      int s1 = key1.getPosition();
369      int l1 = key1.getLength() - s1;
370      int s2 = key2.getPosition();
371      int l2 = key2.getLength() - s2;
372
373      return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
374    }
375   
376    public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
377                                     int factor, Path tmpDir,
378                                     Counters.Counter readsCounter,
379                                     Counters.Counter writesCounter)
380        throws IOException {
381      return merge(keyClass, valueClass, factor, 0, tmpDir,
382                   readsCounter, writesCounter);
383    }
384
385    RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
386                                     int factor, int inMem, Path tmpDir,
387                                     Counters.Counter readsCounter,
388                                     Counters.Counter writesCounter)
389        throws IOException {
390      LOG.info("Merging " + segments.size() + " sorted segments");
391     
392      //create the MergeStreams from the sorted map created in the constructor
393      //and dump the final output to a file
394      int numSegments = segments.size();
395      int origFactor = factor;
396      int passNo = 1;
397      do {
398        //get the factor for this pass of merge. We assume in-memory segments
399        //are the first entries in the segment list and that the pass factor
400        //doesn't apply to them
401        factor = getPassFactor(factor, passNo, numSegments - inMem);
402        if (1 == passNo) {
403          factor += inMem;
404        }
405        List<Segment<K, V>> segmentsToMerge =
406          new ArrayList<Segment<K, V>>();
407        int segmentsConsidered = 0;
408        int numSegmentsToConsider = factor;
409        long startBytes = 0; // starting bytes of segments of this merge
410        while (true) {
411          //extract the smallest 'factor' number of segments 
412          //Call cleanup on the empty segments (no key/value data)
413          List<Segment<K, V>> mStream = 
414            getSegmentDescriptors(numSegmentsToConsider);
415          for (Segment<K, V> segment : mStream) {
416            // Initialize the segment at the last possible moment;
417            // this helps in ensuring we don't use buffers until we need them
418            segment.init(readsCounter);
419            long startPos = segment.getPosition();
420            boolean hasNext = segment.next();
421            long endPos = segment.getPosition();
422            startBytes += endPos - startPos;
423           
424            if (hasNext) {
425              segmentsToMerge.add(segment);
426              segmentsConsidered++;
427            }
428            else {
429              segment.close();
430              numSegments--; //we ignore this segment for the merge
431            }
432          }
433          //if we have the desired number of segments
434          //or looked at all available segments, we break
435          if (segmentsConsidered == factor || 
436              segments.size() == 0) {
437            break;
438          }
439           
440          numSegmentsToConsider = factor - segmentsConsidered;
441        }
442       
443        //feed the streams to the priority queue
444        initialize(segmentsToMerge.size());
445        clear();
446        for (Segment<K, V> segment : segmentsToMerge) {
447          put(segment);
448        }
449       
450        //if we have lesser number of segments remaining, then just return the
451        //iterator, else do another single level merge
452        if (numSegments <= factor) {
453          // Reset totalBytesProcessed to track the progress of the final merge.
454          // This is considered the progress of the reducePhase, the 3rd phase
455          // of reduce task. Currently totalBytesProcessed is not used in sort
456          // phase of reduce task(i.e. when intermediate merges happen).
457          totalBytesProcessed = startBytes;
458         
459          //calculate the length of the remaining segments. Required for
460          //calculating the merge progress
461          long totalBytes = 0;
462          for (int i = 0; i < segmentsToMerge.size(); i++) {
463            totalBytes += segmentsToMerge.get(i).getLength();
464          }
465          if (totalBytes != 0) //being paranoid
466            progPerByte = 1.0f / (float)totalBytes;
467         
468          if (totalBytes != 0)
469            mergeProgress.set(totalBytesProcessed * progPerByte);
470          else
471            mergeProgress.set(1.0f); // Last pass and no segments left - we're done
472         
473          LOG.info("Down to the last merge-pass, with " + numSegments + 
474                   " segments left of total size: " + totalBytes + " bytes");
475          return this;
476        } else {
477          LOG.info("Merging " + segmentsToMerge.size() + 
478                   " intermediate segments out of a total of " + 
479                   (segments.size()+segmentsToMerge.size()));
480         
481          //we want to spread the creation of temp files on multiple disks if
482          //available under the space constraints
483          long approxOutputSize = 0; 
484          for (Segment<K, V> s : segmentsToMerge) {
485            approxOutputSize += s.getLength() + 
486                                ChecksumFileSystem.getApproxChkSumLength(
487                                s.getLength());
488          }
489          Path tmpFilename = 
490            new Path(tmpDir, "intermediate").suffix("." + passNo);
491
492          Path outputFile =  lDirAlloc.getLocalPathForWrite(
493                                              tmpFilename.toString(),
494                                              approxOutputSize, conf);
495
496          Writer<K, V> writer = 
497            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
498                             writesCounter);
499          writeFile(this, writer, reporter, conf);
500          writer.close();
501         
502          //we finished one single level merge; now clean up the priority
503          //queue
504          this.close();
505
506          // Add the newly create segment to the list of segments to be merged
507          Segment<K, V> tempSegment = 
508            new Segment<K, V>(conf, fs, outputFile, codec, false);
509          segments.add(tempSegment);
510          numSegments = segments.size();
511          Collections.sort(segments, segmentComparator);
512         
513          passNo++;
514        }
515        //we are worried about only the first pass merge factor. So reset the
516        //factor to what it originally was
517        factor = origFactor;
518      } while(true);
519    }
520   
521    /**
522     * Determine the number of segments to merge in a given pass. Assuming more
523     * than factor segments, the first pass should attempt to bring the total
524     * number of segments - 1 to be divisible by the factor - 1 (each pass
525     * takes X segments and produces 1) to minimize the number of merges.
526     */
527    private int getPassFactor(int factor, int passNo, int numSegments) {
528      if (passNo > 1 || numSegments <= factor || factor == 1) 
529        return factor;
530      int mod = (numSegments - 1) % (factor - 1);
531      if (mod == 0)
532        return factor;
533      return mod + 1;
534    }
535   
536    /** Return (& remove) the requested number of segment descriptors from the
537     * sorted map.
538     */
539    private List<Segment<K, V>> getSegmentDescriptors(int numDescriptors) {
540      if (numDescriptors > segments.size()) {
541        List<Segment<K, V>> subList = new ArrayList<Segment<K,V>>(segments);
542        segments.clear();
543        return subList;
544      }
545     
546      List<Segment<K, V>> subList = 
547        new ArrayList<Segment<K,V>>(segments.subList(0, numDescriptors));
548      for (int i=0; i < numDescriptors; ++i) {
549        segments.remove(0);
550      }
551      return subList;
552    }
553
554    public Progress getProgress() {
555      return mergeProgress;
556    }
557
558  }
559}
Note: See TracBrowser for help on using the repository browser.