source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/io/file/tfile/Compression.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17package org.apache.hadoop.io.file.tfile;
18
19import java.io.BufferedInputStream;
20import java.io.BufferedOutputStream;
21import java.io.FilterOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
24import java.io.OutputStream;
25import java.util.ArrayList;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.io.compress.CodecPool;
31import org.apache.hadoop.io.compress.CompressionCodec;
32import org.apache.hadoop.io.compress.CompressionInputStream;
33import org.apache.hadoop.io.compress.CompressionOutputStream;
34import org.apache.hadoop.io.compress.Compressor;
35import org.apache.hadoop.io.compress.Decompressor;
36import org.apache.hadoop.io.compress.DefaultCodec;
37import org.apache.hadoop.util.ReflectionUtils;
38
39/**
40 * Compression related stuff.
41 */
42final class Compression {
43  static final Log LOG = LogFactory.getLog(Compression.class);
44
45  /**
46   * Prevent the instantiation of class.
47   */
48  private Compression() {
49    // nothing
50  }
51
52  static class FinishOnFlushCompressionStream extends FilterOutputStream {
53    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
54      super(cout);
55    }
56
57    @Override
58    public void write(byte b[], int off, int len) throws IOException {
59      out.write(b, off, len);
60    }
61
62    @Override
63    public void flush() throws IOException {
64      CompressionOutputStream cout = (CompressionOutputStream) out;
65      cout.finish();
66      cout.flush();
67      cout.resetState();
68    }
69  }
70
71  /**
72   * Compression algorithms.
73   */
74  static enum Algorithm {
75    LZO(TFile.COMPRESSION_LZO) {
76      private transient boolean checked = false;
77      private static final String defaultClazz =
78          "org.apache.hadoop.io.compress.LzoCodec";
79      private transient CompressionCodec codec = null;
80
81      @Override
82      public synchronized boolean isSupported() {
83        if (!checked) {
84          checked = true;
85          String extClazz =
86              (conf.get(CONF_LZO_CLASS) == null ? System
87                  .getProperty(CONF_LZO_CLASS) : null);
88          String clazz = (extClazz != null) ? extClazz : defaultClazz;
89          try {
90            LOG.info("Trying to load Lzo codec class: " + clazz);
91            codec =
92                (CompressionCodec) ReflectionUtils.newInstance(Class
93                    .forName(clazz), conf);
94          } catch (ClassNotFoundException e) {
95            // that is okay
96          }
97        }
98        return codec != null;
99      }
100
101      @Override
102      CompressionCodec getCodec() throws IOException {
103        if (!isSupported()) {
104          throw new IOException(
105              "LZO codec class not specified. Did you forget to set property "
106                  + CONF_LZO_CLASS + "?");
107        }
108
109        return codec;
110      }
111
112      @Override
113      public synchronized InputStream createDecompressionStream(
114          InputStream downStream, Decompressor decompressor,
115          int downStreamBufferSize) throws IOException {
116        if (!isSupported()) {
117          throw new IOException(
118              "LZO codec class not specified. Did you forget to set property "
119                  + CONF_LZO_CLASS + "?");
120        }
121        InputStream bis1 = null;
122        if (downStreamBufferSize > 0) {
123          bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
124        } else {
125          bis1 = downStream;
126        }
127        conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
128        CompressionInputStream cis =
129            codec.createInputStream(bis1, decompressor);
130        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
131        return bis2;
132      }
133
134      @Override
135      public synchronized OutputStream createCompressionStream(
136          OutputStream downStream, Compressor compressor,
137          int downStreamBufferSize) throws IOException {
138        if (!isSupported()) {
139          throw new IOException(
140              "LZO codec class not specified. Did you forget to set property "
141                  + CONF_LZO_CLASS + "?");
142        }
143        OutputStream bos1 = null;
144        if (downStreamBufferSize > 0) {
145          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
146        } else {
147          bos1 = downStream;
148        }
149        conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
150        CompressionOutputStream cos =
151            codec.createOutputStream(bos1, compressor);
152        BufferedOutputStream bos2 =
153            new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
154                DATA_OBUF_SIZE);
155        return bos2;
156      }
157    },
158
159    GZ(TFile.COMPRESSION_GZ) {
160      private transient DefaultCodec codec;
161
162      @Override
163      synchronized CompressionCodec getCodec() {
164        if (codec == null) {
165          codec = new DefaultCodec();
166          codec.setConf(conf);
167        }
168
169        return codec;
170      }
171
172      @Override
173      public synchronized InputStream createDecompressionStream(
174          InputStream downStream, Decompressor decompressor,
175          int downStreamBufferSize) throws IOException {
176        // Set the internal buffer size to read from down stream.
177        if (downStreamBufferSize > 0) {
178          codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
179        }
180        CompressionInputStream cis =
181            codec.createInputStream(downStream, decompressor);
182        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
183        return bis2;
184      }
185
186      @Override
187      public synchronized OutputStream createCompressionStream(
188          OutputStream downStream, Compressor compressor,
189          int downStreamBufferSize) throws IOException {
190        OutputStream bos1 = null;
191        if (downStreamBufferSize > 0) {
192          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
193        } else {
194          bos1 = downStream;
195        }
196        codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
197        CompressionOutputStream cos =
198            codec.createOutputStream(bos1, compressor);
199        BufferedOutputStream bos2 =
200            new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
201                DATA_OBUF_SIZE);
202        return bos2;
203      }
204
205      @Override
206      public boolean isSupported() {
207        return true;
208      }
209    },
210
211    NONE(TFile.COMPRESSION_NONE) {
212      @Override
213      CompressionCodec getCodec() {
214        return null;
215      }
216
217      @Override
218      public synchronized InputStream createDecompressionStream(
219          InputStream downStream, Decompressor decompressor,
220          int downStreamBufferSize) throws IOException {
221        if (downStreamBufferSize > 0) {
222          return new BufferedInputStream(downStream, downStreamBufferSize);
223        }
224        return downStream;
225      }
226
227      @Override
228      public synchronized OutputStream createCompressionStream(
229          OutputStream downStream, Compressor compressor,
230          int downStreamBufferSize) throws IOException {
231        if (downStreamBufferSize > 0) {
232          return new BufferedOutputStream(downStream, downStreamBufferSize);
233        }
234
235        return downStream;
236      }
237
238      @Override
239      public boolean isSupported() {
240        return true;
241      }
242    };
243
244    // We require that all compression related settings are configured
245    // statically in the Configuration object.
246    protected static final Configuration conf = new Configuration();
247    private final String compressName;
248    // data input buffer size to absorb small reads from application.
249    private static final int DATA_IBUF_SIZE = 1 * 1024;
250    // data output buffer size to absorb small writes from application.
251    private static final int DATA_OBUF_SIZE = 4 * 1024;
252    public static final String CONF_LZO_CLASS =
253        "io.compression.codec.lzo.class";
254
255    Algorithm(String name) {
256      this.compressName = name;
257    }
258
259    abstract CompressionCodec getCodec() throws IOException;
260
261    public abstract InputStream createDecompressionStream(
262        InputStream downStream, Decompressor decompressor,
263        int downStreamBufferSize) throws IOException;
264
265    public abstract OutputStream createCompressionStream(
266        OutputStream downStream, Compressor compressor, int downStreamBufferSize)
267        throws IOException;
268
269    public abstract boolean isSupported();
270
271    public Compressor getCompressor() throws IOException {
272      CompressionCodec codec = getCodec();
273      if (codec != null) {
274        Compressor compressor = CodecPool.getCompressor(codec);
275        if (compressor != null) {
276          if (compressor.finished()) {
277            // Somebody returns the compressor to CodecPool but is still using
278            // it.
279            LOG.warn("Compressor obtained from CodecPool already finished()");
280          } else {
281            LOG.debug("Got a compressor: " + compressor.hashCode());
282          }
283          /**
284           * Following statement is necessary to get around bugs in 0.18 where a
285           * compressor is referenced after returned back to the codec pool.
286           */
287          compressor.reset();
288        }
289        return compressor;
290      }
291      return null;
292    }
293
294    public void returnCompressor(Compressor compressor) {
295      if (compressor != null) {
296        LOG.debug("Return a compressor: " + compressor.hashCode());
297        CodecPool.returnCompressor(compressor);
298      }
299    }
300
301    public Decompressor getDecompressor() throws IOException {
302      CompressionCodec codec = getCodec();
303      if (codec != null) {
304        Decompressor decompressor = CodecPool.getDecompressor(codec);
305        if (decompressor != null) {
306          if (decompressor.finished()) {
307            // Somebody returns the decompressor to CodecPool but is still using
308            // it.
309            LOG.warn("Deompressor obtained from CodecPool already finished()");
310          } else {
311            LOG.debug("Got a decompressor: " + decompressor.hashCode());
312          }
313          /**
314           * Following statement is necessary to get around bugs in 0.18 where a
315           * decompressor is referenced after returned back to the codec pool.
316           */
317          decompressor.reset();
318        }
319        return decompressor;
320      }
321
322      return null;
323    }
324
325    public void returnDecompressor(Decompressor decompressor) {
326      if (decompressor != null) {
327        LOG.debug("Returned a decompressor: " + decompressor.hashCode());
328        CodecPool.returnDecompressor(decompressor);
329      }
330    }
331
332    public String getName() {
333      return compressName;
334    }
335  }
336
337  static Algorithm getCompressionAlgorithmByName(String compressName) {
338    Algorithm[] algos = Algorithm.class.getEnumConstants();
339
340    for (Algorithm a : algos) {
341      if (a.getName().equals(compressName)) {
342        return a;
343      }
344    }
345
346    throw new IllegalArgumentException(
347        "Unsupported compression algorithm name: " + compressName);
348  }
349
350  static String[] getSupportedAlgorithms() {
351    Algorithm[] algos = Algorithm.class.getEnumConstants();
352
353    ArrayList<String> ret = new ArrayList<String>();
354    for (Algorithm a : algos) {
355      if (a.isSupported()) {
356        ret.add(a.getName());
357      }
358    }
359    return ret.toArray(new String[ret.size()]);
360  }
361}
Note: See TracBrowser for help on using the repository browser.