[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | package org.apache.hadoop.mapred; |
---|
| 19 | |
---|
| 20 | import java.io.IOException; |
---|
| 21 | import java.util.List; |
---|
| 22 | |
---|
| 23 | import org.apache.commons.logging.Log; |
---|
| 24 | import org.apache.commons.logging.LogFactory; |
---|
| 25 | import org.apache.hadoop.examples.SleepJob; |
---|
| 26 | import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin; |
---|
| 27 | import org.apache.hadoop.util.MemoryCalculatorPlugin; |
---|
| 28 | import org.apache.hadoop.util.ToolRunner; |
---|
| 29 | |
---|
| 30 | import junit.framework.TestCase; |
---|
| 31 | |
---|
| 32 | /** |
---|
| 33 | * This test class tests the functionality related to configuring, reporting |
---|
| 34 | * and computing memory related parameters in a Map/Reduce cluster. |
---|
| 35 | * |
---|
| 36 | * Each test sets up a {@link MiniMRCluster} with a locally defined |
---|
| 37 | * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates |
---|
| 38 | * the memory related configuration is correctly computed and reported from |
---|
| 39 | * the tasktracker in |
---|
| 40 | * {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}. |
---|
| 41 | */ |
---|
| 42 | public class TestTTMemoryReporting extends TestCase { |
---|
| 43 | |
---|
| 44 | static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class); |
---|
| 45 | |
---|
| 46 | private MiniMRCluster miniMRCluster; |
---|
| 47 | |
---|
| 48 | /** |
---|
| 49 | * Fake scheduler to test the proper reporting of memory values by TT |
---|
| 50 | */ |
---|
| 51 | public static class FakeTaskScheduler extends JobQueueTaskScheduler { |
---|
| 52 | |
---|
| 53 | private boolean hasPassed = true; |
---|
| 54 | private String message; |
---|
| 55 | |
---|
| 56 | public FakeTaskScheduler() { |
---|
| 57 | super(); |
---|
| 58 | } |
---|
| 59 | |
---|
| 60 | public boolean hasTestPassed() { |
---|
| 61 | return hasPassed; |
---|
| 62 | } |
---|
| 63 | |
---|
| 64 | public String getFailureMessage() { |
---|
| 65 | return message; |
---|
| 66 | } |
---|
| 67 | |
---|
| 68 | @Override |
---|
| 69 | public List<Task> assignTasks(TaskTrackerStatus status) |
---|
| 70 | throws IOException { |
---|
| 71 | |
---|
| 72 | long totalVirtualMemoryOnTT = |
---|
| 73 | getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); |
---|
| 74 | long totalPhysicalMemoryOnTT = |
---|
| 75 | getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); |
---|
| 76 | long mapSlotMemorySize = |
---|
| 77 | getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT); |
---|
| 78 | long reduceSlotMemorySize = |
---|
| 79 | getConf() |
---|
| 80 | .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT); |
---|
| 81 | |
---|
| 82 | long reportedTotalVirtualMemoryOnTT = |
---|
| 83 | status.getResourceStatus().getTotalVirtualMemory(); |
---|
| 84 | long reportedTotalPhysicalMemoryOnTT = |
---|
| 85 | status.getResourceStatus().getTotalPhysicalMemory(); |
---|
| 86 | long reportedMapSlotMemorySize = |
---|
| 87 | status.getResourceStatus().getMapSlotMemorySizeOnTT(); |
---|
| 88 | long reportedReduceSlotMemorySize = |
---|
| 89 | status.getResourceStatus().getReduceSlotMemorySizeOnTT(); |
---|
| 90 | |
---|
| 91 | message = |
---|
| 92 | "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, " |
---|
| 93 | + "mapSlotMemSize, reduceSlotMemorySize) = (" |
---|
| 94 | + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + "," |
---|
| 95 | + mapSlotMemorySize + "," + reduceSlotMemorySize + ")"; |
---|
| 96 | message += |
---|
| 97 | "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, " |
---|
| 98 | + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = (" |
---|
| 99 | + reportedTotalVirtualMemoryOnTT |
---|
| 100 | + ", " |
---|
| 101 | + reportedTotalPhysicalMemoryOnTT |
---|
| 102 | + "," |
---|
| 103 | + reportedMapSlotMemorySize |
---|
| 104 | + "," |
---|
| 105 | + reportedReduceSlotMemorySize |
---|
| 106 | + ")"; |
---|
| 107 | LOG.info(message); |
---|
| 108 | if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT |
---|
| 109 | || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT |
---|
| 110 | || mapSlotMemorySize != reportedMapSlotMemorySize |
---|
| 111 | || reduceSlotMemorySize != reportedReduceSlotMemorySize) { |
---|
| 112 | hasPassed = false; |
---|
| 113 | } |
---|
| 114 | return super.assignTasks(status); |
---|
| 115 | } |
---|
| 116 | } |
---|
| 117 | |
---|
| 118 | /** |
---|
| 119 | * Test that verifies default values are configured and reported correctly. |
---|
| 120 | * |
---|
| 121 | * @throws Exception |
---|
| 122 | */ |
---|
| 123 | public void testDefaultMemoryValues() |
---|
| 124 | throws Exception { |
---|
| 125 | JobConf conf = new JobConf(); |
---|
| 126 | try { |
---|
| 127 | // Memory values are disabled by default. |
---|
| 128 | conf.setClass( |
---|
| 129 | TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, |
---|
| 130 | DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); |
---|
| 131 | setUpCluster(conf); |
---|
| 132 | runSleepJob(miniMRCluster.createJobConf()); |
---|
| 133 | verifyTestResults(); |
---|
| 134 | } finally { |
---|
| 135 | tearDownCluster(); |
---|
| 136 | } |
---|
| 137 | } |
---|
| 138 | |
---|
| 139 | /** |
---|
| 140 | * Test that verifies that configured values are reported correctly. |
---|
| 141 | * |
---|
| 142 | * @throws Exception |
---|
| 143 | */ |
---|
| 144 | public void testConfiguredMemoryValues() |
---|
| 145 | throws Exception { |
---|
| 146 | JobConf conf = new JobConf(); |
---|
| 147 | conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L); |
---|
| 148 | conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L); |
---|
| 149 | conf.setLong("mapSlotMemorySize", 1 * 512L); |
---|
| 150 | conf.setLong("reduceSlotMemorySize", 1 * 1024L); |
---|
| 151 | |
---|
| 152 | conf.setClass( |
---|
| 153 | TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, |
---|
| 154 | DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); |
---|
| 155 | conf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, |
---|
| 156 | 4 * 1024 * 1024 * 1024L); |
---|
| 157 | conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY, |
---|
| 158 | 2 * 1024 * 1024 * 1024L); |
---|
| 159 | conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, |
---|
| 160 | 512L); |
---|
| 161 | conf.setLong( |
---|
| 162 | JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L); |
---|
| 163 | |
---|
| 164 | try { |
---|
| 165 | setUpCluster(conf); |
---|
| 166 | JobConf jobConf = miniMRCluster.createJobConf(); |
---|
| 167 | jobConf.setMemoryForMapTask(1 * 1024L); |
---|
| 168 | jobConf.setMemoryForReduceTask(2 * 1024L); |
---|
| 169 | runSleepJob(jobConf); |
---|
| 170 | verifyTestResults(); |
---|
| 171 | } finally { |
---|
| 172 | tearDownCluster(); |
---|
| 173 | } |
---|
| 174 | } |
---|
| 175 | |
---|
| 176 | /** |
---|
| 177 | * Test that verifies that total memory values are calculated and reported |
---|
| 178 | * correctly. |
---|
| 179 | * |
---|
| 180 | * @throws Exception |
---|
| 181 | */ |
---|
| 182 | public void testMemoryValuesOnLinux() |
---|
| 183 | throws Exception { |
---|
| 184 | if (!System.getProperty("os.name").startsWith("Linux")) { |
---|
| 185 | return; |
---|
| 186 | } |
---|
| 187 | |
---|
| 188 | JobConf conf = new JobConf(); |
---|
| 189 | LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin(); |
---|
| 190 | conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize()); |
---|
| 191 | conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize()); |
---|
| 192 | |
---|
| 193 | try { |
---|
| 194 | setUpCluster(conf); |
---|
| 195 | runSleepJob(miniMRCluster.createJobConf()); |
---|
| 196 | verifyTestResults(); |
---|
| 197 | } finally { |
---|
| 198 | tearDownCluster(); |
---|
| 199 | } |
---|
| 200 | } |
---|
| 201 | |
---|
| 202 | private void setUpCluster(JobConf conf) |
---|
| 203 | throws Exception { |
---|
| 204 | conf.setClass("mapred.jobtracker.taskScheduler", |
---|
| 205 | TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class); |
---|
| 206 | conf.set("mapred.job.tracker.handler.count", "1"); |
---|
| 207 | miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf); |
---|
| 208 | } |
---|
| 209 | |
---|
| 210 | private void runSleepJob(JobConf conf) throws Exception { |
---|
| 211 | String[] args = { "-m", "1", "-r", "1", |
---|
| 212 | "-mt", "10", "-rt", "10" }; |
---|
| 213 | ToolRunner.run(conf, new SleepJob(), args); |
---|
| 214 | } |
---|
| 215 | |
---|
| 216 | private void verifyTestResults() { |
---|
| 217 | FakeTaskScheduler scheduler = |
---|
| 218 | (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner(). |
---|
| 219 | getJobTracker().getTaskScheduler(); |
---|
| 220 | assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed()); |
---|
| 221 | } |
---|
| 222 | |
---|
| 223 | private void tearDownCluster() { |
---|
| 224 | if (miniMRCluster != null) { |
---|
| 225 | miniMRCluster.shutdown(); |
---|
| 226 | } |
---|
| 227 | } |
---|
| 228 | } |
---|