source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Last change on this file was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 13.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import junit.framework.TestCase;
21import java.io.*;
22import java.util.Random;
23import java.net.InetSocketAddress;
24import java.net.Socket;
25import java.nio.ByteBuffer;
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.fs.FSDataInputStream;
28import org.apache.hadoop.fs.FSDataOutputStream;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.io.IOUtils;
31import org.apache.hadoop.io.Text;
32import org.apache.hadoop.net.NetUtils;
33import org.apache.hadoop.util.DataChecksum;
34import org.apache.hadoop.fs.Path;
35import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
36import org.apache.hadoop.hdfs.protocol.Block;
37import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
38import org.apache.hadoop.hdfs.protocol.DatanodeID;
39import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
40import org.apache.hadoop.hdfs.server.common.HdfsConstants;
41import org.apache.commons.logging.Log;
42import org.apache.commons.logging.LogFactory;
43
44/**
45 * This tests data transfer protocol handling in the Datanode. It sends
46 * various forms of wrong data and verifies that Datanode handles it well.
47 */
48public class TestDataTransferProtocol extends TestCase {
49 
50  private static final Log LOG = LogFactory.getLog(
51                    "org.apache.hadoop.hdfs.TestDataTransferProtocol");
52 
53  DatanodeID datanode;
54  InetSocketAddress dnAddr;
55  ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
56  DataOutputStream sendOut = new DataOutputStream(sendBuf);
57  // byte[] recvBuf = new byte[128];
58  // ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf);
59  ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
60  DataOutputStream recvOut = new DataOutputStream(recvBuf);
61
62  private void sendRecvData(String testDescription,
63                            boolean eofExpected) throws IOException {
64    /* Opens a socket to datanode
65     * sends the data in sendBuf.
66     * If there is data in expectedBuf, expects to receive the data
67     *     from datanode that matches expectedBuf.
68     * If there is an exception while recieving, throws it
69     *     only if exceptionExcepted is false.
70     */
71   
72    Socket sock = null;
73    try {
74     
75      if ( testDescription != null ) {
76        LOG.info("Testing : " + testDescription);
77      }
78      sock = new Socket();
79      sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
80      sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
81     
82      OutputStream out = sock.getOutputStream();
83      // Should we excuse
84      byte[] retBuf = new byte[recvBuf.size()];
85     
86      DataInputStream in = new DataInputStream(sock.getInputStream());
87      out.write(sendBuf.toByteArray());
88      try {
89        in.readFully(retBuf);
90      } catch (EOFException eof) {
91        if ( eofExpected ) {
92          LOG.info("Got EOF as expected.");
93          return;
94        }
95        throw eof;
96      }
97      for (int i=0; i<retBuf.length; i++) {
98        System.out.print(retBuf[i]);
99      }
100      System.out.println(":");
101     
102      if (eofExpected) {
103        throw new IOException("Did not recieve IOException when an exception " +
104                              "is expected while reading from " + 
105                              datanode.getName());
106      }
107     
108      byte[] needed = recvBuf.toByteArray();
109      for (int i=0; i<retBuf.length; i++) {
110        System.out.print(retBuf[i]);
111        assertEquals("checking byte[" + i + "]", needed[i], retBuf[i]);
112      }
113    } finally {
114      IOUtils.closeSocket(sock);
115    }
116  }
117 
118  void createFile(FileSystem fs, Path path, int fileLen) throws IOException {
119    byte [] arr = new byte[fileLen];
120    FSDataOutputStream out = fs.create(path);
121    out.write(arr);
122    out.close();
123  }
124 
125  void readFile(FileSystem fs, Path path, int fileLen) throws IOException {
126    byte [] arr = new byte[fileLen];
127    FSDataInputStream in = fs.open(path);
128    in.readFully(arr);
129  }
130 
131  public void testDataTransferProtocol() throws IOException {
132    Random random = new Random();
133    int oneMil = 1024*1024;
134    Path file = new Path("dataprotocol.dat");
135    int numDataNodes = 1;
136   
137    Configuration conf = new Configuration();
138    conf.setInt("dfs.replication", numDataNodes); 
139    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
140    cluster.waitActive();
141    DFSClient dfsClient = new DFSClient(
142                 new InetSocketAddress("localhost", cluster.getNameNodePort()),
143                 conf);               
144    datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
145    dnAddr = NetUtils.createSocketAddr(datanode.getName());
146    FileSystem fileSys = cluster.getFileSystem();
147   
148    int fileLen = Math.min(conf.getInt("dfs.block.size", 4096), 4096);
149   
150    createFile(fileSys, file, fileLen);
151
152    // get the first blockid for the file
153    Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
154    long newBlockId = firstBlock.getBlockId() + 1;
155
156    recvBuf.reset();
157    sendBuf.reset();
158   
159    // bad version
160    recvOut.writeShort((short)(DataTransferProtocol.DATA_TRANSFER_VERSION-1));
161    sendOut.writeShort((short)(DataTransferProtocol.DATA_TRANSFER_VERSION-1));
162    sendRecvData("Wrong Version", true);
163
164    // bad ops
165    sendBuf.reset();
166    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
167    sendOut.writeByte((byte)(DataTransferProtocol.OP_WRITE_BLOCK-1));
168    sendRecvData("Wrong Op Code", true);
169   
170    /* Test OP_WRITE_BLOCK */
171    sendBuf.reset();
172    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
173    sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
174    sendOut.writeLong(newBlockId); // block id
175    sendOut.writeLong(0);          // generation stamp
176    sendOut.writeInt(0);           // targets in pipeline
177    sendOut.writeBoolean(false);   // recoveryFlag
178    Text.writeString(sendOut, "cl");// clientID
179    sendOut.writeBoolean(false); // no src node info
180    sendOut.writeInt(0);           // number of downstream targets
181    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
182   
183    // bad bytes per checksum
184    sendOut.writeInt(-1-random.nextInt(oneMil));
185    recvBuf.reset();
186    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
187    sendRecvData("wrong bytesPerChecksum while writing", true);
188
189    sendBuf.reset();
190    recvBuf.reset();
191    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
192    sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
193    sendOut.writeLong(newBlockId);
194    sendOut.writeLong(0);          // generation stamp
195    sendOut.writeInt(0);           // targets in pipeline
196    sendOut.writeBoolean(false);   // recoveryFlag
197    Text.writeString(sendOut, "cl");// clientID
198    sendOut.writeBoolean(false); // no src node info
199
200    // bad number of targets
201    sendOut.writeInt(-1-random.nextInt(oneMil));
202    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
203    sendRecvData("bad targets len while writing block " + newBlockId, true);
204
205    sendBuf.reset();
206    recvBuf.reset();
207    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
208    sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
209    sendOut.writeLong(++newBlockId);
210    sendOut.writeLong(0);          // generation stamp
211    sendOut.writeInt(0);           // targets in pipeline
212    sendOut.writeBoolean(false);   // recoveryFlag
213    Text.writeString(sendOut, "cl");// clientID
214    sendOut.writeBoolean(false); // no src node info
215    sendOut.writeInt(0);
216    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
217    sendOut.writeInt((int)512);
218    sendOut.writeInt(4);           // size of packet
219    sendOut.writeLong(0);          // OffsetInBlock
220    sendOut.writeLong(100);        // sequencenumber
221    sendOut.writeBoolean(false);   // lastPacketInBlock
222   
223    // bad data chunk length
224    sendOut.writeInt(-1-random.nextInt(oneMil));
225    Text.writeString(recvOut, ""); // first bad node
226    recvOut.writeLong(100);        // sequencenumber
227    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
228    sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
229                 true);
230
231    // test for writing a valid zero size block
232    sendBuf.reset();
233    recvBuf.reset();
234    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
235    sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
236    sendOut.writeLong(++newBlockId);
237    sendOut.writeLong(0);          // generation stamp
238    sendOut.writeInt(0);           // targets in pipeline
239    sendOut.writeBoolean(false);   // recoveryFlag
240    Text.writeString(sendOut, "cl");// clientID
241    sendOut.writeBoolean(false); // no src node info
242    sendOut.writeInt(0);
243    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
244    sendOut.writeInt((int)512);    // checksum size
245    sendOut.writeInt(8);           // size of packet
246    sendOut.writeLong(0);          // OffsetInBlock
247    sendOut.writeLong(100);        // sequencenumber
248    sendOut.writeBoolean(true);    // lastPacketInBlock
249
250    sendOut.writeInt(0);           // chunk length
251    sendOut.writeInt(0);           // zero checksum
252    //ok finally write a block with 0 len
253    Text.writeString(recvOut, ""); // first bad node
254    recvOut.writeLong(100);        // sequencenumber
255    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
256    sendRecvData("Writing a zero len block blockid " + newBlockId, false);
257   
258    /* Test OP_READ_BLOCK */
259
260    // bad block id
261    sendBuf.reset();
262    recvBuf.reset();
263    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
264    sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
265    newBlockId = firstBlock.getBlockId()-1;
266    sendOut.writeLong(newBlockId);
267    sendOut.writeLong(firstBlock.getGenerationStamp());
268    sendOut.writeLong(0L);
269    sendOut.writeLong(fileLen);
270    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
271    Text.writeString(sendOut, "cl");
272    sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
273
274    // negative block start offset
275    sendBuf.reset();
276    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
277    sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
278    sendOut.writeLong(firstBlock.getBlockId());
279    sendOut.writeLong(firstBlock.getGenerationStamp());
280    sendOut.writeLong(-1L);
281    sendOut.writeLong(fileLen);
282    Text.writeString(sendOut, "cl");
283    sendRecvData("Negative start-offset for read for block " + 
284                 firstBlock.getBlockId(), false);
285
286    // bad block start offset
287    sendBuf.reset();
288    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
289    sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
290    sendOut.writeLong(firstBlock.getBlockId());
291    sendOut.writeLong(firstBlock.getGenerationStamp());
292    sendOut.writeLong(fileLen);
293    sendOut.writeLong(fileLen);
294    Text.writeString(sendOut, "cl");
295    sendRecvData("Wrong start-offset for reading block " +
296                 firstBlock.getBlockId(), false);
297   
298    // negative length is ok. Datanode assumes we want to read the whole block.
299    recvBuf.reset();
300    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);   
301    sendBuf.reset();
302    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
303    sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
304    sendOut.writeLong(firstBlock.getBlockId());
305    sendOut.writeLong(firstBlock.getGenerationStamp());
306    sendOut.writeLong(0);
307    sendOut.writeLong(-1-random.nextInt(oneMil));
308    Text.writeString(sendOut, "cl");
309    sendRecvData("Negative length for reading block " +
310                 firstBlock.getBlockId(), false);
311   
312    // length is more than size of block.
313    recvBuf.reset();
314    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);   
315    sendBuf.reset();
316    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
317    sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
318    sendOut.writeLong(firstBlock.getBlockId());
319    sendOut.writeLong(firstBlock.getGenerationStamp());
320    sendOut.writeLong(0);
321    sendOut.writeLong(fileLen + 1);
322    Text.writeString(sendOut, "cl");
323    sendRecvData("Wrong length for reading block " +
324                 firstBlock.getBlockId(), false);
325   
326    //At the end of all this, read the file to make sure that succeeds finally.
327    sendBuf.reset();
328    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
329    sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
330    sendOut.writeLong(firstBlock.getBlockId());
331    sendOut.writeLong(firstBlock.getGenerationStamp());
332    sendOut.writeLong(0);
333    sendOut.writeLong(fileLen);
334    Text.writeString(sendOut, "cl");
335    readFile(fileSys, file, fileLen);
336  }
337}
Note: See TracBrowser for help on using the repository browser.