source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestDecommission.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import java.io.IOException;
21import java.net.InetSocketAddress;
22import java.util.ArrayList;
23import java.util.Collection;
24import java.util.Iterator;
25import java.util.Random;
26
27import junit.framework.TestCase;
28
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.BlockLocation;
31import org.apache.hadoop.fs.FSDataOutputStream;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
35import org.apache.hadoop.hdfs.protocol.LocatedBlock;
36import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
37import org.apache.hadoop.hdfs.server.namenode.NameNode;
38
39/**
40 * This class tests the decommissioning of nodes.
41 */
42public class TestDecommission extends TestCase {
43  static final long seed = 0xDEADBEEFL;
44  static final int blockSize = 8192;
45  static final int fileSize = 16384;
46  static final int numDatanodes = 6;
47
48
49  Random myrand = new Random();
50  Path hostsFile;
51  Path excludeFile;
52
53  ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
54
55  private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
56
57  private void writeConfigFile(FileSystem fs, Path name, ArrayList<String> nodes) 
58    throws IOException {
59
60    // delete if it already exists
61    if (fs.exists(name)) {
62      fs.delete(name, true);
63    }
64
65    FSDataOutputStream stm = fs.create(name);
66   
67    if (nodes != null) {
68      for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
69        String node = it.next();
70        stm.writeBytes(node);
71        stm.writeBytes("\n");
72      }
73    }
74    stm.close();
75  }
76
77  private void writeFile(FileSystem fileSys, Path name, int repl)
78    throws IOException {
79    // create and write a file that contains three blocks of data
80    FSDataOutputStream stm = fileSys.create(name, true, 
81                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
82                                            (short)repl, (long)blockSize);
83    byte[] buffer = new byte[fileSize];
84    Random rand = new Random(seed);
85    rand.nextBytes(buffer);
86    stm.write(buffer);
87    stm.close();
88  }
89 
90 
91  private void checkFile(FileSystem fileSys, Path name, int repl)
92    throws IOException {
93    DFSTestUtil.waitReplication(fileSys, name, (short) repl);
94  }
95
96  private void printFileLocations(FileSystem fileSys, Path name)
97  throws IOException {
98    BlockLocation[] locations = fileSys.getFileBlockLocations(
99        fileSys.getFileStatus(name), 0, fileSize);
100    for (int idx = 0; idx < locations.length; idx++) {
101      String[] loc = locations[idx].getHosts();
102      System.out.print("Block[" + idx + "] : ");
103      for (int j = 0; j < loc.length; j++) {
104        System.out.print(loc[j] + " ");
105      }
106      System.out.println("");
107    }
108  }
109
110  /**
111   * For blocks that reside on the nodes that are down, verify that their
112   * replication factor is 1 more than the specified one.
113   */
114  private void checkFile(FileSystem fileSys, Path name, int repl,
115                         String downnode) throws IOException {
116    //
117    // sleep an additional 10 seconds for the blockreports from the datanodes
118    // to arrive.
119    //
120    // need a raw stream
121    assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
122       
123    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) 
124      ((DistributedFileSystem)fileSys).open(name);
125    Collection<LocatedBlock> dinfo = dis.getAllBlocks();
126
127    for (LocatedBlock blk : dinfo) { // for each block
128      int hasdown = 0;
129      DatanodeInfo[] nodes = blk.getLocations();
130      for (int j = 0; j < nodes.length; j++) {     // for each replica
131        if (nodes[j].getName().equals(downnode)) {
132          hasdown++;
133          System.out.println("Block " + blk.getBlock() + " replica " +
134                             nodes[j].getName() + " is decommissioned.");
135        }
136      }
137      System.out.println("Block " + blk.getBlock() + " has " + hasdown +
138                         " decommissioned replica.");
139      assertEquals("Number of replicas for block" + blk.getBlock(),
140                   Math.min(numDatanodes, repl+hasdown), nodes.length); 
141    }
142  }
143 
144  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
145    assertTrue(fileSys.exists(name));
146    fileSys.delete(name, true);
147    assertTrue(!fileSys.exists(name));
148  }
149
150  private void printDatanodeReport(DatanodeInfo[] info) {
151    System.out.println("-------------------------------------------------");
152    for (int i = 0; i < info.length; i++) {
153      System.out.println(info[i].getDatanodeReport());
154      System.out.println();
155    }
156  }
157
158  /*
159   * decommission one random node.
160   */
161  private String decommissionNode(NameNode namenode,
162                                  Configuration conf,
163                                  DFSClient client, 
164                                  FileSystem localFileSys)
165    throws IOException {
166    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
167
168    //
169    // pick one datanode randomly.
170    //
171    int index = 0;
172    boolean found = false;
173    while (!found) {
174      index = myrand.nextInt(info.length);
175      if (!info[index].isDecommissioned()) {
176        found = true;
177      }
178    }
179    String nodename = info[index].getName();
180    System.out.println("Decommissioning node: " + nodename);
181
182    // write nodename into the exclude file.
183    ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
184    nodes.add(nodename);
185    writeConfigFile(localFileSys, excludeFile, nodes);
186    namenode.namesystem.refreshNodes(conf);
187    return nodename;
188  }
189
190  /*
191   * Check if node is in the requested state.
192   */
193  private boolean checkNodeState(FileSystem filesys, 
194                                 String node, 
195                                 NodeState state) throws IOException {
196    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
197    boolean done = false;
198    boolean foundNode = false;
199    DatanodeInfo[] datanodes = dfs.getDataNodeStats();
200    for (int i = 0; i < datanodes.length; i++) {
201      DatanodeInfo dn = datanodes[i];
202      if (dn.getName().equals(node)) {
203        if (state == NodeState.DECOMMISSIONED) {
204          done = dn.isDecommissioned();
205        } else if (state == NodeState.DECOMMISSION_INPROGRESS) {
206          done = dn.isDecommissionInProgress();
207        } else {
208          done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned());
209        }
210        System.out.println(dn.getDatanodeReport());
211        foundNode = true;
212      }
213    }
214    if (!foundNode) {
215      throw new IOException("Could not find node: " + node);
216    }
217    return done;
218  }
219
220  /*
221   * Wait till node is fully decommissioned.
222   */
223  private void waitNodeState(FileSystem filesys,
224                             String node,
225                             NodeState state) throws IOException {
226    boolean done = checkNodeState(filesys, node, state);
227    while (!done) {
228      System.out.println("Waiting for node " + node +
229                         " to change state to " + state);
230      try {
231        Thread.sleep(1000);
232      } catch (InterruptedException e) {
233        // nothing
234      }
235      done = checkNodeState(filesys, node, state);
236    }
237  }
238 
239  /**
240   * Tests Decommission in DFS.
241   */
242  public void testDecommission() throws IOException {
243    Configuration conf = new Configuration();
244    conf.setBoolean("dfs.replication.considerLoad", false);
245
246    // Set up the hosts/exclude files.
247    FileSystem localFileSys = FileSystem.getLocal(conf);
248    Path workingDir = localFileSys.getWorkingDirectory();
249    Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
250    assertTrue(localFileSys.mkdirs(dir));
251    hostsFile = new Path(dir, "hosts");
252    excludeFile = new Path(dir, "exclude");
253    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
254    conf.setInt("heartbeat.recheck.interval", 2000);
255    conf.setInt("dfs.heartbeat.interval", 1);
256    conf.setInt("dfs.replication.pending.timeout.sec", 4);
257    writeConfigFile(localFileSys, excludeFile, null);
258
259    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
260    cluster.waitActive();
261    InetSocketAddress addr = new InetSocketAddress("localhost", 
262                                                   cluster.getNameNodePort());
263    DFSClient client = new DFSClient(addr, conf);
264    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
265    assertEquals("Number of Datanodes ", numDatanodes, info.length);
266    FileSystem fileSys = cluster.getFileSystem();
267
268    try {
269      for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
270        int replicas = numDatanodes - iteration - 1;
271        //
272        // Decommission one node. Verify that node is decommissioned.
273        //
274        Path file1 = new Path("decommission.dat");
275        writeFile(fileSys, file1, replicas);
276        System.out.println("Created file decommission.dat with " +
277                           replicas + " replicas.");
278        checkFile(fileSys, file1, replicas);
279        printFileLocations(fileSys, file1);
280        String downnode = decommissionNode(cluster.getNameNode(), conf,
281                                           client, localFileSys);
282        decommissionedNodes.add(downnode);
283        waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
284        checkFile(fileSys, file1, replicas, downnode);
285        cleanupFile(fileSys, file1);
286        cleanupFile(localFileSys, dir);
287      }
288    } catch (IOException e) {
289      info = client.datanodeReport(DatanodeReportType.ALL);
290      printDatanodeReport(info);
291      throw e;
292    } finally {
293      fileSys.close();
294      cluster.shutdown();
295    }
296  }
297}
Note: See TracBrowser for help on using the repository browser.