source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestQueueManager.java

Last change on this file was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 15.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.util.Set;
23import java.util.TreeSet;
24
25import javax.security.auth.login.LoginException;
26
27import junit.framework.TestCase;
28
29import org.apache.commons.logging.Log;
30import org.apache.commons.logging.LogFactory;
31import org.apache.hadoop.examples.SleepJob;
32import org.apache.hadoop.mapred.JobConf;
33import org.apache.hadoop.fs.FileSystem;
34import org.apache.hadoop.fs.Path;
35import org.apache.hadoop.hdfs.MiniDFSCluster;
36import org.apache.hadoop.security.UserGroupInformation;
37import org.apache.hadoop.security.UnixUserGroupInformation;
38
39public class TestQueueManager extends TestCase {
40
41  private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
42 
43  private MiniDFSCluster miniDFSCluster;
44  private MiniMRCluster miniMRCluster;
45
46  public void testDefaultQueueConfiguration() {
47    JobConf conf = new JobConf();
48    QueueManager qMgr = new QueueManager(conf);
49    Set<String> expQueues = new TreeSet<String>();
50    expQueues.add("default");
51    verifyQueues(expQueues, qMgr.getQueues());
52    // pass true so it will fail if the key is not found.
53    assertFalse(conf.getBoolean("mapred.acls.enabled", true));
54  }
55 
56  public void testMultipleQueues() {
57    JobConf conf = new JobConf();
58    conf.set("mapred.queue.names", "q1,q2,Q3");
59    QueueManager qMgr = new QueueManager(conf);
60    Set<String> expQueues = new TreeSet<String>();
61    expQueues.add("q1");
62    expQueues.add("q2");
63    expQueues.add("Q3");
64    verifyQueues(expQueues, qMgr.getQueues());
65  }
66 
67  public void testSchedulerInfo() {
68    JobConf conf = new JobConf();
69    conf.set("mapred.queue.names", "qq1,qq2");
70    QueueManager qMgr = new QueueManager(conf);
71    qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
72    qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
73    assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
74    assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
75  }
76 
77  public void testAllEnabledACLForJobSubmission() throws IOException {
78    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
79    verifyJobSubmission(conf, true);
80  }
81 
82  public void testAllDisabledACLForJobSubmission() throws IOException {
83    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
84    verifyJobSubmission(conf, false);
85  }
86 
87  public void testUserDisabledACLForJobSubmission() throws IOException {
88    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", 
89                                "3698-non-existent-user");
90    verifyJobSubmission(conf, false);
91  }
92 
93  public void testDisabledACLForNonDefaultQueue() throws IOException {
94    // allow everyone in default queue
95    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
96    // setup a different queue
97    conf.set("mapred.queue.names", "default,q1");
98    // setup a different acl for this queue.
99    conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
100    // verify job submission to other queue fails.
101    verifyJobSubmission(conf, false, "q1");
102  }
103 
104  public void testSubmissionToInvalidQueue() throws IOException{
105    JobConf conf = new JobConf();
106    conf.set("mapred.queue.names","default");
107    setUpCluster(conf);
108    String queueName = "q1";
109    try {
110      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
111    } catch (IOException ioe) {     
112       assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
113       return;
114    } finally {
115      tearDownCluster();
116    }
117    fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");   
118  }
119 
120  public void testEnabledACLForNonDefaultQueue() throws IOException,
121                                                          LoginException {
122    // login as self...
123    UserGroupInformation ugi = UnixUserGroupInformation.login();
124    String userName = ugi.getUserName();
125    // allow everyone in default queue
126    JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
127    // setup a different queue
128    conf.set("mapred.queue.names", "default,q2");
129    // setup a different acl for this queue.
130    conf.set("mapred.queue.q2.acl-submit-job", userName);
131    // verify job submission to other queue fails.
132    verifyJobSubmission(conf, true, "q2");
133  }
134 
135  public void testUserEnabledACLForJobSubmission() 
136                                    throws IOException, LoginException {
137    // login as self...
138    UserGroupInformation ugi = UnixUserGroupInformation.login();
139    String userName = ugi.getUserName();
140    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
141                                  "3698-junk-user," + userName
142                                    + " 3698-junk-group1,3698-junk-group2");
143    verifyJobSubmission(conf, true);
144  }
145 
146  public void testGroupsEnabledACLForJobSubmission() 
147                                    throws IOException, LoginException {
148    // login as self, get one group, and add in allowed list.
149    UserGroupInformation ugi = UnixUserGroupInformation.login();
150    String[] groups = ugi.getGroupNames();
151    assertTrue(groups.length > 0);
152    JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
153                                "3698-junk-user1,3698-junk-user2 " 
154                                  + groups[groups.length-1] 
155                                           + ",3698-junk-group");
156    verifyJobSubmission(conf, true);
157  }
158 
159  public void testAllEnabledACLForJobKill() throws IOException {
160    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
161    verifyJobKill(conf, true);
162  }
163
164  public void testAllDisabledACLForJobKill() throws IOException {
165    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
166    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
167  }
168 
169  public void testOwnerAllowedForJobKill() throws IOException {
170    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
171                                              "junk-user");
172    verifyJobKill(conf, true);
173  }
174 
175  public void testUserDisabledACLForJobKill() throws IOException {
176    //setup a cluster allowing a user to submit
177    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
178                                              "dummy-user");
179    verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
180  }
181 
182  public void testUserEnabledACLForJobKill() throws IOException, 
183                                                    LoginException {
184    // login as self...
185    UserGroupInformation ugi = UnixUserGroupInformation.login();
186    String userName = ugi.getUserName();
187    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
188                                              "dummy-user,"+userName);
189    verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
190  }
191 
192  public void testUserDisabledForJobPriorityChange() throws IOException {
193    JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
194                              "junk-user");
195    verifyJobPriorityChangeAsOtherUser(conf, false, 
196                              "junk-user,junk-user-group");
197  }
198 
199  private JobConf setupConf(String aclName, String aclValue) {
200    JobConf conf = new JobConf();
201    conf.setBoolean("mapred.acls.enabled", true);
202    conf.set(aclName, aclValue);
203    return conf;
204  }
205 
206  private void verifyQueues(Set<String> expectedQueues, 
207                                          Set<String> actualQueues) {
208    assertEquals(expectedQueues.size(), actualQueues.size());
209    for (String queue : expectedQueues) {
210      assertTrue(actualQueues.contains(queue));
211    }
212  }
213 
214  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed) 
215                                              throws IOException {
216    verifyJobSubmission(conf, shouldSucceed, "default");
217  }
218
219  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
220                                    String queue) throws IOException {
221    setUpCluster(conf);
222    try {
223      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
224      if (shouldSucceed) {
225        assertTrue(rjob.isSuccessful());
226      } else {
227        fail("Job submission should have failed.");
228      }
229    } catch (IOException ioe) {
230      if (shouldSucceed) {
231        throw ioe;
232      } else {
233        LOG.info("exception while submitting job: " + ioe.getMessage());
234        assertTrue(ioe.getMessage().
235            contains("cannot perform operation " +
236            "SUBMIT_JOB on queue " + queue));
237        // check if the system directory gets cleaned up or not
238        JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
239        Path sysDir = new Path(jobtracker.getSystemDir());
240        FileSystem fs = sysDir.getFileSystem(conf);
241        int size = fs.listStatus(sysDir).length;
242        while (size > 1) { // ignore the jobtracker.info file
243          System.out.println("Waiting for the job files in sys directory to be cleaned up");
244          UtilsForTests.waitFor(100);
245          size = fs.listStatus(sysDir).length;
246        }
247      }
248    } finally {
249      tearDownCluster();
250    }
251}
252
253  private void verifyJobKill(JobConf conf, boolean shouldSucceed) 
254                                      throws IOException {
255    setUpCluster(conf);
256    try {
257      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
258      assertFalse(rjob.isComplete());
259      while(rjob.mapProgress() == 0.0f) {
260        try {
261          Thread.sleep(10); 
262        } catch (InterruptedException ie) {
263          break;
264        }
265      }
266      rjob.killJob();
267      while(rjob.cleanupProgress() == 0.0f) {
268        try {
269          Thread.sleep(10); 
270        } catch (InterruptedException ie) {
271          break;
272        }
273      }
274      if (shouldSucceed) {
275        assertTrue(rjob.isComplete());
276      } else {
277        fail("Job kill should have failed.");
278      }
279    } catch (IOException ioe) {
280      if (shouldSucceed) {
281        throw ioe;
282      } else {
283        LOG.info("exception while submitting job: " + ioe.getMessage());
284        assertTrue(ioe.getMessage().
285                        contains("cannot perform operation " +
286                                    "ADMINISTER_JOBS on queue default"));
287      }
288    } finally {
289      tearDownCluster();
290    }
291  }
292
293 
294  private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
295                                        String otherUserInfo) 
296                        throws IOException {
297    setUpCluster(conf);
298    try {
299      // submit a job as another user.
300      String userInfo = otherUserInfo;
301      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
302      assertFalse(rjob.isComplete());
303
304      //try to kill as self
305      try {
306        rjob.killJob();
307        if (!shouldSucceed) {
308          fail("should fail kill operation"); 
309        }
310      } catch (IOException ioe) {
311        if (shouldSucceed) {
312          throw ioe;
313        }
314        //verify it fails
315        LOG.info("exception while submitting job: " + ioe.getMessage());
316        assertTrue(ioe.getMessage().
317                        contains("cannot perform operation " +
318                                    "ADMINISTER_JOBS on queue default"));
319      }
320      //wait for job to complete on its own
321      while (!rjob.isComplete()) {
322        try {
323          Thread.sleep(1000);
324        } catch (InterruptedException ie) {
325          break;
326        }
327      }
328    } finally {
329      tearDownCluster();
330    }
331  }
332 
333  private void verifyJobPriorityChangeAsOtherUser(JobConf conf, 
334                          boolean shouldSucceed, String otherUserInfo)
335                            throws IOException {
336    setUpCluster(conf);
337    try {
338      // submit job as another user.
339      String userInfo = otherUserInfo;
340      RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
341      assertFalse(rjob.isComplete());
342     
343      // try to change priority as self
344      try {
345        rjob.setJobPriority("VERY_LOW");
346        if (!shouldSucceed) {
347          fail("changing priority should fail.");
348        }
349      } catch (IOException ioe) {
350        //verify it fails
351        LOG.info("exception while submitting job: " + ioe.getMessage());
352        assertTrue(ioe.getMessage().
353                        contains("cannot perform operation " +
354                                    "ADMINISTER_JOBS on queue default"));
355      }
356      //wait for job to complete on its own
357      while (!rjob.isComplete()) {
358        try {
359          Thread.sleep(1000);
360        } catch (InterruptedException ie) {
361          break;
362        }
363      }
364    } finally {
365      tearDownCluster();
366    }
367  }
368 
369  private void setUpCluster(JobConf conf) throws IOException {
370    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
371    FileSystem fileSys = miniDFSCluster.getFileSystem();
372    String namenode = fileSys.getUri().toString();
373    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
374                      null, null, conf);
375  }
376 
377  private void tearDownCluster() throws IOException {
378    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
379    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
380  }
381 
382  private RunningJob submitSleepJob(int numMappers, int numReducers, 
383                            long mapSleepTime, long reduceSleepTime,
384                            boolean shouldComplete) 
385                              throws IOException {
386    return submitSleepJob(numMappers, numReducers, mapSleepTime,
387                          reduceSleepTime, shouldComplete, null);
388  }
389 
390  private RunningJob submitSleepJob(int numMappers, int numReducers, 
391                                      long mapSleepTime, long reduceSleepTime,
392                                      boolean shouldComplete, String userInfo) 
393                                            throws IOException {
394    return submitSleepJob(numMappers, numReducers, mapSleepTime, 
395                          reduceSleepTime, shouldComplete, userInfo, null);
396  }
397
398  private RunningJob submitSleepJob(int numMappers, int numReducers, 
399                                    long mapSleepTime, long reduceSleepTime,
400                                    boolean shouldComplete, String userInfo,
401                                    String queueName) 
402                                      throws IOException {
403    JobConf clientConf = new JobConf();
404    clientConf.set("mapred.job.tracker", "localhost:"
405        + miniMRCluster.getJobTrackerPort());
406    SleepJob job = new SleepJob();
407    job.setConf(clientConf);
408    clientConf = job.setupJobConf(numMappers, numReducers, 
409        mapSleepTime, (int)mapSleepTime/100,
410        reduceSleepTime, (int)reduceSleepTime/100);
411    if (queueName != null) {
412      clientConf.setQueueName(queueName);
413    }
414    RunningJob rJob = null;
415    if (shouldComplete) {
416      rJob = JobClient.runJob(clientConf); 
417    } else {
418      JobConf jc = new JobConf(clientConf);
419      if (userInfo != null) {
420        jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
421      }
422      rJob = new JobClient(clientConf).submitJob(jc);
423    }
424    return rJob;
425  }
426
427}
Note: See TracBrowser for help on using the repository browser.