source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.HashMap;
24import java.util.List;
25import java.util.Map;
26
27import junit.framework.TestCase;
28
29import org.apache.hadoop.io.BytesWritable;
30
31public class TestJobQueueTaskScheduler extends TestCase {
32 
33  private static int jobCounter;
34  private static int taskCounter;
35 
36  static void resetCounters() {
37    jobCounter = 0;
38    taskCounter = 0;
39  }
40 
41  static class FakeJobInProgress extends JobInProgress {
42   
43    private FakeTaskTrackerManager taskTrackerManager;
44   
45    public FakeJobInProgress(JobConf jobConf,
46        FakeTaskTrackerManager taskTrackerManager) throws IOException {
47      super(new JobID("test", ++jobCounter), jobConf);
48      this.taskTrackerManager = taskTrackerManager;
49      this.startTime = System.currentTimeMillis();
50      this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
51      this.status.setJobPriority(JobPriority.NORMAL);
52      this.status.setStartTime(startTime);
53    }
54
55    @Override
56    public synchronized void initTasks() throws IOException {
57      // do nothing
58    }
59
60    @Override
61    public Task obtainNewLocalMapTask(TaskTrackerStatus tts, int clusterSize, 
62                                      int ignored) 
63    throws IOException {
64      return obtainNewMapTask(tts, clusterSize, ignored);
65    }
66   
67    @Override
68    public Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, int clusterSize, 
69                                         int ignored) 
70    throws IOException {
71      return obtainNewMapTask(tts, clusterSize, ignored);
72    }
73   
74    @Override
75    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
76        int ignored) throws IOException {
77      TaskAttemptID attemptId = getTaskAttemptID(true);
78      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
79        @Override
80        public String toString() {
81          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
82        }
83      };
84      taskTrackerManager.update(tts.getTrackerName(), task);
85      runningMapTasks++;
86      return task;
87    }
88   
89    @Override
90    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
91        int clusterSize, int ignored) throws IOException {
92      TaskAttemptID attemptId = getTaskAttemptID(false);
93      Task task = new ReduceTask("", attemptId, 0, 10) {
94        @Override
95        public String toString() {
96          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
97        }
98      };
99      taskTrackerManager.update(tts.getTrackerName(), task);
100      runningReduceTasks++;
101      return task;
102    }
103   
104    private TaskAttemptID getTaskAttemptID(boolean isMap) {
105      JobID jobId = getJobID();
106      return new TaskAttemptID(jobId.getJtIdentifier(),
107          jobId.getId(), isMap, ++taskCounter, 0);
108    }
109  }
110 
111  static class FakeTaskTrackerManager implements TaskTrackerManager {
112   
113    int maps = 0;
114    int reduces = 0;
115    int maxMapTasksPerTracker = 2;
116    int maxReduceTasksPerTracker = 2;
117    List<JobInProgressListener> listeners =
118      new ArrayList<JobInProgressListener>();
119    QueueManager queueManager;
120   
121    private Map<String, TaskTrackerStatus> trackers =
122      new HashMap<String, TaskTrackerStatus>();
123
124    public FakeTaskTrackerManager() {
125      JobConf conf = new JobConf();
126      queueManager = new QueueManager(conf);
127      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
128                   new ArrayList<TaskStatus>(), 0,
129                   maxMapTasksPerTracker, maxReduceTasksPerTracker));
130      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
131                   new ArrayList<TaskStatus>(), 0,
132                   maxMapTasksPerTracker, maxReduceTasksPerTracker));
133    }
134   
135    @Override
136    public ClusterStatus getClusterStatus() {
137      int numTrackers = trackers.size();
138      return new ClusterStatus(numTrackers, 0, 
139                               JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
140                               maps, reduces,
141                               numTrackers * maxMapTasksPerTracker,
142                               numTrackers * maxReduceTasksPerTracker,
143                               JobTracker.State.RUNNING);
144    }
145
146    @Override
147    public int getNumberOfUniqueHosts() {
148      return 0;
149    }
150
151    @Override
152    public Collection<TaskTrackerStatus> taskTrackers() {
153      return trackers.values();
154    }
155
156
157    @Override
158    public void addJobInProgressListener(JobInProgressListener listener) {
159      listeners.add(listener);
160    }
161
162    @Override
163    public void removeJobInProgressListener(JobInProgressListener listener) {
164      listeners.remove(listener);
165    }
166   
167    @Override
168    public QueueManager getQueueManager() {
169      return queueManager;
170    }
171   
172    @Override
173    public int getNextHeartbeatInterval() {
174      return MRConstants.HEARTBEAT_INTERVAL_MIN;
175    }
176
177    @Override
178    public void killJob(JobID jobid) {
179      return;
180    }
181
182    @Override
183    public JobInProgress getJob(JobID jobid) {
184      return null;
185    }
186
187    public void initJob(JobInProgress job) {
188      // do nothing
189    }
190   
191    public void failJob(JobInProgress job) {
192      // do nothing
193    }
194   
195    // Test methods
196   
197    public void submitJob(JobInProgress job) throws IOException {
198      for (JobInProgressListener listener : listeners) {
199        listener.jobAdded(job);
200      }
201    }
202   
203    public TaskTrackerStatus getTaskTracker(String trackerID) {
204      return trackers.get(trackerID);
205    }
206   
207    public void update(String taskTrackerName, final Task t) {
208      if (t.isMapTask()) {
209        maps++;
210      } else {
211        reduces++;
212      }
213      TaskStatus status = new TaskStatus() {
214        @Override
215        public boolean getIsMap() {
216          return t.isMapTask();
217        }
218      };
219      status.setRunState(TaskStatus.State.RUNNING);
220      trackers.get(taskTrackerName).getTaskReports().add(status);
221    }
222   
223  }
224 
225  protected JobConf jobConf;
226  protected TaskScheduler scheduler;
227  private FakeTaskTrackerManager taskTrackerManager;
228
229  @Override
230  protected void setUp() throws Exception {
231    resetCounters();
232    jobConf = new JobConf();
233    jobConf.setNumMapTasks(10);
234    jobConf.setNumReduceTasks(10);
235    taskTrackerManager = new FakeTaskTrackerManager();
236    scheduler = createTaskScheduler();
237    scheduler.setConf(jobConf);
238    scheduler.setTaskTrackerManager(taskTrackerManager);
239    scheduler.start();
240  }
241 
242  @Override
243  protected void tearDown() throws Exception {
244    if (scheduler != null) {
245      scheduler.terminate();
246    }
247  }
248 
249  protected TaskScheduler createTaskScheduler() {
250    return new JobQueueTaskScheduler();
251  }
252 
253  static void submitJobs(FakeTaskTrackerManager taskTrackerManager, JobConf jobConf, 
254                         int numJobs, int state)
255    throws IOException {
256    for (int i = 0; i < numJobs; i++) {
257      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
258      job.getStatus().setRunState(state);
259      taskTrackerManager.submitJob(job);
260    }
261  }
262
263  public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
264    assertEquals(0, scheduler.assignTasks(tracker(taskTrackerManager, "tt1")).size());
265  }
266
267  public void testNonRunningJobsAreIgnored() throws IOException {
268    submitJobs(taskTrackerManager, jobConf, 1, JobStatus.PREP);
269    submitJobs(taskTrackerManager, jobConf, 1, JobStatus.SUCCEEDED);
270    submitJobs(taskTrackerManager, jobConf, 1, JobStatus.FAILED);
271    submitJobs(taskTrackerManager, jobConf, 1, JobStatus.KILLED);
272    assertEquals(0, scheduler.assignTasks(tracker(taskTrackerManager, "tt1")).size());
273  }
274 
275  public void testDefaultTaskAssignment() throws IOException {
276    submitJobs(taskTrackerManager, jobConf, 2, JobStatus.RUNNING);
277    // All slots are filled with job 1
278    checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), 
279                    new String[] {"attempt_test_0001_m_000001_0 on tt1", 
280                                  "attempt_test_0001_m_000002_0 on tt1", 
281                                  "attempt_test_0001_r_000003_0 on tt1"});
282    checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), 
283                    new String[] {"attempt_test_0001_r_000004_0 on tt1"});
284    checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), new String[] {});
285    checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), 
286                    new String[] {"attempt_test_0001_m_000005_0 on tt2", 
287                                         "attempt_test_0001_m_000006_0 on tt2", 
288                                         "attempt_test_0001_r_000007_0 on tt2"});
289    checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), 
290                    new String[] {"attempt_test_0001_r_000008_0 on tt2"});
291    checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), new String[] {});
292    checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), new String[] {});
293    checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), new String[] {});
294  }
295
296  static TaskTrackerStatus tracker(FakeTaskTrackerManager taskTrackerManager,
297                                      String taskTrackerName) {
298    return taskTrackerManager.getTaskTracker(taskTrackerName);
299  }
300 
301  static void checkAssignment(TaskScheduler scheduler, TaskTrackerStatus tts,
302      String[] expectedTaskStrings) throws IOException {
303    List<Task> tasks = scheduler.assignTasks(tts);
304    assertNotNull(tasks);
305    assertEquals(expectedTaskStrings.length, tasks.size());
306    for (int i=0; i < expectedTaskStrings.length; ++i) {
307      assertEquals(expectedTaskStrings[i], tasks.get(i).toString());
308    }
309  }
310 
311}
Note: See TracBrowser for help on using the repository browser.