1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import java.io.IOException; |
---|
21 | |
---|
22 | import junit.framework.TestCase; |
---|
23 | |
---|
24 | import org.apache.hadoop.conf.Configuration; |
---|
25 | import org.apache.hadoop.fs.FileSystem; |
---|
26 | import org.apache.hadoop.fs.Path; |
---|
27 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
28 | import org.apache.hadoop.mapred.TestRackAwareTaskPlacement; |
---|
29 | |
---|
30 | /** |
---|
31 | * This test checks whether the task caches are created and used properly. |
---|
32 | */ |
---|
33 | public class TestMultipleLevelCaching extends TestCase { |
---|
34 | private static final int MAX_LEVEL = 5; |
---|
35 | final Path inDir = new Path("/cachetesting"); |
---|
36 | final Path outputPath = new Path("/output"); |
---|
37 | |
---|
38 | /** |
---|
39 | * Returns a string representing a rack with level + 1 nodes in the topology |
---|
40 | * for the rack. |
---|
41 | * For id = 2, level = 2 we get /a/b2/c2 |
---|
42 | * id = 1, level = 3 we get /a/b1/c1/d1 |
---|
43 | * NOTE There should always be one shared node i.e /a |
---|
44 | * @param id Unique Id for the rack |
---|
45 | * @param level The level in the topology where the separation starts |
---|
46 | */ |
---|
47 | private static String getRack(int id, int level) { |
---|
48 | StringBuilder rack = new StringBuilder(); |
---|
49 | char alpha = 'a'; |
---|
50 | int length = level + 1; |
---|
51 | while (length > level) { |
---|
52 | rack.append("/"); |
---|
53 | rack.append(alpha); |
---|
54 | ++alpha; |
---|
55 | --length; |
---|
56 | } |
---|
57 | while (length > 0) { |
---|
58 | rack.append("/"); |
---|
59 | rack.append(alpha); |
---|
60 | rack.append(id); |
---|
61 | ++alpha; |
---|
62 | --length; |
---|
63 | } |
---|
64 | return rack.toString(); |
---|
65 | } |
---|
66 | |
---|
67 | public void testMultiLevelCaching() throws IOException { |
---|
68 | for (int i = 1 ; i <= MAX_LEVEL; ++i) { |
---|
69 | testCachingAtLevel(i); |
---|
70 | } |
---|
71 | } |
---|
72 | |
---|
73 | private void testCachingAtLevel(int level) throws IOException { |
---|
74 | String namenode = null; |
---|
75 | MiniDFSCluster dfs = null; |
---|
76 | MiniMRCluster mr = null; |
---|
77 | FileSystem fileSys = null; |
---|
78 | String testName = "TestMultiLevelCaching"; |
---|
79 | try { |
---|
80 | final int taskTrackers = 1; |
---|
81 | // generate the racks |
---|
82 | // use rack1 for data node |
---|
83 | String rack1 = getRack(0, level); |
---|
84 | // use rack2 for task tracker |
---|
85 | String rack2 = getRack(1, level); |
---|
86 | Configuration conf = new Configuration(); |
---|
87 | // Run a datanode on host1 under /a/b/c/..../d1/e1/f1 |
---|
88 | dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, |
---|
89 | new String[] {"host1.com"}); |
---|
90 | dfs.waitActive(); |
---|
91 | fileSys = dfs.getFileSystem(); |
---|
92 | if (!fileSys.mkdirs(inDir)) { |
---|
93 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
94 | } |
---|
95 | UtilsForTests.writeFile(dfs.getNameNode(), conf, |
---|
96 | new Path(inDir + "/file"), (short)1); |
---|
97 | namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + |
---|
98 | (dfs.getFileSystem()).getUri().getPort(); |
---|
99 | |
---|
100 | // Run a job with the (only)tasktracker on host2 under diff topology |
---|
101 | // e.g /a/b/c/..../d2/e2/f2. |
---|
102 | JobConf jc = new JobConf(); |
---|
103 | // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) |
---|
104 | // + 1 (for host) |
---|
105 | jc.setInt("mapred.task.cache.levels", level + 2); |
---|
106 | mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, |
---|
107 | new String[] {"host2.com"}, jc); |
---|
108 | |
---|
109 | /* The job is configured with 1 map for one (non-splittable) file. |
---|
110 | * Since the datanode is running under different subtree, there is no |
---|
111 | * node-level data locality but there should be topological locality. |
---|
112 | */ |
---|
113 | TestRackAwareTaskPlacement.launchJobAndTestCounters( |
---|
114 | testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0); |
---|
115 | mr.shutdown(); |
---|
116 | } finally { |
---|
117 | fileSys.delete(inDir, true); |
---|
118 | fileSys.delete(outputPath, true); |
---|
119 | if (dfs != null) { |
---|
120 | dfs.shutdown(); |
---|
121 | } |
---|
122 | } |
---|
123 | } |
---|
124 | } |
---|