source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestDatanodeDeath.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 13.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import java.io.IOException;
21
22import junit.framework.TestCase;
23
24import org.apache.commons.logging.impl.Log4JLogger;
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.fs.BlockLocation;
27import org.apache.hadoop.fs.FSDataInputStream;
28import org.apache.hadoop.fs.FSDataOutputStream;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
32import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
33import org.apache.hadoop.hdfs.server.datanode.DataNode;
34import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
35import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
36import org.apache.hadoop.hdfs.server.namenode.NameNode;
37import org.apache.log4j.Level;
38
39/**
40 * This class tests that a file need not be closed before its
41 * data can be read by another client.
42 */
43public class TestDatanodeDeath extends TestCase {
44  {
45    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
46    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
47    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
48    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
49    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
50    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
51  }
52
53  static final int blockSize = 8192;
54  static final int numBlocks = 2;
55  static final int fileSize = numBlocks * blockSize + 1;
56  static final int numDatanodes = 15;
57  static final short replication = 3;
58
59  int numberOfFiles = 3;
60  int numThreads = 5;
61  Workload[] workload = null;
62
63  //
64  // an object that does a bunch of transactions
65  //
66  static class Workload extends Thread {
67    private short replication;
68    private int numberOfFiles;
69    private int id;
70    private FileSystem fs;
71    private long stamp;
72    private final long myseed;
73
74    Workload(long myseed, FileSystem fs, int threadIndex, int numberOfFiles, 
75             short replication, long stamp) {
76      this.myseed = myseed;
77      id = threadIndex;
78      this.fs = fs;
79      this.numberOfFiles = numberOfFiles;
80      this.replication = replication;
81      this.stamp = stamp;
82    }
83
84    // create a bunch of files. Write to them and then verify.
85    public void run() {
86      System.out.println("Workload starting ");
87      for (int i = 0; i < numberOfFiles; i++) {
88        Path filename = new Path(id + "." + i);
89        try {
90          System.out.println("Workload processing file " + filename);
91          FSDataOutputStream stm = createFile(fs, filename, replication);
92          DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
93                                                 (stm.getWrappedStream());
94          dfstream.setArtificialSlowdown(1000);
95          writeFile(stm, myseed);
96          stm.close();
97          checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
98        } catch (Throwable e) {
99          System.out.println("Workload exception " + e);
100          assertTrue(e.toString(), false);
101        }
102
103        // increment the stamp to indicate that another file is done.
104        synchronized (this) {
105          stamp++;
106        }
107      }
108    }
109
110    public synchronized void resetStamp() {
111      this.stamp = 0;
112    }
113
114    public synchronized long getStamp() {
115      return stamp;
116    }
117  }
118
119  //
120  // creates a file and returns a descriptor for writing to it.
121  //
122  static private FSDataOutputStream createFile(FileSystem fileSys, Path name, short repl)
123    throws IOException {
124    // create and write a file that contains three blocks of data
125    FSDataOutputStream stm = fileSys.create(name, true,
126                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
127                                            repl, (long)blockSize);
128    return stm;
129  }
130
131  //
132  // writes to file
133  //
134  static private void writeFile(FSDataOutputStream stm, long seed) throws IOException {
135    byte[] buffer = AppendTestUtil.randomBytes(seed, fileSize);
136
137    int mid = fileSize/2;
138    stm.write(buffer, 0, mid);
139    stm.write(buffer, mid, fileSize - mid);
140  }
141
142  //
143  // verify that the data written are sane
144  //
145  static private void checkFile(FileSystem fileSys, Path name, int repl,
146                         int numblocks, int filesize, long seed)
147    throws IOException {
148    boolean done = false;
149    int attempt = 0;
150
151    long len = fileSys.getFileStatus(name).getLen();
152    assertTrue(name + " should be of size " + filesize +
153               " but found to be of size " + len, 
154               len == filesize);
155
156    // wait till all full blocks are confirmed by the datanodes.
157    while (!done) {
158      attempt++;
159      try {
160        Thread.sleep(1000);
161      } catch (InterruptedException e) {}
162      done = true;
163      BlockLocation[] locations = fileSys.getFileBlockLocations(
164          fileSys.getFileStatus(name), 0, filesize);
165
166      if (locations.length < numblocks) {
167        if (attempt > 100) {
168          System.out.println("File " + name + " has only " +
169                             locations.length + " blocks, " +
170                             " but is expected to have " + numblocks +
171                             " blocks.");
172        }
173        done = false;
174        continue;
175      }
176      for (int idx = 0; idx < locations.length; idx++) {
177        if (locations[idx].getHosts().length < repl) {
178          if (attempt > 100) {
179            System.out.println("File " + name + " has " +
180                               locations.length + " blocks: " +
181                               " The " + idx + " block has only " +
182                               locations[idx].getHosts().length + 
183                               " replicas but is expected to have " 
184                               + repl + " replicas.");
185          }
186          done = false;
187          break;
188        }
189      }
190    }
191    FSDataInputStream stm = fileSys.open(name);
192    final byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
193
194    // do a sanity check. Read the file
195    byte[] actual = new byte[filesize];
196    stm.readFully(0, actual);
197    checkData(actual, 0, expected, "Read 1");
198  }
199
200  private static void checkData(byte[] actual, int from, byte[] expected, String message) {
201    for (int idx = 0; idx < actual.length; idx++) {
202      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
203                        expected[from+idx]+" actual "+actual[idx],
204                        actual[idx], expected[from+idx]);
205      actual[idx] = 0;
206    }
207  }
208
209  /**
210   * A class that kills one datanode and recreates a new one. It waits to
211   * ensure that that all workers have finished at least one file since the
212   * last kill of a datanode. This guarantees that all three replicas of
213   * a block do not get killed (otherwise the file will be corrupt and the
214   * test will fail).
215   */
216  class Modify extends Thread {
217    volatile boolean running;
218    MiniDFSCluster cluster;
219    Configuration conf;
220
221    Modify(Configuration conf, MiniDFSCluster cluster) {
222      running = true;
223      this.cluster = cluster;
224      this.conf = conf;
225    }
226
227    public void run() {
228
229      while (running) {
230        try {
231          Thread.sleep(1000);
232        } catch (InterruptedException e) {
233          continue;
234        }
235
236        // check if all threads have a new stamp.
237        // If so, then all workers have finished at least one file
238        // since the last stamp.
239        boolean loop = false;
240        for (int i = 0; i < numThreads; i++) {
241          if (workload[i].getStamp() == 0) {
242            loop = true;
243            break;
244          }
245        }
246        if (loop) {
247          continue;
248        }
249
250        // Now it is guaranteed that there will be at least one valid
251        // replica of a file.
252
253        for (int i = 0; i < replication - 1; i++) {
254          // pick a random datanode to shutdown
255          int victim = AppendTestUtil.nextInt(numDatanodes);
256          try {
257            System.out.println("Stopping datanode " + victim);
258            cluster.restartDataNode(victim);
259            // cluster.startDataNodes(conf, 1, true, null, null);
260          } catch (IOException e) {
261            System.out.println("TestDatanodeDeath Modify exception " + e);
262            assertTrue("TestDatanodeDeath Modify exception " + e, false);
263            running = false;
264          }
265        }
266
267        // set a new stamp for all workers
268        for (int i = 0; i < numThreads; i++) {
269          workload[i].resetStamp();
270        }
271      }
272    }
273
274    // Make the thread exit.
275    void close() {
276      running = false;
277      this.interrupt();
278    }
279  }
280
281  /**
282   * Test that writing to files is good even when datanodes in the pipeline
283   * dies.
284   */
285  private void complexTest() throws IOException {
286    Configuration conf = new Configuration();
287    conf.setInt("heartbeat.recheck.interval", 2000);
288    conf.setInt("dfs.heartbeat.interval", 2);
289    conf.setInt("dfs.replication.pending.timeout.sec", 2);
290    conf.setInt("dfs.socket.timeout", 5000);
291    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
292    cluster.waitActive();
293    FileSystem fs = cluster.getFileSystem();
294    Modify modThread = null;
295
296    try {
297     
298      // Create threads and make them run workload concurrently.
299      workload = new Workload[numThreads];
300      for (int i = 0; i < numThreads; i++) {
301        workload[i] = new Workload(AppendTestUtil.nextLong(), fs, i, numberOfFiles, replication, 0);
302        workload[i].start();
303      }
304
305      // Create a thread that kills existing datanodes and creates new ones.
306      modThread = new Modify(conf, cluster);
307      modThread.start();
308
309      // wait for all transactions to get over
310      for (int i = 0; i < numThreads; i++) {
311        try {
312          System.out.println("Waiting for thread " + i + " to complete...");
313          workload[i].join();
314
315          // if most of the threads are done, then stop restarting datanodes.
316          if (i >= numThreads/2) {
317            modThread.close();
318          }
319         
320        } catch (InterruptedException e) {
321          i--;      // retry
322        }
323      }
324    } finally {
325      if (modThread != null) {
326        modThread.close();
327        try {
328          modThread.join();
329        } catch (InterruptedException e) {}
330      }
331      fs.close();
332      cluster.shutdown();
333    }
334  }
335
336  /**
337   * Write to one file, then kill one datanode in the pipeline and then
338   * close the file.
339   */
340  private void simpleTest(int datanodeToKill) throws IOException {
341    Configuration conf = new Configuration();
342    conf.setInt("heartbeat.recheck.interval", 2000);
343    conf.setInt("dfs.heartbeat.interval", 1);
344    conf.setInt("dfs.replication.pending.timeout.sec", 2);
345    conf.setInt("dfs.socket.timeout", 5000);
346    int myMaxNodes = 5;
347    System.out.println("SimpleTest starting with DataNode to Kill " + 
348                       datanodeToKill);
349    MiniDFSCluster cluster = new MiniDFSCluster(conf, myMaxNodes, true, null);
350    cluster.waitActive();
351    FileSystem fs = cluster.getFileSystem();
352    short repl = 3;
353
354    Path filename = new Path("simpletest.dat");
355    try {
356
357      // create a file and write one block of data
358      System.out.println("SimpleTest creating file " + filename);
359      FSDataOutputStream stm = createFile(fs, filename, repl);
360      DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
361                                             (stm.getWrappedStream());
362
363      // these are test settings
364      dfstream.setChunksPerPacket(5);
365      dfstream.setArtificialSlowdown(3000);
366
367      final long myseed = AppendTestUtil.nextLong();
368      byte[] buffer = AppendTestUtil.randomBytes(myseed, fileSize);
369      int mid = fileSize/4;
370      stm.write(buffer, 0, mid);
371
372      DatanodeInfo[] targets = dfstream.getPipeline();
373      int count = 5;
374      while (count-- > 0 && targets == null) {
375        try {
376          System.out.println("SimpleTest: Waiting for pipeline to be created.");
377          Thread.sleep(1000);
378        } catch (InterruptedException e) {
379        }
380        targets = dfstream.getPipeline();
381      }
382
383      if (targets == null) {
384        int victim = AppendTestUtil.nextInt(myMaxNodes);
385        System.out.println("SimpleTest stopping datanode random " + victim);
386        cluster.stopDataNode(victim);
387      } else {
388        int victim = datanodeToKill;
389        System.out.println("SimpleTest stopping datanode " +
390                            targets[victim].getName());
391        cluster.stopDataNode(targets[victim].getName());
392      }
393      System.out.println("SimpleTest stopping datanode complete");
394
395      // write some more data to file, close and verify
396      stm.write(buffer, mid, fileSize - mid);
397      stm.close();
398
399      checkFile(fs, filename, repl, numBlocks, fileSize, myseed);
400    } catch (Throwable e) {
401      System.out.println("Simple Workload exception " + e);
402      e.printStackTrace();
403      assertTrue(e.toString(), false);
404    } finally {
405      fs.close();
406      cluster.shutdown();
407    }
408  }
409
410  public void testSimple0() throws IOException {simpleTest(0);}
411
412  public void testSimple1() throws IOException {simpleTest(1);}
413
414  public void testSimple2() throws IOException {simpleTest(2);}
415
416  public void testComplex() throws IOException {complexTest();}
417}
Note: See TracBrowser for help on using the repository browser.