source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred.jobcontrol;
20
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.Hashtable;
24import java.util.Map;
25
26/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks
27 *  the states of the jobs by placing them into different tables according to their
28 *  states.
29 * 
30 *  This class provides APIs for the client app to add a job to the group and to get
31 *  the jobs in the group in different states. When a
32 *  job is added, an ID unique to the group is assigned to the job.
33 * 
34 *  This class has a thread that submits jobs when they become ready, monitors the
35 *  states of the running jobs, and updates the states of jobs based on the state changes
36 *  of their depending jobs states. The class provides APIs for suspending/resuming
37 *  the thread,and for stopping the thread.
38 * 
39 */
40public class JobControl implements Runnable{
41
42  // The thread can be in one of the following state
43  private static final int RUNNING = 0;
44  private static final int SUSPENDED = 1;
45  private static final int STOPPED = 2;
46  private static final int STOPPING = 3;
47  private static final int READY = 4;
48       
49  private int runnerState;                      // the thread state
50       
51  private Map<String, Job> waitingJobs;
52  private Map<String, Job> readyJobs;
53  private Map<String, Job> runningJobs;
54  private Map<String, Job> successfulJobs;
55  private Map<String, Job> failedJobs;
56       
57  private long nextJobID;
58  private String groupName;
59       
60  /**
61   * Construct a job control for a group of jobs.
62   * @param groupName a name identifying this group
63   */
64  public JobControl(String groupName) {
65    this.waitingJobs = new Hashtable<String, Job>();
66    this.readyJobs = new Hashtable<String, Job>();
67    this.runningJobs = new Hashtable<String, Job>();
68    this.successfulJobs = new Hashtable<String, Job>();
69    this.failedJobs = new Hashtable<String, Job>();
70    this.nextJobID = -1;
71    this.groupName = groupName;
72    this.runnerState = JobControl.READY;
73  }
74       
75  private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
76    ArrayList<Job> retv = new ArrayList<Job>();
77    synchronized (jobs) {
78      for (Job job : jobs.values()) {
79        retv.add(job);
80      }
81    }
82    return retv;
83  }
84       
85  /**
86   * @return the jobs in the waiting state
87   */
88  public ArrayList<Job> getWaitingJobs() {
89    return JobControl.toArrayList(this.waitingJobs);
90  }
91       
92  /**
93   * @return the jobs in the running state
94   */
95  public ArrayList<Job> getRunningJobs() {
96    return JobControl.toArrayList(this.runningJobs);
97  }
98       
99  /**
100   * @return the jobs in the ready state
101   */
102  public ArrayList<Job> getReadyJobs() {
103    return JobControl.toArrayList(this.readyJobs);
104  }
105       
106  /**
107   * @return the jobs in the success state
108   */
109  public ArrayList<Job> getSuccessfulJobs() {
110    return JobControl.toArrayList(this.successfulJobs);
111  }
112       
113  public ArrayList<Job> getFailedJobs() {
114    return JobControl.toArrayList(this.failedJobs);
115  }
116       
117  private String getNextJobID() {
118    nextJobID += 1;
119    return this.groupName + this.nextJobID;
120  }
121       
122  private static void addToQueue(Job aJob, Map<String, Job> queue) {
123    synchronized(queue) {
124      queue.put(aJob.getJobID(), aJob);
125    }           
126  }
127       
128  private void addToQueue(Job aJob) {
129    Map<String, Job> queue = getQueue(aJob.getState());
130    addToQueue(aJob, queue);   
131  }
132       
133  private Map<String, Job> getQueue(int state) {
134    Map<String, Job> retv = null;
135    if (state == Job.WAITING) {
136      retv = this.waitingJobs;
137    } else if (state == Job.READY) {
138      retv = this.readyJobs;
139    } else if (state == Job.RUNNING) {
140      retv = this.runningJobs;
141    } else if (state == Job.SUCCESS) {
142      retv = this.successfulJobs;
143    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
144      retv = this.failedJobs;
145    } 
146    return retv;
147  }
148
149  /**
150   * Add a new job.
151   * @param aJob the new job
152   */
153  synchronized public String addJob(Job aJob) {
154    String id = this.getNextJobID();
155    aJob.setJobID(id);
156    aJob.setState(Job.WAITING);
157    this.addToQueue(aJob);
158    return id; 
159  }
160       
161  /**
162   * Add a collection of jobs
163   *
164   * @param jobs
165   */
166  public void addJobs(Collection<Job> jobs) {
167    for (Job job : jobs) {
168      addJob(job);
169    }
170  }
171       
172  /**
173   * @return the thread state
174   */
175  public int getState() {
176    return this.runnerState;
177  }
178       
179  /**
180   * set the thread state to STOPPING so that the
181   * thread will stop when it wakes up.
182   */
183  public void stop() {
184    this.runnerState = JobControl.STOPPING;
185  }
186       
187  /**
188   * suspend the running thread
189   */
190  public void suspend () {
191    if (this.runnerState == JobControl.RUNNING) {
192      this.runnerState = JobControl.SUSPENDED;
193    }
194  }
195       
196  /**
197   * resume the suspended thread
198   */
199  public void resume () {
200    if (this.runnerState == JobControl.SUSPENDED) {
201      this.runnerState = JobControl.RUNNING;
202    }
203  }
204       
205  synchronized private void checkRunningJobs() {
206               
207    Map<String, Job> oldJobs = null;
208    oldJobs = this.runningJobs;
209    this.runningJobs = new Hashtable<String, Job>();
210               
211    for (Job nextJob : oldJobs.values()) {
212      int state = nextJob.checkState();
213      /*
214        if (state != Job.RUNNING) {
215        System.out.println("The state of the running job " +
216        nextJob.getJobName() + " has changed to: " + nextJob.getState());
217        }
218      */
219      this.addToQueue(nextJob);
220    }
221  }
222       
223  synchronized private void checkWaitingJobs() {
224    Map<String, Job> oldJobs = null;
225    oldJobs = this.waitingJobs;
226    this.waitingJobs = new Hashtable<String, Job>();
227               
228    for (Job nextJob : oldJobs.values()) {
229      int state = nextJob.checkState();
230      /*
231        if (state != Job.WAITING) {
232        System.out.println("The state of the waiting job " +
233        nextJob.getJobName() + " has changed to: " + nextJob.getState());
234        }
235      */
236      this.addToQueue(nextJob);
237    }
238  }
239       
240  synchronized private void startReadyJobs() {
241    Map<String, Job> oldJobs = null;
242    oldJobs = this.readyJobs;
243    this.readyJobs = new Hashtable<String, Job>();
244               
245    for (Job nextJob : oldJobs.values()) {
246      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
247      nextJob.submit();
248      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
249      this.addToQueue(nextJob);
250    }   
251  }
252       
253  synchronized public boolean allFinished() {
254    return this.waitingJobs.size() == 0 &&
255      this.readyJobs.size() == 0 &&
256      this.runningJobs.size() == 0;
257  }
258       
259  /**
260   *  The main loop for the thread.
261   *  The loop does the following:
262   *    Check the states of the running jobs
263   *    Update the states of waiting jobs
264   *    Submit the jobs in ready state
265   */
266  public void run() {
267    this.runnerState = JobControl.RUNNING;
268    while (true) {
269      while (this.runnerState == JobControl.SUSPENDED) {
270        try {
271          Thread.sleep(5000);
272        }
273        catch (Exception e) {
274                                       
275        }
276      }
277      checkRunningJobs();       
278      checkWaitingJobs();               
279      startReadyJobs();         
280      if (this.runnerState != JobControl.RUNNING && 
281          this.runnerState != JobControl.SUSPENDED) {
282        break;
283      }
284      try {
285        Thread.sleep(5000);
286      }
287      catch (Exception e) {
288                               
289      }
290      if (this.runnerState != JobControl.RUNNING && 
291          this.runnerState != JobControl.SUSPENDED) {
292        break;
293      }
294    }
295    this.runnerState = JobControl.STOPPED;
296  }
297
298}
Note: See TracBrowser for help on using the repository browser.