source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 9.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.datanode;
19
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.net.InetSocketAddress;
24import java.net.Socket;
25import java.util.ArrayList;
26import java.util.Arrays;
27import java.util.List;
28import java.util.Random;
29
30import junit.framework.TestCase;
31
32import org.apache.commons.logging.Log;
33import org.apache.commons.logging.LogFactory;
34import org.apache.hadoop.conf.Configuration;
35import org.apache.hadoop.fs.FileSystem;
36import org.apache.hadoop.fs.Path;
37import org.apache.hadoop.hdfs.DFSClient;
38import org.apache.hadoop.hdfs.DFSTestUtil;
39import org.apache.hadoop.hdfs.MiniDFSCluster;
40import org.apache.hadoop.hdfs.protocol.Block;
41import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
42import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
43import org.apache.hadoop.hdfs.protocol.LocatedBlock;
44import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
45import org.apache.hadoop.hdfs.server.common.HdfsConstants;
46import org.apache.hadoop.hdfs.server.common.Util;
47import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
48import org.apache.hadoop.io.Text;
49import org.apache.hadoop.net.NetUtils;
50/**
51 * This class tests if block replacement request to data nodes work correctly.
52 */
53public class TestBlockReplacement extends TestCase {
54  private static final Log LOG = LogFactory.getLog(
55  "org.apache.hadoop.hdfs.TestBlockReplacement");
56
57  MiniDFSCluster cluster;
58  public void testThrottler() throws IOException {
59    Configuration conf = new Configuration();
60    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
61    long bandwidthPerSec = 1024*1024L;
62    final long TOTAL_BYTES =6*bandwidthPerSec; 
63    long bytesToSend = TOTAL_BYTES; 
64    long start = Util.now();
65    BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
66    long totalBytes = 0L;
67    long bytesSent = 1024*512L; // 0.5MB
68    throttler.throttle(bytesSent);
69    bytesToSend -= bytesSent;
70    bytesSent = 1024*768L; // 0.75MB
71    throttler.throttle(bytesSent);
72    bytesToSend -= bytesSent;
73    try {
74      Thread.sleep(1000);
75    } catch (InterruptedException ignored) {}
76    throttler.throttle(bytesToSend);
77    long end = Util.now();
78    assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
79  }
80 
81  public void testBlockReplacement() throws IOException {
82    final Configuration CONF = new Configuration();
83    final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};
84    final String[] NEW_RACKS = {"/RACK2"};
85
86    final short REPLICATION_FACTOR = (short)3;
87    final int DEFAULT_BLOCK_SIZE = 1024;
88    final Random r = new Random();
89   
90    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
91    CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE/2);
92    CONF.setLong("dfs.blockreport.intervalMsec",500);
93    cluster = new MiniDFSCluster(
94          CONF, REPLICATION_FACTOR, true, INITIAL_RACKS );
95    try {
96      cluster.waitActive();
97     
98      FileSystem fs = cluster.getFileSystem();
99      Path fileName = new Path("/tmp.txt");
100     
101      // create a file with one block
102      DFSTestUtil.createFile(fs, fileName,
103          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong());
104      DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR);
105     
106      // get all datanodes
107      InetSocketAddress addr = new InetSocketAddress("localhost",
108          cluster.getNameNodePort());
109      DFSClient client = new DFSClient(addr, CONF);
110      List<LocatedBlock> locatedBlocks = client.namenode.
111        getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks();
112      assertEquals(1, locatedBlocks.size());
113      LocatedBlock block = locatedBlocks.get(0);
114      DatanodeInfo[]  oldNodes = block.getLocations();
115      assertEquals(oldNodes.length, 3);
116      Block b = block.getBlock();
117     
118      // add a new datanode to the cluster
119      cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
120      cluster.waitActive();
121     
122      DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL);
123
124      // find out the new node
125      DatanodeInfo newNode=null;
126      for(DatanodeInfo node:datanodes) {
127        Boolean isNewNode = true;
128        for(DatanodeInfo oldNode:oldNodes) {
129          if(node.equals(oldNode)) {
130            isNewNode = false;
131            break;
132          }
133        }
134        if(isNewNode) {
135          newNode = node;
136          break;
137        }
138      }
139     
140      assertTrue(newNode!=null);
141      DatanodeInfo source=null;
142      ArrayList<DatanodeInfo> proxies = new ArrayList<DatanodeInfo>(2);
143      for(DatanodeInfo node:datanodes) {
144        if(node != newNode) {
145          if( node.getNetworkLocation().equals(newNode.getNetworkLocation())) {
146            source = node;
147          } else {
148            proxies.add( node );
149          }
150        }
151      }
152      assertTrue(source!=null && proxies.size()==2);
153     
154      // start to replace the block
155      // case 1: proxySource does not contain the block
156      LOG.info("Testcase 1: Proxy " + newNode.getName() 
157          + " does not contain the block " + b.getBlockName() );
158      assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
159      // case 2: destination contains the block
160      LOG.info("Testcase 2: Destination " + proxies.get(1).getName() 
161          + " contains the block " + b.getBlockName() );
162      assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
163      // case 3: correct case
164      LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" + 
165          proxies.get(0).getName() + " destination=" + newNode.getName() );
166      assertTrue(replaceBlock(b, source, proxies.get(0), newNode));
167      // block locations should contain two proxies and newNode
168      checkBlocks(new DatanodeInfo[]{newNode, proxies.get(0), proxies.get(1)},
169          fileName.toString(), 
170          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
171      // case 4: proxies.get(0) is not a valid del hint
172      LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() );
173      assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source));
174      /* block locations should contain two proxies,
175       * and either of source or newNode
176       */
177      checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), 
178          fileName.toString(), 
179          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
180    } finally {
181      cluster.shutdown();
182    }
183  }
184 
185  /* check if file's blocks exist at includeNodes */
186  private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, 
187      long fileLen, short replFactor, DFSClient client) throws IOException {
188    Boolean notDone;
189    do {
190      try {
191        Thread.sleep(100);
192      } catch(InterruptedException e) {
193      }
194      List<LocatedBlock> blocks = client.namenode.
195      getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
196      assertEquals(1, blocks.size());
197      DatanodeInfo[] nodes = blocks.get(0).getLocations();
198      notDone = (nodes.length != replFactor);
199      if (notDone) {
200        LOG.info("Expected replication factor is " + replFactor +
201            " but the real replication factor is " + nodes.length );
202      } else {
203        List<DatanodeInfo> nodeLocations = Arrays.asList(nodes);
204        for (DatanodeInfo node : includeNodes) {
205          if (!nodeLocations.contains(node) ) {
206            notDone=true; 
207            LOG.info("Block is not located at " + node.getName() );
208            break;
209          }
210        }
211      }
212    } while(notDone);
213  }
214
215  /* Copy a block from sourceProxy to destination. If the block becomes
216   * over-replicated, preferably remove it from source.
217   *
218   * Return true if a block is successfully copied; otherwise false.
219   */
220  private boolean replaceBlock( Block block, DatanodeInfo source,
221      DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
222    Socket sock = new Socket();
223    sock.connect(NetUtils.createSocketAddr(
224        destination.getName()), HdfsConstants.READ_TIMEOUT);
225    sock.setKeepAlive(true);
226    // sendRequest
227    DataOutputStream out = new DataOutputStream(sock.getOutputStream());
228    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
229    out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
230    out.writeLong(block.getBlockId());
231    out.writeLong(block.getGenerationStamp());
232    Text.writeString(out, source.getStorageID());
233    sourceProxy.write(out);
234    out.flush();
235    // receiveResponse
236    DataInputStream reply = new DataInputStream(sock.getInputStream());
237
238    short status = reply.readShort();
239    if(status == DataTransferProtocol.OP_STATUS_SUCCESS) {
240      return true;
241    }
242    return false;
243  }
244
245  /**
246   * @param args
247   */
248  public static void main(String[] args) throws Exception {
249    (new TestBlockReplacement()).testBlockReplacement();
250  }
251
252}
Note: See TracBrowser for help on using the repository browser.