/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hdfs.server.balancer; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import junit.framework.TestCase; /** * This class tests if a balancer schedules tasks correctly. */ public class TestBalancer extends TestCase { private static final Configuration CONF = new Configuration(); final private static long CAPACITY = 500L; final private static String RACK0 = "/rack0"; final private static String RACK1 = "/rack1"; final private static String RACK2 = "/rack2"; final static private String fileName = "/tmp.txt"; final static private Path filePath = new Path(fileName); private MiniDFSCluster cluster; ClientProtocol client; static final int DEFAULT_BLOCK_SIZE = 10; private Balancer balancer; private Random r = new Random(); static { CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE); CONF.setLong("dfs.heartbeat.interval", 1L); CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); CONF.setLong("dfs.balancer.movedWinWidth", 2000L); Balancer.setBlockMoveWaitTime(1000L) ; } /* create a file with a length of fileLen */ private void createFile(long fileLen, short replicationFactor) throws IOException { FileSystem fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, filePath, fileLen, replicationFactor, r.nextLong()); DFSTestUtil.waitReplication(fs, filePath, replicationFactor); } /* fill up a cluster with numNodes datanodes * whose used space to be size */ private Block[] generateBlocks(long size, short numNodes) throws IOException { cluster = new MiniDFSCluster( CONF, numNodes, true, null); try { cluster.waitActive(); client = DFSClient.createNamenode(CONF); short replicationFactor = (short)(numNodes-1); long fileLen = size/replicationFactor; createFile(fileLen, replicationFactor); List locatedBlocks = client. getBlockLocations(fileName, 0, fileLen).getLocatedBlocks(); int numOfBlocks = locatedBlocks.size(); Block[] blocks = new Block[numOfBlocks]; for(int i=0; i> blockReports = new ArrayList>(usedSpace.length); Block[][] results = new Block[usedSpace.length][]; for(int i=0; i()); } for(int i=0; i0 ) { notChosen = false; blockReports.get(chosenIndex).add(blocks[i]); usedSpace[chosenIndex] -= blocks[i].getNumBytes(); } } } } for(int i=0; i nodeBlockList = blockReports.get(i); results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]); } return results; } /* we first start a cluster and fill the cluster up to a certain size. * then redistribute blocks according the required distribution. * Afterwards a balancer is running to balance the cluster. */ private void testUnevenDistribution( long distribution[], long capacities[], String[] racks) throws Exception { int numDatanodes = distribution.length; if (capacities.length != numDatanodes || racks.length != numDatanodes) { throw new IllegalArgumentException("Array length is not the same"); } // calculate total space that need to be filled long totalUsedSpace=0L; for(int i=0; i10) { balanced = false; try { Thread.sleep(100); } catch(InterruptedException ignored) { } break; } } } while(!balanced); } /** Test a cluster with even distribution, * then a new empty node is added to the cluster*/ public void testBalancer0() throws Exception { /** one-node cluster test*/ // add an empty node with half of the CAPACITY & the same rack test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0); /** two-node cluster test */ test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2); } /** Test unevenly distributed cluster */ public void testBalancer1() throws Exception { testUnevenDistribution( new long[] {50*CAPACITY/100, 10*CAPACITY/100}, new long[]{CAPACITY, CAPACITY}, new String[] {RACK0, RACK1}); } /** * @param args */ public static void main(String[] args) throws Exception { TestBalancer balancerTest = new TestBalancer(); balancerTest.testBalancer0(); balancerTest.testBalancer1(); } }