source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.HashMap;
24import java.util.List;
25import java.util.Map;
26
27import junit.framework.TestCase;
28
29import org.apache.hadoop.io.IntWritable;
30import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
31import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
32
33public class TestParallelInitialization extends TestCase {
34 
35  private static int jobCounter;
36  private static final int NUM_JOBS = 3;
37  IntWritable numJobsCompleted = new IntWritable();
38 
39  static void resetCounters() {
40    jobCounter = 0;
41  }
42 
43  class FakeJobInProgress extends JobInProgress {
44   
45    public FakeJobInProgress(JobConf jobConf,
46        FakeTaskTrackerManager taskTrackerManager) throws IOException {
47      super(new JobID("test", ++jobCounter), jobConf);
48      this.startTime = System.currentTimeMillis();
49      this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
50      this.status.setJobPriority(JobPriority.NORMAL);
51      this.status.setStartTime(startTime);
52    }
53
54    @Override
55    public synchronized void initTasks() throws IOException {
56      try {
57        int jobNumber = this.getJobID().getId();
58        synchronized (numJobsCompleted) {
59          while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
60            numJobsCompleted.wait();
61          }
62          numJobsCompleted.set(numJobsCompleted.get() + 1);
63          numJobsCompleted.notifyAll();
64          LOG.info("JobNumber " + jobNumber + " succeeded");
65        }
66      } catch (InterruptedException ie) {};
67      this.status.setRunState(JobStatus.SUCCEEDED);
68    }
69
70    @Override
71    synchronized void fail() {
72      this.status.setRunState(JobStatus.FAILED);
73    }
74  }
75 
76  static class FakeTaskTrackerManager implements TaskTrackerManager {
77   
78    int maps = 0;
79    int reduces = 0;
80    int maxMapTasksPerTracker = 2;
81    int maxReduceTasksPerTracker = 2;
82    List<JobInProgressListener> listeners =
83      new ArrayList<JobInProgressListener>();
84    QueueManager queueManager;
85   
86    private Map<String, TaskTrackerStatus> trackers =
87      new HashMap<String, TaskTrackerStatus>();
88
89    public FakeTaskTrackerManager() {
90      JobConf conf = new JobConf();
91      queueManager = new QueueManager(conf);
92      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
93                   new ArrayList<TaskStatus>(), 0,
94                   maxMapTasksPerTracker, maxReduceTasksPerTracker));
95    }
96   
97    public ClusterStatus getClusterStatus() {
98      int numTrackers = trackers.size();
99      return new ClusterStatus(numTrackers, 0, 
100                               JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
101                               maps, reduces,
102                               numTrackers * maxMapTasksPerTracker,
103                               numTrackers * maxReduceTasksPerTracker,
104                               JobTracker.State.RUNNING);
105    }
106   
107    public int getNumberOfUniqueHosts() {
108      return 0;
109    }
110
111    public Collection<TaskTrackerStatus> taskTrackers() {
112      return trackers.values();
113    }
114
115    public void addJobInProgressListener(JobInProgressListener listener) {
116      listeners.add(listener);
117    }
118
119    public void removeJobInProgressListener(JobInProgressListener listener) {
120      listeners.remove(listener);
121    }
122   
123   
124    public QueueManager getQueueManager() {
125      return queueManager;
126    }
127   
128    public int getNextHeartbeatInterval() {
129      return MRConstants.HEARTBEAT_INTERVAL_MIN;
130    }
131
132    public void killJob(JobID jobid) {
133      return;
134    }
135
136    public JobInProgress getJob(JobID jobid) {
137      return null;
138    }
139
140    public void initJob(JobInProgress job) {
141      try {
142        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
143        job.initTasks();
144        JobStatus newStatus = (JobStatus)job.getStatus().clone();
145        if (prevStatus.getRunState() != newStatus.getRunState()) {
146          JobStatusChangeEvent event = 
147            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
148                newStatus);
149          for (JobInProgressListener listener : listeners) {
150            listener.jobUpdated(event);
151          }
152        }
153      } catch (Exception ioe) {
154        failJob(job);
155      }
156    }
157    // Test methods
158   
159    public synchronized void failJob(JobInProgress job) {
160      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
161      job.fail();
162      JobStatus newStatus = (JobStatus)job.getStatus().clone();
163      if (prevStatus.getRunState() != newStatus.getRunState()) {
164        JobStatusChangeEvent event = 
165          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
166              newStatus);
167        for (JobInProgressListener listener : listeners) {
168          listener.jobUpdated(event);
169        }
170      }
171    }
172   
173    public void submitJob(JobInProgress job) throws IOException {
174      for (JobInProgressListener listener : listeners) {
175        listener.jobAdded(job);
176      }
177    }
178  }
179 
180  protected JobConf jobConf;
181  protected TaskScheduler scheduler;
182  private FakeTaskTrackerManager taskTrackerManager;
183
184  @Override
185  protected void setUp() throws Exception {
186    resetCounters();
187    jobConf = new JobConf();
188    taskTrackerManager = new FakeTaskTrackerManager();
189    scheduler = createTaskScheduler();
190    scheduler.setConf(jobConf);
191    scheduler.setTaskTrackerManager(taskTrackerManager);
192    scheduler.start();
193  }
194 
195  @Override
196  protected void tearDown() throws Exception {
197    if (scheduler != null) {
198      scheduler.terminate();
199    }
200  }
201 
202  protected TaskScheduler createTaskScheduler() {
203    return new JobQueueTaskScheduler();
204  }
205 
206  public void testParallelInitJobs() throws IOException {
207    FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
208   
209    // Submit NUM_JOBS jobs in order. The init code will ensure
210    // that the jobs get inited in descending order of Job ids
211    // i.e. highest job id first and the smallest last.
212    // If we were not doing parallel init, the first submitted job
213    // will be inited first and that will hang
214   
215    for (int i = 0; i < NUM_JOBS; i++) {
216      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
217      jobs[i].getStatus().setRunState(JobStatus.PREP);
218      taskTrackerManager.submitJob(jobs[i]);
219    }
220   
221    try {
222      Thread.sleep(1000);
223    } catch (InterruptedException ie) {}
224   
225    for (int i = 0; i < NUM_JOBS; i++) {
226      assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
227    }
228  } 
229}
Note: See TracBrowser for help on using the repository browser.