source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import junit.framework.TestCase;
21import java.io.*;
22import java.util.HashSet;
23import java.util.Set;
24import java.net.*;
25
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.FSDataOutputStream;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.hdfs.protocol.Block;
34import org.apache.hadoop.hdfs.protocol.ClientProtocol;
35import org.apache.hadoop.hdfs.protocol.LocatedBlock;
36import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
37import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
38
39
40/**
41 * This class tests the replication and injection of blocks of a DFS file for simulated storage.
42 */
43public class TestInjectionForSimulatedStorage extends TestCase {
44  private int checksumSize = 16;
45  private int blockSize = checksumSize*2;
46  private int numBlocks = 4;
47  private int filesize = blockSize*numBlocks;
48  private int numDataNodes = 4;
49  private static final Log LOG = LogFactory.getLog(
50      "org.apache.hadoop.hdfs.TestInjectionForSimulatedStorage");
51
52 
53  private void writeFile(FileSystem fileSys, Path name, int repl)
54                                                throws IOException {
55    // create and write a file that contains three blocks of data
56    FSDataOutputStream stm = fileSys.create(name, true,
57          fileSys.getConf().getInt("io.file.buffer.size", 4096),
58                                      (short)repl, (long)blockSize);
59    byte[] buffer = new byte[filesize];
60    for (int i=0; i<buffer.length; i++) {
61      buffer[i] = '1';
62    }
63    stm.write(buffer);
64    stm.close();
65  }
66 
67  // Waits for all of the blocks to have expected replication
68
69  // Waits for all of the blocks to have expected replication
70  private void waitForBlockReplication(String filename, 
71                                       ClientProtocol namenode,
72                                       int expected, long maxWaitSec) 
73                                       throws IOException {
74    long start = System.currentTimeMillis();
75   
76    //wait for all the blocks to be replicated;
77    LOG.info("Checking for block replication for " + filename);
78   
79    LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
80    assertEquals(numBlocks, blocks.locatedBlockCount());
81   
82    for (int i = 0; i < numBlocks; ++i) {
83      LOG.info("Checking for block:" + (i+1));
84      while (true) { // Loop to check for block i (usually when 0 is done all will be done
85        blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
86        assertEquals(numBlocks, blocks.locatedBlockCount());
87        LocatedBlock block = blocks.get(i);
88        int actual = block.getLocations().length;
89        if ( actual == expected ) {
90          LOG.info("Got enough replicas for " + (i+1) + "th block " + block.getBlock() +
91              ", got " + actual + ".");
92          break;
93        }
94        LOG.info("Not enough replicas for " + (i+1) + "th block " + block.getBlock() +
95                               " yet. Expecting " + expected + ", got " + 
96                               actual + ".");
97     
98        if (maxWaitSec > 0 && 
99            (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
100          throw new IOException("Timedout while waiting for all blocks to " +
101                                " be replicated for " + filename);
102        }
103     
104        try {
105          Thread.sleep(500);
106        } catch (InterruptedException ignored) {}
107      }
108    }
109  }
110 
111 
112 
113  /* This test makes sure that NameNode retries all the available blocks
114   * for under replicated blocks. This test uses simulated storage and one
115   * of its features to inject blocks,
116   *
117   * It creates a file with several blocks and replication of 4.
118   * The cluster is then shut down - NN retains its state but the DNs are
119   * all simulated and hence loose their blocks.
120   * The blocks are then injected in one of the DNs. The  expected behaviour is
121   * that the NN will arrange for themissing replica will be copied from a valid source.
122   */
123  public void testInjection() throws IOException {
124   
125    MiniDFSCluster cluster = null;
126
127    String testFile = "/replication-test-file";
128    Path testPath = new Path(testFile);
129   
130    byte buffer[] = new byte[1024];
131    for (int i=0; i<buffer.length; i++) {
132      buffer[i] = '1';
133    }
134   
135    try {
136      Configuration conf = new Configuration();
137      conf.set("dfs.replication", Integer.toString(numDataNodes));
138      conf.setInt("io.bytes.per.checksum", checksumSize);
139      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
140      //first time format
141      cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
142                                   true, null, null);
143      cluster.waitActive();
144      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
145                                            cluster.getNameNodePort()),
146                                            conf);
147     
148      writeFile(cluster.getFileSystem(), testPath, numDataNodes);
149
150     
151      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, 20);
152
153     
154      Block[][] blocksList = cluster.getAllBlockReports();
155                   
156     
157      cluster.shutdown();
158      cluster = null;
159     
160
161     
162      /* Start the MiniDFSCluster with more datanodes since once a writeBlock
163       * to a datanode node fails, same block can not be written to it
164       * immediately. In our case some replication attempts will fail.
165       */
166     
167      LOG.info("Restarting minicluster");
168      conf = new Configuration();
169      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
170      conf.set("dfs.safemode.threshold.pct", "0.0f"); 
171     
172      cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
173                                   true, null, null);
174      cluster.waitActive();
175      Set<Block> uniqueBlocks = new HashSet<Block>();
176      for (int i=0; i<blocksList.length; ++i) {
177        for (int j=0; j < blocksList[i].length; ++j) {
178          uniqueBlocks.add(blocksList[i][j]);
179        }
180      }
181      // Insert all the blocks in the first data node
182     
183      LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
184      Block[] blocks = uniqueBlocks.toArray(new Block[uniqueBlocks.size()]);
185      cluster.injectBlocks(0, blocks);
186     
187      dfsClient = new DFSClient(new InetSocketAddress("localhost",
188                                  cluster.getNameNodePort()),
189                                  conf);
190     
191      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
192     
193    } finally {
194      if (cluster != null) {
195        cluster.shutdown();
196      }
197    }
198  } 
199}
Note: See TracBrowser for help on using the repository browser.