source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestFileCreationClient.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import org.apache.commons.logging.impl.Log4JLogger;
21import org.apache.hadoop.conf.Configuration;
22import org.apache.hadoop.fs.FSDataInputStream;
23import org.apache.hadoop.fs.FSDataOutputStream;
24import org.apache.hadoop.fs.FileSystem;
25import org.apache.hadoop.fs.Path;
26import org.apache.hadoop.hdfs.server.datanode.DataNode;
27import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
28import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
29import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
30import org.apache.hadoop.io.IOUtils;
31import org.apache.log4j.Level;
32
33/**
34 * This class tests that a file need not be closed before its
35 * data can be read by another client.
36 */
37public class TestFileCreationClient extends junit.framework.TestCase {
38  static final String DIR = "/" + TestFileCreationClient.class.getSimpleName() + "/";
39
40  {
41    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
42    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
43    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
44    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
45  }
46
47  /** Test lease recovery Triggered by DFSClient. */
48  public void testClientTriggeredLeaseRecovery() throws Exception {
49    final int REPLICATION = 3;
50    Configuration conf = new Configuration();
51    conf.setInt("dfs.datanode.handler.count", 1);
52    conf.setInt("dfs.replication", REPLICATION);
53    MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
54
55    try {
56      final FileSystem fs = cluster.getFileSystem();
57      final Path dir = new Path("/wrwelkj");
58     
59      SlowWriter[] slowwriters = new SlowWriter[10];
60      for(int i = 0; i < slowwriters.length; i++) {
61        slowwriters[i] = new SlowWriter(fs, new Path(dir, "file" + i));
62      }
63
64      try {
65        for(int i = 0; i < slowwriters.length; i++) {
66          slowwriters[i].start();
67        }
68
69        Thread.sleep(1000);                       // let writers get started
70
71        //stop a datanode, it should have least recover.
72        cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION));
73       
74        //let the slow writer writes a few more seconds
75        System.out.println("Wait a few seconds");
76        Thread.sleep(5000);
77      }
78      finally {
79        for(int i = 0; i < slowwriters.length; i++) {
80          if (slowwriters[i] != null) {
81            slowwriters[i].running = false;
82            slowwriters[i].interrupt();
83          }
84        }
85        for(int i = 0; i < slowwriters.length; i++) {
86          if (slowwriters[i] != null) {
87            slowwriters[i].join();
88          }
89        }
90      }
91
92      //Verify the file
93      System.out.println("Verify the file");
94      for(int i = 0; i < slowwriters.length; i++) {
95        System.out.println(slowwriters[i].filepath + ": length="
96            + fs.getFileStatus(slowwriters[i].filepath).getLen());
97        FSDataInputStream in = null;
98        try {
99          in = fs.open(slowwriters[i].filepath);
100          for(int j = 0, x; (x = in.read()) != -1; j++) {
101            assertEquals(j, x);
102          }
103        }
104        finally {
105          IOUtils.closeStream(in);
106        }
107      }
108    } finally {
109      if (cluster != null) {cluster.shutdown();}
110    }
111  }
112
113  static class SlowWriter extends Thread {
114    final FileSystem fs;
115    final Path filepath;
116    boolean running = true;
117   
118    SlowWriter(FileSystem fs, Path filepath) {
119      super(SlowWriter.class.getSimpleName() + ":" + filepath);
120      this.fs = fs;
121      this.filepath = filepath;
122    }
123
124    public void run() {
125      FSDataOutputStream out = null;
126      int i = 0;
127      try {
128        out = fs.create(filepath);
129        for(; running; i++) {
130          System.out.println(getName() + " writes " + i);
131          out.write(i);
132          out.sync();
133          sleep(100);
134        }
135      }
136      catch(Exception e) {
137        System.out.println(getName() + " dies: e=" + e);
138      }
139      finally {
140        System.out.println(getName() + ": i=" + i);
141        IOUtils.closeStream(out);
142      }
143    }       
144  }
145}
Note: See TracBrowser for help on using the repository browser.