source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.List;
24
25import org.apache.commons.logging.Log;
26import org.apache.commons.logging.LogFactory;
27import org.apache.hadoop.conf.Configuration;
28
29/**
30 * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
31 * by default).
32 */
33class JobQueueTaskScheduler extends TaskScheduler {
34 
35  private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
36  public static final Log LOG = LogFactory.getLog(JobQueueTaskScheduler.class);
37 
38  protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
39  protected EagerTaskInitializationListener eagerTaskInitializationListener;
40  private float padFraction;
41 
42  public JobQueueTaskScheduler() {
43    this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
44  }
45 
46  @Override
47  public synchronized void start() throws IOException {
48    super.start();
49    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
50    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
51    eagerTaskInitializationListener.start();
52    taskTrackerManager.addJobInProgressListener(
53        eagerTaskInitializationListener);
54  }
55 
56  @Override
57  public synchronized void terminate() throws IOException {
58    if (jobQueueJobInProgressListener != null) {
59      taskTrackerManager.removeJobInProgressListener(
60          jobQueueJobInProgressListener);
61    }
62    if (eagerTaskInitializationListener != null) {
63      taskTrackerManager.removeJobInProgressListener(
64          eagerTaskInitializationListener);
65      eagerTaskInitializationListener.terminate();
66    }
67    super.terminate();
68  }
69 
70  @Override
71  public synchronized void setConf(Configuration conf) {
72    super.setConf(conf);
73    padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
74                                 0.01f);
75    this.eagerTaskInitializationListener =
76      new EagerTaskInitializationListener(conf);
77  }
78
79  @Override
80  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
81      throws IOException {
82
83    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
84    final int numTaskTrackers = clusterStatus.getTaskTrackers();
85    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
86    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
87
88    Collection<JobInProgress> jobQueue =
89      jobQueueJobInProgressListener.getJobQueue();
90
91    //
92    // Get map + reduce counts for the current tracker.
93    //
94    final int trackerMapCapacity = taskTracker.getMaxMapTasks();
95    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
96    final int trackerRunningMaps = taskTracker.countMapTasks();
97    final int trackerRunningReduces = taskTracker.countReduceTasks();
98
99    // Assigned tasks
100    List<Task> assignedTasks = new ArrayList<Task>();
101
102    //
103    // Compute (running + pending) map and reduce task numbers across pool
104    //
105    int remainingReduceLoad = 0;
106    int remainingMapLoad = 0;
107    synchronized (jobQueue) {
108      for (JobInProgress job : jobQueue) {
109        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
110          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
111          if (job.scheduleReduces()) {
112            remainingReduceLoad += 
113              (job.desiredReduces() - job.finishedReduces());
114          }
115        }
116      }
117    }
118
119    // Compute the 'load factor' for maps and reduces
120    double mapLoadFactor = 0.0;
121    if (clusterMapCapacity > 0) {
122      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
123    }
124    double reduceLoadFactor = 0.0;
125    if (clusterReduceCapacity > 0) {
126      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
127    }
128       
129    //
130    // In the below steps, we allocate first map tasks (if appropriate),
131    // and then reduce tasks if appropriate.  We go through all jobs
132    // in order of job arrival; jobs only get serviced if their
133    // predecessors are serviced, too.
134    //
135
136    //
137    // We assign tasks to the current taskTracker if the given machine
138    // has a workload that's less than the maximum load of that kind of
139    // task.
140    // However, if the cluster is close to getting loaded i.e. we don't
141    // have enough _padding_ for speculative executions etc., we only
142    // schedule the "highest priority" task i.e. the task from the job
143    // with the highest priority.
144    //
145   
146    final int trackerCurrentMapCapacity = 
147      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
148                              trackerMapCapacity);
149    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
150    boolean exceededMapPadding = false;
151    if (availableMapSlots > 0) {
152      exceededMapPadding = 
153        exceededPadding(true, clusterStatus, trackerMapCapacity);
154    }
155   
156    int numLocalMaps = 0;
157    int numNonLocalMaps = 0;
158    scheduleMaps:
159    for (int i=0; i < availableMapSlots; ++i) {
160      synchronized (jobQueue) {
161        for (JobInProgress job : jobQueue) {
162          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
163            continue;
164          }
165
166          Task t = null;
167         
168          // Try to schedule a node-local or rack-local Map task
169          t = 
170            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
171                                      taskTrackerManager.getNumberOfUniqueHosts());
172          if (t != null) {
173            assignedTasks.add(t);
174            ++numLocalMaps;
175           
176            // Don't assign map tasks to the hilt!
177            // Leave some free slots in the cluster for future task-failures,
178            // speculative tasks etc. beyond the highest priority job
179            if (exceededMapPadding) {
180              break scheduleMaps;
181            }
182           
183            // Try all jobs again for the next Map task
184            break;
185          }
186         
187          // Try to schedule a node-local or rack-local Map task
188          t = 
189            job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
190                                   taskTrackerManager.getNumberOfUniqueHosts());
191         
192          if (t != null) {
193            assignedTasks.add(t);
194            ++numNonLocalMaps;
195           
196            // We assign at most 1 off-switch or speculative task
197            // This is to prevent TaskTrackers from stealing local-tasks
198            // from other TaskTrackers.
199            break scheduleMaps;
200          }
201        }
202      }
203    }
204    int assignedMaps = assignedTasks.size();
205
206    //
207    // Same thing, but for reduce tasks
208    // However we _never_ assign more than 1 reduce task per heartbeat
209    //
210    final int trackerCurrentReduceCapacity = 
211      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
212               trackerReduceCapacity);
213    final int availableReduceSlots = 
214      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
215    boolean exceededReducePadding = false;
216    if (availableReduceSlots > 0) {
217      exceededReducePadding = exceededPadding(false, clusterStatus, 
218                                              trackerReduceCapacity);
219      synchronized (jobQueue) {
220        for (JobInProgress job : jobQueue) {
221          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
222              job.numReduceTasks == 0) {
223            continue;
224          }
225
226          Task t = 
227            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
228                                    taskTrackerManager.getNumberOfUniqueHosts()
229                                    );
230          if (t != null) {
231            assignedTasks.add(t);
232            break;
233          }
234         
235          // Don't assign reduce tasks to the hilt!
236          // Leave some free slots in the cluster for future task-failures,
237          // speculative tasks etc. beyond the highest priority job
238          if (exceededReducePadding) {
239            break;
240          }
241        }
242      }
243    }
244   
245    if (LOG.isDebugEnabled()) {
246      LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
247                "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
248                trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
249                (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
250                assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + 
251                ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + 
252                trackerCurrentReduceCapacity + "," + trackerRunningReduces + 
253                "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + 
254                ", " + (assignedTasks.size()-assignedMaps) + "]");
255    }
256
257    return assignedTasks;
258  }
259
260  private boolean exceededPadding(boolean isMapTask, 
261                                  ClusterStatus clusterStatus, 
262                                  int maxTaskTrackerSlots) { 
263    int numTaskTrackers = clusterStatus.getTaskTrackers();
264    int totalTasks = 
265      (isMapTask) ? clusterStatus.getMapTasks() : 
266        clusterStatus.getReduceTasks();
267    int totalTaskCapacity = 
268      isMapTask ? clusterStatus.getMaxMapTasks() : 
269                  clusterStatus.getMaxReduceTasks();
270
271    Collection<JobInProgress> jobQueue =
272      jobQueueJobInProgressListener.getJobQueue();
273
274    boolean exceededPadding = false;
275    synchronized (jobQueue) {
276      int totalNeededTasks = 0;
277      for (JobInProgress job : jobQueue) {
278        if (job.getStatus().getRunState() != JobStatus.RUNNING ||
279            job.numReduceTasks == 0) {
280          continue;
281        }
282
283        //
284        // Beyond the highest-priority task, reserve a little
285        // room for failures and speculative executions; don't
286        // schedule tasks to the hilt.
287        //
288        totalNeededTasks += 
289          isMapTask ? job.desiredMaps() : job.desiredReduces();
290        int padding = 0;
291        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
292          padding = 
293            Math.min(maxTaskTrackerSlots,
294                     (int) (totalNeededTasks * padFraction));
295        }
296        if (totalTasks + padding >= totalTaskCapacity) {
297          exceededPadding = true;
298          break;
299        }
300      }
301    }
302
303    return exceededPadding;
304  }
305
306  @Override
307  public synchronized Collection<JobInProgress> getJobs(String queueName) {
308    return jobQueueJobInProgressListener.getJobQueue();
309  } 
310}
Note: See TracBrowser for help on using the repository browser.