source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/FSOutputSummer.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs;
20
21import java.io.IOException;
22import java.io.OutputStream;
23import java.util.zip.Checksum;
24
25/**
26 * This is a generic output stream for generating checksums for
27 * data before it is written to the underlying stream
28 */
29
30abstract public class FSOutputSummer extends OutputStream {
31  // data checksum
32  private Checksum sum;
33  // internal buffer for storing data before it is checksumed
34  private byte buf[];
35  // internal buffer for storing checksum
36  private byte checksum[];
37  // The number of valid bytes in the buffer.
38  private int count;
39 
40  protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
41    this.sum = sum;
42    this.buf = new byte[maxChunkSize];
43    this.checksum = new byte[checksumSize];
44    this.count = 0;
45  }
46 
47  /* write the data chunk in <code>b</code> staring at <code>offset</code> with
48   * a length of <code>len</code>, and its checksum
49   */
50  protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
51  throws IOException;
52
53  /** Write one byte */
54  public synchronized void write(int b) throws IOException {
55    sum.update(b);
56    buf[count++] = (byte)b;
57    if(count == buf.length) {
58      flushBuffer();
59    }
60  }
61
62  /**
63   * Writes <code>len</code> bytes from the specified byte array
64   * starting at offset <code>off</code> and generate a checksum for
65   * each data chunk.
66   *
67   * <p> This method stores bytes from the given array into this
68   * stream's buffer before it gets checksumed. The buffer gets checksumed
69   * and flushed to the underlying output stream when all data
70   * in a checksum chunk are in the buffer.  If the buffer is empty and
71   * requested length is at least as large as the size of next checksum chunk
72   * size, this method will checksum and write the chunk directly
73   * to the underlying output stream.  Thus it avoids uneccessary data copy.
74   *
75   * @param      b     the data.
76   * @param      off   the start offset in the data.
77   * @param      len   the number of bytes to write.
78   * @exception  IOException  if an I/O error occurs.
79   */
80  public synchronized void write(byte b[], int off, int len)
81  throws IOException {
82    if (off < 0 || len < 0 || off > b.length - len) {
83      throw new ArrayIndexOutOfBoundsException();
84    }
85
86    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
87    }
88  }
89 
90  /**
91   * Write a portion of an array, flushing to the underlying
92   * stream at most once if necessary.
93   */
94  private int write1(byte b[], int off, int len) throws IOException {
95    if(count==0 && len>=buf.length) {
96      // local buffer is empty and user data has one chunk
97      // checksum and output data
98      final int length = buf.length;
99      sum.update(b, off, length);
100      writeChecksumChunk(b, off, length, false);
101      return length;
102    }
103   
104    // copy user data to local buffer
105    int bytesToCopy = buf.length-count;
106    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
107    sum.update(b, off, bytesToCopy);
108    System.arraycopy(b, off, buf, count, bytesToCopy);
109    count += bytesToCopy;
110    if (count == buf.length) {
111      // local buffer is full
112      flushBuffer();
113    } 
114    return bytesToCopy;
115  }
116
117  /* Forces any buffered output bytes to be checksumed and written out to
118   * the underlying output stream.
119   */
120  protected synchronized void flushBuffer() throws IOException {
121    flushBuffer(false);
122  }
123
124  /* Forces any buffered output bytes to be checksumed and written out to
125   * the underlying output stream.  If keep is true, then the state of
126   * this object remains intact.
127   */
128  protected synchronized void flushBuffer(boolean keep) throws IOException {
129    if (count != 0) {
130      int chunkLen = count;
131      count = 0;
132      writeChecksumChunk(buf, 0, chunkLen, keep);
133      if (keep) {
134        count = chunkLen;
135      }
136    }
137  }
138 
139  /** Generate checksum for the data chunk and output data chunk & checksum
140   * to the underlying output stream. If keep is true then keep the
141   * current checksum intact, do not reset it.
142   */
143  private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
144  throws IOException {
145    int tempChecksum = (int)sum.getValue();
146    if (!keep) {
147      sum.reset();
148    }
149    int2byte(tempChecksum, checksum);
150    writeChunk(b, off, len, checksum);
151  }
152
153  /**
154   * Converts a checksum integer value to a byte stream
155   */
156  static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
157    return int2byte((int)sum.getValue(), new byte[checksumSize]);
158  }
159
160  static byte[] int2byte(int integer, byte[] bytes) {
161    bytes[0] = (byte)((integer >>> 24) & 0xFF);
162    bytes[1] = (byte)((integer >>> 16) & 0xFF);
163    bytes[2] = (byte)((integer >>>  8) & 0xFF);
164    bytes[3] = (byte)((integer >>>  0) & 0xFF);
165    return bytes;
166  }
167
168  /**
169   * Resets existing buffer with a new one of the specified size.
170   */
171  protected synchronized void resetChecksumChunk(int size) {
172    sum.reset();
173    this.buf = new byte[size];
174    this.count = 0;
175  }
176}
Note: See TracBrowser for help on using the repository browser.