source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/io/compress/BlockCompressorStream.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.1 KB
Line 
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.io.compress;
20
21import java.io.IOException;
22import java.io.OutputStream;
23
24/**
25 * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
26 * with 'block-based' based compression algorithms, as opposed to
27 * 'stream-based' compression algorithms.
28 *
29 * It should be noted that this wrapper does not guarantee that blocks will
30 * be sized for the compressor. If the
31 * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to
32 * effect meaningful compression, it is responsible for it.
33 */
34public class BlockCompressorStream extends CompressorStream {
35
36  // The 'maximum' size of input data to be compressed, to account
37  // for the overhead of the compression algorithm.
38  private final int MAX_INPUT_SIZE;
39
40  /**
41   * Create a {@link BlockCompressorStream}.
42   *
43   * @param out stream
44   * @param compressor compressor to be used
45   * @param bufferSize size of buffer
46   * @param compressionOverhead maximum 'overhead' of the compression
47   *                            algorithm with given bufferSize
48   */
49  public BlockCompressorStream(OutputStream out, Compressor compressor, 
50                               int bufferSize, int compressionOverhead) {
51    super(out, compressor, bufferSize);
52    MAX_INPUT_SIZE = bufferSize - compressionOverhead;
53  }
54
55  /**
56   * Create a {@link BlockCompressorStream} with given output-stream and
57   * compressor.
58   * Use default of 512 as bufferSize and compressionOverhead of
59   * (1% of bufferSize + 12 bytes) =  18 bytes (zlib algorithm).
60   *
61   * @param out stream
62   * @param compressor compressor to be used
63   */
64  public BlockCompressorStream(OutputStream out, Compressor compressor) {
65    this(out, compressor, 512, 18);
66  }
67
68  /**
69   * Write the data provided to the compression codec, compressing no more
70   * than the buffer size less the compression overhead as specified during
71   * construction for each block.
72   *
73   * Each block contains the uncompressed length for the block, followed by
74   * one or more length-prefixed blocks of compressed data.
75   */
76  public void write(byte[] b, int off, int len) throws IOException {
77    // Sanity checks
78    if (compressor.finished()) {
79      throw new IOException("write beyond end of stream");
80    }
81    if (b == null) {
82      throw new NullPointerException();
83    } else if ((off < 0) || (off > b.length) || (len < 0) ||
84               ((off + len) > b.length)) {
85      throw new IndexOutOfBoundsException();
86    } else if (len == 0) {
87      return;
88    }
89
90    long limlen = compressor.getBytesRead();
91    if (len + limlen > MAX_INPUT_SIZE && limlen > 0) {
92      // Adding this segment would exceed the maximum size.
93      // Flush data if we have it.
94      finish();
95      compressor.reset();
96    }
97
98    if (len > MAX_INPUT_SIZE) {
99      // The data we're given exceeds the maximum size. Any data
100      // we had have been flushed, so we write out this chunk in segments
101      // not exceeding the maximum size until it is exhausted.
102      rawWriteInt(len);
103      do {
104        int bufLen = Math.min(len, MAX_INPUT_SIZE);
105       
106        compressor.setInput(b, off, bufLen);
107        compressor.finish();
108        while (!compressor.finished()) {
109          compress();
110        }
111        compressor.reset();
112        off += bufLen;
113        len -= bufLen;
114      } while (len > 0);
115      return;
116    }
117
118    // Give data to the compressor
119    compressor.setInput(b, off, len);
120    if (!compressor.needsInput()) {
121      // compressor buffer size might be smaller than the maximum
122      // size, so we permit it to flush if required.
123      rawWriteInt((int)compressor.getBytesRead());
124      do {
125        compress();
126      } while (!compressor.needsInput());
127    }
128  }
129
130  public void finish() throws IOException {
131    if (!compressor.finished()) {
132      rawWriteInt((int)compressor.getBytesRead());
133      compressor.finish();
134      while (!compressor.finished()) {
135        compress();
136      }
137    }
138  }
139
140  protected void compress() throws IOException {
141    int len = compressor.compress(buffer, 0, buffer.length);
142    if (len > 0) {
143      // Write out the compressed chunk
144      rawWriteInt(len);
145      out.write(buffer, 0, len);
146    }
147  }
148 
149  private void rawWriteInt(int v) throws IOException {
150    out.write((v >>> 24) & 0xFF);
151    out.write((v >>> 16) & 0xFF);
152    out.write((v >>>  8) & 0xFF);
153    out.write((v >>>  0) & 0xFF);
154  }
155
156}
Note: See TracBrowser for help on using the repository browser.