source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 9.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.balancer;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.List;
23import java.util.Random;
24
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.hdfs.DFSClient;
27import org.apache.hadoop.hdfs.DFSTestUtil;
28import org.apache.hadoop.hdfs.MiniDFSCluster;
29import org.apache.hadoop.hdfs.protocol.Block;
30import org.apache.hadoop.hdfs.protocol.ClientProtocol;
31import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
32import org.apache.hadoop.hdfs.protocol.LocatedBlock;
33import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
34import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
35import org.apache.hadoop.fs.FileSystem;
36import org.apache.hadoop.fs.Path;
37
38import junit.framework.TestCase;
39/**
40 * This class tests if a balancer schedules tasks correctly.
41 */
42public class TestBalancer extends TestCase {
43  private static final Configuration CONF = new Configuration();
44  final private static long CAPACITY = 500L;
45  final private static String RACK0 = "/rack0";
46  final private static String RACK1 = "/rack1";
47  final private static String RACK2 = "/rack2";
48  final static private String fileName = "/tmp.txt";
49  final static private Path filePath = new Path(fileName);
50  private MiniDFSCluster cluster;
51
52  ClientProtocol client;
53
54  static final int DEFAULT_BLOCK_SIZE = 10;
55  private Balancer balancer;
56  private Random r = new Random();
57
58  static {
59    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
60    CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
61    CONF.setLong("dfs.heartbeat.interval", 1L);
62    CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
63    CONF.setLong("dfs.balancer.movedWinWidth", 2000L);
64    Balancer.setBlockMoveWaitTime(1000L) ;
65  }
66
67  /* create a file with a length of <code>fileLen</code> */
68  private void createFile(long fileLen, short replicationFactor)
69  throws IOException {
70    FileSystem fs = cluster.getFileSystem();
71    DFSTestUtil.createFile(fs, filePath, fileLen, 
72        replicationFactor, r.nextLong());
73    DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
74  }
75
76
77  /* fill up a cluster with <code>numNodes</code> datanodes
78   * whose used space to be <code>size</code>
79   */
80  private Block[] generateBlocks(long size, short numNodes) throws IOException {
81    cluster = new MiniDFSCluster( CONF, numNodes, true, null);
82    try {
83      cluster.waitActive();
84      client = DFSClient.createNamenode(CONF);
85
86      short replicationFactor = (short)(numNodes-1);
87      long fileLen = size/replicationFactor;
88      createFile(fileLen, replicationFactor);
89
90      List<LocatedBlock> locatedBlocks = client.
91      getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
92
93      int numOfBlocks = locatedBlocks.size();
94      Block[] blocks = new Block[numOfBlocks];
95      for(int i=0; i<numOfBlocks; i++) {
96        Block b = locatedBlocks.get(i).getBlock();
97        blocks[i] = new Block(b.getBlockId(), b.getNumBytes(), b.getGenerationStamp());
98      }
99
100      return blocks;
101    } finally {
102      cluster.shutdown();
103    }
104  }
105
106  /* Distribute all blocks according to the given distribution */
107  Block[][] distributeBlocks(Block[] blocks, short replicationFactor, 
108      final long[] distribution ) {
109    // make a copy
110    long[] usedSpace = new long[distribution.length];
111    System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
112
113    List<List<Block>> blockReports = 
114      new ArrayList<List<Block>>(usedSpace.length);
115    Block[][] results = new Block[usedSpace.length][];
116    for(int i=0; i<usedSpace.length; i++) {
117      blockReports.add(new ArrayList<Block>());
118    }
119    for(int i=0; i<blocks.length; i++) {
120      for(int j=0; j<replicationFactor; j++) {
121        boolean notChosen = true;
122        while(notChosen) {
123          int chosenIndex = r.nextInt(usedSpace.length);
124          if( usedSpace[chosenIndex]>0 ) {
125            notChosen = false;
126            blockReports.get(chosenIndex).add(blocks[i]);
127            usedSpace[chosenIndex] -= blocks[i].getNumBytes();
128          }
129        }
130      }
131    }
132    for(int i=0; i<usedSpace.length; i++) {
133      List<Block> nodeBlockList = blockReports.get(i);
134      results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
135    }
136    return results;
137  }
138
139  /* we first start a cluster and fill the cluster up to a certain size.
140   * then redistribute blocks according the required distribution.
141   * Afterwards a balancer is running to balance the cluster.
142   */
143  private void testUnevenDistribution(
144      long distribution[], long capacities[], String[] racks) throws Exception {
145    int numDatanodes = distribution.length;
146    if (capacities.length != numDatanodes || racks.length != numDatanodes) {
147      throw new IllegalArgumentException("Array length is not the same");
148    }
149
150    // calculate total space that need to be filled
151    long totalUsedSpace=0L;
152    for(int i=0; i<distribution.length; i++) {
153      totalUsedSpace += distribution[i];
154    }
155
156    // fill the cluster
157    Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
158
159    // redistribute blocks
160    Block[][] blocksDN = distributeBlocks(
161        blocks, (short)(numDatanodes-1), distribution);
162
163    // restart the cluster: do NOT format the cluster
164    CONF.set("dfs.safemode.threshold.pct", "0.0f"); 
165    cluster = new MiniDFSCluster(0, CONF, numDatanodes,
166        false, true, null, racks, capacities);
167    cluster.waitActive();
168    client = DFSClient.createNamenode(CONF);
169
170    cluster.injectBlocks(blocksDN);
171
172    long totalCapacity = 0L;
173    for(long capacity:capacities) {
174      totalCapacity += capacity;
175    }
176    runBalancer(totalUsedSpace, totalCapacity);
177  }
178
179  /* wait for one heartbeat */
180  private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
181  throws IOException {
182    long[] status = client.getStats();
183    while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
184      try {
185        Thread.sleep(100L);
186      } catch(InterruptedException ignored) {
187      }
188      status = client.getStats();
189    }
190  }
191
192  /* This test start a one-node cluster, fill the node to be 30% full;
193   * It then adds an empty node and start balancing.
194   * @param newCapacity new node's capacity
195   * @param new
196   */
197  private void test(long[] capacities, String[] racks, 
198      long newCapacity, String newRack) throws Exception {
199    int numOfDatanodes = capacities.length;
200    assertEquals(numOfDatanodes, racks.length);
201    cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null, 
202        racks, capacities);
203    try {
204      cluster.waitActive();
205      client = DFSClient.createNamenode(CONF);
206
207      long totalCapacity=0L;
208      for(long capacity:capacities) {
209        totalCapacity += capacity;
210      }
211      // fill up the cluster to be 30% full
212      long totalUsedSpace = totalCapacity*3/10;
213      createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
214      // start up an empty node with the same capacity and on the same rack
215      cluster.startDataNodes(CONF, 1, true, null,
216          new String[]{newRack}, new long[]{newCapacity});
217
218      totalCapacity += newCapacity;
219
220      // run balancer and validate results
221      runBalancer(totalUsedSpace, totalCapacity);
222    } finally {
223      cluster.shutdown();
224    }
225  }
226
227  /* Start balancer and check if the cluster is balanced after the run */
228  private void runBalancer( long totalUsedSpace, long totalCapacity )
229  throws Exception {
230    waitForHeartBeat(totalUsedSpace, totalCapacity);
231
232    // start rebalancing
233    balancer = new Balancer(CONF);
234    balancer.run(new String[0]);
235
236    waitForHeartBeat(totalUsedSpace, totalCapacity);
237    boolean balanced;
238    do {
239      DatanodeInfo[] datanodeReport = 
240        client.getDatanodeReport(DatanodeReportType.ALL);
241      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
242      balanced = true;
243      double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
244      for(DatanodeInfo datanode:datanodeReport) {
245        if(Math.abs(avgUtilization-
246            ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
247          balanced = false;
248          try {
249            Thread.sleep(100);
250          } catch(InterruptedException ignored) {
251          }
252          break;
253        }
254      }
255    } while(!balanced);
256
257  }
258  /** Test a cluster with even distribution,
259   * then a new empty node is added to the cluster*/
260  public void testBalancer0() throws Exception {
261    /** one-node cluster test*/
262    // add an empty node with half of the CAPACITY & the same rack
263    test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
264
265    /** two-node cluster test */
266    test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
267        CAPACITY, RACK2);
268  }
269
270  /** Test unevenly distributed cluster */
271  public void testBalancer1() throws Exception {
272    testUnevenDistribution(
273        new long[] {50*CAPACITY/100, 10*CAPACITY/100},
274        new long[]{CAPACITY, CAPACITY},
275        new String[] {RACK0, RACK1});
276  }
277
278  /**
279   * @param args
280   */
281  public static void main(String[] args) throws Exception {
282    TestBalancer balancerTest = new TestBalancer();
283    balancerTest.testBalancer0();
284    balancerTest.testBalancer1();
285  }
286}
Note: See TracBrowser for help on using the repository browser.