source: proiecte/HadoopJUnit/hadoop-0.20.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 20.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.util.ArrayList;
21import java.util.Collection;
22import java.util.HashMap;
23import java.util.Iterator;
24import java.util.Set;
25import java.util.TreeMap;
26import java.util.Map.Entry;
27import java.util.concurrent.atomic.AtomicInteger;
28
29import org.apache.commons.logging.Log;
30import org.apache.commons.logging.LogFactory;
31import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
32import org.apache.hadoop.util.StringUtils;
33
34/**
35 * This class asynchronously initializes jobs submitted to the
36 * Map/Reduce cluster running with the {@link CapacityTaskScheduler}.
37 *
38 * <p>
39 * The class comprises of a main poller thread, and a set of worker
40 * threads that together initialize the jobs. The poller thread periodically
41 * looks at jobs submitted to the scheduler, and selects a set of them
42 * to be initialized. It passes these to the worker threads for initializing.
43 * Each worker thread is configured to look at jobs submitted to a fixed
44 * set of queues. It initializes jobs in a round robin manner - selecting
45 * the first job in order from each queue ready to be initialized.
46 * </p>
47 *
48 * <p>
49 * An initialized job occupies memory resources on the Job Tracker. Hence,
50 * the poller limits the number of jobs initialized at any given time to
51 * a configured limit. The limit is specified per user per queue.
52 * </p>
53 *
54 * <p>
55 * However, since a job needs to be initialized before the scheduler can
56 * select tasks from it to run, it tries to keep a backlog of jobs
57 * initialized so the scheduler does not need to wait and let empty slots
58 * go waste. The core logic of the poller is to pick up the right jobs,
59 * which have a good potential to be run next by the scheduler. To do this,
60 * it picks up jobs submitted across users and across queues to account
61 * both for guaranteed capacities and user limits. It also always initializes
62 * high priority jobs, whenever they need to be initialized, even if this
63 * means going over the limit for initialized jobs.
64 * </p>
65 */
66public class JobInitializationPoller extends Thread {
67
68  private static final Log LOG = LogFactory
69      .getLog(JobInitializationPoller.class.getName());
70
71  /*
72   * The poller picks up jobs across users to initialize based on user limits.
73   * Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
74   * can run together. However, in order to account for jobs from a user that
75   * might complete faster than others, it initializes jobs from an additional
76   * number of users as a backlog. This variable defines the additional
77   * number of users whose jobs can be considered for initializing.
78   */
79  private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
80
81  private JobQueuesManager jobQueueManager;
82  private long sleepInterval;
83  private int poolSize;
84
85  /**
86   * A worker thread that initializes jobs in one or more queues assigned to
87   * it.
88   *
89   * Jobs are initialized in a round robin fashion one from each queue at a
90   * time.
91   */
92  class JobInitializationThread extends Thread {
93
94    private JobInProgress initializingJob;
95
96    private volatile boolean startIniting;
97    private AtomicInteger currentJobCount = new AtomicInteger(0); // number of jobs to initialize
98
99    /**
100     * The hash map which maintains relationship between queue to jobs to
101     * initialize per queue.
102     */
103    private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
104
105    public JobInitializationThread() {
106      startIniting = true;
107      jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
108    }
109
110    @Override
111    public void run() {
112      while (startIniting) {
113        initializeJobs(); 
114        try {
115          if (startIniting) {
116            Thread.sleep(sleepInterval);
117          } else {
118            break;
119          }
120        } catch (Throwable t) {
121        }
122      }
123    }
124
125    // The key method that initializes jobs from queues
126    // This method is package-private to allow test cases to call it
127    // synchronously in a controlled manner.
128    void initializeJobs() {
129      // while there are more jobs to initialize...
130      while (currentJobCount.get() > 0) {
131        Set<String> queues = jobsPerQueue.keySet();
132        for (String queue : queues) {
133          JobInProgress job = getFirstJobInQueue(queue);
134          if (job == null) {
135            continue;
136          }
137          LOG.info("Initializing job : " + job.getJobID() + " in Queue "
138              + job.getProfile().getQueueName() + " For user : "
139              + job.getProfile().getUser());
140          if (startIniting) {
141            setInitializingJob(job);
142            ttm.initJob(job);
143            setInitializingJob(null);
144          } else {
145            break;
146          }
147        }
148      }
149    }
150
151    /**
152     * This method returns the first job in the queue and removes the same.
153     *
154     * @param queue
155     *          queue name
156     * @return First job in the queue and removes it.
157     */
158    private JobInProgress getFirstJobInQueue(String queue) {
159      TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
160          .get(queue);
161      synchronized (jobsList) {
162        if (jobsList.isEmpty()) {
163          return null;
164        }
165        Iterator<JobInProgress> jobIterator = jobsList.values().iterator();
166        JobInProgress job = jobIterator.next();
167        jobIterator.remove();
168        currentJobCount.getAndDecrement();
169        return job;
170      }
171    }
172
173    /*
174     * Test method to check if the thread is currently initialising the job
175     */
176    synchronized JobInProgress getInitializingJob() {
177      return this.initializingJob;
178    }
179   
180    synchronized void setInitializingJob(JobInProgress job) {
181      this.initializingJob  = job;
182    }
183
184    void terminate() {
185      startIniting = false;
186    }
187
188    void addJobsToQueue(String queue, JobInProgress job) {
189      TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
190          .get(queue);
191      if (jobs == null) {
192        LOG.error("Invalid queue passed to the thread : " + queue
193            + " For job :: " + job.getJobID());
194      }
195      synchronized (jobs) {
196        JobSchedulingInfo schedInfo = new JobSchedulingInfo(job);
197        jobs.put(schedInfo, job);
198        currentJobCount.getAndIncrement();
199      }
200    }
201
202    void addQueue(String queue) {
203      TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
204          jobQueueManager.getComparator(queue));
205      jobsPerQueue.put(queue, jobs);
206    }
207  }
208
209  /**
210   * The queue information class maintains following information per queue:
211   * Maximum users allowed to initialize job in the particular queue. Maximum
212   * jobs allowed to be initialize per user in the queue.
213   *
214   */
215  private class QueueInfo {
216    String queue;
217    int maxUsersAllowedToInitialize;
218    int maxJobsPerUserToInitialize;
219
220    public QueueInfo(String queue, int maxUsersAllowedToInitialize,
221        int maxJobsPerUserToInitialize) {
222      this.queue = queue;
223      this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
224      this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
225    }
226  }
227
228  /**
229   * Map which contains the configuration used for initializing jobs
230   * in that associated to a particular job queue.
231   */
232  private HashMap<String, QueueInfo> jobQueues;
233
234  /**
235   * Set of jobs which have been passed to Initialization threads.
236   * This is maintained so that we dont call initTasks() for same job twice.
237   */
238  private HashMap<JobID,JobInProgress> initializedJobs;
239
240  private volatile boolean running;
241
242  private TaskTrackerManager ttm;
243  /**
244   * The map which provides information which thread should be used to
245   * initialize jobs for a given job queue.
246   */
247  private HashMap<String, JobInitializationThread> threadsToQueueMap;
248
249  public JobInitializationPoller(JobQueuesManager mgr,
250      CapacitySchedulerConf rmConf, Set<String> queue, 
251      TaskTrackerManager ttm) {
252    initializedJobs = new HashMap<JobID,JobInProgress>();
253    jobQueues = new HashMap<String, QueueInfo>();
254    this.jobQueueManager = mgr;
255    threadsToQueueMap = new HashMap<String, JobInitializationThread>();
256    super.setName("JobInitializationPollerThread");
257    running = true;
258    this.ttm = ttm;
259  }
260
261  /*
262   * method to read all configuration values required by the initialisation
263   * poller
264   */
265
266  void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
267    for (String queue : queues) {
268      int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
269      int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
270      int maxJobsPerUserToInitialize = capacityConf
271          .getMaxJobsPerUserToInitialize(queue);
272      QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
273          maxJobsPerUserToInitialize);
274      jobQueues.put(queue, qi);
275    }
276    sleepInterval = capacityConf.getSleepInterval();
277    poolSize = capacityConf.getMaxWorkerThreads();
278    if (poolSize > queues.size()) {
279      poolSize = queues.size();
280    }
281    assignThreadsToQueues();
282    Collection<JobInitializationThread> threads = threadsToQueueMap.values();
283    for (JobInitializationThread t : threads) {
284      if (!t.isAlive()) {
285        t.setDaemon(true);
286        t.start();
287      }
288    }
289  }
290
291  /**
292   * This is main thread of initialization poller, We essentially do
293   * following in the main threads:
294   *
295   * <ol>
296   * <li> Clean up the list of initialized jobs list which poller maintains
297   * </li>
298   * <li> Select jobs to initialize in the polling interval.</li>
299   * </ol>
300   */
301  public void run() {
302    while (running) {
303      try {
304        cleanUpInitializedJobsList();
305        selectJobsToInitialize();
306        if (!this.isInterrupted()) {
307          Thread.sleep(sleepInterval);
308        }
309      } catch (InterruptedException e) {
310        LOG.error("Job Initialization poller interrupted"
311            + StringUtils.stringifyException(e));
312      }
313    }
314  }
315
316  /**
317   * The key method which does selecting jobs to be initalized across
318   * queues and assign those jobs to their appropriate init-worker threads.
319   * <br/>
320   * This method is overriden in test case which is used to test job
321   * initialization poller.
322   *
323   */
324  void selectJobsToInitialize() {
325    for (String queue : jobQueues.keySet()) {
326      ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
327      printJobs(jobsToInitialize);
328      JobInitializationThread t = threadsToQueueMap.get(queue);
329      for (JobInProgress job : jobsToInitialize) {
330        t.addJobsToQueue(queue, job);
331      }
332    }
333  }
334
335  /**
336   * Method used to print log statements about which jobs are being
337   * passed to init-threads.
338   *
339   * @param jobsToInitialize list of jobs which are passed to be
340   * init-threads.
341   */
342  private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
343    for (JobInProgress job : jobsToInitialize) {
344      LOG.info("Passing to Initializer Job Id :" + job.getJobID()
345          + " User: " + job.getProfile().getUser() + " Queue : "
346          + job.getProfile().getQueueName());
347    }
348  }
349
350  /**
351   * This method exists to be overridden by test cases that wish to
352   * create a test-friendly worker thread which can be controlled
353   * synchronously.
354   *
355   * @return Instance of worker init-threads.
356   */
357  JobInitializationThread createJobInitializationThread() {
358    return new JobInitializationThread();
359  }
360 
361  /**
362   * Method which is used by the poller to assign appropriate worker thread
363   * to a queue. The number of threads would be always less than or equal
364   * to number of queues in a system. If number of threads is configured to
365   * be more than number of queues then poller does not create threads more
366   * than number of queues.
367   *
368   */
369  private void assignThreadsToQueues() {
370    int countOfQueues = jobQueues.size();
371    String[] queues = (String[]) jobQueues.keySet().toArray(
372        new String[countOfQueues]);
373    int numberOfQueuesPerThread = countOfQueues / poolSize;
374    int numberOfQueuesAssigned = 0;
375    for (int i = 0; i < poolSize; i++) {
376      JobInitializationThread initializer = createJobInitializationThread();
377      int batch = (i * numberOfQueuesPerThread);
378      for (int j = batch; j < (batch + numberOfQueuesPerThread); j++) {
379        initializer.addQueue(queues[j]);
380        threadsToQueueMap.put(queues[j], initializer);
381        numberOfQueuesAssigned++;
382      }
383    }
384
385    if (numberOfQueuesAssigned < countOfQueues) {
386      // Assign remaining queues in round robin fashion to other queues
387      int startIndex = 0;
388      for (int i = numberOfQueuesAssigned; i < countOfQueues; i++) {
389        JobInitializationThread t = threadsToQueueMap
390            .get(queues[startIndex]);
391        t.addQueue(queues[i]);
392        threadsToQueueMap.put(queues[i], t);
393        startIndex++;
394      }
395    }
396  }
397
398  /**
399   *
400   * Method used to select jobs to be initialized for a given queue. <br/>
401   *
402   * We want to ensure that enough jobs have been initialized, so that when the
403   * Scheduler wants to consider a new job to run, it's ready. We clearly don't
404   * want to initialize too many jobs as each initialized job has a memory
405   * footprint, sometimes significant.
406   *
407   * Number of jobs to be initialized is restricted by two values: - Maximum
408   * number of users whose jobs we want to initialize, which is equal to
409   * the number of concurrent users the queue can support. - Maximum number
410   * of initialized jobs per user. The product of these two gives us the
411   * total number of initialized jobs.
412   *
413   * Note that this is a rough number, meant for decreasing extra memory
414   * footprint. It's OK if we go over it once in a while, if we have to.
415   *
416   * This can happen as follows. Suppose we have initialized 3 jobs for a
417   * user. Now, suppose the user submits a job who's priority is higher than
418   * that of the 3 jobs initialized. This job needs to be initialized, since it
419   * will run earlier than the 3 jobs. We'll now have 4 initialized jobs for the
420   * user. If memory becomes a problem, we should ideally un-initialize one of
421   * the 3 jobs, to keep the count of initialized jobs at 3, but that's
422   * something we don't do for now. This situation can also arise when a new
423   * user submits a high priority job, thus superceeding a user whose jobs have
424   * already been initialized. The latter user's initialized jobs are redundant,
425   * but we'll leave them initialized.
426   *
427   * @param queue name of the queue to pick the jobs to initialize.
428   * @return list of jobs to be initalized in a queue. An empty queue is
429   *         returned if no jobs are found.
430   */
431  ArrayList<JobInProgress> getJobsToInitialize(String queue) {
432    QueueInfo qi = jobQueues.get(queue);
433    ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
434    // use the configuration parameter which is configured for the particular
435    // queue.
436    int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
437    int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
438    int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
439        * maxJobsPerUserAllowedToInitialize;
440    int countOfJobsInitialized = 0;
441    HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
442    Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
443    /*
444     * Walk through the collection of waiting jobs.
445     *  We maintain a map of jobs that have already been initialized. If a
446     *  job exists in that map, increment the count for that job's user
447     *  and move on to the next job.
448     *   
449     *  If the job doesn't exist, see whether we  want to initialize it.
450     *  We initialize it if: - at least one job of the user has already
451     *  been initialized, but the user's total initialized jobs are below
452     *  the limit, OR - this is a new user, and we haven't reached the limit
453     *  for the number of users whose jobs we want to initialize. We break
454     *  when we've reached the limit of maximum jobs to initialize.
455     */
456    for (JobInProgress job : jobs) {
457      String user = job.getProfile().getUser();
458      int numberOfJobs = userJobsInitialized.get(user) == null ? 0
459          : userJobsInitialized.get(user);
460      // If the job is already initialized then add the count against user
461      // then continue.
462      if (initializedJobs.containsKey(job.getJobID())) {
463        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
464        countOfJobsInitialized++;
465        continue;
466      }
467      boolean isUserPresent = userJobsInitialized.containsKey(user);
468      if (!isUserPresent
469          && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
470        // this is a new user being considered and the number of users
471        // is within limits.
472        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
473        jobsToInitialize.add(job);
474        initializedJobs.put(job.getJobID(),job);
475        countOfJobsInitialized++;
476      } else if (isUserPresent
477          && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
478        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
479        jobsToInitialize.add(job);
480        initializedJobs.put(job.getJobID(),job);
481        countOfJobsInitialized++;
482      }
483      /*
484       * if the maximum number of jobs to initalize for a queue is reached
485       * then we stop looking at further jobs. The jobs beyond this number
486       * can be initialized.
487       */
488      if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
489        break;
490      }
491    }
492    return jobsToInitialize;
493  }
494
495
496  /**
497   * Method which is used internally to clean up the initialized jobs
498   * data structure which the job initialization poller uses to check
499   * if a job is initalized or not.
500   *
501   * Algorithm for cleaning up task is as follows:
502   *
503   * <ul>
504   * <li> For jobs in <b>initalizedJobs</b> list </li>
505   * <ul>
506   * <li> If job is running</li>
507   * <ul>
508   * <li> If job is scheduled then remove the job from the waiting queue
509   * of the scheduler and <b>initalizedJobs</b>.<br/>
510   *  The check for a job is scheduled or not is done by following
511   *  formulae:<br/>
512   *  if pending <i>task</i> &lt; desired <i>task</i> then scheduled else
513   *  not scheduled.<br/>
514   *  The formulae would return <i>scheduled</i> if one task has run or failed,
515   *  any cases in which there has been a failure but not enough to mark task
516   *  as failed, we return <i>not scheduled</i> in formulae.
517   * </li>
518   * </ul>
519   *
520   * <li> If job is complete, then remove the job from <b>initalizedJobs</b>.
521   * </li>
522   *
523   * </ul>
524   * </ul>
525   *
526   */
527  void cleanUpInitializedJobsList() {
528    Iterator<Entry<JobID, JobInProgress>> jobsIterator = 
529      initializedJobs.entrySet().iterator();
530    while(jobsIterator.hasNext()) {
531      Entry<JobID,JobInProgress> entry = jobsIterator.next();
532      JobInProgress job = entry.getValue();
533      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
534        if (isScheduled(job)) {
535          LOG.info("Removing scheduled jobs from waiting queue"
536              + job.getJobID());
537          jobsIterator.remove();
538          jobQueueManager.removeJobFromWaitingQueue(job);
539          continue;
540        }
541      }
542      if(job.isComplete()) {
543        LOG.info("Removing killed/completed job from initalized jobs " +
544                        "list : "+ job.getJobID());
545        jobsIterator.remove();
546      }
547    }
548  }
549
550  /**
551   * Convenience method to check if job has been scheduled or not.
552   *
553   * The method may return false in case of job which has failure but
554   * has not failed the tip.
555   * @param job
556   * @return
557   */
558  private boolean isScheduled(JobInProgress job) {
559    return ((job.pendingMaps() < job.desiredMaps()) 
560        || (job.pendingReduces() < job.desiredReduces()));
561  }
562
563  void terminate() {
564    running = false;
565    for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
566        .entrySet()) {
567      JobInitializationThread t = entry.getValue();
568      if (t.isAlive()) {
569        t.terminate();
570        t.interrupt();
571      }
572    }
573  }
574
575  /*
576   * Test method used only for testing purposes.
577   */
578  JobInProgress getInitializingJob(String queue) {
579    JobInitializationThread t = threadsToQueueMap.get(queue);
580    if (t == null) {
581      return null;
582    } else {
583      return t.getInitializingJob();
584    }
585  }
586
587  Set<JobID> getInitializedJobList() {
588    return initializedJobs.keySet();
589  }
590}
Note: See TracBrowser for help on using the repository browser.