source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21
22import junit.framework.TestCase;
23
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.Path;
27import org.apache.hadoop.hdfs.MiniDFSCluster;
28import org.apache.hadoop.mapred.TestRackAwareTaskPlacement;
29
30/**
31 * This test checks whether the task caches are created and used properly.
32 */
33public class TestMultipleLevelCaching extends TestCase {
34  private static final int MAX_LEVEL = 5;
35  final Path inDir = new Path("/cachetesting");
36  final Path outputPath = new Path("/output");
37
38  /**
39   * Returns a string representing a rack with level + 1 nodes in the topology
40   * for the rack.
41   * For id = 2, level = 2 we get /a/b2/c2
42   *     id = 1, level = 3 we get /a/b1/c1/d1
43   * NOTE There should always be one shared node i.e /a
44   * @param id Unique Id for the rack
45   * @param level The level in the topology where the separation starts
46   */
47  private static String getRack(int id, int level) {
48    StringBuilder rack = new StringBuilder();
49    char alpha = 'a';
50    int length = level + 1;
51    while (length > level) {
52      rack.append("/");
53      rack.append(alpha);
54      ++alpha;
55      --length;
56    }
57    while (length > 0) {
58      rack.append("/");
59      rack.append(alpha);
60      rack.append(id);
61      ++alpha;
62      --length;
63    }
64    return rack.toString();
65  }
66
67  public void testMultiLevelCaching() throws IOException {
68    for (int i = 1 ; i <= MAX_LEVEL; ++i) {
69      testCachingAtLevel(i);
70    }
71  }
72
73  private void testCachingAtLevel(int level) throws IOException {
74    String namenode = null;
75    MiniDFSCluster dfs = null;
76    MiniMRCluster mr = null;
77    FileSystem fileSys = null;
78    String testName = "TestMultiLevelCaching";
79    try {
80      final int taskTrackers = 1;
81      // generate the racks
82      // use rack1 for data node
83      String rack1 = getRack(0, level);
84      // use rack2 for task tracker
85      String rack2 = getRack(1, level);
86      Configuration conf = new Configuration();
87      // Run a datanode on host1 under /a/b/c/..../d1/e1/f1
88      dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, 
89                               new String[] {"host1.com"});
90      dfs.waitActive();
91      fileSys = dfs.getFileSystem();
92      if (!fileSys.mkdirs(inDir)) {
93        throw new IOException("Mkdirs failed to create " + inDir.toString());
94      }
95      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
96                                        new Path(inDir + "/file"), (short)1);
97      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
98                 (dfs.getFileSystem()).getUri().getPort();
99
100      // Run a job with the (only)tasktracker on host2 under diff topology
101      // e.g /a/b/c/..../d2/e2/f2.
102      JobConf jc = new JobConf();
103      // cache-level = level (unshared levels) + 1(topmost shared node i.e /a)
104      //               + 1 (for host)
105      jc.setInt("mapred.task.cache.levels", level + 2);
106      mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, 
107                                 new String[] {"host2.com"}, jc);
108
109      /* The job is configured with 1 map for one (non-splittable) file.
110       * Since the datanode is running under different subtree, there is no
111       * node-level data locality but there should be topological locality.
112       */
113      TestRackAwareTaskPlacement.launchJobAndTestCounters(
114                  testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
115      mr.shutdown();
116    } finally {
117      fileSys.delete(inDir, true);
118      fileSys.delete(outputPath, true);
119      if (dfs != null) { 
120        dfs.shutdown(); 
121      }
122    }
123  }
124}
Note: See TracBrowser for help on using the repository browser.