source: proiecte/HadoopJUnit/hadoop-0.20.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.datanode;
19
20import java.io.BufferedInputStream;
21import java.io.DataInputStream;
22import java.io.DataOutputStream;
23import java.io.FileInputStream;
24import java.io.IOException;
25import java.io.InputStream;
26import java.io.OutputStream;
27import java.net.SocketException;
28import java.nio.ByteBuffer;
29import java.nio.channels.FileChannel;
30import java.util.Arrays;
31
32import org.apache.commons.logging.Log;
33import org.apache.hadoop.fs.ChecksumException;
34import org.apache.hadoop.hdfs.protocol.Block;
35import org.apache.hadoop.hdfs.protocol.FSConstants;
36import org.apache.hadoop.io.IOUtils;
37import org.apache.hadoop.net.SocketOutputStream;
38import org.apache.hadoop.util.DataChecksum;
39import org.apache.hadoop.util.StringUtils;
40
41/**
42 * Reads a block from the disk and sends it to a recipient.
43 */
44class BlockSender implements java.io.Closeable, FSConstants {
45  public static final Log LOG = DataNode.LOG;
46  static final Log ClientTraceLog = DataNode.ClientTraceLog;
47 
48  private Block block; // the block to read from
49  private InputStream blockIn; // data stream
50  private long blockInPosition = -1; // updated while using transferTo().
51  private DataInputStream checksumIn; // checksum datastream
52  private DataChecksum checksum; // checksum stream
53  private long offset; // starting position to read
54  private long endOffset; // ending position
55  private long blockLength;
56  private int bytesPerChecksum; // chunk size
57  private int checksumSize; // checksum size
58  private boolean corruptChecksumOk; // if need to verify checksum
59  private boolean chunkOffsetOK; // if need to send chunk offset
60  private long seqno; // sequence number of packet
61
62  private boolean transferToAllowed = true;
63  private boolean blockReadFully; //set when the whole block is read
64  private boolean verifyChecksum; //if true, check is verified while reading
65  private BlockTransferThrottler throttler;
66  private final String clientTraceFmt; // format of client trace log message
67
68  /**
69   * Minimum buffer used while sending data to clients. Used only if
70   * transferTo() is enabled. 64KB is not that large. It could be larger, but
71   * not sure if there will be much more improvement.
72   */
73  private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
74
75 
76  BlockSender(Block block, long startOffset, long length,
77              boolean corruptChecksumOk, boolean chunkOffsetOK,
78              boolean verifyChecksum, DataNode datanode) throws IOException {
79    this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
80         verifyChecksum, datanode, null);
81  }
82
83  BlockSender(Block block, long startOffset, long length,
84              boolean corruptChecksumOk, boolean chunkOffsetOK,
85              boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
86      throws IOException {
87    try {
88      this.block = block;
89      this.chunkOffsetOK = chunkOffsetOK;
90      this.corruptChecksumOk = corruptChecksumOk;
91      this.verifyChecksum = verifyChecksum;
92      this.blockLength = datanode.data.getLength(block);
93      this.transferToAllowed = datanode.transferToAllowed;
94      this.clientTraceFmt = clientTraceFmt;
95
96      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
97        checksumIn = new DataInputStream(
98                new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
99                                        BUFFER_SIZE));
100
101        // read and handle the common header here. For now just a version
102       BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
103       short version = header.getVersion();
104
105        if (version != FSDataset.METADATA_VERSION) {
106          LOG.warn("Wrong version (" + version + ") for metadata file for "
107              + block + " ignoring ...");
108        }
109        checksum = header.getChecksum();
110      } else {
111        LOG.warn("Could not find metadata file for " + block);
112        // This only decides the buffer size. Use BUFFER_SIZE?
113        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
114            16 * 1024);
115      }
116
117      /* If bytesPerChecksum is very large, then the metadata file
118       * is mostly corrupted. For now just truncate bytesPerchecksum to
119       * blockLength.
120       */       
121      bytesPerChecksum = checksum.getBytesPerChecksum();
122      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
123        checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
124                                   Math.max((int)blockLength, 10*1024*1024));
125        bytesPerChecksum = checksum.getBytesPerChecksum();       
126      }
127      checksumSize = checksum.getChecksumSize();
128
129      if (length < 0) {
130        length = blockLength;
131      }
132
133      endOffset = blockLength;
134      if (startOffset < 0 || startOffset > endOffset
135          || (length + startOffset) > endOffset) {
136        String msg = " Offset " + startOffset + " and length " + length
137        + " don't match block " + block + " ( blockLen " + endOffset + " )";
138        LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
139        throw new IOException(msg);
140      }
141
142     
143      offset = (startOffset - (startOffset % bytesPerChecksum));
144      if (length >= 0) {
145        // Make sure endOffset points to end of a checksumed chunk.
146        long tmpLen = startOffset + length;
147        if (tmpLen % bytesPerChecksum != 0) {
148          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
149        }
150        if (tmpLen < endOffset) {
151          endOffset = tmpLen;
152        }
153      }
154
155      // seek to the right offsets
156      if (offset > 0) {
157        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
158        // note blockInStream is  seeked when created below
159        if (checksumSkip > 0) {
160          // Should we use seek() for checksum file as well?
161          IOUtils.skipFully(checksumIn, checksumSkip);
162        }
163      }
164      seqno = 0;
165
166      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
167    } catch (IOException ioe) {
168      IOUtils.closeStream(this);
169      IOUtils.closeStream(blockIn);
170      throw ioe;
171    }
172  }
173
174  /**
175   * close opened files.
176   */
177  public void close() throws IOException {
178    IOException ioe = null;
179    // close checksum file
180    if(checksumIn!=null) {
181      try {
182        checksumIn.close();
183      } catch (IOException e) {
184        ioe = e;
185      }
186      checksumIn = null;
187    }
188    // close data file
189    if(blockIn!=null) {
190      try {
191        blockIn.close();
192      } catch (IOException e) {
193        ioe = e;
194      }
195      blockIn = null;
196    }
197    // throw IOException if there is any
198    if(ioe!= null) {
199      throw ioe;
200    }
201  }
202
203  /**
204   * Converts an IOExcpetion (not subclasses) to SocketException.
205   * This is typically done to indicate to upper layers that the error
206   * was a socket error rather than often more serious exceptions like
207   * disk errors.
208   */
209  private static IOException ioeToSocketException(IOException ioe) {
210    if (ioe.getClass().equals(IOException.class)) {
211      // "se" could be a new class in stead of SocketException.
212      IOException se = new SocketException("Original Exception : " + ioe);
213      se.initCause(ioe);
214      /* Change the stacktrace so that original trace is not truncated
215       * when printed.*/ 
216      se.setStackTrace(ioe.getStackTrace());
217      return se;
218    }
219    // otherwise just return the same exception.
220    return ioe;
221  }
222
223  /**
224   * Sends upto maxChunks chunks of data.
225   *
226   * When blockInPosition is >= 0, assumes 'out' is a
227   * {@link SocketOutputStream} and tries
228   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
229   * send data (and updates blockInPosition).
230   */
231  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
232                         throws IOException {
233    // Sends multiple chunks in one packet with a single write().
234
235    int len = Math.min((int) (endOffset - offset),
236                       bytesPerChecksum*maxChunks);
237    if (len == 0) {
238      return 0;
239    }
240
241    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
242    int packetLen = len + numChunks*checksumSize + 4;
243    pkt.clear();
244   
245    // write packet header
246    pkt.putInt(packetLen);
247    pkt.putLong(offset);
248    pkt.putLong(seqno);
249    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
250               //why no ByteBuf.putBoolean()?
251    pkt.putInt(len);
252   
253    int checksumOff = pkt.position();
254    int checksumLen = numChunks * checksumSize;
255    byte[] buf = pkt.array();
256   
257    if (checksumSize > 0 && checksumIn != null) {
258      try {
259        checksumIn.readFully(buf, checksumOff, checksumLen);
260      } catch (IOException e) {
261        LOG.warn(" Could not read or failed to veirfy checksum for data" +
262                 " at offset " + offset + " for block " + block + " got : "
263                 + StringUtils.stringifyException(e));
264        IOUtils.closeStream(checksumIn);
265        checksumIn = null;
266        if (corruptChecksumOk) {
267          if (checksumOff < checksumLen) {
268            // Just fill the array with zeros.
269            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
270          }
271        } else {
272          throw e;
273        }
274      }
275    }
276   
277    int dataOff = checksumOff + checksumLen;
278   
279    if (blockInPosition < 0) {
280      //normal transfer
281      IOUtils.readFully(blockIn, buf, dataOff, len);
282
283      if (verifyChecksum) {
284        int dOff = dataOff;
285        int cOff = checksumOff;
286        int dLeft = len;
287
288        for (int i=0; i<numChunks; i++) {
289          checksum.reset();
290          int dLen = Math.min(dLeft, bytesPerChecksum);
291          checksum.update(buf, dOff, dLen);
292          if (!checksum.compare(buf, cOff)) {
293            throw new ChecksumException("Checksum failed at " + 
294                                        (offset + len - dLeft), len);
295          }
296          dLeft -= dLen;
297          dOff += dLen;
298          cOff += checksumSize;
299        }
300      }
301      //writing is done below (mainly to handle IOException)
302    }
303   
304    try {
305      if (blockInPosition >= 0) {
306        //use transferTo(). Checks on out and blockIn are already done.
307
308        SocketOutputStream sockOut = (SocketOutputStream)out;
309        //first write the packet
310        sockOut.write(buf, 0, dataOff);
311        // no need to flush. since we know out is not a buffered stream.
312
313        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
314                                blockInPosition, len);
315
316        blockInPosition += len;
317      } else {
318        // normal transfer
319        out.write(buf, 0, dataOff + len);
320      }
321     
322    } catch (IOException e) {
323      /* exception while writing to the client (well, with transferTo(),
324       * it could also be while reading from the local file).
325       */
326      throw ioeToSocketException(e);
327    }
328
329    if (throttler != null) { // rebalancing so throttle
330      throttler.throttle(packetLen);
331    }
332
333    return len;
334  }
335
336  /**
337   * sendBlock() is used to read block and its metadata and stream the data to
338   * either a client or to another datanode.
339   *
340   * @param out  stream to which the block is written to
341   * @param baseStream optional. if non-null, <code>out</code> is assumed to
342   *        be a wrapper over this stream. This enables optimizations for
343   *        sending the data, e.g.
344   *        {@link SocketOutputStream#transferToFully(FileChannel,
345   *        long, int)}.
346   * @param throttler for sending data.
347   * @return total bytes reads, including crc.
348   */
349  long sendBlock(DataOutputStream out, OutputStream baseStream, 
350                 BlockTransferThrottler throttler) throws IOException {
351    if( out == null ) {
352      throw new IOException( "out stream is null" );
353    }
354    this.throttler = throttler;
355
356    long initialOffset = offset;
357    long totalRead = 0;
358    OutputStream streamForSendChunks = out;
359   
360    try {
361      try {
362        checksum.writeHeader(out);
363        if ( chunkOffsetOK ) {
364          out.writeLong( offset );
365        }
366        out.flush();
367      } catch (IOException e) { //socket error
368        throw ioeToSocketException(e);
369      }
370     
371      int maxChunksPerPacket;
372      int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
373     
374      if (transferToAllowed && !verifyChecksum && 
375          baseStream instanceof SocketOutputStream && 
376          blockIn instanceof FileInputStream) {
377       
378        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
379       
380        // blockInPosition also indicates sendChunks() uses transferTo.
381        blockInPosition = fileChannel.position();
382        streamForSendChunks = baseStream;
383       
384        // assure a mininum buffer size.
385        maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
386                                       MIN_BUFFER_WITH_TRANSFERTO)
387                              + bytesPerChecksum - 1)/bytesPerChecksum;
388       
389        // allocate smaller buffer while using transferTo().
390        pktSize += checksumSize * maxChunksPerPacket;
391      } else {
392        maxChunksPerPacket = Math.max(1,
393                 (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
394        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
395      }
396
397      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
398
399      while (endOffset > offset) {
400        long len = sendChunks(pktBuf, maxChunksPerPacket, 
401                              streamForSendChunks);
402        offset += len;
403        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
404                            checksumSize);
405        seqno++;
406      }
407      try {
408        out.writeInt(0); // mark the end of block       
409        out.flush();
410      } catch (IOException e) { //socket error
411        throw ioeToSocketException(e);
412      }
413    } finally {
414      if (clientTraceFmt != null) {
415        ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
416      }
417      close();
418    }
419
420    blockReadFully = (initialOffset == 0 && offset >= blockLength);
421
422    return totalRead;
423  }
424 
425  boolean isBlockReadFully() {
426    return blockReadFully;
427  }
428}
Note: See TracBrowser for help on using the repository browser.