source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21import java.net.InetAddress;
22import java.net.InetSocketAddress;
23import java.util.Collection;
24import java.util.List;
25
26import javax.security.auth.login.LoginException;
27
28import org.apache.commons.logging.Log;
29import org.apache.commons.logging.LogFactory;
30import org.apache.hadoop.conf.Configuration;
31import org.apache.hadoop.examples.SleepJob;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.hdfs.MiniDFSCluster;
35import org.apache.hadoop.ipc.RPC;
36import org.apache.hadoop.net.NetUtils;
37import org.apache.hadoop.security.UnixUserGroupInformation;
38
39import junit.framework.TestCase;
40
41public class TestJobQueueInformation extends TestCase {
42
43  private MiniMRCluster mrCluster;
44  private MiniDFSCluster dfsCluster;
45  private JobConf jc;
46  private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO";
47  private static final Path TEST_DIR = 
48    new Path(System.getProperty("test.build.data","/tmp"), 
49             "job-queue-info-testing");
50  private static final Path IN_DIR = new Path(TEST_DIR, "input");
51  private static final Path SHARE_DIR = new Path(TEST_DIR, "share");
52  private static final Path OUTPUT_DIR = new Path(TEST_DIR, "output");
53 
54  static String getSignalFile() {
55    return (new Path(SHARE_DIR, "signal")).toString();
56  }
57
58  // configure a waiting job with 2 maps
59  private JobConf configureWaitingJob(JobConf conf) throws IOException {
60   
61    UtilsForTests.configureWaitingJobConf(conf, IN_DIR, OUTPUT_DIR, 2, 0, 
62        "test-job-queue-info", getSignalFile(), getSignalFile());
63    return conf;
64  }
65
66  public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
67
68    @Override
69    public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
70        throws IOException {
71      Collection<JobInProgress> jips = jobQueueJobInProgressListener
72          .getJobQueue();
73      if (jips != null && !jips.isEmpty()) {
74        for (JobInProgress jip : jips) {
75          jip.setSchedulingInfo(JOB_SCHEDULING_INFO);
76        }
77      }
78      return super.assignTasks(taskTracker);
79    }
80  }
81
82  @Override
83  protected void setUp() throws Exception {
84    super.setUp();
85    final int taskTrackers = 4;
86    Configuration conf = new Configuration();
87    dfsCluster = new MiniDFSCluster(conf, 4, true, null);
88
89    jc = new JobConf();
90    jc.setClass("mapred.jobtracker.taskScheduler", TestTaskScheduler.class,
91        TaskScheduler.class);
92    jc.setLong("mapred.jobtracker.taskScheduler.maxRunningTasksPerJob", 10L);
93    mrCluster = new MiniMRCluster(0, 0, taskTrackers, dfsCluster
94        .getFileSystem().getUri().toString(), 1, null, null, null, jc);
95  }
96
97  @Override
98  protected void tearDown() throws Exception {
99    super.tearDown();
100    mrCluster.shutdown();
101    dfsCluster.shutdown();
102  }
103
104  public void testJobQueues() throws IOException {
105    JobClient jc = new JobClient(mrCluster.createJobConf());
106    String expectedQueueInfo = "Maximum Tasks Per Job :: 10";
107    JobQueueInfo[] queueInfos = jc.getQueues();
108    assertNotNull(queueInfos);
109    assertEquals(1, queueInfos.length);
110    assertEquals("default", queueInfos[0].getQueueName());
111    JobConf conf = mrCluster.createJobConf();
112    FileSystem fileSys = dfsCluster.getFileSystem();
113   
114    // configure a waiting job
115    conf = configureWaitingJob(conf);
116    conf.setJobName("test-job-queue-info-test");
117   
118    // clear the signal file if any
119    fileSys.delete(SHARE_DIR, true);
120   
121    RunningJob rJob = jc.submitJob(conf);
122   
123    while (rJob.getJobState() != JobStatus.RUNNING) {
124      UtilsForTests.waitFor(10);
125    }
126   
127    int numberOfJobs = 0;
128
129    for (JobQueueInfo queueInfo : queueInfos) {
130      JobStatus[] jobStatusList = jc.getJobsFromQueue(queueInfo
131          .getQueueName());
132      assertNotNull(queueInfo.getQueueName());
133      assertNotNull(queueInfo.getSchedulingInfo());
134      assertEquals(expectedQueueInfo, queueInfo.getSchedulingInfo());
135      numberOfJobs += jobStatusList.length;
136      for (JobStatus status : jobStatusList) {
137        assertEquals(JOB_SCHEDULING_INFO, status.getSchedulingInfo());
138      }
139    }
140    assertEquals(1, numberOfJobs);
141   
142    UtilsForTests.signalTasks(dfsCluster, fileSys, getSignalFile(), 
143                              getSignalFile(), 4);
144  }
145}
Note: See TracBrowser for help on using the repository browser.