[120] | 1 | package org.apache.hadoop.mapred; |
---|
| 2 | |
---|
| 3 | import java.io.DataOutputStream; |
---|
| 4 | import java.io.IOException; |
---|
| 5 | import java.util.Iterator; |
---|
| 6 | import java.util.Map; |
---|
| 7 | import java.util.Set; |
---|
| 8 | import java.util.HashSet; |
---|
| 9 | |
---|
| 10 | import org.apache.commons.logging.Log; |
---|
| 11 | import org.apache.commons.logging.LogFactory; |
---|
| 12 | import org.apache.hadoop.conf.Configuration; |
---|
| 13 | import org.apache.hadoop.examples.RandomWriter; |
---|
| 14 | import org.apache.hadoop.fs.FileSystem; |
---|
| 15 | import org.apache.hadoop.fs.Path; |
---|
| 16 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
| 17 | import org.apache.hadoop.io.IntWritable; |
---|
| 18 | import org.apache.hadoop.io.LongWritable; |
---|
| 19 | import org.apache.hadoop.io.Text; |
---|
| 20 | import org.apache.hadoop.mapred.UtilsForTests; |
---|
| 21 | import org.apache.hadoop.mapred.lib.IdentityMapper; |
---|
| 22 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
---|
| 23 | import org.apache.hadoop.net.Node; |
---|
| 24 | |
---|
| 25 | import junit.framework.TestCase; |
---|
| 26 | |
---|
| 27 | public class TestJobInProgress extends TestCase { |
---|
| 28 | static final Log LOG = LogFactory.getLog(TestJobInProgress.class); |
---|
| 29 | |
---|
| 30 | private MiniMRCluster mrCluster; |
---|
| 31 | |
---|
| 32 | private MiniDFSCluster dfsCluster; |
---|
| 33 | JobTracker jt; |
---|
| 34 | private static Path TEST_DIR = |
---|
| 35 | new Path(System.getProperty("test.build.data","/tmp"), "jip-testing"); |
---|
| 36 | private static int numSlaves = 4; |
---|
| 37 | |
---|
| 38 | public static class FailMapTaskJob extends MapReduceBase implements |
---|
| 39 | Mapper<LongWritable, Text, Text, IntWritable> { |
---|
| 40 | |
---|
| 41 | @Override |
---|
| 42 | public void map(LongWritable key, Text value, |
---|
| 43 | OutputCollector<Text, IntWritable> output, Reporter reporter) |
---|
| 44 | throws IOException { |
---|
| 45 | // reporter.incrCounter(TaskCounts.LaunchedTask, 1); |
---|
| 46 | try { |
---|
| 47 | Thread.sleep(1000); |
---|
| 48 | } catch (InterruptedException e) { |
---|
| 49 | throw new IllegalArgumentException("Interrupted MAP task"); |
---|
| 50 | } |
---|
| 51 | throw new IllegalArgumentException("Failing MAP task"); |
---|
| 52 | } |
---|
| 53 | } |
---|
| 54 | |
---|
| 55 | // Suppressing waring as we just need to write a failing reduce task job |
---|
| 56 | // We don't need to bother about the actual key value pairs which are passed. |
---|
| 57 | @SuppressWarnings("unchecked") |
---|
| 58 | public static class FailReduceTaskJob extends MapReduceBase implements |
---|
| 59 | Reducer { |
---|
| 60 | |
---|
| 61 | @Override |
---|
| 62 | public void reduce(Object key, Iterator values, OutputCollector output, |
---|
| 63 | Reporter reporter) throws IOException { |
---|
| 64 | // reporter.incrCounter(TaskCounts.LaunchedTask, 1); |
---|
| 65 | try { |
---|
| 66 | Thread.sleep(1000); |
---|
| 67 | } catch (InterruptedException e) { |
---|
| 68 | throw new IllegalArgumentException("Failing Reduce task"); |
---|
| 69 | } |
---|
| 70 | throw new IllegalArgumentException("Failing Reduce task"); |
---|
| 71 | } |
---|
| 72 | |
---|
| 73 | } |
---|
| 74 | |
---|
| 75 | @Override |
---|
| 76 | protected void setUp() throws Exception { |
---|
| 77 | // TODO Auto-generated method stub |
---|
| 78 | super.setUp(); |
---|
| 79 | Configuration conf = new Configuration(); |
---|
| 80 | dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); |
---|
| 81 | mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem() |
---|
| 82 | .getUri().toString(), 1); |
---|
| 83 | jt = mrCluster.getJobTrackerRunner().getJobTracker(); |
---|
| 84 | } |
---|
| 85 | |
---|
| 86 | public void testPendingMapTaskCount() throws Exception { |
---|
| 87 | launchTask(FailMapTaskJob.class, IdentityReducer.class); |
---|
| 88 | checkTaskCounts(); |
---|
| 89 | } |
---|
| 90 | |
---|
| 91 | public void testPendingReduceTaskCount() throws Exception { |
---|
| 92 | launchTask(IdentityMapper.class, FailReduceTaskJob.class); |
---|
| 93 | checkTaskCounts(); |
---|
| 94 | } |
---|
| 95 | |
---|
| 96 | /** |
---|
| 97 | * Test if running tasks are correctly maintained for various types of jobs |
---|
| 98 | */ |
---|
| 99 | private void testRunningTaskCount(boolean speculation, boolean locality) |
---|
| 100 | throws Exception { |
---|
| 101 | LOG.info("Testing running jobs with speculation : " + speculation |
---|
| 102 | + ", locality : " + locality); |
---|
| 103 | // cleanup |
---|
| 104 | dfsCluster.getFileSystem().delete(TEST_DIR, true); |
---|
| 105 | |
---|
| 106 | final Path mapSignalFile = new Path(TEST_DIR, "map-signal"); |
---|
| 107 | final Path redSignalFile = new Path(TEST_DIR, "reduce-signal"); |
---|
| 108 | |
---|
| 109 | // configure a waiting job with 2 maps and 2 reducers |
---|
| 110 | JobConf job = |
---|
| 111 | configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1, |
---|
| 112 | locality); |
---|
| 113 | job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString()); |
---|
| 114 | job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString()); |
---|
| 115 | |
---|
| 116 | // Disable slow-start for reduces since this maps don't complete |
---|
| 117 | // in these test-cases... |
---|
| 118 | job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f); |
---|
| 119 | |
---|
| 120 | // test jobs with speculation |
---|
| 121 | job.setSpeculativeExecution(speculation); |
---|
| 122 | JobClient jc = new JobClient(job); |
---|
| 123 | RunningJob running = jc.submitJob(job); |
---|
| 124 | JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker(); |
---|
| 125 | JobInProgress jip = jobtracker.getJob(running.getID()); |
---|
| 126 | LOG.info("Running job " + jip.getJobID()); |
---|
| 127 | |
---|
| 128 | // wait |
---|
| 129 | LOG.info("Waiting for job " + jip.getJobID() + " to be ready"); |
---|
| 130 | waitTillReady(jip, job); |
---|
| 131 | |
---|
| 132 | // check if the running structures are populated |
---|
| 133 | Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>(); |
---|
| 134 | for (Map.Entry<Node, Set<TaskInProgress>> s : |
---|
| 135 | jip.getRunningMapCache().entrySet()) { |
---|
| 136 | uniqueTasks.addAll(s.getValue()); |
---|
| 137 | } |
---|
| 138 | |
---|
| 139 | // add non local map tasks |
---|
| 140 | uniqueTasks.addAll(jip.getNonLocalRunningMaps()); |
---|
| 141 | |
---|
| 142 | assertEquals("Running map count doesnt match for jobs with speculation " |
---|
| 143 | + speculation + ", and locality " + locality, |
---|
| 144 | jip.runningMaps(), uniqueTasks.size()); |
---|
| 145 | |
---|
| 146 | assertEquals("Running reducer count doesnt match for jobs with speculation " |
---|
| 147 | + speculation + ", and locality " + locality, |
---|
| 148 | jip.runningReduces(), jip.getRunningReduces().size()); |
---|
| 149 | |
---|
| 150 | // signal the tasks |
---|
| 151 | LOG.info("Signaling the tasks"); |
---|
| 152 | UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(), |
---|
| 153 | mapSignalFile.toString(), |
---|
| 154 | redSignalFile.toString(), numSlaves); |
---|
| 155 | |
---|
| 156 | // wait for the job to complete |
---|
| 157 | LOG.info("Waiting for job " + jip.getJobID() + " to be complete"); |
---|
| 158 | UtilsForTests.waitTillDone(jc); |
---|
| 159 | |
---|
| 160 | // cleanup |
---|
| 161 | dfsCluster.getFileSystem().delete(TEST_DIR, true); |
---|
| 162 | } |
---|
| 163 | |
---|
| 164 | // wait for the job to start |
---|
| 165 | private void waitTillReady(JobInProgress jip, JobConf job) { |
---|
| 166 | // wait for all the maps to get scheduled |
---|
| 167 | while (jip.runningMaps() < job.getNumMapTasks()) { |
---|
| 168 | UtilsForTests.waitFor(10); |
---|
| 169 | } |
---|
| 170 | |
---|
| 171 | // wait for all the reducers to get scheduled |
---|
| 172 | while (jip.runningReduces() < job.getNumReduceTasks()) { |
---|
| 173 | UtilsForTests.waitFor(10); |
---|
| 174 | } |
---|
| 175 | } |
---|
| 176 | |
---|
| 177 | public void testRunningTaskCount() throws Exception { |
---|
| 178 | // test with spec = false and locality=true |
---|
| 179 | testRunningTaskCount(false, true); |
---|
| 180 | |
---|
| 181 | // test with spec = true and locality=true |
---|
| 182 | testRunningTaskCount(true, true); |
---|
| 183 | |
---|
| 184 | // test with spec = false and locality=false |
---|
| 185 | testRunningTaskCount(false, false); |
---|
| 186 | |
---|
| 187 | // test with spec = true and locality=false |
---|
| 188 | testRunningTaskCount(true, false); |
---|
| 189 | } |
---|
| 190 | |
---|
| 191 | @Override |
---|
| 192 | protected void tearDown() throws Exception { |
---|
| 193 | mrCluster.shutdown(); |
---|
| 194 | dfsCluster.shutdown(); |
---|
| 195 | super.tearDown(); |
---|
| 196 | } |
---|
| 197 | |
---|
| 198 | |
---|
| 199 | void launchTask(Class MapClass,Class ReduceClass) throws Exception{ |
---|
| 200 | JobConf job = configure(MapClass, ReduceClass, 5, 10, true); |
---|
| 201 | try { |
---|
| 202 | JobClient.runJob(job); |
---|
| 203 | } catch (IOException ioe) {} |
---|
| 204 | } |
---|
| 205 | |
---|
| 206 | @SuppressWarnings("unchecked") |
---|
| 207 | JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers, |
---|
| 208 | boolean locality) |
---|
| 209 | throws Exception { |
---|
| 210 | JobConf jobConf = mrCluster.createJobConf(); |
---|
| 211 | final Path inDir = new Path("./failjob/input"); |
---|
| 212 | final Path outDir = new Path("./failjob/output"); |
---|
| 213 | String input = "Test failing job.\n One more line"; |
---|
| 214 | FileSystem inFs = inDir.getFileSystem(jobConf); |
---|
| 215 | FileSystem outFs = outDir.getFileSystem(jobConf); |
---|
| 216 | outFs.delete(outDir, true); |
---|
| 217 | if (!inFs.mkdirs(inDir)) { |
---|
| 218 | throw new IOException("create directory failed" + inDir.toString()); |
---|
| 219 | } |
---|
| 220 | |
---|
| 221 | DataOutputStream file = inFs.create(new Path(inDir, "part-0")); |
---|
| 222 | file.writeBytes(input); |
---|
| 223 | file.close(); |
---|
| 224 | jobConf.setJobName("failmaptask"); |
---|
| 225 | if (locality) { |
---|
| 226 | jobConf.setInputFormat(TextInputFormat.class); |
---|
| 227 | } else { |
---|
| 228 | jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class); |
---|
| 229 | } |
---|
| 230 | jobConf.setOutputKeyClass(Text.class); |
---|
| 231 | jobConf.setOutputValueClass(Text.class); |
---|
| 232 | jobConf.setMapperClass(MapClass); |
---|
| 233 | jobConf.setCombinerClass(ReduceClass); |
---|
| 234 | jobConf.setReducerClass(ReduceClass); |
---|
| 235 | FileInputFormat.setInputPaths(jobConf, inDir); |
---|
| 236 | FileOutputFormat.setOutputPath(jobConf, outDir); |
---|
| 237 | jobConf.setNumMapTasks(maps); |
---|
| 238 | jobConf.setNumReduceTasks(reducers); |
---|
| 239 | return jobConf; |
---|
| 240 | } |
---|
| 241 | |
---|
| 242 | void checkTaskCounts() { |
---|
| 243 | JobStatus[] status = jt.getAllJobs(); |
---|
| 244 | for (JobStatus js : status) { |
---|
| 245 | JobInProgress jip = jt.getJob(js.getJobID()); |
---|
| 246 | Counters counter = jip.getJobCounters(); |
---|
| 247 | long totalTaskCount = counter |
---|
| 248 | .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS) |
---|
| 249 | + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES); |
---|
| 250 | while (jip.getNumTaskCompletionEvents() < totalTaskCount) { |
---|
| 251 | assertEquals(true, (jip.runningMaps() >= 0)); |
---|
| 252 | assertEquals(true, (jip.pendingMaps() >= 0)); |
---|
| 253 | assertEquals(true, (jip.runningReduces() >= 0)); |
---|
| 254 | assertEquals(true, (jip.pendingReduces() >= 0)); |
---|
| 255 | } |
---|
| 256 | } |
---|
| 257 | } |
---|
| 258 | |
---|
| 259 | } |
---|