source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/io/MapFile.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 24.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.io;
20
21import java.io.*;
22
23import org.apache.commons.logging.Log;
24import org.apache.commons.logging.LogFactory;
25import org.apache.hadoop.fs.*;
26import org.apache.hadoop.conf.*;
27import org.apache.hadoop.util.Progressable;
28import org.apache.hadoop.util.ReflectionUtils;
29import org.apache.hadoop.io.SequenceFile.CompressionType;
30import org.apache.hadoop.io.compress.CompressionCodec;
31import org.apache.hadoop.io.compress.DefaultCodec;
32
33/** A file-based map from keys to values.
34 *
35 * <p>A map is a directory containing two files, the <code>data</code> file,
36 * containing all keys and values in the map, and a smaller <code>index</code>
37 * file, containing a fraction of the keys.  The fraction is determined by
38 * {@link Writer#getIndexInterval()}.
39 *
40 * <p>The index file is read entirely into memory.  Thus key implementations
41 * should try to keep themselves small.
42 *
43 * <p>Map files are created by adding entries in-order.  To maintain a large
44 * database, perform updates by copying the previous version of a database and
45 * merging in a sorted change list, to create a new version of the database in
46 * a new file.  Sorting large change lists can be done with {@link
47 * SequenceFile.Sorter}.
48 */
49public class MapFile {
50  private static final Log LOG = LogFactory.getLog(MapFile.class);
51
52  /** The name of the index file. */
53  public static final String INDEX_FILE_NAME = "index";
54
55  /** The name of the data file. */
56  public static final String DATA_FILE_NAME = "data";
57
58  protected MapFile() {}                          // no public ctor
59
60  /** Writes a new map. */
61  public static class Writer implements java.io.Closeable {
62    private SequenceFile.Writer data;
63    private SequenceFile.Writer index;
64
65    final private static String INDEX_INTERVAL = "io.map.index.interval";
66    private int indexInterval = 128;
67
68    private long size;
69    private LongWritable position = new LongWritable();
70
71    // the following fields are used only for checking key order
72    private WritableComparator comparator;
73    private DataInputBuffer inBuf = new DataInputBuffer();
74    private DataOutputBuffer outBuf = new DataOutputBuffer();
75    private WritableComparable lastKey;
76
77
78    /** Create the named map for keys of the named class. */
79    public Writer(Configuration conf, FileSystem fs, String dirName,
80                  Class<? extends WritableComparable> keyClass, Class valClass)
81      throws IOException {
82      this(conf, fs, dirName,
83           WritableComparator.get(keyClass), valClass,
84           SequenceFile.getCompressionType(conf));
85    }
86
87    /** Create the named map for keys of the named class. */
88    public Writer(Configuration conf, FileSystem fs, String dirName,
89                  Class<? extends WritableComparable> keyClass, Class valClass,
90                  CompressionType compress, Progressable progress)
91      throws IOException {
92      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
93           compress, progress);
94    }
95
96    /** Create the named map for keys of the named class. */
97    public Writer(Configuration conf, FileSystem fs, String dirName,
98                  Class<? extends WritableComparable> keyClass, Class valClass,
99                  CompressionType compress, CompressionCodec codec,
100                  Progressable progress)
101      throws IOException {
102      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
103           compress, codec, progress);
104    }
105
106    /** Create the named map for keys of the named class. */
107    public Writer(Configuration conf, FileSystem fs, String dirName,
108                  Class<? extends WritableComparable> keyClass, Class valClass,
109                  CompressionType compress)
110      throws IOException {
111      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
112    }
113
114    /** Create the named map using the named key comparator. */
115    public Writer(Configuration conf, FileSystem fs, String dirName,
116                  WritableComparator comparator, Class valClass)
117      throws IOException {
118      this(conf, fs, dirName, comparator, valClass,
119           SequenceFile.getCompressionType(conf));
120    }
121    /** Create the named map using the named key comparator. */
122    public Writer(Configuration conf, FileSystem fs, String dirName,
123                  WritableComparator comparator, Class valClass,
124                  SequenceFile.CompressionType compress)
125      throws IOException {
126      this(conf, fs, dirName, comparator, valClass, compress, null);
127    }
128    /** Create the named map using the named key comparator. */
129    public Writer(Configuration conf, FileSystem fs, String dirName,
130                  WritableComparator comparator, Class valClass,
131                  SequenceFile.CompressionType compress,
132                  Progressable progress)
133      throws IOException {
134      this(conf, fs, dirName, comparator, valClass, 
135           compress, new DefaultCodec(), progress);
136    }
137    /** Create the named map using the named key comparator. */
138    public Writer(Configuration conf, FileSystem fs, String dirName,
139                  WritableComparator comparator, Class valClass,
140                  SequenceFile.CompressionType compress, CompressionCodec codec,
141                  Progressable progress)
142      throws IOException {
143
144      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
145
146      this.comparator = comparator;
147      this.lastKey = comparator.newKey();
148
149      Path dir = new Path(dirName);
150      if (!fs.mkdirs(dir)) {
151        throw new IOException("Mkdirs failed to create directory " + dir.toString());
152      }
153      Path dataFile = new Path(dir, DATA_FILE_NAME);
154      Path indexFile = new Path(dir, INDEX_FILE_NAME);
155
156      Class keyClass = comparator.getKeyClass();
157      this.data =
158        SequenceFile.createWriter
159        (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
160      this.index =
161        SequenceFile.createWriter
162        (fs, conf, indexFile, keyClass, LongWritable.class,
163         CompressionType.BLOCK, progress);
164    }
165   
166    /** The number of entries that are added before an index entry is added.*/
167    public int getIndexInterval() { return indexInterval; }
168
169    /** Sets the index interval.
170     * @see #getIndexInterval()
171     */
172    public void setIndexInterval(int interval) { indexInterval = interval; }
173
174    /** Sets the index interval and stores it in conf
175     * @see #getIndexInterval()
176     */
177    public static void setIndexInterval(Configuration conf, int interval) {
178      conf.setInt(INDEX_INTERVAL, interval);
179    }
180
181    /** Close the map. */
182    public synchronized void close() throws IOException {
183      data.close();
184      index.close();
185    }
186
187    /** Append a key/value pair to the map.  The key must be greater or equal
188     * to the previous key added to the map. */
189    public synchronized void append(WritableComparable key, Writable val)
190      throws IOException {
191
192      checkKey(key);
193     
194      if (size % indexInterval == 0) {            // add an index entry
195        position.set(data.getLength());           // point to current eof
196        index.append(key, position);
197      }
198
199      data.append(key, val);                      // append key/value to data
200      size++;
201    }
202
203    private void checkKey(WritableComparable key) throws IOException {
204      // check that keys are well-ordered
205      if (size != 0 && comparator.compare(lastKey, key) > 0)
206        throw new IOException("key out of order: "+key+" after "+lastKey);
207         
208      // update lastKey with a copy of key by writing and reading
209      outBuf.reset();
210      key.write(outBuf);                          // write new key
211
212      inBuf.reset(outBuf.getData(), outBuf.getLength());
213      lastKey.readFields(inBuf);                  // read into lastKey
214    }
215
216  }
217 
218  /** Provide access to an existing map. */
219  public static class Reader implements java.io.Closeable {
220     
221    /** Number of index entries to skip between each entry.  Zero by default.
222     * Setting this to values larger than zero can facilitate opening large map
223     * files using less memory. */
224    private int INDEX_SKIP = 0;
225     
226    private WritableComparator comparator;
227
228    private WritableComparable nextKey;
229    private long seekPosition = -1;
230    private int seekIndex = -1;
231    private long firstPosition;
232
233    // the data, on disk
234    private SequenceFile.Reader data;
235    private SequenceFile.Reader index;
236
237    // whether the index Reader was closed
238    private boolean indexClosed = false;
239
240    // the index, in memory
241    private int count = -1;
242    private WritableComparable[] keys;
243    private long[] positions;
244
245    /** Returns the class of keys in this file. */
246    public Class<?> getKeyClass() { return data.getKeyClass(); }
247
248    /** Returns the class of values in this file. */
249    public Class<?> getValueClass() { return data.getValueClass(); }
250
251    /** Construct a map reader for the named map.*/
252    public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
253      this(fs, dirName, null, conf);
254      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
255    }
256
257    /** Construct a map reader for the named map using the named comparator.*/
258    public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
259      throws IOException {
260      this(fs, dirName, comparator, conf, true);
261    }
262   
263    /**
264     * Hook to allow subclasses to defer opening streams until further
265     * initialization is complete.
266     * @see #createDataFileReader(FileSystem, Path, Configuration)
267     */
268    protected Reader(FileSystem fs, String dirName,
269        WritableComparator comparator, Configuration conf, boolean open)
270      throws IOException {
271     
272      if (open) {
273        open(fs, dirName, comparator, conf);
274      }
275    }
276   
277    protected synchronized void open(FileSystem fs, String dirName,
278        WritableComparator comparator, Configuration conf) throws IOException {
279      Path dir = new Path(dirName);
280      Path dataFile = new Path(dir, DATA_FILE_NAME);
281      Path indexFile = new Path(dir, INDEX_FILE_NAME);
282
283      // open the data
284      this.data = createDataFileReader(fs, dataFile, conf);
285      this.firstPosition = data.getPosition();
286
287      if (comparator == null)
288        this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
289      else
290        this.comparator = comparator;
291
292      // open the index
293      this.index = new SequenceFile.Reader(fs, indexFile, conf);
294    }
295
296    /**
297     * Override this method to specialize the type of
298     * {@link SequenceFile.Reader} returned.
299     */
300    protected SequenceFile.Reader createDataFileReader(FileSystem fs,
301        Path dataFile, Configuration conf) throws IOException {
302      return new SequenceFile.Reader(fs, dataFile,  conf);
303    }
304
305    private void readIndex() throws IOException {
306      // read the index entirely into memory
307      if (this.keys != null)
308        return;
309      this.count = 0;
310      this.keys = new WritableComparable[1024];
311      this.positions = new long[1024];
312      try {
313        int skip = INDEX_SKIP;
314        LongWritable position = new LongWritable();
315        WritableComparable lastKey = null;
316        while (true) {
317          WritableComparable k = comparator.newKey();
318
319          if (!index.next(k, position))
320            break;
321
322          // check order to make sure comparator is compatible
323          if (lastKey != null && comparator.compare(lastKey, k) > 0)
324            throw new IOException("key out of order: "+k+" after "+lastKey);
325          lastKey = k;
326         
327          if (skip > 0) {
328            skip--;
329            continue;                             // skip this entry
330          } else {
331            skip = INDEX_SKIP;                    // reset skip
332          }
333
334          if (count == keys.length) {                // time to grow arrays
335            int newLength = (keys.length*3)/2;
336            WritableComparable[] newKeys = new WritableComparable[newLength];
337            long[] newPositions = new long[newLength];
338            System.arraycopy(keys, 0, newKeys, 0, count);
339            System.arraycopy(positions, 0, newPositions, 0, count);
340            keys = newKeys;
341            positions = newPositions;
342          }
343
344          keys[count] = k;
345          positions[count] = position.get();
346          count++;
347        }
348      } catch (EOFException e) {
349        LOG.warn("Unexpected EOF reading " + index +
350                              " at entry #" + count + ".  Ignoring.");
351      } finally {
352        indexClosed = true;
353        index.close();
354      }
355    }
356
357    /** Re-positions the reader before its first key. */
358    public synchronized void reset() throws IOException {
359      data.seek(firstPosition);
360    }
361
362    /** Get the key at approximately the middle of the file.
363     *
364     * @throws IOException
365     */
366    public synchronized WritableComparable midKey() throws IOException {
367
368      readIndex();
369      int pos = ((count - 1) / 2);              // middle of the index
370      if (pos < 0) {
371        throw new IOException("MapFile empty");
372      }
373     
374      return keys[pos];
375    }
376   
377    /** Reads the final key from the file.
378     *
379     * @param key key to read into
380     */
381    public synchronized void finalKey(WritableComparable key)
382      throws IOException {
383
384      long originalPosition = data.getPosition(); // save position
385      try {
386        readIndex();                              // make sure index is valid
387        if (count > 0) {
388          data.seek(positions[count-1]);          // skip to last indexed entry
389        } else {
390          reset();                                // start at the beginning
391        }
392        while (data.next(key)) {}                 // scan to eof
393
394      } finally {
395        data.seek(originalPosition);              // restore position
396      }
397    }
398
399    /** Positions the reader at the named key, or if none such exists, at the
400     * first entry after the named key.  Returns true iff the named key exists
401     * in this map.
402     */
403    public synchronized boolean seek(WritableComparable key) throws IOException {
404      return seekInternal(key) == 0;
405    }
406
407    /**
408     * Positions the reader at the named key, or if none such exists, at the
409     * first entry after the named key.
410     *
411     * @return  0   - exact match found
412     *          < 0 - positioned at next record
413     *          1   - no more records in file
414     */
415    private synchronized int seekInternal(WritableComparable key)
416      throws IOException {
417      return seekInternal(key, false);
418    }
419
420    /**
421     * Positions the reader at the named key, or if none such exists, at the
422     * key that falls just before or just after dependent on how the
423     * <code>before</code> parameter is set.
424     *
425     * @param before - IF true, and <code>key</code> does not exist, position
426     * file at entry that falls just before <code>key</code>.  Otherwise,
427     * position file at record that sorts just after.
428     * @return  0   - exact match found
429     *          < 0 - positioned at next record
430     *          1   - no more records in file
431     */
432    private synchronized int seekInternal(WritableComparable key,
433        final boolean before)
434      throws IOException {
435      readIndex();                                // make sure index is read
436
437      if (seekIndex != -1                         // seeked before
438          && seekIndex+1 < count           
439          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
440          && comparator.compare(key, nextKey)
441          >= 0) {                                 // but after last seeked
442        // do nothing
443      } else {
444        seekIndex = binarySearch(key);
445        if (seekIndex < 0)                        // decode insertion point
446          seekIndex = -seekIndex-2;
447
448        if (seekIndex == -1)                      // belongs before first entry
449          seekPosition = firstPosition;           // use beginning of file
450        else
451          seekPosition = positions[seekIndex];    // else use index
452      }
453      data.seek(seekPosition);
454     
455      if (nextKey == null)
456        nextKey = comparator.newKey();
457     
458      // If we're looking for the key before, we need to keep track
459      // of the position we got the current key as well as the position
460      // of the key before it.
461      long prevPosition = -1;
462      long curPosition = seekPosition;
463
464      while (data.next(nextKey)) {
465        int c = comparator.compare(key, nextKey);
466        if (c <= 0) {                             // at or beyond desired
467          if (before && c != 0) {
468            if (prevPosition == -1) {
469              // We're on the first record of this index block
470              // and we've already passed the search key. Therefore
471              // we must be at the beginning of the file, so seek
472              // to the beginning of this block and return c
473              data.seek(curPosition);
474            } else {
475              // We have a previous record to back up to
476              data.seek(prevPosition);
477              data.next(nextKey);
478              // now that we've rewound, the search key must be greater than this key
479              return 1;
480            }
481          }
482          return c;
483        }
484        if (before) {
485          prevPosition = curPosition;
486          curPosition = data.getPosition();
487        }
488      }
489
490      return 1;
491    }
492
493    private int binarySearch(WritableComparable key) {
494      int low = 0;
495      int high = count-1;
496
497      while (low <= high) {
498        int mid = (low + high) >>> 1;
499        WritableComparable midVal = keys[mid];
500        int cmp = comparator.compare(midVal, key);
501
502        if (cmp < 0)
503          low = mid + 1;
504        else if (cmp > 0)
505          high = mid - 1;
506        else
507          return mid;                             // key found
508      }
509      return -(low + 1);                          // key not found.
510    }
511
512    /** Read the next key/value pair in the map into <code>key</code> and
513     * <code>val</code>.  Returns true if such a pair exists and false when at
514     * the end of the map */
515    public synchronized boolean next(WritableComparable key, Writable val)
516      throws IOException {
517      return data.next(key, val);
518    }
519
520    /** Return the value for the named key, or null if none exists. */
521    public synchronized Writable get(WritableComparable key, Writable val)
522      throws IOException {
523      if (seek(key)) {
524        data.getCurrentValue(val);
525        return val;
526      } else
527        return null;
528    }
529
530    /**
531     * Finds the record that is the closest match to the specified key.
532     * Returns <code>key</code> or if it does not exist, at the first entry
533     * after the named key.
534     *
535-     * @param key       - key that we're trying to find
536-     * @param val       - data value if key is found
537-     * @return          - the key that was the closest match or null if eof.
538     */
539    public synchronized WritableComparable getClosest(WritableComparable key,
540      Writable val)
541    throws IOException {
542      return getClosest(key, val, false);
543    }
544
545    /**
546     * Finds the record that is the closest match to the specified key.
547     *
548     * @param key       - key that we're trying to find
549     * @param val       - data value if key is found
550     * @param before    - IF true, and <code>key</code> does not exist, return
551     * the first entry that falls just before the <code>key</code>.  Otherwise,
552     * return the record that sorts just after.
553     * @return          - the key that was the closest match or null if eof.
554     */
555    public synchronized WritableComparable getClosest(WritableComparable key,
556        Writable val, final boolean before)
557      throws IOException {
558     
559      int c = seekInternal(key, before);
560
561      // If we didn't get an exact match, and we ended up in the wrong
562      // direction relative to the query key, return null since we
563      // must be at the beginning or end of the file.
564      if ((!before && c > 0) ||
565          (before && c < 0)) {
566        return null;
567      }
568
569      data.getCurrentValue(val);
570      return nextKey;
571    }
572
573    /** Close the map. */
574    public synchronized void close() throws IOException {
575      if (!indexClosed) {
576        index.close();
577      }
578      data.close();
579    }
580
581  }
582
583  /** Renames an existing map directory. */
584  public static void rename(FileSystem fs, String oldName, String newName)
585    throws IOException {
586    Path oldDir = new Path(oldName);
587    Path newDir = new Path(newName);
588    if (!fs.rename(oldDir, newDir)) {
589      throw new IOException("Could not rename " + oldDir + " to " + newDir);
590    }
591  }
592
593  /** Deletes the named map file. */
594  public static void delete(FileSystem fs, String name) throws IOException {
595    Path dir = new Path(name);
596    Path data = new Path(dir, DATA_FILE_NAME);
597    Path index = new Path(dir, INDEX_FILE_NAME);
598
599    fs.delete(data, true);
600    fs.delete(index, true);
601    fs.delete(dir, true);
602  }
603
604  /**
605   * This method attempts to fix a corrupt MapFile by re-creating its index.
606   * @param fs filesystem
607   * @param dir directory containing the MapFile data and index
608   * @param keyClass key class (has to be a subclass of Writable)
609   * @param valueClass value class (has to be a subclass of Writable)
610   * @param dryrun do not perform any changes, just report what needs to be done
611   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
612   * @throws Exception
613   */
614  public static long fix(FileSystem fs, Path dir,
615                         Class<? extends Writable> keyClass,
616                         Class<? extends Writable> valueClass, boolean dryrun,
617                         Configuration conf) throws Exception {
618    String dr = (dryrun ? "[DRY RUN ] " : "");
619    Path data = new Path(dir, DATA_FILE_NAME);
620    Path index = new Path(dir, INDEX_FILE_NAME);
621    int indexInterval = 128;
622    if (!fs.exists(data)) {
623      // there's nothing we can do to fix this!
624      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
625    }
626    if (fs.exists(index)) {
627      // no fixing needed
628      return -1;
629    }
630    SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
631    if (!dataReader.getKeyClass().equals(keyClass)) {
632      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
633                          ", got " + dataReader.getKeyClass().getName());
634    }
635    if (!dataReader.getValueClass().equals(valueClass)) {
636      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
637                          ", got " + dataReader.getValueClass().getName());
638    }
639    long cnt = 0L;
640    Writable key = ReflectionUtils.newInstance(keyClass, conf);
641    Writable value = ReflectionUtils.newInstance(valueClass, conf);
642    SequenceFile.Writer indexWriter = null;
643    if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
644    try {
645      long pos = 0L;
646      LongWritable position = new LongWritable();
647      while(dataReader.next(key, value)) {
648        cnt++;
649        if (cnt % indexInterval == 0) {
650          position.set(pos);
651          if (!dryrun) indexWriter.append(key, position);
652        }
653        pos = dataReader.getPosition();
654      }
655    } catch(Throwable t) {
656      // truncated data file. swallow it.
657    }
658    dataReader.close();
659    if (!dryrun) indexWriter.close();
660    return cnt;
661  }
662
663
664  public static void main(String[] args) throws Exception {
665    String usage = "Usage: MapFile inFile outFile";
666     
667    if (args.length != 2) {
668      System.err.println(usage);
669      System.exit(-1);
670    }
671     
672    String in = args[0];
673    String out = args[1];
674
675    Configuration conf = new Configuration();
676    FileSystem fs = FileSystem.getLocal(conf);
677    MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
678    MapFile.Writer writer =
679      new MapFile.Writer(conf, fs, out,
680          reader.getKeyClass().asSubclass(WritableComparable.class),
681          reader.getValueClass());
682
683    WritableComparable key =
684      ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
685    Writable value =
686      ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
687
688    while (reader.next(key, value))               // copy all entries
689      writer.append(key, value);
690
691    writer.close();
692  }
693
694}
Note: See TracBrowser for help on using the repository browser.