1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import java.io.File; |
---|
21 | import java.io.IOException; |
---|
22 | import java.text.SimpleDateFormat; |
---|
23 | import java.util.ArrayList; |
---|
24 | import java.util.Date; |
---|
25 | import java.util.Iterator; |
---|
26 | import java.util.List; |
---|
27 | |
---|
28 | import org.apache.commons.logging.Log; |
---|
29 | import org.apache.commons.logging.LogFactory; |
---|
30 | import org.apache.hadoop.fs.FileSystem; |
---|
31 | import org.apache.hadoop.net.DNSToSwitchMapping; |
---|
32 | import org.apache.hadoop.net.NetUtils; |
---|
33 | import org.apache.hadoop.net.NetworkTopology; |
---|
34 | import org.apache.hadoop.net.StaticMapping; |
---|
35 | import org.apache.hadoop.security.UnixUserGroupInformation; |
---|
36 | |
---|
37 | /** |
---|
38 | * This class creates a single-process Map-Reduce cluster for junit testing. |
---|
39 | * One thread is created for each server. |
---|
40 | */ |
---|
41 | public class MiniMRCluster { |
---|
42 | private static final Log LOG = LogFactory.getLog(MiniMRCluster.class); |
---|
43 | |
---|
44 | private Thread jobTrackerThread; |
---|
45 | private JobTrackerRunner jobTracker; |
---|
46 | |
---|
47 | private int jobTrackerPort = 0; |
---|
48 | private int taskTrackerPort = 0; |
---|
49 | private int jobTrackerInfoPort = 0; |
---|
50 | private int numTaskTrackers; |
---|
51 | |
---|
52 | private List<TaskTrackerRunner> taskTrackerList = new ArrayList<TaskTrackerRunner>(); |
---|
53 | private List<Thread> taskTrackerThreadList = new ArrayList<Thread>(); |
---|
54 | |
---|
55 | private String namenode; |
---|
56 | private UnixUserGroupInformation ugi = null; |
---|
57 | private JobConf conf; |
---|
58 | |
---|
59 | private JobConf job; |
---|
60 | |
---|
61 | /** |
---|
62 | * An inner class that runs a job tracker. |
---|
63 | */ |
---|
64 | class JobTrackerRunner implements Runnable { |
---|
65 | private JobTracker tracker = null; |
---|
66 | private volatile boolean isActive = true; |
---|
67 | |
---|
68 | JobConf jc = null; |
---|
69 | |
---|
70 | public JobTrackerRunner(JobConf conf) { |
---|
71 | jc = conf; |
---|
72 | } |
---|
73 | |
---|
74 | public boolean isUp() { |
---|
75 | return (tracker != null); |
---|
76 | } |
---|
77 | |
---|
78 | public boolean isActive() { |
---|
79 | return isActive; |
---|
80 | } |
---|
81 | |
---|
82 | public int getJobTrackerPort() { |
---|
83 | return tracker.getTrackerPort(); |
---|
84 | } |
---|
85 | |
---|
86 | public int getJobTrackerInfoPort() { |
---|
87 | return tracker.getInfoPort(); |
---|
88 | } |
---|
89 | |
---|
90 | public JobTracker getJobTracker() { |
---|
91 | return tracker; |
---|
92 | } |
---|
93 | |
---|
94 | /** |
---|
95 | * Create the job tracker and run it. |
---|
96 | */ |
---|
97 | public void run() { |
---|
98 | try { |
---|
99 | jc = (jc == null) ? createJobConf() : createJobConf(jc); |
---|
100 | File f = new File("build/test/mapred/local").getAbsoluteFile(); |
---|
101 | jc.set("mapred.local.dir",f.getAbsolutePath()); |
---|
102 | jc.setClass("topology.node.switch.mapping.impl", |
---|
103 | StaticMapping.class, DNSToSwitchMapping.class); |
---|
104 | String id = |
---|
105 | new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()); |
---|
106 | tracker = JobTracker.startTracker(jc, id); |
---|
107 | tracker.offerService(); |
---|
108 | } catch (Throwable e) { |
---|
109 | LOG.error("Job tracker crashed", e); |
---|
110 | isActive = false; |
---|
111 | } |
---|
112 | } |
---|
113 | |
---|
114 | /** |
---|
115 | * Shutdown the job tracker and wait for it to finish. |
---|
116 | */ |
---|
117 | public void shutdown() { |
---|
118 | try { |
---|
119 | if (tracker != null) { |
---|
120 | tracker.stopTracker(); |
---|
121 | } |
---|
122 | } catch (Throwable e) { |
---|
123 | LOG.error("Problem shutting down job tracker", e); |
---|
124 | } |
---|
125 | isActive = false; |
---|
126 | } |
---|
127 | } |
---|
128 | |
---|
129 | /** |
---|
130 | * An inner class to run the task tracker. |
---|
131 | */ |
---|
132 | class TaskTrackerRunner implements Runnable { |
---|
133 | volatile TaskTracker tt; |
---|
134 | int trackerId; |
---|
135 | // the localDirs for this taskTracker |
---|
136 | String[] localDirs; |
---|
137 | volatile boolean isInitialized = false; |
---|
138 | volatile boolean isDead = false; |
---|
139 | int numDir; |
---|
140 | |
---|
141 | TaskTrackerRunner(int trackerId, int numDir, String hostname, |
---|
142 | JobConf cfg) |
---|
143 | throws IOException { |
---|
144 | this.trackerId = trackerId; |
---|
145 | this.numDir = numDir; |
---|
146 | localDirs = new String[numDir]; |
---|
147 | JobConf conf = null; |
---|
148 | if (cfg == null) { |
---|
149 | conf = createJobConf(); |
---|
150 | } else { |
---|
151 | conf = createJobConf(cfg); |
---|
152 | } |
---|
153 | if (hostname != null) { |
---|
154 | conf.set("slave.host.name", hostname); |
---|
155 | } |
---|
156 | conf.set("mapred.task.tracker.http.address", "0.0.0.0:0"); |
---|
157 | conf.set("mapred.task.tracker.report.address", |
---|
158 | "127.0.0.1:" + taskTrackerPort); |
---|
159 | File localDirBase = |
---|
160 | new File(conf.get("mapred.local.dir")).getAbsoluteFile(); |
---|
161 | localDirBase.mkdirs(); |
---|
162 | StringBuffer localPath = new StringBuffer(); |
---|
163 | for(int i=0; i < numDir; ++i) { |
---|
164 | File ttDir = new File(localDirBase, |
---|
165 | Integer.toString(trackerId) + "_" + 0); |
---|
166 | if (!ttDir.mkdirs()) { |
---|
167 | if (!ttDir.isDirectory()) { |
---|
168 | throw new IOException("Mkdirs failed to create " + ttDir); |
---|
169 | } |
---|
170 | } |
---|
171 | localDirs[i] = ttDir.toString(); |
---|
172 | if (i != 0) { |
---|
173 | localPath.append(","); |
---|
174 | } |
---|
175 | localPath.append(localDirs[i]); |
---|
176 | } |
---|
177 | conf.set("mapred.local.dir", localPath.toString()); |
---|
178 | LOG.info("mapred.local.dir is " + localPath); |
---|
179 | try { |
---|
180 | tt = new TaskTracker(conf); |
---|
181 | isInitialized = true; |
---|
182 | } catch (Throwable e) { |
---|
183 | isDead = true; |
---|
184 | tt = null; |
---|
185 | LOG.error("task tracker " + trackerId + " crashed", e); |
---|
186 | } |
---|
187 | } |
---|
188 | |
---|
189 | /** |
---|
190 | * Create and run the task tracker. |
---|
191 | */ |
---|
192 | public void run() { |
---|
193 | try { |
---|
194 | if (tt != null) { |
---|
195 | tt.run(); |
---|
196 | } |
---|
197 | } catch (Throwable e) { |
---|
198 | isDead = true; |
---|
199 | tt = null; |
---|
200 | LOG.error("task tracker " + trackerId + " crashed", e); |
---|
201 | } |
---|
202 | } |
---|
203 | |
---|
204 | /** |
---|
205 | * Get the local dir for this TaskTracker. |
---|
206 | * This is there so that we do not break |
---|
207 | * previous tests. |
---|
208 | * @return the absolute pathname |
---|
209 | */ |
---|
210 | public String getLocalDir() { |
---|
211 | return localDirs[0]; |
---|
212 | } |
---|
213 | |
---|
214 | public String[] getLocalDirs(){ |
---|
215 | return localDirs; |
---|
216 | } |
---|
217 | |
---|
218 | public TaskTracker getTaskTracker() { |
---|
219 | return tt; |
---|
220 | } |
---|
221 | |
---|
222 | /** |
---|
223 | * Shut down the server and wait for it to finish. |
---|
224 | */ |
---|
225 | public void shutdown() { |
---|
226 | if (tt != null) { |
---|
227 | try { |
---|
228 | tt.shutdown(); |
---|
229 | } catch (Throwable e) { |
---|
230 | LOG.error("task tracker " + trackerId + " could not shut down", |
---|
231 | e); |
---|
232 | } |
---|
233 | } |
---|
234 | } |
---|
235 | } |
---|
236 | |
---|
237 | /** |
---|
238 | * Get the local directory for the Nth task tracker |
---|
239 | * @param taskTracker the index of the task tracker to check |
---|
240 | * @return the absolute pathname of the local dir |
---|
241 | */ |
---|
242 | public String getTaskTrackerLocalDir(int taskTracker) { |
---|
243 | return (taskTrackerList.get(taskTracker)).getLocalDir(); |
---|
244 | } |
---|
245 | |
---|
246 | public JobTrackerRunner getJobTrackerRunner() { |
---|
247 | return jobTracker; |
---|
248 | } |
---|
249 | |
---|
250 | TaskTrackerRunner getTaskTrackerRunner(int id) { |
---|
251 | return taskTrackerList.get(id); |
---|
252 | } |
---|
253 | /** |
---|
254 | * Get the number of task trackers in the cluster |
---|
255 | */ |
---|
256 | public int getNumTaskTrackers() { |
---|
257 | return taskTrackerList.size(); |
---|
258 | } |
---|
259 | |
---|
260 | /** |
---|
261 | * Wait until the system is idle. |
---|
262 | */ |
---|
263 | public void waitUntilIdle() { |
---|
264 | waitTaskTrackers(); |
---|
265 | |
---|
266 | JobClient client; |
---|
267 | try { |
---|
268 | client = new JobClient(job); |
---|
269 | while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) { |
---|
270 | for(TaskTrackerRunner runner : taskTrackerList) { |
---|
271 | if(runner.isDead) { |
---|
272 | throw new RuntimeException("TaskTracker is dead"); |
---|
273 | } |
---|
274 | } |
---|
275 | Thread.sleep(1000); |
---|
276 | } |
---|
277 | } |
---|
278 | catch (IOException ex) { |
---|
279 | throw new RuntimeException(ex); |
---|
280 | } |
---|
281 | catch (InterruptedException ex) { |
---|
282 | throw new RuntimeException(ex); |
---|
283 | } |
---|
284 | |
---|
285 | } |
---|
286 | |
---|
287 | private void waitTaskTrackers() { |
---|
288 | for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();) { |
---|
289 | TaskTrackerRunner runner = itr.next(); |
---|
290 | while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) { |
---|
291 | if (!runner.isInitialized) { |
---|
292 | LOG.info("Waiting for task tracker to start."); |
---|
293 | } else { |
---|
294 | LOG.info("Waiting for task tracker " + runner.tt.getName() + |
---|
295 | " to be idle."); |
---|
296 | } |
---|
297 | try { |
---|
298 | Thread.sleep(1000); |
---|
299 | } catch (InterruptedException ie) {} |
---|
300 | } |
---|
301 | } |
---|
302 | } |
---|
303 | |
---|
304 | /** |
---|
305 | * Get the actual rpc port used. |
---|
306 | */ |
---|
307 | public int getJobTrackerPort() { |
---|
308 | return jobTrackerPort; |
---|
309 | } |
---|
310 | |
---|
311 | public JobConf createJobConf() { |
---|
312 | return createJobConf(new JobConf()); |
---|
313 | } |
---|
314 | |
---|
315 | public JobConf createJobConf(JobConf conf) { |
---|
316 | if(conf == null) { |
---|
317 | conf = new JobConf(); |
---|
318 | } |
---|
319 | return configureJobConf(conf, namenode, jobTrackerPort, jobTrackerInfoPort, |
---|
320 | ugi); |
---|
321 | } |
---|
322 | |
---|
323 | static JobConf configureJobConf(JobConf conf, String namenode, |
---|
324 | int jobTrackerPort, int jobTrackerInfoPort, |
---|
325 | UnixUserGroupInformation ugi) { |
---|
326 | JobConf result = new JobConf(conf); |
---|
327 | FileSystem.setDefaultUri(result, namenode); |
---|
328 | result.set("mapred.job.tracker", "localhost:"+jobTrackerPort); |
---|
329 | result.set("mapred.job.tracker.http.address", |
---|
330 | "127.0.0.1:" + jobTrackerInfoPort); |
---|
331 | if (ugi != null) { |
---|
332 | result.set("mapred.system.dir", "/mapred/system"); |
---|
333 | UnixUserGroupInformation.saveToConf(result, |
---|
334 | UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi); |
---|
335 | } |
---|
336 | // for debugging have all task output sent to the test output |
---|
337 | JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL); |
---|
338 | return result; |
---|
339 | } |
---|
340 | |
---|
341 | /** |
---|
342 | * Create the config and the cluster. |
---|
343 | * @param numTaskTrackers no. of tasktrackers in the cluster |
---|
344 | * @param namenode the namenode |
---|
345 | * @param numDir no. of directories |
---|
346 | * @throws IOException |
---|
347 | */ |
---|
348 | public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, |
---|
349 | String[] racks, String[] hosts) throws IOException { |
---|
350 | this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts); |
---|
351 | } |
---|
352 | |
---|
353 | /** |
---|
354 | * Create the config and the cluster. |
---|
355 | * @param numTaskTrackers no. of tasktrackers in the cluster |
---|
356 | * @param namenode the namenode |
---|
357 | * @param numDir no. of directories |
---|
358 | * @param racks Array of racks |
---|
359 | * @param hosts Array of hosts in the corresponding racks |
---|
360 | * @param conf Default conf for the jobtracker |
---|
361 | * @throws IOException |
---|
362 | */ |
---|
363 | public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, |
---|
364 | String[] racks, String[] hosts, JobConf conf) |
---|
365 | throws IOException { |
---|
366 | this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf); |
---|
367 | } |
---|
368 | |
---|
369 | /** |
---|
370 | * Create the config and the cluster. |
---|
371 | * @param numTaskTrackers no. of tasktrackers in the cluster |
---|
372 | * @param namenode the namenode |
---|
373 | * @param numDir no. of directories |
---|
374 | * @throws IOException |
---|
375 | */ |
---|
376 | public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) |
---|
377 | throws IOException { |
---|
378 | this(0, 0, numTaskTrackers, namenode, numDir); |
---|
379 | } |
---|
380 | |
---|
381 | public MiniMRCluster(int jobTrackerPort, |
---|
382 | int taskTrackerPort, |
---|
383 | int numTaskTrackers, |
---|
384 | String namenode, |
---|
385 | int numDir) |
---|
386 | throws IOException { |
---|
387 | this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
---|
388 | numDir, null); |
---|
389 | } |
---|
390 | |
---|
391 | public MiniMRCluster(int jobTrackerPort, |
---|
392 | int taskTrackerPort, |
---|
393 | int numTaskTrackers, |
---|
394 | String namenode, |
---|
395 | int numDir, |
---|
396 | String[] racks) throws IOException { |
---|
397 | this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
---|
398 | numDir, racks, null); |
---|
399 | } |
---|
400 | |
---|
401 | public MiniMRCluster(int jobTrackerPort, |
---|
402 | int taskTrackerPort, |
---|
403 | int numTaskTrackers, |
---|
404 | String namenode, |
---|
405 | int numDir, |
---|
406 | String[] racks, String[] hosts) throws IOException { |
---|
407 | this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
---|
408 | numDir, racks, hosts, null); |
---|
409 | } |
---|
410 | |
---|
411 | public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, |
---|
412 | int numTaskTrackers, String namenode, |
---|
413 | int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi |
---|
414 | ) throws IOException { |
---|
415 | this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, |
---|
416 | numDir, racks, hosts, ugi, null); |
---|
417 | } |
---|
418 | |
---|
419 | public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, |
---|
420 | int numTaskTrackers, String namenode, |
---|
421 | int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi, |
---|
422 | JobConf conf) throws IOException { |
---|
423 | if (racks != null && racks.length < numTaskTrackers) { |
---|
424 | LOG.error("Invalid number of racks specified. It should be at least " + |
---|
425 | "equal to the number of tasktrackers"); |
---|
426 | shutdown(); |
---|
427 | } |
---|
428 | if (hosts != null && numTaskTrackers > hosts.length ) { |
---|
429 | throw new IllegalArgumentException( "The length of hosts [" + hosts.length |
---|
430 | + "] is less than the number of tasktrackers [" + numTaskTrackers + "]."); |
---|
431 | } |
---|
432 | |
---|
433 | //Generate rack names if required |
---|
434 | if (racks == null) { |
---|
435 | System.out.println("Generating rack names for tasktrackers"); |
---|
436 | racks = new String[numTaskTrackers]; |
---|
437 | for (int i=0; i < racks.length; ++i) { |
---|
438 | racks[i] = NetworkTopology.DEFAULT_RACK; |
---|
439 | } |
---|
440 | } |
---|
441 | |
---|
442 | //Generate some hostnames if required |
---|
443 | if (hosts == null) { |
---|
444 | System.out.println("Generating host names for tasktrackers"); |
---|
445 | hosts = new String[numTaskTrackers]; |
---|
446 | for (int i = 0; i < numTaskTrackers; i++) { |
---|
447 | hosts[i] = "host" + i + ".foo.com"; |
---|
448 | } |
---|
449 | } |
---|
450 | this.jobTrackerPort = jobTrackerPort; |
---|
451 | this.taskTrackerPort = taskTrackerPort; |
---|
452 | this.jobTrackerInfoPort = 0; |
---|
453 | this.numTaskTrackers = 0; |
---|
454 | this.namenode = namenode; |
---|
455 | this.ugi = ugi; |
---|
456 | this.conf = conf; // this is the conf the mr starts with |
---|
457 | |
---|
458 | // start the jobtracker |
---|
459 | startJobTracker(); |
---|
460 | |
---|
461 | // Create the TaskTrackers |
---|
462 | for (int idx = 0; idx < numTaskTrackers; idx++) { |
---|
463 | String rack = null; |
---|
464 | String host = null; |
---|
465 | if (racks != null) { |
---|
466 | rack = racks[idx]; |
---|
467 | } |
---|
468 | if (hosts != null) { |
---|
469 | host = hosts[idx]; |
---|
470 | } |
---|
471 | |
---|
472 | startTaskTracker(host, rack, idx, numDir); |
---|
473 | } |
---|
474 | |
---|
475 | this.job = createJobConf(conf); |
---|
476 | waitUntilIdle(); |
---|
477 | } |
---|
478 | |
---|
479 | /** |
---|
480 | * Get the task completion events |
---|
481 | */ |
---|
482 | public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, |
---|
483 | int max) |
---|
484 | throws IOException { |
---|
485 | return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max); |
---|
486 | } |
---|
487 | |
---|
488 | /** |
---|
489 | * Change the job's priority |
---|
490 | */ |
---|
491 | public void setJobPriority(JobID jobId, JobPriority priority) { |
---|
492 | jobTracker.getJobTracker().setJobPriority(jobId, priority); |
---|
493 | } |
---|
494 | |
---|
495 | /** |
---|
496 | * Get the job's priority |
---|
497 | */ |
---|
498 | public JobPriority getJobPriority(JobID jobId) { |
---|
499 | return jobTracker.getJobTracker().getJob(jobId).getPriority(); |
---|
500 | } |
---|
501 | |
---|
502 | /** |
---|
503 | * Get the job finish time |
---|
504 | */ |
---|
505 | public long getJobFinishTime(JobID jobId) { |
---|
506 | return jobTracker.getJobTracker().getJob(jobId).getFinishTime(); |
---|
507 | } |
---|
508 | |
---|
509 | /** |
---|
510 | * Init the job |
---|
511 | */ |
---|
512 | public void initializeJob(JobID jobId) throws IOException { |
---|
513 | JobInProgress job = jobTracker.getJobTracker().getJob(jobId); |
---|
514 | jobTracker.getJobTracker().initJob(job); |
---|
515 | } |
---|
516 | |
---|
517 | /** |
---|
518 | * Get the events list at the tasktracker |
---|
519 | */ |
---|
520 | public MapTaskCompletionEventsUpdate |
---|
521 | getMapTaskCompletionEventsUpdates(int index, JobID jobId, int max) |
---|
522 | throws IOException { |
---|
523 | String jtId = jobTracker.getJobTracker().getTrackerIdentifier(); |
---|
524 | TaskAttemptID dummy = |
---|
525 | new TaskAttemptID(jtId, jobId.getId(), false, 0, 0); |
---|
526 | return taskTrackerList.get(index).getTaskTracker() |
---|
527 | .getMapCompletionEvents(jobId, 0, max, |
---|
528 | dummy); |
---|
529 | } |
---|
530 | |
---|
531 | /** |
---|
532 | * Get jobtracker conf |
---|
533 | */ |
---|
534 | public JobConf getJobTrackerConf() { |
---|
535 | return this.conf; |
---|
536 | } |
---|
537 | |
---|
538 | /** |
---|
539 | * Get num events recovered |
---|
540 | */ |
---|
541 | public int getNumEventsRecovered() { |
---|
542 | return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered(); |
---|
543 | } |
---|
544 | |
---|
545 | public int getFaultCount(String hostName) { |
---|
546 | return jobTracker.getJobTracker().getFaultCount(hostName); |
---|
547 | } |
---|
548 | |
---|
549 | /** |
---|
550 | * Start the jobtracker. |
---|
551 | */ |
---|
552 | public void startJobTracker() { |
---|
553 | startJobTracker(true); |
---|
554 | } |
---|
555 | |
---|
556 | void startJobTracker(boolean wait) { |
---|
557 | // Create the JobTracker |
---|
558 | jobTracker = new JobTrackerRunner(conf); |
---|
559 | jobTrackerThread = new Thread(jobTracker); |
---|
560 | |
---|
561 | jobTrackerThread.start(); |
---|
562 | |
---|
563 | if (!wait) { |
---|
564 | return; |
---|
565 | } |
---|
566 | |
---|
567 | while (jobTracker.isActive() && !jobTracker.isUp()) { |
---|
568 | try { // let daemons get started |
---|
569 | Thread.sleep(1000); |
---|
570 | } catch(InterruptedException e) { |
---|
571 | } |
---|
572 | } |
---|
573 | |
---|
574 | // is the jobtracker has started then wait for it to init |
---|
575 | ClusterStatus status = null; |
---|
576 | if (jobTracker.isUp()) { |
---|
577 | status = jobTracker.getJobTracker().getClusterStatus(false); |
---|
578 | while (jobTracker.isActive() && status.getJobTrackerState() |
---|
579 | == JobTracker.State.INITIALIZING) { |
---|
580 | try { |
---|
581 | LOG.info("JobTracker still initializing. Waiting."); |
---|
582 | Thread.sleep(1000); |
---|
583 | } catch(InterruptedException e) {} |
---|
584 | status = jobTracker.getJobTracker().getClusterStatus(false); |
---|
585 | } |
---|
586 | } |
---|
587 | |
---|
588 | if (!jobTracker.isActive()) { |
---|
589 | // return if jobtracker has crashed |
---|
590 | return; |
---|
591 | } |
---|
592 | |
---|
593 | // Set the configuration for the task-trackers |
---|
594 | this.jobTrackerPort = jobTracker.getJobTrackerPort(); |
---|
595 | this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort(); |
---|
596 | } |
---|
597 | |
---|
598 | /** |
---|
599 | * Kill the jobtracker. |
---|
600 | */ |
---|
601 | public void stopJobTracker() { |
---|
602 | //jobTracker.exit(-1); |
---|
603 | jobTracker.shutdown(); |
---|
604 | |
---|
605 | jobTrackerThread.interrupt(); |
---|
606 | try { |
---|
607 | jobTrackerThread.join(); |
---|
608 | } catch (InterruptedException ex) { |
---|
609 | LOG.error("Problem waiting for job tracker to finish", ex); |
---|
610 | } |
---|
611 | } |
---|
612 | |
---|
613 | /** |
---|
614 | * Kill the tasktracker. |
---|
615 | */ |
---|
616 | public void stopTaskTracker(int id) { |
---|
617 | TaskTrackerRunner tracker = taskTrackerList.remove(id); |
---|
618 | tracker.shutdown(); |
---|
619 | |
---|
620 | Thread thread = taskTrackerThreadList.remove(id); |
---|
621 | thread.interrupt(); |
---|
622 | |
---|
623 | try { |
---|
624 | thread.join(); |
---|
625 | // This will break the wait until idle loop |
---|
626 | tracker.isDead = true; |
---|
627 | --numTaskTrackers; |
---|
628 | } catch (InterruptedException ex) { |
---|
629 | LOG.error("Problem waiting for task tracker to finish", ex); |
---|
630 | } |
---|
631 | } |
---|
632 | |
---|
633 | /** |
---|
634 | * Start the tasktracker. |
---|
635 | */ |
---|
636 | public void startTaskTracker(String host, String rack, int idx, int numDir) |
---|
637 | throws IOException { |
---|
638 | if (rack != null) { |
---|
639 | StaticMapping.addNodeToRack(host, rack); |
---|
640 | } |
---|
641 | if (host != null) { |
---|
642 | NetUtils.addStaticResolution(host, "localhost"); |
---|
643 | } |
---|
644 | TaskTrackerRunner taskTracker; |
---|
645 | taskTracker = new TaskTrackerRunner(idx, numDir, host, conf); |
---|
646 | |
---|
647 | Thread taskTrackerThread = new Thread(taskTracker); |
---|
648 | taskTrackerList.add(taskTracker); |
---|
649 | taskTrackerThreadList.add(taskTrackerThread); |
---|
650 | taskTrackerThread.start(); |
---|
651 | ++numTaskTrackers; |
---|
652 | } |
---|
653 | |
---|
654 | /** |
---|
655 | * Get the tasktrackerID in MiniMRCluster with given trackerName. |
---|
656 | */ |
---|
657 | int getTaskTrackerID(String trackerName) { |
---|
658 | for (int id=0; id < numTaskTrackers; id++) { |
---|
659 | if (taskTrackerList.get(id).getTaskTracker().getName().equals( |
---|
660 | trackerName)) { |
---|
661 | return id; |
---|
662 | } |
---|
663 | } |
---|
664 | return -1; |
---|
665 | } |
---|
666 | |
---|
667 | /** |
---|
668 | * Shut down the servers. |
---|
669 | */ |
---|
670 | public void shutdown() { |
---|
671 | try { |
---|
672 | waitTaskTrackers(); |
---|
673 | for (int idx = 0; idx < numTaskTrackers; idx++) { |
---|
674 | TaskTrackerRunner taskTracker = taskTrackerList.get(idx); |
---|
675 | Thread taskTrackerThread = taskTrackerThreadList.get(idx); |
---|
676 | taskTracker.shutdown(); |
---|
677 | taskTrackerThread.interrupt(); |
---|
678 | try { |
---|
679 | taskTrackerThread.join(); |
---|
680 | } catch (InterruptedException ex) { |
---|
681 | LOG.error("Problem shutting down task tracker", ex); |
---|
682 | } |
---|
683 | } |
---|
684 | stopJobTracker(); |
---|
685 | } finally { |
---|
686 | File configDir = new File("build", "minimr"); |
---|
687 | File siteFile = new File(configDir, "mapred-site.xml"); |
---|
688 | siteFile.delete(); |
---|
689 | } |
---|
690 | } |
---|
691 | |
---|
692 | public static void main(String[] args) throws IOException { |
---|
693 | LOG.info("Bringing up Jobtracker and tasktrackers."); |
---|
694 | MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1); |
---|
695 | LOG.info("JobTracker and TaskTrackers are up."); |
---|
696 | mr.shutdown(); |
---|
697 | LOG.info("JobTracker and TaskTrackers brought down."); |
---|
698 | } |
---|
699 | } |
---|
700 | |
---|