source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.OutputStream;
23
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.fs.*;
28import org.apache.hadoop.fs.permission.FsPermission;
29import org.apache.hadoop.hdfs.protocol.*;
30import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
31import org.apache.hadoop.hdfs.server.common.*;
32import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
33import org.apache.hadoop.io.*;
34import org.apache.hadoop.ipc.RemoteException;
35import org.apache.hadoop.security.AccessControlException;
36
37import junit.framework.TestCase;
38
39
40/**
41 * These tests make sure that DFSClient retries fetching data from DFS
42 * properly in case of errors.
43 */
44public class TestDFSClientRetries extends TestCase {
45  public static final Log LOG =
46    LogFactory.getLog(TestDFSClientRetries.class.getName());
47 
48  // writes 'len' bytes of data to out.
49  private static void writeData(OutputStream out, int len) throws IOException {
50    byte [] buf = new byte[4096*16];
51    while(len > 0) {
52      int toWrite = Math.min(len, buf.length);
53      out.write(buf, 0, toWrite);
54      len -= toWrite;
55    }
56  }
57 
58  /**
59   * This makes sure that when DN closes clients socket after client had
60   * successfully connected earlier, the data can still be fetched.
61   */
62  public void testWriteTimeoutAtDataNode() throws IOException,
63                                                  InterruptedException { 
64    Configuration conf = new Configuration();
65   
66    final int writeTimeout = 100; //milliseconds.
67    // set a very short write timeout for datanode, so that tests runs fast.
68    conf.setInt("dfs.datanode.socket.write.timeout", writeTimeout); 
69    // set a smaller block size
70    final int blockSize = 10*1024*1024;
71    conf.setInt("dfs.block.size", blockSize);
72    conf.setInt("dfs.client.max.block.acquire.failures", 1);
73    // set a small buffer size
74    final int bufferSize = 4096;
75    conf.setInt("io.file.buffer.size", bufferSize);
76
77    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
78   
79    try {
80      cluster.waitActive();
81      FileSystem fs = cluster.getFileSystem();
82   
83      Path filePath = new Path("/testWriteTimeoutAtDataNode");
84      OutputStream out = fs.create(filePath, true, bufferSize);
85   
86      // write a 2 block file.
87      writeData(out, 2*blockSize);
88      out.close();
89     
90      byte[] buf = new byte[1024*1024]; // enough to empty TCP buffers.
91     
92      InputStream in = fs.open(filePath, bufferSize);
93     
94      //first read a few bytes
95      IOUtils.readFully(in, buf, 0, bufferSize/2);
96      //now read few more chunks of data by sleeping in between :
97      for(int i=0; i<10; i++) {
98        Thread.sleep(2*writeTimeout); // force write timeout at the datanode.
99        // read enough to empty out socket buffers.
100        IOUtils.readFully(in, buf, 0, buf.length); 
101      }
102      // successfully read with write timeout on datanodes.
103      in.close();
104    } finally {
105      cluster.shutdown();
106    }
107  }
108 
109  // more tests related to different failure cases can be added here.
110 
111  class TestNameNode implements ClientProtocol
112  {
113    int num_calls = 0;
114   
115    // The total number of calls that can be made to addBlock
116    // before an exception is thrown
117    int num_calls_allowed; 
118    public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
119                                             + "TestDFSClientRetries::"
120                                             + "TestNameNode::addBlock";
121    public final String RETRY_CONFIG
122          = "dfs.client.block.write.locateFollowingBlock.retries";
123         
124    public TestNameNode(Configuration conf) throws IOException
125    {
126      // +1 because the configuration value is the number of retries and
127      // the first call is not a retry (e.g., 2 retries == 3 total
128      // calls allowed)
129      this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
130    }
131
132    public long getProtocolVersion(String protocol, 
133                                     long clientVersion)
134    throws IOException
135    {
136      return versionID;
137    }
138
139    public LocatedBlock addBlock(String src, String clientName)
140    throws IOException
141    {
142      num_calls++;
143      if (num_calls > num_calls_allowed) { 
144        throw new IOException("addBlock called more times than "
145                              + RETRY_CONFIG
146                              + " allows.");
147      } else {
148          throw new RemoteException(NotReplicatedYetException.class.getName(),
149                                    ADD_BLOCK_EXCEPTION);
150      }
151    }
152   
153   
154    // The following methods are stub methods that are not needed by this mock class
155
156    public LocatedBlocks  getBlockLocations(String src, long offset, long length) throws IOException { return null; }
157
158    public void create(String src, FsPermission masked, String clientName, boolean overwrite, short replication, long blockSize) throws IOException {}
159
160    public LocatedBlock append(String src, String clientName) throws IOException { return null; }
161
162    public boolean setReplication(String src, short replication) throws IOException { return false; }
163
164    public void setPermission(String src, FsPermission permission) throws IOException {}
165
166    public void setOwner(String src, String username, String groupname) throws IOException {}
167
168    public void abandonBlock(Block b, String src, String holder) throws IOException {}
169
170    public boolean complete(String src, String clientName) throws IOException { return false; }
171
172    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
173
174    public boolean rename(String src, String dst) throws IOException { return false; }
175
176    public boolean delete(String src) throws IOException { return false; }
177
178    public boolean delete(String src, boolean recursive) throws IOException { return false; }
179
180    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
181
182    public FileStatus[] getListing(String src) throws IOException { return null; }
183
184    public void renewLease(String clientName) throws IOException {}
185
186    public long[] getStats() throws IOException { return null; }
187
188    public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException { return null; }
189
190    public long getPreferredBlockSize(String filename) throws IOException { return 0; }
191
192    public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException { return false; }
193
194    public void saveNamespace() throws IOException {}
195
196    public boolean restoreFailedStorage(String arg) throws AccessControlException { return false; }
197
198    public void refreshNodes() throws IOException {}
199
200    public void finalizeUpgrade() throws IOException {}
201
202    public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException { return null; }
203
204    public void metaSave(String filename) throws IOException {}
205
206    public FileStatus getFileInfo(String src) throws IOException { return null; }
207
208    public ContentSummary getContentSummary(String path) throws IOException { return null; }
209
210    public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {}
211
212    public void fsync(String src, String client) throws IOException {}
213
214    public void setTimes(String src, long mtime, long atime) throws IOException {}
215
216  }
217 
218  public void testNotYetReplicatedErrors() throws IOException
219  {   
220    Configuration conf = new Configuration();
221   
222    // allow 1 retry (2 total calls)
223    conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
224       
225    TestNameNode tnn = new TestNameNode(conf);
226    final DFSClient client = new DFSClient(null, tnn, conf, null);
227    OutputStream os = client.create("testfile", true);
228    os.write(20); // write one random byte
229   
230    try {
231      os.close();
232    } catch (Exception e) {
233      assertTrue("Retries are not being stopped correctly",
234           e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
235    }
236  }
237 
238}
Note: See TracBrowser for help on using the repository browser.