source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/io/file/tfile/Chunk.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17package org.apache.hadoop.io.file.tfile;
18
19import java.io.DataInputStream;
20import java.io.DataOutputStream;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24
25/**
26 * Several related classes to support chunk-encoded sub-streams on top of a
27 * regular stream.
28 */
29final class Chunk {
30
31  /**
32   * Prevent the instantiation of class.
33   */
34  private Chunk() {
35    // nothing
36  }
37
38  /**
39   * Decoding a chain of chunks encoded through ChunkEncoder or
40   * SingleChunkEncoder.
41   */
42  static public class ChunkDecoder extends InputStream {
43    private DataInputStream in = null;
44    private boolean lastChunk;
45    private int remain = 0;
46    private boolean closed;
47
48    public ChunkDecoder() {
49      lastChunk = true;
50      closed = true;
51    }
52
53    public void reset(DataInputStream downStream) {
54      // no need to wind forward the old input.
55      in = downStream;
56      lastChunk = false;
57      remain = 0;
58      closed = false;
59    }
60
61    /**
62     * Constructor
63     *
64     * @param in
65     *          The source input stream which contains chunk-encoded data
66     *          stream.
67     */
68    public ChunkDecoder(DataInputStream in) {
69      this.in = in;
70      lastChunk = false;
71      closed = false;
72    }
73
74    /**
75     * Have we reached the last chunk.
76     *
77     * @return true if we have reached the last chunk.
78     * @throws java.io.IOException
79     */
80    public boolean isLastChunk() throws IOException {
81      checkEOF();
82      return lastChunk;
83    }
84
85    /**
86     * How many bytes remain in the current chunk?
87     *
88     * @return remaining bytes left in the current chunk.
89     * @throws java.io.IOException
90     */
91    public int getRemain() throws IOException {
92      checkEOF();
93      return remain;
94    }
95
96    /**
97     * Reading the length of next chunk.
98     *
99     * @throws java.io.IOException
100     *           when no more data is available.
101     */
102    private void readLength() throws IOException {
103      remain = Utils.readVInt(in);
104      if (remain >= 0) {
105        lastChunk = true;
106      } else {
107        remain = -remain;
108      }
109    }
110
111    /**
112     * Check whether we reach the end of the stream.
113     *
114     * @return false if the chunk encoded stream has more data to read (in which
115     *         case available() will be greater than 0); true otherwise.
116     * @throws java.io.IOException
117     *           on I/O errors.
118     */
119    private boolean checkEOF() throws IOException {
120      if (isClosed()) return true;
121      while (true) {
122        if (remain > 0) return false;
123        if (lastChunk) return true;
124        readLength();
125      }
126    }
127
128    @Override
129    /*
130     * This method never blocks the caller. Returning 0 does not mean we reach
131     * the end of the stream.
132     */
133    public int available() {
134      return remain;
135    }
136
137    @Override
138    public int read() throws IOException {
139      if (checkEOF()) return -1;
140      int ret = in.read();
141      if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
142      --remain;
143      return ret;
144    }
145
146    @Override
147    public int read(byte[] b) throws IOException {
148      return read(b, 0, b.length);
149    }
150
151    @Override
152    public int read(byte[] b, int off, int len) throws IOException {
153      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
154        throw new IndexOutOfBoundsException();
155      }
156
157      if (!checkEOF()) {
158        int n = Math.min(remain, len);
159        int ret = in.read(b, off, n);
160        if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
161        remain -= ret;
162        return ret;
163      }
164      return -1;
165    }
166
167    @Override
168    public long skip(long n) throws IOException {
169      if (!checkEOF()) {
170        long ret = in.skip(Math.min(remain, n));
171        remain -= ret;
172        return ret;
173      }
174      return 0;
175    }
176
177    @Override
178    public boolean markSupported() {
179      return false;
180    }
181
182    public boolean isClosed() {
183      return closed;
184    }
185
186    @Override
187    public void close() throws IOException {
188      if (closed == false) {
189        try {
190          while (!checkEOF()) {
191            skip(Integer.MAX_VALUE);
192          }
193        } finally {
194          closed = true;
195        }
196      }
197    }
198  }
199
200  /**
201   * Chunk Encoder. Encoding the output data into a chain of chunks in the
202   * following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n,
203   * byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data
204   * chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks
205   * cannot have length 0. All lengths are in the range of 0 to
206   * Integer.MAX_VALUE and are encoded in Utils.VInt format.
207   */
208  static public class ChunkEncoder extends OutputStream {
209    /**
210     * The data output stream it connects to.
211     */
212    private DataOutputStream out;
213
214    /**
215     * The internal buffer that is only used when we do not know the advertised
216     * size.
217     */
218    private byte buf[];
219
220    /**
221     * The number of valid bytes in the buffer. This value is always in the
222     * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
223     * through <tt>buf[count-1]</tt> contain valid byte data.
224     */
225    private int count;
226
227    /**
228     * Constructor.
229     *
230     * @param out
231     *          the underlying output stream.
232     * @param buf
233     *          user-supplied buffer. The buffer would be used exclusively by
234     *          the ChunkEncoder during its life cycle.
235     */
236    public ChunkEncoder(DataOutputStream out, byte[] buf) {
237      this.out = out;
238      this.buf = buf;
239      this.count = 0;
240    }
241
242    /**
243     * Write out a chunk.
244     *
245     * @param chunk
246     *          The chunk buffer.
247     * @param offset
248     *          Offset to chunk buffer for the beginning of chunk.
249     * @param len
250     * @param last
251     *          Is this the last call to flushBuffer?
252     */
253    private void writeChunk(byte[] chunk, int offset, int len, boolean last)
254        throws IOException {
255      if (last) { // always write out the length for the last chunk.
256        Utils.writeVInt(out, len);
257        if (len > 0) {
258          out.write(chunk, offset, len);
259        }
260      } else {
261        if (len > 0) {
262          Utils.writeVInt(out, -len);
263          out.write(chunk, offset, len);
264        }
265      }
266    }
267
268    /**
269     * Write out a chunk that is a concatenation of the internal buffer plus
270     * user supplied data. This will never be the last block.
271     *
272     * @param data
273     *          User supplied data buffer.
274     * @param offset
275     *          Offset to user data buffer.
276     * @param len
277     *          User data buffer size.
278     */
279    private void writeBufData(byte[] data, int offset, int len)
280        throws IOException {
281      if (count + len > 0) {
282        Utils.writeVInt(out, -(count + len));
283        out.write(buf, 0, count);
284        count = 0;
285        out.write(data, offset, len);
286      }
287    }
288
289    /**
290     * Flush the internal buffer.
291     *
292     * Is this the last call to flushBuffer?
293     *
294     * @throws java.io.IOException
295     */
296    private void flushBuffer() throws IOException {
297      if (count > 0) {
298        writeChunk(buf, 0, count, false);
299        count = 0;
300      }
301    }
302
303    @Override
304    public void write(int b) throws IOException {
305      if (count >= buf.length) {
306        flushBuffer();
307      }
308      buf[count++] = (byte) b;
309    }
310
311    @Override
312    public void write(byte b[]) throws IOException {
313      write(b, 0, b.length);
314    }
315
316    @Override
317    public void write(byte b[], int off, int len) throws IOException {
318      if ((len + count) >= buf.length) {
319        /*
320         * If the input data do not fit in buffer, flush the output buffer and
321         * then write the data directly. In this way buffered streams will
322         * cascade harmlessly.
323         */
324        writeBufData(b, off, len);
325        return;
326      }
327
328      System.arraycopy(b, off, buf, count, len);
329      count += len;
330    }
331
332    @Override
333    public void flush() throws IOException {
334      flushBuffer();
335      out.flush();
336    }
337
338    @Override
339    public void close() throws IOException {
340      if (buf != null) {
341        try {
342          writeChunk(buf, 0, count, true);
343        } finally {
344          buf = null;
345          out = null;
346        }
347      }
348    }
349  }
350
351  /**
352   * Encode the whole stream as a single chunk. Expecting to know the size of
353   * the chunk up-front.
354   */
355  static public class SingleChunkEncoder extends OutputStream {
356    /**
357     * The data output stream it connects to.
358     */
359    private final DataOutputStream out;
360
361    /**
362     * The remaining bytes to be written.
363     */
364    private int remain;
365    private boolean closed = false;
366
367    /**
368     * Constructor.
369     *
370     * @param out
371     *          the underlying output stream.
372     * @param size
373     *          The total # of bytes to be written as a single chunk.
374     * @throws java.io.IOException
375     *           if an I/O error occurs.
376     */
377    public SingleChunkEncoder(DataOutputStream out, int size)
378        throws IOException {
379      this.out = out;
380      this.remain = size;
381      Utils.writeVInt(out, size);
382    }
383
384    @Override
385    public void write(int b) throws IOException {
386      if (remain > 0) {
387        out.write(b);
388        --remain;
389      } else {
390        throw new IOException("Writing more bytes than advertised size.");
391      }
392    }
393
394    @Override
395    public void write(byte b[]) throws IOException {
396      write(b, 0, b.length);
397    }
398
399    @Override
400    public void write(byte b[], int off, int len) throws IOException {
401      if (remain >= len) {
402        out.write(b, off, len);
403        remain -= len;
404      } else {
405        throw new IOException("Writing more bytes than advertised size.");
406      }
407    }
408
409    @Override
410    public void flush() throws IOException {
411      out.flush();
412    }
413
414    @Override
415    public void close() throws IOException {
416      if (closed == true) {
417        return;
418      }
419
420      try {
421        if (remain > 0) {
422          throw new IOException("Writing less bytes than advertised size.");
423        }
424      } finally {
425        closed = true;
426      }
427    }
428  }
429}
Note: See TracBrowser for help on using the repository browser.