source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 16.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.hdfs;
20
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.net.URL;
24import java.util.regex.Matcher;
25import java.util.regex.Pattern;
26import java.io.*;
27import java.nio.channels.FileChannel;
28import java.util.Random;
29
30import org.apache.commons.logging.Log;
31import org.apache.commons.logging.LogFactory;
32import org.apache.hadoop.conf.Configuration;
33import org.apache.hadoop.hdfs.protocol.Block;
34import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
35import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
36import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
37import org.apache.hadoop.fs.FileSystem;
38import org.apache.hadoop.fs.Path;
39import org.apache.hadoop.io.IOUtils;
40
41import junit.framework.TestCase;
42
43/**
44 * This test verifies that block verification occurs on the datanode
45 */
46public class TestDatanodeBlockScanner extends TestCase {
47 
48  private static final Log LOG = 
49                 LogFactory.getLog(TestDatanodeBlockScanner.class);
50 
51  private static Pattern pattern = 
52             Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
53  /**
54   * This connects to datanode and fetches block verification data.
55   * It repeats this until the given block has a verification time > 0.
56   */
57  private static long waitForVerification(DatanodeInfo dn, FileSystem fs, 
58                                          Path file) throws IOException {
59    URL url = new URL("http://localhost:" + dn.getInfoPort() +
60                      "/blockScannerReport?listblocks");
61    long lastWarnTime = System.currentTimeMillis();
62    long verificationTime = 0;
63   
64    String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
65   
66    while (verificationTime <= 0) {
67      String response = DFSTestUtil.urlGet(url);
68      for(Matcher matcher = pattern.matcher(response); matcher.find();) {
69        if (block.equals(matcher.group(1))) {
70          verificationTime = Long.parseLong(matcher.group(2));
71          break;
72        }
73      }
74     
75      if (verificationTime <= 0) {
76        long now = System.currentTimeMillis();
77        if ((now - lastWarnTime) >= 5*1000) {
78          LOG.info("Waiting for verification of " + block);
79          lastWarnTime = now; 
80        }
81        try {
82          Thread.sleep(500);
83        } catch (InterruptedException ignored) {}
84      }
85    }
86   
87    return verificationTime;
88  }
89
90  public void testDatanodeBlockScanner() throws IOException {
91   
92    long startTime = System.currentTimeMillis();
93   
94    Configuration conf = new Configuration();
95    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
96    cluster.waitActive();
97   
98    FileSystem fs = cluster.getFileSystem();
99    Path file1 = new Path("/tmp/testBlockVerification/file1");
100    Path file2 = new Path("/tmp/testBlockVerification/file2");
101   
102    /*
103     * Write the first file and restart the cluster.
104     */
105    DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
106    cluster.shutdown();
107    cluster = new MiniDFSCluster(conf, 1, false, null);
108    cluster.waitActive();
109   
110    DFSClient dfsClient =  new DFSClient(new InetSocketAddress("localhost", 
111                                         cluster.getNameNodePort()), conf);
112    fs = cluster.getFileSystem();
113    DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
114   
115    /*
116     * The cluster restarted. The block should be verified by now.
117     */
118    assertTrue(waitForVerification(dn, fs, file1) > startTime);
119   
120    /*
121     * Create a new file and read the block. The block should be marked
122     * verified since the client reads the block and verifies checksum.
123     */
124    DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
125    IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), 
126                      conf, true); 
127    assertTrue(waitForVerification(dn, fs, file2) > startTime);
128   
129    cluster.shutdown();
130  }
131
132  public static boolean corruptReplica(String blockName, int replica) throws IOException {
133    Random random = new Random();
134    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
135    boolean corrupted = false;
136    for (int i=replica*2; i<replica*2+2; i++) {
137      File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
138                               blockName);
139      if (blockFile.exists()) {
140        // Corrupt replica by writing random bytes into replica
141        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
142        FileChannel channel = raFile.getChannel();
143        String badString = "BADBAD";
144        int rand = random.nextInt((int)channel.size()/2);
145        raFile.seek(rand);
146        raFile.write(badString.getBytes());
147        raFile.close();
148        corrupted = true;
149      }
150    }
151    return corrupted;
152  }
153
154  public void testBlockCorruptionPolicy() throws IOException {
155    Configuration conf = new Configuration();
156    conf.setLong("dfs.blockreport.intervalMsec", 1000L);
157    Random random = new Random();
158    FileSystem fs = null;
159    DFSClient dfsClient = null;
160    LocatedBlocks blocks = null;
161    int blockCount = 0;
162    int rand = random.nextInt(3);
163
164    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
165    cluster.waitActive();
166    fs = cluster.getFileSystem();
167    Path file1 = new Path("/tmp/testBlockVerification/file1");
168    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
169    String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
170   
171    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
172                                        cluster.getNameNodePort()), conf);
173    do {
174      blocks = dfsClient.namenode.
175                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
176      blockCount = blocks.get(0).getLocations().length;
177      try {
178        LOG.info("Looping until expected blockCount of 3 is received");
179        Thread.sleep(1000);
180      } catch (InterruptedException ignore) {
181      }
182    } while (blockCount != 3);
183    assertTrue(blocks.get(0).isCorrupt() == false);
184
185    // Corrupt random replica of block
186    corruptReplica(block, rand);
187
188    // Restart the datanode hoping the corrupt block to be reported
189    cluster.restartDataNode(rand);
190
191    // We have 2 good replicas and block is not corrupt
192    do {
193      blocks = dfsClient.namenode.
194                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
195      blockCount = blocks.get(0).getLocations().length;
196      try {
197        LOG.info("Looping until expected blockCount of 2 is received");
198        Thread.sleep(1000);
199      } catch (InterruptedException ignore) {
200      }
201    } while (blockCount != 2);
202    assertTrue(blocks.get(0).isCorrupt() == false);
203 
204    // Corrupt all replicas. Now, block should be marked as corrupt
205    // and we should get all the replicas
206    corruptReplica(block, 0);
207    corruptReplica(block, 1);
208    corruptReplica(block, 2);
209
210    // Read the file to trigger reportBadBlocks by client
211    try {
212      IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
213                        conf, true);
214    } catch (IOException e) {
215      // Ignore exception
216    }
217
218    // We now have the blocks to be marked as corrupt and we get back all
219    // its replicas
220    do {
221      blocks = dfsClient.namenode.
222                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
223      blockCount = blocks.get(0).getLocations().length;
224      try {
225        LOG.info("Looping until expected blockCount of 3 is received");
226        Thread.sleep(1000);
227      } catch (InterruptedException ignore) {
228      }
229    } while (blockCount != 3);
230    assertTrue(blocks.get(0).isCorrupt() == true);
231
232    cluster.shutdown();
233  }
234 
235  /**
236   * testBlockCorruptionRecoveryPolicy.
237   * This tests recovery of corrupt replicas, first for one corrupt replica
238   * then for two. The test invokes blockCorruptionRecoveryPolicy which
239   * 1. Creates a block with desired number of replicas
240   * 2. Corrupts the desired number of replicas and restarts the datanodes
241   *    containing the corrupt replica. Additionaly we also read the block
242   *    in case restarting does not report corrupt replicas.
243   *    Restarting or reading from the datanode would trigger reportBadBlocks
244   *    to namenode.
245   *    NameNode adds it to corruptReplicasMap and neededReplication
246   * 3. Test waits until all corrupt replicas are reported, meanwhile
247   *    Re-replciation brings the block back to healthy state
248   * 4. Test again waits until the block is reported with expected number
249   *    of good replicas.
250   */
251  public void testBlockCorruptionRecoveryPolicy() throws IOException {
252    // Test recovery of 1 corrupt replica
253    LOG.info("Testing corrupt replica recovery for one corrupt replica");
254    blockCorruptionRecoveryPolicy(4, (short)3, 1);
255
256    // Test recovery of 2 corrupt replicas
257    LOG.info("Testing corrupt replica recovery for two corrupt replicas");
258    blockCorruptionRecoveryPolicy(5, (short)3, 2);
259  }
260 
261  private void blockCorruptionRecoveryPolicy(int numDataNodes, 
262                                             short numReplicas,
263                                             int numCorruptReplicas) 
264                                             throws IOException {
265    Configuration conf = new Configuration();
266    conf.setLong("dfs.blockreport.intervalMsec", 30L);
267    conf.setLong("dfs.replication.interval", 30);
268    conf.setLong("dfs.heartbeat.interval", 30L);
269    conf.setBoolean("dfs.replication.considerLoad", false);
270    Random random = new Random();
271    FileSystem fs = null;
272    DFSClient dfsClient = null;
273    LocatedBlocks blocks = null;
274    int replicaCount = 0;
275    int rand = random.nextInt(numDataNodes);
276
277    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
278    cluster.waitActive();
279    fs = cluster.getFileSystem();
280    Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
281    DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
282    Block blk = DFSTestUtil.getFirstBlock(fs, file1);
283    String block = blk.getBlockName();
284   
285    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
286                                        cluster.getNameNodePort()), conf);
287    blocks = dfsClient.namenode.
288               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
289    replicaCount = blocks.get(0).getLocations().length;
290
291    // Wait until block is replicated to numReplicas
292    while (replicaCount != numReplicas) {
293      try {
294        LOG.info("Looping until expected replicaCount of " + numReplicas +
295                  "is reached");
296        Thread.sleep(1000);
297      } catch (InterruptedException ignore) {
298      }
299      blocks = dfsClient.namenode.
300                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
301      replicaCount = blocks.get(0).getLocations().length;
302    }
303    assertTrue(blocks.get(0).isCorrupt() == false);
304
305    // Corrupt numCorruptReplicas replicas of block
306    int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
307    for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
308      if (corruptReplica(block, i)) 
309        corruptReplicasDNIDs[j++] = i;
310    }
311   
312    // Restart the datanodes containing corrupt replicas
313    // so they would be reported to namenode and re-replicated
314    for (int i =0; i < numCorruptReplicas; i++) 
315     cluster.restartDataNode(corruptReplicasDNIDs[i]);
316
317    // Loop until all corrupt replicas are reported
318    int corruptReplicaSize = cluster.getNameNode().namesystem.
319                              corruptReplicas.numCorruptReplicas(blk);
320    while (corruptReplicaSize != numCorruptReplicas) {
321      try {
322        IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
323                          conf, true);
324      } catch (IOException e) {
325      }
326      try {
327        LOG.info("Looping until expected " + numCorruptReplicas + " are " +
328                 "reported. Current reported " + corruptReplicaSize);
329        Thread.sleep(1000);
330      } catch (InterruptedException ignore) {
331      }
332      corruptReplicaSize = cluster.getNameNode().namesystem.
333                              corruptReplicas.numCorruptReplicas(blk);
334    }
335   
336    // Loop until the block recovers after replication
337    blocks = dfsClient.namenode.
338               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
339    replicaCount = blocks.get(0).getLocations().length;
340    while (replicaCount != numReplicas) {
341      try {
342        LOG.info("Looping until block gets rereplicated to " + numReplicas);
343        Thread.sleep(1000);
344      } catch (InterruptedException ignore) {
345      }
346      blocks = dfsClient.namenode.
347                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
348      replicaCount = blocks.get(0).getLocations().length;
349    }
350
351    // Make sure the corrupt replica is invalidated and removed from
352    // corruptReplicasMap
353    corruptReplicaSize = cluster.getNameNode().namesystem.
354                          corruptReplicas.numCorruptReplicas(blk);
355    while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
356      try {
357        LOG.info("Looping until corrupt replica is invalidated");
358        Thread.sleep(1000);
359      } catch (InterruptedException ignore) {
360      }
361      corruptReplicaSize = cluster.getNameNode().namesystem.
362                            corruptReplicas.numCorruptReplicas(blk);
363      blocks = dfsClient.namenode.
364                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
365      replicaCount = blocks.get(0).getLocations().length;
366    }
367    // Make sure block is healthy
368    assertTrue(corruptReplicaSize == 0);
369    assertTrue(replicaCount == numReplicas);
370    assertTrue(blocks.get(0).isCorrupt() == false);
371    cluster.shutdown();
372  }
373 
374  /** Test if NameNode handles truncated blocks in block report */
375  public void testTruncatedBlockReport() throws Exception {
376    final Configuration conf = new Configuration();
377    final short REPLICATION_FACTOR = (short)2;
378
379    MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
380    cluster.waitActive();
381    FileSystem fs = cluster.getFileSystem();
382    try {
383      final Path fileName = new Path("/file1");
384      DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
385      DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
386
387      String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
388
389      // Truncate replica of block
390      changeReplicaLength(block, 0, -1);
391
392      cluster.shutdown();
393
394      // restart the cluster
395      cluster = new MiniDFSCluster(
396          0, conf, REPLICATION_FACTOR, false, true, null, null, null);
397      cluster.startDataNodes(conf, 1, true, null, null);
398      cluster.waitActive();  // now we have 3 datanodes
399
400      // wait for truncated block be detected and the block to be replicated
401      DFSTestUtil.waitReplication(
402          cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
403     
404      // Make sure that truncated block will be deleted
405      waitForBlockDeleted(block, 0);
406    } finally {
407      cluster.shutdown();
408    }
409  }
410 
411  /**
412   * Change the length of a block at datanode dnIndex
413   */
414  static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
415    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
416    for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
417      File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
418                               blockName);
419      if (blockFile.exists()) {
420        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
421        raFile.setLength(raFile.length()+lenDelta);
422        raFile.close();
423        return true;
424      }
425    }
426    return false;
427  }
428 
429  private static void waitForBlockDeleted(String blockName, int dnIndex) 
430  throws IOException, InterruptedException {
431    File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
432    File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1)+ "/current/" + 
433        blockName);
434    File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2)+ "/current/" + 
435        blockName);
436    while (blockFile1.exists() || blockFile2.exists()) {
437      Thread.sleep(100);
438    }
439  }
440}
Note: See TracBrowser for help on using the repository browser.