source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/ChecksumFileSystem.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 17.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs;
20
21import java.io.*;
22import java.util.Arrays;
23import java.util.zip.CRC32;
24
25import org.apache.commons.logging.Log;
26import org.apache.commons.logging.LogFactory;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.fs.permission.FsPermission;
29import org.apache.hadoop.util.Progressable;
30import org.apache.hadoop.util.StringUtils;
31
32/****************************************************************
33 * Abstract Checksumed FileSystem.
34 * It provide a basice implementation of a Checksumed FileSystem,
35 * which creates a checksum file for each raw file.
36 * It generates & verifies checksums at the client side.
37 *
38 *****************************************************************/
39public abstract class ChecksumFileSystem extends FilterFileSystem {
40  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
41  private int bytesPerChecksum = 512;
42  private boolean verifyChecksum = true;
43
44  public static double getApproxChkSumLength(long size) {
45    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
46  }
47 
48  public ChecksumFileSystem(FileSystem fs) {
49    super(fs);
50  }
51
52  public void setConf(Configuration conf) {
53    super.setConf(conf);
54    if (conf != null) {
55      bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
56    }
57  }
58 
59  /**
60   * Set whether to verify checksum.
61   */
62  public void setVerifyChecksum(boolean verifyChecksum) {
63    this.verifyChecksum = verifyChecksum;
64  }
65
66  /** get the raw file system */
67  public FileSystem getRawFileSystem() {
68    return fs;
69  }
70
71  /** Return the name of the checksum file associated with a file.*/
72  public Path getChecksumFile(Path file) {
73    return new Path(file.getParent(), "." + file.getName() + ".crc");
74  }
75
76  /** Return true iff file is a checksum file name.*/
77  public static boolean isChecksumFile(Path file) {
78    String name = file.getName();
79    return name.startsWith(".") && name.endsWith(".crc");
80  }
81
82  /** Return the length of the checksum file given the size of the
83   * actual file.
84   **/
85  public long getChecksumFileLength(Path file, long fileSize) {
86    return getChecksumLength(fileSize, getBytesPerSum());
87  }
88
89  /** Return the bytes Per Checksum */
90  public int getBytesPerSum() {
91    return bytesPerChecksum;
92  }
93
94  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
95    int defaultBufferSize = getConf().getInt("io.file.buffer.size", 4096);
96    int proportionalBufferSize = bufferSize / bytesPerSum;
97    return Math.max(bytesPerSum,
98                    Math.max(proportionalBufferSize, defaultBufferSize));
99  }
100
101  /*******************************************************
102   * For open()'s FSInputStream
103   * It verifies that data matches checksums.
104   *******************************************************/
105  private static class ChecksumFSInputChecker extends FSInputChecker {
106    public static final Log LOG
107      = LogFactory.getLog(FSInputChecker.class);
108   
109    private ChecksumFileSystem fs;
110    private FSDataInputStream datas;
111    private FSDataInputStream sums;
112   
113    private static final int HEADER_LENGTH = 8;
114   
115    private int bytesPerSum = 1;
116    private long fileLen = -1L;
117   
118    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
119      throws IOException {
120      this(fs, file, fs.getConf().getInt("io.file.buffer.size", 4096));
121    }
122   
123    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
124      throws IOException {
125      super( file, fs.getFileStatus(file).getReplication() );
126      this.datas = fs.getRawFileSystem().open(file, bufferSize);
127      this.fs = fs;
128      Path sumFile = fs.getChecksumFile(file);
129      try {
130        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
131        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
132
133        byte[] version = new byte[CHECKSUM_VERSION.length];
134        sums.readFully(version);
135        if (!Arrays.equals(version, CHECKSUM_VERSION))
136          throw new IOException("Not a checksum file: "+sumFile);
137        this.bytesPerSum = sums.readInt();
138        set(fs.verifyChecksum, new CRC32(), bytesPerSum, 4);
139      } catch (FileNotFoundException e) {         // quietly ignore
140        set(fs.verifyChecksum, null, 1, 0);
141      } catch (IOException e) {                   // loudly ignore
142        LOG.warn("Problem opening checksum file: "+ file + 
143                 ".  Ignoring exception: " + 
144                 StringUtils.stringifyException(e));
145        set(fs.verifyChecksum, null, 1, 0);
146      }
147    }
148   
149    private long getChecksumFilePos( long dataPos ) {
150      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
151    }
152   
153    protected long getChunkPosition( long dataPos ) {
154      return dataPos/bytesPerSum*bytesPerSum;
155    }
156   
157    public int available() throws IOException {
158      return datas.available() + super.available();
159    }
160   
161    public int read(long position, byte[] b, int off, int len)
162      throws IOException {
163      // parameter check
164      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
165        throw new IndexOutOfBoundsException();
166      } else if (len == 0) {
167        return 0;
168      }
169      if( position<0 ) {
170        throw new IllegalArgumentException(
171            "Parameter position can not to be negative");
172      }
173
174      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
175      checker.seek(position);
176      int nread = checker.read(b, off, len);
177      checker.close();
178      return nread;
179    }
180   
181    public void close() throws IOException {
182      datas.close();
183      if( sums != null ) {
184        sums.close();
185      }
186      set(fs.verifyChecksum, null, 1, 0);
187    }
188   
189
190    @Override
191    public boolean seekToNewSource(long targetPos) throws IOException {
192      long sumsPos = getChecksumFilePos(targetPos);
193      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
194      boolean newDataSource = datas.seekToNewSource(targetPos);
195      return sums.seekToNewSource(sumsPos) || newDataSource;
196    }
197
198    @Override
199    protected int readChunk(long pos, byte[] buf, int offset, int len,
200        byte[] checksum) throws IOException {
201      boolean eof = false;
202      if(needChecksum()) {
203        try {
204          long checksumPos = getChecksumFilePos(pos); 
205          if(checksumPos != sums.getPos()) {
206            sums.seek(checksumPos);
207          }
208          sums.readFully(checksum);
209        } catch (EOFException e) {
210          eof = true;
211        }
212        len = bytesPerSum;
213      }
214      if(pos != datas.getPos()) {
215        datas.seek(pos);
216      }
217      int nread = readFully(datas, buf, offset, len);
218      if( eof && nread > 0) {
219        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
220      }
221      return nread;
222    }
223   
224    /* Return the file length */
225    private long getFileLength() throws IOException {
226      if( fileLen==-1L ) {
227        fileLen = fs.getContentSummary(file).getLength();
228      }
229      return fileLen;
230    }
231   
232    /**
233     * Skips over and discards <code>n</code> bytes of data from the
234     * input stream.
235     *
236     *The <code>skip</code> method skips over some smaller number of bytes
237     * when reaching end of file before <code>n</code> bytes have been skipped.
238     * The actual number of bytes skipped is returned.  If <code>n</code> is
239     * negative, no bytes are skipped.
240     *
241     * @param      n   the number of bytes to be skipped.
242     * @return     the actual number of bytes skipped.
243     * @exception  IOException  if an I/O error occurs.
244     *             ChecksumException if the chunk to skip to is corrupted
245     */
246    public synchronized long skip(long n) throws IOException {
247      long curPos = getPos();
248      long fileLength = getFileLength();
249      if( n+curPos > fileLength ) {
250        n = fileLength - curPos;
251      }
252      return super.skip(n);
253    }
254   
255    /**
256     * Seek to the given position in the stream.
257     * The next read() will be from that position.
258     *
259     * <p>This method does not allow seek past the end of the file.
260     * This produces IOException.
261     *
262     * @param      pos   the postion to seek to.
263     * @exception  IOException  if an I/O error occurs or seeks after EOF
264     *             ChecksumException if the chunk to seek to is corrupted
265     */
266
267    public synchronized void seek(long pos) throws IOException {
268      if(pos>getFileLength()) {
269        throw new IOException("Cannot seek after EOF");
270      }
271      super.seek(pos);
272    }
273
274  }
275
276  /**
277   * Opens an FSDataInputStream at the indicated Path.
278   * @param f the file name to open
279   * @param bufferSize the size of the buffer to be used.
280   */
281  @Override
282  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
283    return new FSDataInputStream(
284        new ChecksumFSInputChecker(this, f, bufferSize));
285  }
286
287  /** {@inheritDoc} */
288  public FSDataOutputStream append(Path f, int bufferSize,
289      Progressable progress) throws IOException {
290    throw new IOException("Not supported");
291  }
292
293  /**
294   * Calculated the length of the checksum file in bytes.
295   * @param size the length of the data file in bytes
296   * @param bytesPerSum the number of bytes in a checksum block
297   * @return the number of bytes in the checksum file
298   */
299  public static long getChecksumLength(long size, int bytesPerSum) {
300    //the checksum length is equal to size passed divided by bytesPerSum +
301    //bytes written in the beginning of the checksum file. 
302    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
303             CHECKSUM_VERSION.length + 4; 
304  }
305
306  /** This class provides an output stream for a checksummed file.
307   * It generates checksums for data. */
308  private static class ChecksumFSOutputSummer extends FSOutputSummer {
309    private FSDataOutputStream datas;   
310    private FSDataOutputStream sums;
311    private static final float CHKSUM_AS_FRACTION = 0.01f;
312   
313    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
314                          Path file, 
315                          boolean overwrite, 
316                          short replication,
317                          long blockSize,
318                          Configuration conf)
319      throws IOException {
320      this(fs, file, overwrite, 
321           conf.getInt("io.file.buffer.size", 4096),
322           replication, blockSize, null);
323    }
324   
325    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
326                          Path file, 
327                          boolean overwrite,
328                          int bufferSize,
329                          short replication,
330                          long blockSize,
331                          Progressable progress)
332      throws IOException {
333      super(new CRC32(), fs.getBytesPerSum(), 4);
334      int bytesPerSum = fs.getBytesPerSum();
335      this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
336                                         replication, blockSize, progress);
337      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
338      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
339                                               sumBufferSize, replication,
340                                               blockSize);
341      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
342      sums.writeInt(bytesPerSum);
343    }
344   
345    public void close() throws IOException {
346      flushBuffer();
347      sums.close();
348      datas.close();
349    }
350   
351    @Override
352    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
353    throws IOException {
354      datas.write(b, offset, len);
355      sums.write(checksum);
356    }
357  }
358
359  /** {@inheritDoc} */
360  @Override
361  public FSDataOutputStream create(Path f, FsPermission permission,
362      boolean overwrite, int bufferSize, short replication, long blockSize,
363      Progressable progress) throws IOException {
364    Path parent = f.getParent();
365    if (parent != null && !mkdirs(parent)) {
366      throw new IOException("Mkdirs failed to create " + parent);
367    }
368    final FSDataOutputStream out = new FSDataOutputStream(
369        new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
370            blockSize, progress), null);
371    if (permission != null) {
372      setPermission(f, permission);
373    }
374    return out;
375  }
376
377  /**
378   * Set replication for an existing file.
379   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
380   * @param src file name
381   * @param replication new replication
382   * @throws IOException
383   * @return true if successful;
384   *         false if file does not exist or is a directory
385   */
386  public boolean setReplication(Path src, short replication) throws IOException {
387    boolean value = fs.setReplication(src, replication);
388    if (!value)
389      return false;
390
391    Path checkFile = getChecksumFile(src);
392    if (exists(checkFile))
393      fs.setReplication(checkFile, replication);
394
395    return true;
396  }
397
398  /**
399   * Rename files/dirs
400   */
401  public boolean rename(Path src, Path dst) throws IOException {
402    if (fs.isDirectory(src)) {
403      return fs.rename(src, dst);
404    } else {
405
406      boolean value = fs.rename(src, dst);
407      if (!value)
408        return false;
409
410      Path checkFile = getChecksumFile(src);
411      if (fs.exists(checkFile)) { //try to rename checksum
412        if (fs.isDirectory(dst)) {
413          value = fs.rename(checkFile, dst);
414        } else {
415          value = fs.rename(checkFile, getChecksumFile(dst));
416        }
417      }
418
419      return value;
420    }
421  }
422
423  /**
424   * Implement the delete(Path, boolean) in checksum
425   * file system.
426   */
427  public boolean delete(Path f, boolean recursive) throws IOException{
428    FileStatus fstatus = null;
429    try {
430      fstatus = fs.getFileStatus(f);
431    } catch(FileNotFoundException e) {
432      return false;
433    }
434    if(fstatus.isDir()) {
435      //this works since the crcs are in the same
436      //directories and the files. so we just delete
437      //everything in the underlying filesystem
438      return fs.delete(f, recursive);
439    } else {
440      Path checkFile = getChecksumFile(f);
441      if (fs.exists(checkFile)) {
442        fs.delete(checkFile, true);
443      }
444      return fs.delete(f, true);
445    }
446  }
447   
448  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
449    public boolean accept(Path file) {
450      return !isChecksumFile(file);
451    }
452  };
453
454  /**
455   * List the statuses of the files/directories in the given path if the path is
456   * a directory.
457   *
458   * @param f
459   *          given path
460   * @return the statuses of the files/directories in the given patch
461   * @throws IOException
462   */
463  @Override
464  public FileStatus[] listStatus(Path f) throws IOException {
465    return fs.listStatus(f, DEFAULT_FILTER);
466  }
467 
468  @Override
469  public boolean mkdirs(Path f) throws IOException {
470    return fs.mkdirs(f);
471  }
472
473  @Override
474  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
475    throws IOException {
476    Configuration conf = getConf();
477    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
478  }
479
480  /**
481   * The src file is under FS, and the dst is on the local disk.
482   * Copy it from FS control to the local dst name.
483   */
484  @Override
485  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
486    throws IOException {
487    Configuration conf = getConf();
488    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
489  }
490
491  /**
492   * The src file is under FS, and the dst is on the local disk.
493   * Copy it from FS control to the local dst name.
494   * If src and dst are directories, the copyCrc parameter
495   * determines whether to copy CRC files.
496   */
497  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
498    throws IOException {
499    if (!fs.isDirectory(src)) { // source is a file
500      fs.copyToLocalFile(src, dst);
501      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
502      if (localFs.isDirectory(dst)) {
503        dst = new Path(dst, src.getName());
504      }
505      dst = getChecksumFile(dst);
506      if (localFs.exists(dst)) { //remove old local checksum file
507        localFs.delete(dst, true);
508      }
509      Path checksumFile = getChecksumFile(src);
510      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
511        fs.copyToLocalFile(checksumFile, dst);
512      }
513    } else {
514      FileStatus[] srcs = listStatus(src);
515      for (FileStatus srcFile : srcs) {
516        copyToLocalFile(srcFile.getPath(), 
517                        new Path(dst, srcFile.getPath().getName()), copyCrc);
518      }
519    }
520  }
521
522  @Override
523  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
524    throws IOException {
525    return tmpLocalFile;
526  }
527
528  @Override
529  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
530    throws IOException {
531    moveFromLocalFile(tmpLocalFile, fsOutputFile);
532  }
533
534  /**
535   * Report a checksum error to the file system.
536   * @param f the file name containing the error
537   * @param in the stream open on the file
538   * @param inPos the position of the beginning of the bad data in the file
539   * @param sums the stream open on the checksum file
540   * @param sumsPos the position of the beginning of the bad data in the checksum file
541   * @return if retry is neccessary
542   */
543  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
544                                       long inPos, FSDataInputStream sums, long sumsPos) {
545    return false;
546  }
547}
Note: See TracBrowser for help on using the repository browser.