1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import java.io.IOException; |
---|
21 | import java.net.InetAddress; |
---|
22 | import java.net.InetSocketAddress; |
---|
23 | import java.util.Collection; |
---|
24 | import java.util.List; |
---|
25 | |
---|
26 | import javax.security.auth.login.LoginException; |
---|
27 | |
---|
28 | import org.apache.commons.logging.Log; |
---|
29 | import org.apache.commons.logging.LogFactory; |
---|
30 | import org.apache.hadoop.conf.Configuration; |
---|
31 | import org.apache.hadoop.examples.SleepJob; |
---|
32 | import org.apache.hadoop.fs.FileSystem; |
---|
33 | import org.apache.hadoop.fs.Path; |
---|
34 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
---|
35 | import org.apache.hadoop.ipc.RPC; |
---|
36 | import org.apache.hadoop.net.NetUtils; |
---|
37 | import org.apache.hadoop.security.UnixUserGroupInformation; |
---|
38 | |
---|
39 | import junit.framework.TestCase; |
---|
40 | |
---|
41 | public class TestJobQueueInformation extends TestCase { |
---|
42 | |
---|
43 | private MiniMRCluster mrCluster; |
---|
44 | private MiniDFSCluster dfsCluster; |
---|
45 | private JobConf jc; |
---|
46 | private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO"; |
---|
47 | private static final Path TEST_DIR = |
---|
48 | new Path(System.getProperty("test.build.data","/tmp"), |
---|
49 | "job-queue-info-testing"); |
---|
50 | private static final Path IN_DIR = new Path(TEST_DIR, "input"); |
---|
51 | private static final Path SHARE_DIR = new Path(TEST_DIR, "share"); |
---|
52 | private static final Path OUTPUT_DIR = new Path(TEST_DIR, "output"); |
---|
53 | |
---|
54 | static String getSignalFile() { |
---|
55 | return (new Path(SHARE_DIR, "signal")).toString(); |
---|
56 | } |
---|
57 | |
---|
58 | // configure a waiting job with 2 maps |
---|
59 | private JobConf configureWaitingJob(JobConf conf) throws IOException { |
---|
60 | |
---|
61 | UtilsForTests.configureWaitingJobConf(conf, IN_DIR, OUTPUT_DIR, 2, 0, |
---|
62 | "test-job-queue-info", getSignalFile(), getSignalFile()); |
---|
63 | return conf; |
---|
64 | } |
---|
65 | |
---|
66 | public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler { |
---|
67 | |
---|
68 | @Override |
---|
69 | public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) |
---|
70 | throws IOException { |
---|
71 | Collection<JobInProgress> jips = jobQueueJobInProgressListener |
---|
72 | .getJobQueue(); |
---|
73 | if (jips != null && !jips.isEmpty()) { |
---|
74 | for (JobInProgress jip : jips) { |
---|
75 | jip.setSchedulingInfo(JOB_SCHEDULING_INFO); |
---|
76 | } |
---|
77 | } |
---|
78 | return super.assignTasks(taskTracker); |
---|
79 | } |
---|
80 | } |
---|
81 | |
---|
82 | @Override |
---|
83 | protected void setUp() throws Exception { |
---|
84 | super.setUp(); |
---|
85 | final int taskTrackers = 4; |
---|
86 | Configuration conf = new Configuration(); |
---|
87 | dfsCluster = new MiniDFSCluster(conf, 4, true, null); |
---|
88 | |
---|
89 | jc = new JobConf(); |
---|
90 | jc.setClass("mapred.jobtracker.taskScheduler", TestTaskScheduler.class, |
---|
91 | TaskScheduler.class); |
---|
92 | jc.setLong("mapred.jobtracker.taskScheduler.maxRunningTasksPerJob", 10L); |
---|
93 | mrCluster = new MiniMRCluster(0, 0, taskTrackers, dfsCluster |
---|
94 | .getFileSystem().getUri().toString(), 1, null, null, null, jc); |
---|
95 | } |
---|
96 | |
---|
97 | @Override |
---|
98 | protected void tearDown() throws Exception { |
---|
99 | super.tearDown(); |
---|
100 | mrCluster.shutdown(); |
---|
101 | dfsCluster.shutdown(); |
---|
102 | } |
---|
103 | |
---|
104 | public void testJobQueues() throws IOException { |
---|
105 | JobClient jc = new JobClient(mrCluster.createJobConf()); |
---|
106 | String expectedQueueInfo = "Maximum Tasks Per Job :: 10"; |
---|
107 | JobQueueInfo[] queueInfos = jc.getQueues(); |
---|
108 | assertNotNull(queueInfos); |
---|
109 | assertEquals(1, queueInfos.length); |
---|
110 | assertEquals("default", queueInfos[0].getQueueName()); |
---|
111 | JobConf conf = mrCluster.createJobConf(); |
---|
112 | FileSystem fileSys = dfsCluster.getFileSystem(); |
---|
113 | |
---|
114 | // configure a waiting job |
---|
115 | conf = configureWaitingJob(conf); |
---|
116 | conf.setJobName("test-job-queue-info-test"); |
---|
117 | |
---|
118 | // clear the signal file if any |
---|
119 | fileSys.delete(SHARE_DIR, true); |
---|
120 | |
---|
121 | RunningJob rJob = jc.submitJob(conf); |
---|
122 | |
---|
123 | while (rJob.getJobState() != JobStatus.RUNNING) { |
---|
124 | UtilsForTests.waitFor(10); |
---|
125 | } |
---|
126 | |
---|
127 | int numberOfJobs = 0; |
---|
128 | |
---|
129 | for (JobQueueInfo queueInfo : queueInfos) { |
---|
130 | JobStatus[] jobStatusList = jc.getJobsFromQueue(queueInfo |
---|
131 | .getQueueName()); |
---|
132 | assertNotNull(queueInfo.getQueueName()); |
---|
133 | assertNotNull(queueInfo.getSchedulingInfo()); |
---|
134 | assertEquals(expectedQueueInfo, queueInfo.getSchedulingInfo()); |
---|
135 | numberOfJobs += jobStatusList.length; |
---|
136 | for (JobStatus status : jobStatusList) { |
---|
137 | assertEquals(JOB_SCHEDULING_INFO, status.getSchedulingInfo()); |
---|
138 | } |
---|
139 | } |
---|
140 | assertEquals(1, numberOfJobs); |
---|
141 | |
---|
142 | UtilsForTests.signalTasks(dfsCluster, fileSys, getSignalFile(), |
---|
143 | getSignalFile(), 4); |
---|
144 | } |
---|
145 | } |
---|