/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hdfs; import junit.framework.TestCase; import java.io.*; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumFileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; /** * This class tests if FSInputChecker works correctly. */ public class TestFSInputChecker extends TestCase { static final long seed = 0xDEADBEEFL; static final int BYTES_PER_SUM = 10; static final int BLOCK_SIZE = 2*BYTES_PER_SUM; static final int HALF_CHUNK_SIZE = BYTES_PER_SUM/2; static final int FILE_SIZE = 2*BLOCK_SIZE-1; static final short NUM_OF_DATANODES = 2; byte[] expected = new byte[FILE_SIZE]; byte[] actual; FSDataInputStream stm; Random rand = new Random(seed); /* create a file */ private void writeFile(FileSystem fileSys, Path name) throws IOException { // create and write a file that contains three blocks of data FSDataOutputStream stm = fileSys.create(name, new FsPermission((short)0777), true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)NUM_OF_DATANODES, BLOCK_SIZE, null); stm.write(expected); stm.close(); } /*validate data*/ private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) throws Exception { for (int idx = 0; idx < actual.length; idx++) { assertEquals(message+" byte "+(from+idx)+" differs. expected "+ expected[from+idx]+" actual "+actual[idx], actual[idx], expected[from+idx]); actual[idx] = 0; } } /* test read and getPos */ private void checkReadAndGetPos() throws Exception { actual = new byte[FILE_SIZE]; // test reads that do not cross checksum boundary stm.seek(0); int offset; for(offset=0; offset= corruptFileLen); rand.nextBytes(buf); out.seek(corruptFileLen/2); out.write(buf, 0, corruptFileLen/4); out.close(); boolean gotException = false; InputStream in = fileSys.open(file); try { IOUtils.readFully(in, buf, 0, buf.length); } catch (ChecksumException e) { gotException = true; } assertTrue(gotException); in.close(); } public void testFSInputChecker() throws Exception { Configuration conf = new Configuration(); conf.setLong("dfs.block.size", BLOCK_SIZE); conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem"); rand.nextBytes(expected); // test DFS MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem(); try { testChecker(fileSys, true); testChecker(fileSys, false); testSeekAndRead(fileSys); } finally { fileSys.close(); cluster.shutdown(); } // test Local FS fileSys = FileSystem.getLocal(conf); try { testChecker(fileSys, true); testChecker(fileSys, false); testFileCorruption((LocalFileSystem)fileSys); testSeekAndRead(fileSys); }finally { fileSys.close(); } } private void testSeekAndRead(ChecksumFileSystem fileSys) throws IOException { Path file = new Path("try.dat"); writeFile(fileSys, file); stm = fileSys.open(file, fileSys.getConf().getInt("io.file.buffer.size", 4096)); checkSeekAndRead(); stm.close(); cleanupFile(fileSys, file); } private void checkSeekAndRead() throws IOException { int position = 1; int len = 2 * BYTES_PER_SUM - (int) position; readAndCompare(stm, position, len); position = BYTES_PER_SUM; len = BYTES_PER_SUM; readAndCompare(stm, position, len); } private void readAndCompare(FSDataInputStream in, int position, int len) throws IOException { byte[] b = new byte[len]; in.seek(position); IOUtils.readFully(in, b, 0, b.length); for (int i = 0; i < b.length; i++) { assertEquals(expected[position + i], b[i]); } } }