source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestFSInputChecker.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import junit.framework.TestCase;
21import java.io.*;
22import java.util.Random;
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.fs.ChecksumException;
25import org.apache.hadoop.fs.ChecksumFileSystem;
26import org.apache.hadoop.fs.FSDataInputStream;
27import org.apache.hadoop.fs.FSDataOutputStream;
28import org.apache.hadoop.fs.FileSystem;
29import org.apache.hadoop.fs.LocalFileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.fs.permission.FsPermission;
32import org.apache.hadoop.io.IOUtils;
33
34/**
35 * This class tests if FSInputChecker works correctly.
36 */
37public class TestFSInputChecker extends TestCase {
38  static final long seed = 0xDEADBEEFL;
39  static final int BYTES_PER_SUM = 10;
40  static final int BLOCK_SIZE = 2*BYTES_PER_SUM;
41  static final int HALF_CHUNK_SIZE = BYTES_PER_SUM/2;
42  static final int FILE_SIZE = 2*BLOCK_SIZE-1;
43  static final short NUM_OF_DATANODES = 2;
44  byte[] expected = new byte[FILE_SIZE];
45  byte[] actual;
46  FSDataInputStream stm;
47  Random rand = new Random(seed);
48
49  /* create a file */
50  private void writeFile(FileSystem fileSys, Path name) throws IOException {
51    // create and write a file that contains three blocks of data
52    FSDataOutputStream stm = fileSys.create(name, new FsPermission((short)0777),
53        true, fileSys.getConf().getInt("io.file.buffer.size", 4096),
54        (short)NUM_OF_DATANODES, BLOCK_SIZE, null);
55    stm.write(expected);
56    stm.close();
57  }
58 
59  /*validate data*/
60  private void checkAndEraseData(byte[] actual, int from, byte[] expected, 
61      String message) throws Exception {
62    for (int idx = 0; idx < actual.length; idx++) {
63      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
64                        expected[from+idx]+" actual "+actual[idx],
65                        actual[idx], expected[from+idx]);
66      actual[idx] = 0;
67    }
68  }
69 
70  /* test read and getPos */
71  private void checkReadAndGetPos() throws Exception {
72    actual = new byte[FILE_SIZE];
73    // test reads that do not cross checksum boundary
74    stm.seek(0);
75    int offset;
76    for(offset=0; offset<BLOCK_SIZE+BYTES_PER_SUM;
77                  offset += BYTES_PER_SUM ) {
78      assertEquals(stm.getPos(), offset);
79      stm.readFully(actual, offset, BYTES_PER_SUM);
80    }
81    stm.readFully(actual, offset, FILE_SIZE-BLOCK_SIZE-BYTES_PER_SUM);
82    assertEquals(stm.getPos(), FILE_SIZE);
83    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
84   
85    // test reads that cross checksum boundary
86    stm.seek(0L);
87    assertEquals(stm.getPos(), 0L);
88    stm.readFully(actual, 0, HALF_CHUNK_SIZE);
89    assertEquals(stm.getPos(), HALF_CHUNK_SIZE);
90    stm.readFully(actual, HALF_CHUNK_SIZE, BLOCK_SIZE-HALF_CHUNK_SIZE);
91    assertEquals(stm.getPos(), BLOCK_SIZE);
92    stm.readFully(actual, BLOCK_SIZE, BYTES_PER_SUM+HALF_CHUNK_SIZE);
93    assertEquals(stm.getPos(), BLOCK_SIZE+BYTES_PER_SUM+HALF_CHUNK_SIZE);
94    stm.readFully(actual, 2*BLOCK_SIZE-HALF_CHUNK_SIZE, 
95        FILE_SIZE-(2*BLOCK_SIZE-HALF_CHUNK_SIZE));
96    assertEquals(stm.getPos(), FILE_SIZE);
97    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
98   
99    // test read that cross block boundary
100    stm.seek(0L);
101    stm.readFully(actual, 0, BYTES_PER_SUM+HALF_CHUNK_SIZE);
102    assertEquals(stm.getPos(), BYTES_PER_SUM+HALF_CHUNK_SIZE);
103    stm.readFully(actual, BYTES_PER_SUM+HALF_CHUNK_SIZE, BYTES_PER_SUM);
104    assertEquals(stm.getPos(), BLOCK_SIZE+HALF_CHUNK_SIZE);
105    stm.readFully(actual, BLOCK_SIZE+HALF_CHUNK_SIZE,
106        FILE_SIZE-BLOCK_SIZE-HALF_CHUNK_SIZE);
107    assertEquals(stm.getPos(), FILE_SIZE);
108    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
109  }
110 
111  /* test if one seek is correct */
112  private void testSeek1(int offset) 
113  throws Exception {
114    stm.seek(offset);
115    assertEquals(offset, stm.getPos());
116    stm.readFully(actual);
117    checkAndEraseData(actual, offset, expected, "Read Sanity Test");
118  }
119
120  /* test seek() */
121  private void checkSeek( ) throws Exception {
122    actual = new byte[HALF_CHUNK_SIZE];
123   
124    // test seeks to checksum boundary
125    testSeek1(0);
126    testSeek1(BYTES_PER_SUM);
127    testSeek1(BLOCK_SIZE);
128   
129    // test seek to non-checksum-boundary pos
130    testSeek1(BLOCK_SIZE+HALF_CHUNK_SIZE);
131    testSeek1(HALF_CHUNK_SIZE);
132   
133    // test seek to a position at the same checksum chunk
134    testSeek1(HALF_CHUNK_SIZE/2);
135    testSeek1(HALF_CHUNK_SIZE*3/2);
136   
137    // test end of file
138    actual = new byte[1];
139    testSeek1(FILE_SIZE-1);
140   
141    String errMsg = null;
142    try {
143      stm.seek(FILE_SIZE);
144    } catch (IOException e) {
145      errMsg = e.getMessage();
146    }
147    assertTrue(errMsg==null);
148  }
149
150  /* test if one skip is correct */
151  private void testSkip1(int skippedBytes) 
152  throws Exception {
153    long oldPos = stm.getPos();
154    long nSkipped = stm.skip(skippedBytes);
155    long newPos = oldPos+nSkipped;
156    assertEquals(stm.getPos(), newPos);
157    stm.readFully(actual);
158    checkAndEraseData(actual, (int)newPos, expected, "Read Sanity Test");
159  }
160
161  /* test skip() */
162  private void checkSkip( ) throws Exception {
163    actual = new byte[HALF_CHUNK_SIZE];
164   
165    // test skip to a checksum boundary
166    stm.seek(0);
167    testSkip1(BYTES_PER_SUM);
168    testSkip1(HALF_CHUNK_SIZE);
169    testSkip1(HALF_CHUNK_SIZE);
170   
171    // test skip to non-checksum-boundary pos
172    stm.seek(0);
173    testSkip1(HALF_CHUNK_SIZE+1);
174    testSkip1(BYTES_PER_SUM);
175    testSkip1(HALF_CHUNK_SIZE);
176   
177    // test skip to a position at the same checksum chunk
178    stm.seek(0);
179    testSkip1(1);
180    testSkip1(1);
181   
182    // test skip to end of file
183    stm.seek(0);
184    actual = new byte[1];
185    testSkip1(FILE_SIZE-1);
186   
187    stm.seek(0);
188    assertEquals(stm.skip(FILE_SIZE), FILE_SIZE);
189    assertEquals(stm.skip(10), 0);
190   
191    stm.seek(0);
192    assertEquals(stm.skip(FILE_SIZE+10), FILE_SIZE);
193    stm.seek(10);
194    assertEquals(stm.skip(FILE_SIZE), FILE_SIZE-10);
195  }
196
197  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
198    assertTrue(fileSys.exists(name));
199    fileSys.delete(name, true);
200    assertTrue(!fileSys.exists(name));
201  }
202 
203  /**
204   * Tests read/seek/getPos/skipped opeation for input stream.
205   */
206  private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
207  throws Exception {
208    Path file = new Path("try.dat");
209    if( readCS ) {
210      writeFile(fileSys, file);
211    } else {
212      writeFile(fileSys.getRawFileSystem(), file);
213    }
214    stm = fileSys.open(file);
215    checkReadAndGetPos();
216    checkSeek();
217    checkSkip();
218    //checkMark
219    assertFalse(stm.markSupported());
220    stm.close();
221    cleanupFile(fileSys, file);
222  }
223 
224  private void testFileCorruption(LocalFileSystem fileSys) throws IOException {
225    // create a file and verify that checksum corruption results in
226    // a checksum exception on LocalFS
227   
228    String dir = System.getProperty("test.build.data", ".");
229    Path file = new Path(dir + "/corruption-test.dat");
230    Path crcFile = new Path(dir + "/.corruption-test.dat.crc");
231   
232    writeFile(fileSys, file);
233   
234    int fileLen = (int)fileSys.getFileStatus(file).getLen();
235   
236    byte [] buf = new byte[fileLen];
237
238    InputStream in = fileSys.open(file);
239    IOUtils.readFully(in, buf, 0, buf.length);
240    in.close();
241   
242    // check .crc corruption
243    checkFileCorruption(fileSys, file, crcFile);
244    fileSys.delete(file, true);
245   
246    writeFile(fileSys, file);
247   
248    // check data corrutpion
249    checkFileCorruption(fileSys, file, file);
250   
251    fileSys.delete(file, true);
252  }
253 
254  private void checkFileCorruption(LocalFileSystem fileSys, Path file, 
255                                   Path fileToCorrupt) throws IOException {
256   
257    // corrupt the file
258    RandomAccessFile out = 
259      new RandomAccessFile(new File(fileToCorrupt.toString()), "rw");
260   
261    byte[] buf = new byte[(int)fileSys.getFileStatus(file).getLen()];   
262    int corruptFileLen = (int)fileSys.getFileStatus(fileToCorrupt).getLen();
263    assertTrue(buf.length >= corruptFileLen);
264   
265    rand.nextBytes(buf);
266    out.seek(corruptFileLen/2);
267    out.write(buf, 0, corruptFileLen/4);
268    out.close();
269
270    boolean gotException = false;
271   
272    InputStream in = fileSys.open(file);
273    try {
274      IOUtils.readFully(in, buf, 0, buf.length);
275    } catch (ChecksumException e) {
276      gotException = true;
277    }
278    assertTrue(gotException);
279    in.close();   
280  }
281 
282  public void testFSInputChecker() throws Exception {
283    Configuration conf = new Configuration();
284    conf.setLong("dfs.block.size", BLOCK_SIZE);
285    conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
286    conf.set("fs.hdfs.impl",
287             "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
288    rand.nextBytes(expected);
289
290    // test DFS
291    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
292    ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
293    try {
294      testChecker(fileSys, true);
295      testChecker(fileSys, false);
296      testSeekAndRead(fileSys);
297    } finally {
298      fileSys.close();
299      cluster.shutdown();
300    }
301   
302   
303    // test Local FS
304    fileSys = FileSystem.getLocal(conf);
305    try {
306      testChecker(fileSys, true);
307      testChecker(fileSys, false);
308      testFileCorruption((LocalFileSystem)fileSys);
309      testSeekAndRead(fileSys);
310    }finally {
311      fileSys.close();
312    }
313  }
314
315  private void testSeekAndRead(ChecksumFileSystem fileSys)
316  throws IOException {
317    Path file = new Path("try.dat");
318    writeFile(fileSys, file);
319    stm = fileSys.open(file,
320        fileSys.getConf().getInt("io.file.buffer.size", 4096));
321    checkSeekAndRead();
322    stm.close();
323    cleanupFile(fileSys, file);
324  }
325
326  private void checkSeekAndRead() throws IOException {
327    int position = 1;
328    int len = 2 * BYTES_PER_SUM - (int) position;
329    readAndCompare(stm, position, len);
330
331    position = BYTES_PER_SUM;
332    len = BYTES_PER_SUM;
333    readAndCompare(stm, position, len);
334  }
335
336  private void readAndCompare(FSDataInputStream in, int position, int len)
337      throws IOException {
338    byte[] b = new byte[len];
339    in.seek(position);
340    IOUtils.readFully(in, b, 0, b.length);
341
342    for (int i = 0; i < b.length; i++) {
343      assertEquals(expected[position + i], b[i]);
344    }
345  }
346}
Note: See TracBrowser for help on using the repository browser.