source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestReplication.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 16.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import junit.framework.TestCase;
21import java.io.*;
22import java.util.Iterator;
23import java.util.Random;
24import java.net.*;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.hdfs.protocol.ClientProtocol;
30import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
31import org.apache.hadoop.hdfs.protocol.LocatedBlock;
32import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
33import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
34import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
35import org.apache.hadoop.fs.FSDataOutputStream;
36import org.apache.hadoop.fs.FileSystem;
37import org.apache.hadoop.fs.Path;
38import org.apache.hadoop.fs.FileStatus;
39import org.apache.hadoop.fs.BlockLocation;
40
41/**
42 * This class tests the replication of a DFS file.
43 */
44public class TestReplication extends TestCase {
45  private static final long seed = 0xDEADBEEFL;
46  private static final int blockSize = 8192;
47  private static final int fileSize = 16384;
48  private static final String racks[] = new String[] {
49    "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"
50  };
51  private static final int numDatanodes = racks.length;
52  private static final Log LOG = LogFactory.getLog(
53                                       "org.apache.hadoop.hdfs.TestReplication");
54
55  private void writeFile(FileSystem fileSys, Path name, int repl)
56    throws IOException {
57    // create and write a file that contains three blocks of data
58    FSDataOutputStream stm = fileSys.create(name, true,
59                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
60                                            (short)repl, (long)blockSize);
61    byte[] buffer = new byte[fileSize];
62    Random rand = new Random(seed);
63    rand.nextBytes(buffer);
64    stm.write(buffer);
65    stm.close();
66  }
67 
68  /* check if there are at least two nodes are on the same rack */
69  private void checkFile(FileSystem fileSys, Path name, int repl)
70    throws IOException {
71    Configuration conf = fileSys.getConf();
72    ClientProtocol namenode = DFSClient.createNamenode(conf);
73     
74    waitForBlockReplication(name.toString(), namenode, 
75                            Math.min(numDatanodes, repl), -1);
76   
77    LocatedBlocks locations = namenode.getBlockLocations(name.toString(),0,
78                                                         Long.MAX_VALUE);
79    FileStatus stat = fileSys.getFileStatus(name);
80    BlockLocation[] blockLocations = fileSys.getFileBlockLocations(stat,0L,
81                                                         Long.MAX_VALUE);
82    // verify that rack locations match
83    assertTrue(blockLocations.length == locations.locatedBlockCount());
84    for (int i = 0; i < blockLocations.length; i++) {
85      LocatedBlock blk = locations.get(i);
86      DatanodeInfo[] datanodes = blk.getLocations();
87      String[] topologyPaths = blockLocations[i].getTopologyPaths();
88      assertTrue(topologyPaths.length == datanodes.length);
89      for (int j = 0; j < topologyPaths.length; j++) {
90        boolean found = false;
91        for (int k = 0; k < racks.length; k++) {
92          if (topologyPaths[j].startsWith(racks[k])) {
93            found = true;
94            break;
95          }
96        }
97        assertTrue(found);
98      }
99    }
100
101    boolean isOnSameRack = true, isNotOnSameRack = true;
102    for (LocatedBlock blk : locations.getLocatedBlocks()) {
103      DatanodeInfo[] datanodes = blk.getLocations();
104      if (datanodes.length <= 1) break;
105      if (datanodes.length == 2) {
106        isNotOnSameRack = !(datanodes[0].getNetworkLocation().equals(
107                                                                     datanodes[1].getNetworkLocation()));
108        break;
109      }
110      isOnSameRack = false;
111      isNotOnSameRack = false;
112      for (int i = 0; i < datanodes.length-1; i++) {
113        LOG.info("datanode "+ i + ": "+ datanodes[i].getName());
114        boolean onRack = false;
115        for( int j=i+1; j<datanodes.length; j++) {
116           if( datanodes[i].getNetworkLocation().equals(
117            datanodes[j].getNetworkLocation()) ) {
118             onRack = true;
119           }
120        }
121        if (onRack) {
122          isOnSameRack = true;
123        }
124        if (!onRack) {
125          isNotOnSameRack = true;                     
126        }
127        if (isOnSameRack && isNotOnSameRack) break;
128      }
129      if (!isOnSameRack || !isNotOnSameRack) break;
130    }
131    assertTrue(isOnSameRack);
132    assertTrue(isNotOnSameRack);
133  }
134 
135  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
136    assertTrue(fileSys.exists(name));
137    fileSys.delete(name, true);
138    assertTrue(!fileSys.exists(name));
139  }
140
141  /*
142   * Test if Datanode reports bad blocks during replication request
143   */
144  public void testBadBlockReportOnTransfer() throws Exception {
145    Configuration conf = new Configuration();
146    FileSystem fs = null;
147    DFSClient dfsClient = null;
148    LocatedBlocks blocks = null;
149    int replicaCount = 0;
150    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
151    cluster.waitActive();
152    fs = cluster.getFileSystem();
153    dfsClient = new DFSClient(new InetSocketAddress("localhost",
154                              cluster.getNameNodePort()), conf);
155 
156    // Create file with replication factor of 1
157    Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
158    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
159    DFSTestUtil.waitReplication(fs, file1, (short)1);
160 
161    // Corrupt the block belonging to the created file
162    String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
163    cluster.corruptBlockOnDataNodes(block);
164 
165    // Increase replication factor, this should invoke transfer request
166    // Receiving datanode fails on checksum and reports it to namenode
167    fs.setReplication(file1, (short)2);
168 
169    // Now get block details and check if the block is corrupt
170    blocks = dfsClient.namenode.
171              getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
172    while (blocks.get(0).isCorrupt() != true) {
173      try {
174        LOG.info("Waiting until block is marked as corrupt...");
175        Thread.sleep(1000);
176      } catch (InterruptedException ie) {
177      }
178      blocks = dfsClient.namenode.
179                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
180    }
181    replicaCount = blocks.get(0).getLocations().length;
182    assertTrue(replicaCount == 1);
183    cluster.shutdown();
184  }
185 
186  /**
187   * Tests replication in DFS.
188   */
189  public void runReplication(boolean simulated) throws IOException {
190    Configuration conf = new Configuration();
191    conf.setBoolean("dfs.replication.considerLoad", false);
192    if (simulated) {
193      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
194    }
195    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, racks);
196    cluster.waitActive();
197   
198    InetSocketAddress addr = new InetSocketAddress("localhost",
199                                                   cluster.getNameNodePort());
200    DFSClient client = new DFSClient(addr, conf);
201   
202    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
203    assertEquals("Number of Datanodes ", numDatanodes, info.length);
204    FileSystem fileSys = cluster.getFileSystem();
205    try {
206      Path file1 = new Path("/smallblocktest.dat");
207      writeFile(fileSys, file1, 3);
208      checkFile(fileSys, file1, 3);
209      cleanupFile(fileSys, file1);
210      writeFile(fileSys, file1, 10);
211      checkFile(fileSys, file1, 10);
212      cleanupFile(fileSys, file1);
213      writeFile(fileSys, file1, 4);
214      checkFile(fileSys, file1, 4);
215      cleanupFile(fileSys, file1);
216      writeFile(fileSys, file1, 1);
217      checkFile(fileSys, file1, 1);
218      cleanupFile(fileSys, file1);
219      writeFile(fileSys, file1, 2);
220      checkFile(fileSys, file1, 2);
221      cleanupFile(fileSys, file1);
222    } finally {
223      fileSys.close();
224      cluster.shutdown();
225    }
226  }
227
228
229  public void testReplicationSimulatedStorag() throws IOException {
230    runReplication(true);
231  }
232 
233 
234  public void testReplication() throws IOException {
235    runReplication(false);
236  }
237 
238  // Waits for all of the blocks to have expected replication
239  private void waitForBlockReplication(String filename, 
240                                       ClientProtocol namenode,
241                                       int expected, long maxWaitSec) 
242                                       throws IOException {
243    long start = System.currentTimeMillis();
244   
245    //wait for all the blocks to be replicated;
246    LOG.info("Checking for block replication for " + filename);
247    int iters = 0;
248    while (true) {
249      boolean replOk = true;
250      LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, 
251                                                        Long.MAX_VALUE);
252     
253      for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
254           iter.hasNext();) {
255        LocatedBlock block = iter.next();
256        int actual = block.getLocations().length;
257        if ( actual < expected ) {
258          if (true || iters > 0) {
259            LOG.info("Not enough replicas for " + block.getBlock() +
260                               " yet. Expecting " + expected + ", got " + 
261                               actual + ".");
262          }
263          replOk = false;
264          break;
265        }
266      }
267     
268      if (replOk) {
269        return;
270      }
271     
272      iters++;
273     
274      if (maxWaitSec > 0 && 
275          (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
276        throw new IOException("Timedout while waiting for all blocks to " +
277                              " be replicated for " + filename);
278      }
279     
280      try {
281        Thread.sleep(500);
282      } catch (InterruptedException ignored) {}
283    }
284  }
285 
286  /* This test makes sure that NameNode retries all the available blocks
287   * for under replicated blocks.
288   *
289   * It creates a file with one block and replication of 4. It corrupts
290   * two of the blocks and removes one of the replicas. Expected behaviour is
291   * that missing replica will be copied from one valid source.
292   */
293  public void testPendingReplicationRetry() throws IOException {
294   
295    MiniDFSCluster cluster = null;
296    int numDataNodes = 4;
297    String testFile = "/replication-test-file";
298    Path testPath = new Path(testFile);
299   
300    byte buffer[] = new byte[1024];
301    for (int i=0; i<buffer.length; i++) {
302      buffer[i] = '1';
303    }
304   
305    try {
306      Configuration conf = new Configuration();
307      conf.set("dfs.replication", Integer.toString(numDataNodes));
308      //first time format
309      cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
310                                   true, null, null);
311      cluster.waitActive();
312      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
313                                            cluster.getNameNodePort()),
314                                            conf);
315     
316      OutputStream out = cluster.getFileSystem().create(testPath);
317      out.write(buffer);
318      out.close();
319     
320      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
321
322      // get first block of the file.
323      String block = dfsClient.namenode.
324                       getBlockLocations(testFile, 0, Long.MAX_VALUE).
325                       get(0).getBlock().getBlockName();
326     
327      cluster.shutdown();
328      cluster = null;
329     
330      //Now mess up some of the replicas.
331      //Delete the first and corrupt the next two.
332      File baseDir = new File(System.getProperty("test.build.data"), 
333                                                 "dfs/data");
334      for (int i=0; i<25; i++) {
335        buffer[i] = '0';
336      }
337     
338      int fileCount = 0;
339      for (int i=0; i<6; i++) {
340        File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
341        LOG.info("Checking for file " + blockFile);
342       
343        if (blockFile.exists()) {
344          if (fileCount == 0) {
345            LOG.info("Deleting file " + blockFile);
346            assertTrue(blockFile.delete());
347          } else {
348            // corrupt it.
349            LOG.info("Corrupting file " + blockFile);
350            long len = blockFile.length();
351            assertTrue(len > 50);
352            RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
353            try {
354              blockOut.seek(len/3);
355              blockOut.write(buffer, 0, 25);
356            } finally {
357              blockOut.close();
358            }
359          }
360          fileCount++;
361        }
362      }
363      assertEquals(3, fileCount);
364     
365      /* Start the MiniDFSCluster with more datanodes since once a writeBlock
366       * to a datanode node fails, same block can not be written to it
367       * immediately. In our case some replication attempts will fail.
368       */
369     
370      LOG.info("Restarting minicluster after deleting a replica and corrupting 2 crcs");
371      conf = new Configuration();
372      conf.set("dfs.replication", Integer.toString(numDataNodes));
373      conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
374      conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
375      conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist
376     
377      cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
378                                   true, null, null);
379      cluster.waitActive();
380     
381      dfsClient = new DFSClient(new InetSocketAddress("localhost",
382                                  cluster.getNameNodePort()),
383                                  conf);
384     
385      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
386     
387    } finally {
388      if (cluster != null) {
389        cluster.shutdown();
390      }
391    } 
392  }
393 
394  /**
395   * Test if replication can detect mismatched length on-disk blocks
396   * @throws Exception
397   */
398  public void testReplicateLenMismatchedBlock() throws Exception {
399    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
400    try {
401      cluster.waitActive();
402      // test truncated block
403      changeBlockLen(cluster, -1);
404      // test extended block
405      changeBlockLen(cluster, 1);
406    } finally {
407      cluster.shutdown();
408    }
409  }
410 
411  private void changeBlockLen(MiniDFSCluster cluster, 
412      int lenDelta) throws IOException, InterruptedException {
413    final Path fileName = new Path("/file1");
414    final short REPLICATION_FACTOR = (short)1;
415    final FileSystem fs = cluster.getFileSystem();
416    final int fileLen = fs.getConf().getInt("io.bytes.per.checksum", 512);
417    DFSTestUtil.createFile(fs, fileName, fileLen, REPLICATION_FACTOR, 0);
418    DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
419
420    String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
421
422    // Change the length of a replica
423    for (int i=0; i<cluster.getDataNodes().size(); i++) {
424      if (TestDatanodeBlockScanner.changeReplicaLength(block, i, lenDelta)) {
425        break;
426      }
427    }
428
429    // increase the file's replication factor
430    fs.setReplication(fileName, (short)(REPLICATION_FACTOR+1));
431
432    // block replication triggers corrupt block detection
433    DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", 
434        cluster.getNameNodePort()), fs.getConf());
435    LocatedBlocks blocks = dfsClient.namenode.getBlockLocations(
436        fileName.toString(), 0, fileLen);
437    if (lenDelta < 0) { // replica truncated
438        while (!blocks.get(0).isCorrupt() || 
439                        REPLICATION_FACTOR != blocks.get(0).getLocations().length) {
440                Thread.sleep(100);
441                blocks = dfsClient.namenode.getBlockLocations(
442                                fileName.toString(), 0, fileLen);
443        }
444    } else { // no corruption detected; block replicated
445        while (REPLICATION_FACTOR+1 != blocks.get(0).getLocations().length) {
446                Thread.sleep(100);
447                blocks = dfsClient.namenode.getBlockLocations(
448                                fileName.toString(), 0, fileLen);
449        }
450    }
451    fs.delete(fileName, true);
452  }
453}
Note: See TracBrowser for help on using the repository browser.