1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.hdfs.server.datanode; |
---|
19 | |
---|
20 | import java.io.DataInputStream; |
---|
21 | import java.io.DataOutputStream; |
---|
22 | import java.io.IOException; |
---|
23 | import java.net.InetSocketAddress; |
---|
24 | import java.net.Socket; |
---|
25 | import java.util.ArrayList; |
---|
26 | import java.util.Arrays; |
---|
27 | import java.util.List; |
---|
28 | import java.util.Random; |
---|
29 | |
---|
30 | import junit.framework.TestCase; |
---|
31 | |
---|
32 | import org.apache.commons.logging.Log; |
---|
33 | import org.apache.commons.logging.LogFactory; |
---|
34 | import org.apache.hadoop.conf.Configuration; |
---|
35 | import org.apache.hadoop.fs.FileSystem; |
---|
36 | import org.apache.hadoop.fs.Path; |
---|
37 | import org.apache.hadoop.hdfs.DFSClient; |
---|
38 | import org.apache.hadoop.hdfs.DFSTestUtil; |
---|
39 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
40 | import org.apache.hadoop.hdfs.protocol.Block; |
---|
41 | import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; |
---|
42 | import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
---|
43 | import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
---|
44 | import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; |
---|
45 | import org.apache.hadoop.hdfs.server.common.HdfsConstants; |
---|
46 | import org.apache.hadoop.hdfs.server.common.Util; |
---|
47 | import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler; |
---|
48 | import org.apache.hadoop.io.Text; |
---|
49 | import org.apache.hadoop.net.NetUtils; |
---|
50 | /** |
---|
51 | * This class tests if block replacement request to data nodes work correctly. |
---|
52 | */ |
---|
53 | public class TestBlockReplacement extends TestCase { |
---|
54 | private static final Log LOG = LogFactory.getLog( |
---|
55 | "org.apache.hadoop.hdfs.TestBlockReplacement"); |
---|
56 | |
---|
57 | MiniDFSCluster cluster; |
---|
58 | public void testThrottler() throws IOException { |
---|
59 | Configuration conf = new Configuration(); |
---|
60 | FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); |
---|
61 | long bandwidthPerSec = 1024*1024L; |
---|
62 | final long TOTAL_BYTES =6*bandwidthPerSec; |
---|
63 | long bytesToSend = TOTAL_BYTES; |
---|
64 | long start = Util.now(); |
---|
65 | BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec); |
---|
66 | long totalBytes = 0L; |
---|
67 | long bytesSent = 1024*512L; // 0.5MB |
---|
68 | throttler.throttle(bytesSent); |
---|
69 | bytesToSend -= bytesSent; |
---|
70 | bytesSent = 1024*768L; // 0.75MB |
---|
71 | throttler.throttle(bytesSent); |
---|
72 | bytesToSend -= bytesSent; |
---|
73 | try { |
---|
74 | Thread.sleep(1000); |
---|
75 | } catch (InterruptedException ignored) {} |
---|
76 | throttler.throttle(bytesToSend); |
---|
77 | long end = Util.now(); |
---|
78 | assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); |
---|
79 | } |
---|
80 | |
---|
81 | public void testBlockReplacement() throws IOException { |
---|
82 | final Configuration CONF = new Configuration(); |
---|
83 | final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"}; |
---|
84 | final String[] NEW_RACKS = {"/RACK2"}; |
---|
85 | |
---|
86 | final short REPLICATION_FACTOR = (short)3; |
---|
87 | final int DEFAULT_BLOCK_SIZE = 1024; |
---|
88 | final Random r = new Random(); |
---|
89 | |
---|
90 | CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); |
---|
91 | CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE/2); |
---|
92 | CONF.setLong("dfs.blockreport.intervalMsec",500); |
---|
93 | cluster = new MiniDFSCluster( |
---|
94 | CONF, REPLICATION_FACTOR, true, INITIAL_RACKS ); |
---|
95 | try { |
---|
96 | cluster.waitActive(); |
---|
97 | |
---|
98 | FileSystem fs = cluster.getFileSystem(); |
---|
99 | Path fileName = new Path("/tmp.txt"); |
---|
100 | |
---|
101 | // create a file with one block |
---|
102 | DFSTestUtil.createFile(fs, fileName, |
---|
103 | DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong()); |
---|
104 | DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR); |
---|
105 | |
---|
106 | // get all datanodes |
---|
107 | InetSocketAddress addr = new InetSocketAddress("localhost", |
---|
108 | cluster.getNameNodePort()); |
---|
109 | DFSClient client = new DFSClient(addr, CONF); |
---|
110 | List<LocatedBlock> locatedBlocks = client.namenode. |
---|
111 | getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks(); |
---|
112 | assertEquals(1, locatedBlocks.size()); |
---|
113 | LocatedBlock block = locatedBlocks.get(0); |
---|
114 | DatanodeInfo[] oldNodes = block.getLocations(); |
---|
115 | assertEquals(oldNodes.length, 3); |
---|
116 | Block b = block.getBlock(); |
---|
117 | |
---|
118 | // add a new datanode to the cluster |
---|
119 | cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS); |
---|
120 | cluster.waitActive(); |
---|
121 | |
---|
122 | DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL); |
---|
123 | |
---|
124 | // find out the new node |
---|
125 | DatanodeInfo newNode=null; |
---|
126 | for(DatanodeInfo node:datanodes) { |
---|
127 | Boolean isNewNode = true; |
---|
128 | for(DatanodeInfo oldNode:oldNodes) { |
---|
129 | if(node.equals(oldNode)) { |
---|
130 | isNewNode = false; |
---|
131 | break; |
---|
132 | } |
---|
133 | } |
---|
134 | if(isNewNode) { |
---|
135 | newNode = node; |
---|
136 | break; |
---|
137 | } |
---|
138 | } |
---|
139 | |
---|
140 | assertTrue(newNode!=null); |
---|
141 | DatanodeInfo source=null; |
---|
142 | ArrayList<DatanodeInfo> proxies = new ArrayList<DatanodeInfo>(2); |
---|
143 | for(DatanodeInfo node:datanodes) { |
---|
144 | if(node != newNode) { |
---|
145 | if( node.getNetworkLocation().equals(newNode.getNetworkLocation())) { |
---|
146 | source = node; |
---|
147 | } else { |
---|
148 | proxies.add( node ); |
---|
149 | } |
---|
150 | } |
---|
151 | } |
---|
152 | assertTrue(source!=null && proxies.size()==2); |
---|
153 | |
---|
154 | // start to replace the block |
---|
155 | // case 1: proxySource does not contain the block |
---|
156 | LOG.info("Testcase 1: Proxy " + newNode.getName() |
---|
157 | + " does not contain the block " + b.getBlockName() ); |
---|
158 | assertFalse(replaceBlock(b, source, newNode, proxies.get(0))); |
---|
159 | // case 2: destination contains the block |
---|
160 | LOG.info("Testcase 2: Destination " + proxies.get(1).getName() |
---|
161 | + " contains the block " + b.getBlockName() ); |
---|
162 | assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1))); |
---|
163 | // case 3: correct case |
---|
164 | LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" + |
---|
165 | proxies.get(0).getName() + " destination=" + newNode.getName() ); |
---|
166 | assertTrue(replaceBlock(b, source, proxies.get(0), newNode)); |
---|
167 | // block locations should contain two proxies and newNode |
---|
168 | checkBlocks(new DatanodeInfo[]{newNode, proxies.get(0), proxies.get(1)}, |
---|
169 | fileName.toString(), |
---|
170 | DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client); |
---|
171 | // case 4: proxies.get(0) is not a valid del hint |
---|
172 | LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() ); |
---|
173 | assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source)); |
---|
174 | /* block locations should contain two proxies, |
---|
175 | * and either of source or newNode |
---|
176 | */ |
---|
177 | checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), |
---|
178 | fileName.toString(), |
---|
179 | DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client); |
---|
180 | } finally { |
---|
181 | cluster.shutdown(); |
---|
182 | } |
---|
183 | } |
---|
184 | |
---|
185 | /* check if file's blocks exist at includeNodes */ |
---|
186 | private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, |
---|
187 | long fileLen, short replFactor, DFSClient client) throws IOException { |
---|
188 | Boolean notDone; |
---|
189 | do { |
---|
190 | try { |
---|
191 | Thread.sleep(100); |
---|
192 | } catch(InterruptedException e) { |
---|
193 | } |
---|
194 | List<LocatedBlock> blocks = client.namenode. |
---|
195 | getBlockLocations(fileName, 0, fileLen).getLocatedBlocks(); |
---|
196 | assertEquals(1, blocks.size()); |
---|
197 | DatanodeInfo[] nodes = blocks.get(0).getLocations(); |
---|
198 | notDone = (nodes.length != replFactor); |
---|
199 | if (notDone) { |
---|
200 | LOG.info("Expected replication factor is " + replFactor + |
---|
201 | " but the real replication factor is " + nodes.length ); |
---|
202 | } else { |
---|
203 | List<DatanodeInfo> nodeLocations = Arrays.asList(nodes); |
---|
204 | for (DatanodeInfo node : includeNodes) { |
---|
205 | if (!nodeLocations.contains(node) ) { |
---|
206 | notDone=true; |
---|
207 | LOG.info("Block is not located at " + node.getName() ); |
---|
208 | break; |
---|
209 | } |
---|
210 | } |
---|
211 | } |
---|
212 | } while(notDone); |
---|
213 | } |
---|
214 | |
---|
215 | /* Copy a block from sourceProxy to destination. If the block becomes |
---|
216 | * over-replicated, preferably remove it from source. |
---|
217 | * |
---|
218 | * Return true if a block is successfully copied; otherwise false. |
---|
219 | */ |
---|
220 | private boolean replaceBlock( Block block, DatanodeInfo source, |
---|
221 | DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { |
---|
222 | Socket sock = new Socket(); |
---|
223 | sock.connect(NetUtils.createSocketAddr( |
---|
224 | destination.getName()), HdfsConstants.READ_TIMEOUT); |
---|
225 | sock.setKeepAlive(true); |
---|
226 | // sendRequest |
---|
227 | DataOutputStream out = new DataOutputStream(sock.getOutputStream()); |
---|
228 | out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); |
---|
229 | out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); |
---|
230 | out.writeLong(block.getBlockId()); |
---|
231 | out.writeLong(block.getGenerationStamp()); |
---|
232 | Text.writeString(out, source.getStorageID()); |
---|
233 | sourceProxy.write(out); |
---|
234 | out.flush(); |
---|
235 | // receiveResponse |
---|
236 | DataInputStream reply = new DataInputStream(sock.getInputStream()); |
---|
237 | |
---|
238 | short status = reply.readShort(); |
---|
239 | if(status == DataTransferProtocol.OP_STATUS_SUCCESS) { |
---|
240 | return true; |
---|
241 | } |
---|
242 | return false; |
---|
243 | } |
---|
244 | |
---|
245 | /** |
---|
246 | * @param args |
---|
247 | */ |
---|
248 | public static void main(String[] args) throws Exception { |
---|
249 | (new TestBlockReplacement()).testBlockReplacement(); |
---|
250 | } |
---|
251 | |
---|
252 | } |
---|