[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | package org.apache.hadoop.mapred; |
---|
| 19 | |
---|
| 20 | import java.io.IOException; |
---|
| 21 | |
---|
| 22 | import junit.framework.TestCase; |
---|
| 23 | |
---|
| 24 | import org.apache.hadoop.conf.Configuration; |
---|
| 25 | import org.apache.hadoop.fs.FileSystem; |
---|
| 26 | import org.apache.hadoop.fs.Path; |
---|
| 27 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
| 28 | import org.apache.hadoop.mapred.TestRackAwareTaskPlacement; |
---|
| 29 | |
---|
| 30 | /** |
---|
| 31 | * This test checks whether the task caches are created and used properly. |
---|
| 32 | */ |
---|
| 33 | public class TestMultipleLevelCaching extends TestCase { |
---|
| 34 | private static final int MAX_LEVEL = 5; |
---|
| 35 | final Path inDir = new Path("/cachetesting"); |
---|
| 36 | final Path outputPath = new Path("/output"); |
---|
| 37 | |
---|
| 38 | /** |
---|
| 39 | * Returns a string representing a rack with level + 1 nodes in the topology |
---|
| 40 | * for the rack. |
---|
| 41 | * For id = 2, level = 2 we get /a/b2/c2 |
---|
| 42 | * id = 1, level = 3 we get /a/b1/c1/d1 |
---|
| 43 | * NOTE There should always be one shared node i.e /a |
---|
| 44 | * @param id Unique Id for the rack |
---|
| 45 | * @param level The level in the topology where the separation starts |
---|
| 46 | */ |
---|
| 47 | private static String getRack(int id, int level) { |
---|
| 48 | StringBuilder rack = new StringBuilder(); |
---|
| 49 | char alpha = 'a'; |
---|
| 50 | int length = level + 1; |
---|
| 51 | while (length > level) { |
---|
| 52 | rack.append("/"); |
---|
| 53 | rack.append(alpha); |
---|
| 54 | ++alpha; |
---|
| 55 | --length; |
---|
| 56 | } |
---|
| 57 | while (length > 0) { |
---|
| 58 | rack.append("/"); |
---|
| 59 | rack.append(alpha); |
---|
| 60 | rack.append(id); |
---|
| 61 | ++alpha; |
---|
| 62 | --length; |
---|
| 63 | } |
---|
| 64 | return rack.toString(); |
---|
| 65 | } |
---|
| 66 | |
---|
| 67 | public void testMultiLevelCaching() throws IOException { |
---|
| 68 | for (int i = 1 ; i <= MAX_LEVEL; ++i) { |
---|
| 69 | testCachingAtLevel(i); |
---|
| 70 | } |
---|
| 71 | } |
---|
| 72 | |
---|
| 73 | private void testCachingAtLevel(int level) throws IOException { |
---|
| 74 | String namenode = null; |
---|
| 75 | MiniDFSCluster dfs = null; |
---|
| 76 | MiniMRCluster mr = null; |
---|
| 77 | FileSystem fileSys = null; |
---|
| 78 | String testName = "TestMultiLevelCaching"; |
---|
| 79 | try { |
---|
| 80 | final int taskTrackers = 1; |
---|
| 81 | // generate the racks |
---|
| 82 | // use rack1 for data node |
---|
| 83 | String rack1 = getRack(0, level); |
---|
| 84 | // use rack2 for task tracker |
---|
| 85 | String rack2 = getRack(1, level); |
---|
| 86 | Configuration conf = new Configuration(); |
---|
| 87 | // Run a datanode on host1 under /a/b/c/..../d1/e1/f1 |
---|
| 88 | dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, |
---|
| 89 | new String[] {"host1.com"}); |
---|
| 90 | dfs.waitActive(); |
---|
| 91 | fileSys = dfs.getFileSystem(); |
---|
| 92 | if (!fileSys.mkdirs(inDir)) { |
---|
| 93 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
| 94 | } |
---|
| 95 | UtilsForTests.writeFile(dfs.getNameNode(), conf, |
---|
| 96 | new Path(inDir + "/file"), (short)1); |
---|
| 97 | namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + |
---|
| 98 | (dfs.getFileSystem()).getUri().getPort(); |
---|
| 99 | |
---|
| 100 | // Run a job with the (only)tasktracker on host2 under diff topology |
---|
| 101 | // e.g /a/b/c/..../d2/e2/f2. |
---|
| 102 | JobConf jc = new JobConf(); |
---|
| 103 | // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) |
---|
| 104 | // + 1 (for host) |
---|
| 105 | jc.setInt("mapred.task.cache.levels", level + 2); |
---|
| 106 | mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, |
---|
| 107 | new String[] {"host2.com"}, jc); |
---|
| 108 | |
---|
| 109 | /* The job is configured with 1 map for one (non-splittable) file. |
---|
| 110 | * Since the datanode is running under different subtree, there is no |
---|
| 111 | * node-level data locality but there should be topological locality. |
---|
| 112 | */ |
---|
| 113 | TestRackAwareTaskPlacement.launchJobAndTestCounters( |
---|
| 114 | testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0); |
---|
| 115 | mr.shutdown(); |
---|
| 116 | } finally { |
---|
| 117 | fileSys.delete(inDir, true); |
---|
| 118 | fileSys.delete(outputPath, true); |
---|
| 119 | if (dfs != null) { |
---|
| 120 | dfs.shutdown(); |
---|
| 121 | } |
---|
| 122 | } |
---|
| 123 | } |
---|
| 124 | } |
---|