source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/io/file/tfile/BCFile.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 29.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17
18package org.apache.hadoop.io.file.tfile;
19
20import java.io.Closeable;
21import java.io.DataInput;
22import java.io.DataInputStream;
23import java.io.DataOutput;
24import java.io.DataOutputStream;
25import java.io.IOException;
26import java.io.InputStream;
27import java.io.OutputStream;
28import java.util.ArrayList;
29import java.util.Arrays;
30import java.util.Map;
31import java.util.TreeMap;
32
33import org.apache.commons.logging.Log;
34import org.apache.commons.logging.LogFactory;
35import org.apache.hadoop.conf.Configuration;
36import org.apache.hadoop.fs.FSDataInputStream;
37import org.apache.hadoop.fs.FSDataOutputStream;
38import org.apache.hadoop.io.BytesWritable;
39import org.apache.hadoop.io.compress.Compressor;
40import org.apache.hadoop.io.compress.Decompressor;
41import org.apache.hadoop.io.file.tfile.CompareUtils.Scalar;
42import org.apache.hadoop.io.file.tfile.CompareUtils.ScalarComparator;
43import org.apache.hadoop.io.file.tfile.CompareUtils.ScalarLong;
44import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
45import org.apache.hadoop.io.file.tfile.Utils.Version;
46
47/**
48 * Block Compressed file, the underlying physical storage layer for TFile.
49 * BCFile provides the basic block level compression for the data block and meta
50 * blocks. It is separated from TFile as it may be used for other
51 * block-compressed file implementation.
52 */
53final class BCFile {
54  // the current version of BCFile impl, increment them (major or minor) made
55  // enough changes
56  static final Version API_VERSION = new Version((short) 1, (short) 0);
57  static final Log LOG = LogFactory.getLog(BCFile.class);
58
59  /**
60   * Prevent the instantiation of BCFile objects.
61   */
62  private BCFile() {
63    // nothing
64  }
65
66  /**
67   * BCFile writer, the entry point for creating a new BCFile.
68   */
69  static public class Writer implements Closeable {
70    private final FSDataOutputStream out;
71    private final Configuration conf;
72    // the single meta block containing index of compressed data blocks
73    final DataIndex dataIndex;
74    // index for meta blocks
75    final MetaIndex metaIndex;
76    boolean blkInProgress = false;
77    private boolean metaBlkSeen = false;
78    private boolean closed = false;
79    long errorCount = 0;
80    // reusable buffers.
81    private BytesWritable fsOutputBuffer;
82
83    /**
84     * Call-back interface to register a block after a block is closed.
85     */
86    private static interface BlockRegister {
87      /**
88       * Register a block that is fully closed.
89       *
90       * @param raw
91       *          The size of block in terms of uncompressed bytes.
92       * @param offsetStart
93       *          The start offset of the block.
94       * @param offsetEnd
95       *          One byte after the end of the block. Compressed block size is
96       *          offsetEnd - offsetStart.
97       */
98      public void register(long raw, long offsetStart, long offsetEnd);
99    }
100
101    /**
102     * Intermediate class that maintain the state of a Writable Compression
103     * Block.
104     */
105    private static final class WBlockState {
106      private final Algorithm compressAlgo;
107      private Compressor compressor; // !null only if using native
108      // Hadoop compression
109      private final FSDataOutputStream fsOut;
110      private final long posStart;
111      private final SimpleBufferedOutputStream fsBufferedOutput;
112      private OutputStream out;
113
114      /**
115       * @param compressionAlgo
116       *          The compression algorithm to be used to for compression.
117       * @throws IOException
118       */
119      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut,
120          BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
121        this.compressAlgo = compressionAlgo;
122        this.fsOut = fsOut;
123        this.posStart = fsOut.getPos();
124
125        fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf));
126
127        this.fsBufferedOutput =
128            new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.get());
129        this.compressor = compressAlgo.getCompressor();
130
131        try {
132          this.out =
133              compressionAlgo.createCompressionStream(fsBufferedOutput,
134                  compressor, 0);
135        } catch (IOException e) {
136          compressAlgo.returnCompressor(compressor);
137          throw e;
138        }
139      }
140
141      /**
142       * Get the output stream for BlockAppender's consumption.
143       *
144       * @return the output stream suitable for writing block data.
145       */
146      OutputStream getOutputStream() {
147        return out;
148      }
149
150      /**
151       * Get the current position in file.
152       *
153       * @return The current byte offset in underlying file.
154       * @throws IOException
155       */
156      long getCurrentPos() throws IOException {
157        return fsOut.getPos() + fsBufferedOutput.size();
158      }
159
160      long getStartPos() {
161        return posStart;
162      }
163
164      /**
165       * Current size of compressed data.
166       *
167       * @return
168       * @throws IOException
169       */
170      long getCompressedSize() throws IOException {
171        long ret = getCurrentPos() - posStart;
172        return ret;
173      }
174
175      /**
176       * Finishing up the current block.
177       */
178      public void finish() throws IOException {
179        try {
180          if (out != null) {
181            out.flush();
182            out = null;
183          }
184        } finally {
185          compressAlgo.returnCompressor(compressor);
186          compressor = null;
187        }
188      }
189    }
190
191    /**
192     * Access point to stuff data into a block.
193     *
194     * TODO: Change DataOutputStream to something else that tracks the size as
195     * long instead of int. Currently, we will wrap around if the row block size
196     * is greater than 4GB.
197     */
198    public class BlockAppender extends DataOutputStream {
199      private final BlockRegister blockRegister;
200      private final WBlockState wBlkState;
201      @SuppressWarnings("hiding")
202      private boolean closed = false;
203
204      /**
205       * Constructor
206       *
207       * @param register
208       *          the block register, which is called when the block is closed.
209       * @param wbs
210       *          The writable compression block state.
211       */
212      BlockAppender(BlockRegister register, WBlockState wbs) {
213        super(wbs.getOutputStream());
214        this.blockRegister = register;
215        this.wBlkState = wbs;
216      }
217
218      /**
219       * Get the raw size of the block.
220       *
221       * @return the number of uncompressed bytes written through the
222       *         BlockAppender so far.
223       * @throws IOException
224       */
225      public long getRawSize() throws IOException {
226        /**
227         * Expecting the size() of a block not exceeding 4GB. Assuming the
228         * size() will wrap to negative integer if it exceeds 2GB.
229         */
230        return size() & 0x00000000ffffffffL;
231      }
232
233      /**
234       * Get the compressed size of the block in progress.
235       *
236       * @return the number of compressed bytes written to the underlying FS
237       *         file. The size may be smaller than actual need to compress the
238       *         all data written due to internal buffering inside the
239       *         compressor.
240       * @throws IOException
241       */
242      public long getCompressedSize() throws IOException {
243        return wBlkState.getCompressedSize();
244      }
245
246      @Override
247      public void flush() {
248        // The down stream is a special kind of stream that finishes a
249        // compression block upon flush. So we disable flush() here.
250      }
251
252      /**
253       * Signaling the end of write to the block. The block register will be
254       * called for registering the finished block.
255       */
256      @Override
257      public void close() throws IOException {
258        if (closed == true) {
259          return;
260        }
261        try {
262          ++errorCount;
263          wBlkState.finish();
264          blockRegister.register(getRawSize(), wBlkState.getStartPos(),
265              wBlkState.getCurrentPos());
266          --errorCount;
267        } finally {
268          closed = true;
269          blkInProgress = false;
270        }
271      }
272    }
273
274    /**
275     * Constructor
276     *
277     * @param fout
278     *          FS output stream.
279     * @param compressionName
280     *          Name of the compression algorithm, which will be used for all
281     *          data blocks.
282     * @throws IOException
283     * @see Compression#getSupportedAlgorithms
284     */
285    public Writer(FSDataOutputStream fout, String compressionName,
286        Configuration conf) throws IOException {
287      if (fout.getPos() != 0) {
288        throw new IOException("Output file not at zero offset.");
289      }
290
291      this.out = fout;
292      this.conf = conf;
293      dataIndex = new DataIndex(compressionName);
294      metaIndex = new MetaIndex();
295      fsOutputBuffer = new BytesWritable();
296      Magic.write(fout);
297    }
298
299    /**
300     * Close the BCFile Writer. Attempting to use the Writer after calling
301     * <code>close</code> is not allowed and may lead to undetermined results.
302     */
303    public void close() throws IOException {
304      if (closed == true) {
305        return;
306      }
307
308      try {
309        if (errorCount == 0) {
310          if (blkInProgress == true) {
311            throw new IllegalStateException(
312                "Close() called with active block appender.");
313          }
314
315          // add metaBCFileIndex to metaIndex as the last meta block
316          BlockAppender appender =
317              prepareMetaBlock(DataIndex.BLOCK_NAME,
318                  getDefaultCompressionAlgorithm());
319          try {
320            dataIndex.write(appender);
321          } finally {
322            appender.close();
323          }
324
325          long offsetIndexMeta = out.getPos();
326          metaIndex.write(out);
327
328          // Meta Index and the trailing section are written out directly.
329          out.writeLong(offsetIndexMeta);
330
331          API_VERSION.write(out);
332          Magic.write(out);
333          out.flush();
334        }
335      } finally {
336        closed = true;
337      }
338    }
339
340    private Algorithm getDefaultCompressionAlgorithm() {
341      return dataIndex.getDefaultCompressionAlgorithm();
342    }
343
344    private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
345        throws IOException, MetaBlockAlreadyExists {
346      if (blkInProgress == true) {
347        throw new IllegalStateException(
348            "Cannot create Meta Block until previous block is closed.");
349      }
350
351      if (metaIndex.getMetaByName(name) != null) {
352        throw new MetaBlockAlreadyExists("name=" + name);
353      }
354
355      MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
356      WBlockState wbs =
357          new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
358      BlockAppender ba = new BlockAppender(mbr, wbs);
359      blkInProgress = true;
360      metaBlkSeen = true;
361      return ba;
362    }
363
364    /**
365     * Create a Meta Block and obtain an output stream for adding data into the
366     * block. There can only be one BlockAppender stream active at any time.
367     * Regular Blocks may not be created after the first Meta Blocks. The caller
368     * must call BlockAppender.close() to conclude the block creation.
369     *
370     * @param name
371     *          The name of the Meta Block. The name must not conflict with
372     *          existing Meta Blocks.
373     * @param compressionName
374     *          The name of the compression algorithm to be used.
375     * @return The BlockAppender stream
376     * @throws IOException
377     * @throws MetaBlockAlreadyExists
378     *           If the meta block with the name already exists.
379     */
380    public BlockAppender prepareMetaBlock(String name, String compressionName)
381        throws IOException, MetaBlockAlreadyExists {
382      return prepareMetaBlock(name, Compression
383          .getCompressionAlgorithmByName(compressionName));
384    }
385
386    /**
387     * Create a Meta Block and obtain an output stream for adding data into the
388     * block. The Meta Block will be compressed with the same compression
389     * algorithm as data blocks. There can only be one BlockAppender stream
390     * active at any time. Regular Blocks may not be created after the first
391     * Meta Blocks. The caller must call BlockAppender.close() to conclude the
392     * block creation.
393     *
394     * @param name
395     *          The name of the Meta Block. The name must not conflict with
396     *          existing Meta Blocks.
397     * @return The BlockAppender stream
398     * @throws MetaBlockAlreadyExists
399     *           If the meta block with the name already exists.
400     * @throws IOException
401     */
402    public BlockAppender prepareMetaBlock(String name) throws IOException,
403        MetaBlockAlreadyExists {
404      return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
405    }
406
407    /**
408     * Create a Data Block and obtain an output stream for adding data into the
409     * block. There can only be one BlockAppender stream active at any time.
410     * Data Blocks may not be created after the first Meta Blocks. The caller
411     * must call BlockAppender.close() to conclude the block creation.
412     *
413     * @return The BlockAppender stream
414     * @throws IOException
415     */
416    public BlockAppender prepareDataBlock() throws IOException {
417      if (blkInProgress == true) {
418        throw new IllegalStateException(
419            "Cannot create Data Block until previous block is closed.");
420      }
421
422      if (metaBlkSeen == true) {
423        throw new IllegalStateException(
424            "Cannot create Data Block after Meta Blocks.");
425      }
426
427      DataBlockRegister dbr = new DataBlockRegister();
428
429      WBlockState wbs =
430          new WBlockState(getDefaultCompressionAlgorithm(), out,
431              fsOutputBuffer, conf);
432      BlockAppender ba = new BlockAppender(dbr, wbs);
433      blkInProgress = true;
434      return ba;
435    }
436
437    /**
438     * Callback to make sure a meta block is added to the internal list when its
439     * stream is closed.
440     */
441    private class MetaBlockRegister implements BlockRegister {
442      private final String name;
443      private final Algorithm compressAlgo;
444
445      MetaBlockRegister(String name, Algorithm compressAlgo) {
446        this.name = name;
447        this.compressAlgo = compressAlgo;
448      }
449
450      public void register(long raw, long begin, long end) {
451        metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
452            new BlockRegion(begin, end - begin, raw)));
453      }
454    }
455
456    /**
457     * Callback to make sure a data block is added to the internal list when
458     * it's being closed.
459     *
460     */
461    private class DataBlockRegister implements BlockRegister {
462      DataBlockRegister() {
463        // do nothing
464      }
465
466      public void register(long raw, long begin, long end) {
467        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
468      }
469    }
470  }
471
472  /**
473   * BCFile Reader, interface to read the file's data and meta blocks.
474   */
475  static public class Reader implements Closeable {
476    private final FSDataInputStream in;
477    private final Configuration conf;
478    final DataIndex dataIndex;
479    // Index for meta blocks
480    final MetaIndex metaIndex;
481    final Version version;
482
483    /**
484     * Intermediate class that maintain the state of a Readable Compression
485     * Block.
486     */
487    static private final class RBlockState {
488      private final Algorithm compressAlgo;
489      private Decompressor decompressor;
490      private final BlockRegion region;
491      private final InputStream in;
492
493      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin,
494          BlockRegion region, Configuration conf) throws IOException {
495        this.compressAlgo = compressionAlgo;
496        this.region = region;
497        this.decompressor = compressionAlgo.getDecompressor();
498
499        try {
500          this.in =
501              compressAlgo
502                  .createDecompressionStream(new BoundedRangeFileInputStream(
503                      fsin, this.region.getOffset(), this.region
504                          .getCompressedSize()), decompressor, TFile
505                      .getFSInputBufferSize(conf));
506        } catch (IOException e) {
507          compressAlgo.returnDecompressor(decompressor);
508          throw e;
509        }
510      }
511
512      /**
513       * Get the output stream for BlockAppender's consumption.
514       *
515       * @return the output stream suitable for writing block data.
516       */
517      public InputStream getInputStream() {
518        return in;
519      }
520
521      public String getCompressionName() {
522        return compressAlgo.getName();
523      }
524
525      public BlockRegion getBlockRegion() {
526        return region;
527      }
528
529      public void finish() throws IOException {
530        try {
531          in.close();
532        } finally {
533          compressAlgo.returnDecompressor(decompressor);
534          decompressor = null;
535        }
536      }
537    }
538
539    /**
540     * Access point to read a block.
541     */
542    public static class BlockReader extends DataInputStream {
543      private final RBlockState rBlkState;
544      private boolean closed = false;
545
546      BlockReader(RBlockState rbs) {
547        super(rbs.getInputStream());
548        rBlkState = rbs;
549      }
550
551      /**
552       * Finishing reading the block. Release all resources.
553       */
554      @Override
555      public void close() throws IOException {
556        if (closed == true) {
557          return;
558        }
559        try {
560          // Do not set rBlkState to null. People may access stats after calling
561          // close().
562          rBlkState.finish();
563        } finally {
564          closed = true;
565        }
566      }
567
568      /**
569       * Get the name of the compression algorithm used to compress the block.
570       *
571       * @return name of the compression algorithm.
572       */
573      public String getCompressionName() {
574        return rBlkState.getCompressionName();
575      }
576
577      /**
578       * Get the uncompressed size of the block.
579       *
580       * @return uncompressed size of the block.
581       */
582      public long getRawSize() {
583        return rBlkState.getBlockRegion().getRawSize();
584      }
585
586      /**
587       * Get the compressed size of the block.
588       *
589       * @return compressed size of the block.
590       */
591      public long getCompressedSize() {
592        return rBlkState.getBlockRegion().getCompressedSize();
593      }
594
595      /**
596       * Get the starting position of the block in the file.
597       *
598       * @return the starting position of the block in the file.
599       */
600      public long getStartPos() {
601        return rBlkState.getBlockRegion().getOffset();
602      }
603    }
604
605    /**
606     * Constructor
607     *
608     * @param fin
609     *          FS input stream.
610     * @param fileLength
611     *          Length of the corresponding file
612     * @throws IOException
613     */
614    public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
615        throws IOException {
616      this.in = fin;
617      this.conf = conf;
618
619      // move the cursor to the beginning of the tail, containing: offset to the
620      // meta block index, version and magic
621      fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
622          / Byte.SIZE);
623      long offsetIndexMeta = fin.readLong();
624      version = new Version(fin);
625      Magic.readAndVerify(fin);
626
627      if (!version.compatibleWith(BCFile.API_VERSION)) {
628        throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
629      }
630
631      // read meta index
632      fin.seek(offsetIndexMeta);
633      metaIndex = new MetaIndex(fin);
634
635      // read data:BCFile.index, the data block index
636      BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
637      try {
638        dataIndex = new DataIndex(blockR);
639      } finally {
640        blockR.close();
641      }
642    }
643
644    /**
645     * Get the name of the default compression algorithm.
646     *
647     * @return the name of the default compression algorithm.
648     */
649    public String getDefaultCompressionName() {
650      return dataIndex.getDefaultCompressionAlgorithm().getName();
651    }
652
653    /**
654     * Get version of BCFile file being read.
655     *
656     * @return version of BCFile file being read.
657     */
658    public Version getBCFileVersion() {
659      return version;
660    }
661
662    /**
663     * Get version of BCFile API.
664     *
665     * @return version of BCFile API.
666     */
667    public Version getAPIVersion() {
668      return API_VERSION;
669    }
670
671    /**
672     * Finishing reading the BCFile. Release all resources.
673     */
674    public void close() {
675      // nothing to be done now
676    }
677
678    /**
679     * Get the number of data blocks.
680     *
681     * @return the number of data blocks.
682     */
683    public int getBlockCount() {
684      return dataIndex.getBlockRegionList().size();
685    }
686
687    /**
688     * Stream access to a Meta Block.
689     *
690     * @param name
691     *          meta block name
692     * @return BlockReader input stream for reading the meta block.
693     * @throws IOException
694     * @throws MetaBlockDoesNotExist
695     *           The Meta Block with the given name does not exist.
696     */
697    public BlockReader getMetaBlock(String name) throws IOException,
698        MetaBlockDoesNotExist {
699      MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
700      if (imeBCIndex == null) {
701        throw new MetaBlockDoesNotExist("name=" + name);
702      }
703
704      BlockRegion region = imeBCIndex.getRegion();
705      return createReader(imeBCIndex.getCompressionAlgorithm(), region);
706    }
707
708    /**
709     * Stream access to a Data Block.
710     *
711     * @param blockIndex
712     *          0-based data block index.
713     * @return BlockReader input stream for reading the data block.
714     * @throws IOException
715     */
716    public BlockReader getDataBlock(int blockIndex) throws IOException {
717      if (blockIndex < 0 || blockIndex >= getBlockCount()) {
718        throw new IndexOutOfBoundsException(String.format(
719            "blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
720      }
721
722      BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
723      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
724    }
725
726    private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
727        throws IOException {
728      RBlockState rbs = new RBlockState(compressAlgo, in, region, conf);
729      return new BlockReader(rbs);
730    }
731
732    /**
733     * Find the smallest Block index whose starting offset is greater than or
734     * equal to the specified offset.
735     *
736     * @param offset
737     *          User-specific offset.
738     * @return the index to the data Block if such block exists; or -1
739     *         otherwise.
740     */
741    public int getBlockIndexNear(long offset) {
742      ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
743      int idx =
744          Utils
745              .lowerBound(list, new ScalarLong(offset), new ScalarComparator());
746
747      if (idx == list.size()) {
748        return -1;
749      }
750
751      return idx;
752    }
753  }
754
755  /**
756   * Index for all Meta blocks.
757   */
758  static class MetaIndex {
759    // use a tree map, for getting a meta block entry by name
760    final Map<String, MetaIndexEntry> index;
761
762    // for write
763    public MetaIndex() {
764      index = new TreeMap<String, MetaIndexEntry>();
765    }
766
767    // for read, construct the map from the file
768    public MetaIndex(DataInput in) throws IOException {
769      int count = Utils.readVInt(in);
770      index = new TreeMap<String, MetaIndexEntry>();
771
772      for (int nx = 0; nx < count; nx++) {
773        MetaIndexEntry indexEntry = new MetaIndexEntry(in);
774        index.put(indexEntry.getMetaName(), indexEntry);
775      }
776    }
777
778    public void addEntry(MetaIndexEntry indexEntry) {
779      index.put(indexEntry.getMetaName(), indexEntry);
780    }
781
782    public MetaIndexEntry getMetaByName(String name) {
783      return index.get(name);
784    }
785
786    public void write(DataOutput out) throws IOException {
787      Utils.writeVInt(out, index.size());
788
789      for (MetaIndexEntry indexEntry : index.values()) {
790        indexEntry.write(out);
791      }
792    }
793  }
794
795  /**
796   * An entry describes a meta block in the MetaIndex.
797   */
798  static final class MetaIndexEntry {
799    private final String metaName;
800    private final Algorithm compressionAlgorithm;
801    private final static String defaultPrefix = "data:";
802
803    private final BlockRegion region;
804
805    public MetaIndexEntry(DataInput in) throws IOException {
806      String fullMetaName = Utils.readString(in);
807      if (fullMetaName.startsWith(defaultPrefix)) {
808        metaName =
809            fullMetaName.substring(defaultPrefix.length(), fullMetaName
810                .length());
811      } else {
812        throw new IOException("Corrupted Meta region Index");
813      }
814
815      compressionAlgorithm =
816          Compression.getCompressionAlgorithmByName(Utils.readString(in));
817      region = new BlockRegion(in);
818    }
819
820    public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm,
821        BlockRegion region) {
822      this.metaName = metaName;
823      this.compressionAlgorithm = compressionAlgorithm;
824      this.region = region;
825    }
826
827    public String getMetaName() {
828      return metaName;
829    }
830
831    public Algorithm getCompressionAlgorithm() {
832      return compressionAlgorithm;
833    }
834
835    public BlockRegion getRegion() {
836      return region;
837    }
838
839    public void write(DataOutput out) throws IOException {
840      Utils.writeString(out, defaultPrefix + metaName);
841      Utils.writeString(out, compressionAlgorithm.getName());
842
843      region.write(out);
844    }
845  }
846
847  /**
848   * Index of all compressed data blocks.
849   */
850  static class DataIndex {
851    final static String BLOCK_NAME = "BCFile.index";
852
853    private final Algorithm defaultCompressionAlgorithm;
854
855    // for data blocks, each entry specifies a block's offset, compressed size
856    // and raw size
857    private final ArrayList<BlockRegion> listRegions;
858
859    // for read, deserialized from a file
860    public DataIndex(DataInput in) throws IOException {
861      defaultCompressionAlgorithm =
862          Compression.getCompressionAlgorithmByName(Utils.readString(in));
863
864      int n = Utils.readVInt(in);
865      listRegions = new ArrayList<BlockRegion>(n);
866
867      for (int i = 0; i < n; i++) {
868        BlockRegion region = new BlockRegion(in);
869        listRegions.add(region);
870      }
871    }
872
873    // for write
874    public DataIndex(String defaultCompressionAlgorithmName) {
875      this.defaultCompressionAlgorithm =
876          Compression
877              .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
878      listRegions = new ArrayList<BlockRegion>();
879    }
880
881    public Algorithm getDefaultCompressionAlgorithm() {
882      return defaultCompressionAlgorithm;
883    }
884
885    public ArrayList<BlockRegion> getBlockRegionList() {
886      return listRegions;
887    }
888
889    public void addBlockRegion(BlockRegion region) {
890      listRegions.add(region);
891    }
892
893    public void write(DataOutput out) throws IOException {
894      Utils.writeString(out, defaultCompressionAlgorithm.getName());
895
896      Utils.writeVInt(out, listRegions.size());
897
898      for (BlockRegion region : listRegions) {
899        region.write(out);
900      }
901    }
902  }
903
904  /**
905   * Magic number uniquely identifying a BCFile in the header/footer.
906   */
907  static final class Magic {
908    private final static byte[] AB_MAGIC_BCFILE =
909        {
910            // ... total of 16 bytes
911            (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91,
912            (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf,
913            (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1,
914            (byte) 0x50 };
915
916    public static void readAndVerify(DataInput in) throws IOException {
917      byte[] abMagic = new byte[size()];
918      in.readFully(abMagic);
919
920      // check against AB_MAGIC_BCFILE, if not matching, throw an
921      // Exception
922      if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
923        throw new IOException("Not a valid BCFile.");
924      }
925    }
926
927    public static void write(DataOutput out) throws IOException {
928      out.write(AB_MAGIC_BCFILE);
929    }
930
931    public static int size() {
932      return AB_MAGIC_BCFILE.length;
933    }
934  }
935
936  /**
937   * Block region.
938   */
939  static final class BlockRegion implements Scalar {
940    private final long offset;
941    private final long compressedSize;
942    private final long rawSize;
943
944    public BlockRegion(DataInput in) throws IOException {
945      offset = Utils.readVLong(in);
946      compressedSize = Utils.readVLong(in);
947      rawSize = Utils.readVLong(in);
948    }
949
950    public BlockRegion(long offset, long compressedSize, long rawSize) {
951      this.offset = offset;
952      this.compressedSize = compressedSize;
953      this.rawSize = rawSize;
954    }
955
956    public void write(DataOutput out) throws IOException {
957      Utils.writeVLong(out, offset);
958      Utils.writeVLong(out, compressedSize);
959      Utils.writeVLong(out, rawSize);
960    }
961
962    public long getOffset() {
963      return offset;
964    }
965
966    public long getCompressedSize() {
967      return compressedSize;
968    }
969
970    public long getRawSize() {
971      return rawSize;
972    }
973
974    @Override
975    public long magnitude() {
976      return offset;
977    }
978  }
979}
Note: See TracBrowser for help on using the repository browser.