source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 20.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.File;
21import java.io.IOException;
22import java.text.SimpleDateFormat;
23import java.util.ArrayList;
24import java.util.Date;
25import java.util.Iterator;
26import java.util.List;
27
28import org.apache.commons.logging.Log;
29import org.apache.commons.logging.LogFactory;
30import org.apache.hadoop.fs.FileSystem;
31import org.apache.hadoop.net.DNSToSwitchMapping;
32import org.apache.hadoop.net.NetUtils;
33import org.apache.hadoop.net.NetworkTopology;
34import org.apache.hadoop.net.StaticMapping;
35import org.apache.hadoop.security.UnixUserGroupInformation;
36
37/**
38 * This class creates a single-process Map-Reduce cluster for junit testing.
39 * One thread is created for each server.
40 */
41public class MiniMRCluster {
42  private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
43   
44  private Thread jobTrackerThread;
45  private JobTrackerRunner jobTracker;
46   
47  private int jobTrackerPort = 0;
48  private int taskTrackerPort = 0;
49  private int jobTrackerInfoPort = 0;
50  private int numTaskTrackers;
51   
52  private List<TaskTrackerRunner> taskTrackerList = new ArrayList<TaskTrackerRunner>();
53  private List<Thread> taskTrackerThreadList = new ArrayList<Thread>();
54   
55  private String namenode;
56  private UnixUserGroupInformation ugi = null;
57  private JobConf conf;
58   
59  private JobConf job;
60 
61  /**
62   * An inner class that runs a job tracker.
63   */
64  class JobTrackerRunner implements Runnable {
65    private JobTracker tracker = null;
66    private volatile boolean isActive = true;
67   
68    JobConf jc = null;
69       
70    public JobTrackerRunner(JobConf conf) {
71      jc = conf;
72    }
73
74    public boolean isUp() {
75      return (tracker != null);
76    }
77       
78    public boolean isActive() {
79      return isActive;
80    }
81
82    public int getJobTrackerPort() {
83      return tracker.getTrackerPort();
84    }
85
86    public int getJobTrackerInfoPort() {
87      return tracker.getInfoPort();
88    }
89 
90    public JobTracker getJobTracker() {
91      return tracker;
92    }
93   
94    /**
95     * Create the job tracker and run it.
96     */
97    public void run() {
98      try {
99        jc = (jc == null) ? createJobConf() : createJobConf(jc);
100        File f = new File("build/test/mapred/local").getAbsoluteFile();
101        jc.set("mapred.local.dir",f.getAbsolutePath());
102        jc.setClass("topology.node.switch.mapping.impl", 
103            StaticMapping.class, DNSToSwitchMapping.class);
104        String id = 
105          new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
106        tracker = JobTracker.startTracker(jc, id);
107        tracker.offerService();
108      } catch (Throwable e) {
109        LOG.error("Job tracker crashed", e);
110        isActive = false;
111      }
112    }
113       
114    /**
115     * Shutdown the job tracker and wait for it to finish.
116     */
117    public void shutdown() {
118      try {
119        if (tracker != null) {
120          tracker.stopTracker();
121        }
122      } catch (Throwable e) {
123        LOG.error("Problem shutting down job tracker", e);
124      }
125      isActive = false;
126    }
127  }
128   
129  /**
130   * An inner class to run the task tracker.
131   */
132  class TaskTrackerRunner implements Runnable {
133    volatile TaskTracker tt;
134    int trackerId;
135    // the localDirs for this taskTracker
136    String[] localDirs;
137    volatile boolean isInitialized = false;
138    volatile boolean isDead = false;
139    int numDir;
140
141    TaskTrackerRunner(int trackerId, int numDir, String hostname, 
142                                    JobConf cfg) 
143    throws IOException {
144      this.trackerId = trackerId;
145      this.numDir = numDir;
146      localDirs = new String[numDir];
147      JobConf conf = null;
148      if (cfg == null) {
149        conf = createJobConf();
150      } else {
151        conf = createJobConf(cfg);
152      }
153      if (hostname != null) {
154        conf.set("slave.host.name", hostname);
155      }
156      conf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
157      conf.set("mapred.task.tracker.report.address", 
158                "127.0.0.1:" + taskTrackerPort);
159      File localDirBase = 
160        new File(conf.get("mapred.local.dir")).getAbsoluteFile();
161      localDirBase.mkdirs();
162      StringBuffer localPath = new StringBuffer();
163      for(int i=0; i < numDir; ++i) {
164        File ttDir = new File(localDirBase, 
165                              Integer.toString(trackerId) + "_" + 0);
166        if (!ttDir.mkdirs()) {
167          if (!ttDir.isDirectory()) {
168            throw new IOException("Mkdirs failed to create " + ttDir);
169          }
170        }
171        localDirs[i] = ttDir.toString();
172        if (i != 0) {
173          localPath.append(",");
174        }
175        localPath.append(localDirs[i]);
176      }
177      conf.set("mapred.local.dir", localPath.toString());
178      LOG.info("mapred.local.dir is " +  localPath);
179      try {
180        tt = new TaskTracker(conf);
181        isInitialized = true;
182      } catch (Throwable e) {
183        isDead = true;
184        tt = null;
185        LOG.error("task tracker " + trackerId + " crashed", e);
186      }
187    }
188       
189    /**
190     * Create and run the task tracker.
191     */
192    public void run() {
193      try {
194        if (tt != null) {
195          tt.run();
196        }
197      } catch (Throwable e) {
198        isDead = true;
199        tt = null;
200        LOG.error("task tracker " + trackerId + " crashed", e);
201      }
202    }
203       
204    /**
205     * Get the local dir for this TaskTracker.
206     * This is there so that we do not break
207     * previous tests.
208     * @return the absolute pathname
209     */
210    public String getLocalDir() {
211      return localDirs[0];
212    }
213       
214    public String[] getLocalDirs(){
215      return localDirs;
216    } 
217   
218    public TaskTracker getTaskTracker() {
219      return tt;
220    }
221   
222    /**
223     * Shut down the server and wait for it to finish.
224     */
225    public void shutdown() {
226      if (tt != null) {
227        try {
228          tt.shutdown();
229        } catch (Throwable e) {
230          LOG.error("task tracker " + trackerId + " could not shut down",
231                    e);
232        }
233      }
234    }
235  }
236   
237  /**
238   * Get the local directory for the Nth task tracker
239   * @param taskTracker the index of the task tracker to check
240   * @return the absolute pathname of the local dir
241   */
242  public String getTaskTrackerLocalDir(int taskTracker) {
243    return (taskTrackerList.get(taskTracker)).getLocalDir();
244  }
245
246  public JobTrackerRunner getJobTrackerRunner() {
247    return jobTracker;
248  }
249 
250  TaskTrackerRunner getTaskTrackerRunner(int id) {
251    return taskTrackerList.get(id);
252  }
253  /**
254   * Get the number of task trackers in the cluster
255   */
256  public int getNumTaskTrackers() {
257    return taskTrackerList.size();
258  }
259   
260  /**
261   * Wait until the system is idle.
262   */
263  public void waitUntilIdle() {
264    waitTaskTrackers();
265   
266    JobClient client;
267    try {
268      client = new JobClient(job);
269      while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) {
270        for(TaskTrackerRunner runner : taskTrackerList) {
271          if(runner.isDead) {
272            throw new RuntimeException("TaskTracker is dead");
273          }
274        }
275        Thread.sleep(1000);
276      }
277    }
278    catch (IOException ex) {
279      throw new RuntimeException(ex);
280    }
281    catch (InterruptedException ex) {
282      throw new RuntimeException(ex);
283    }
284   
285  }
286
287  private void waitTaskTrackers() {
288    for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();) {
289      TaskTrackerRunner runner = itr.next();
290      while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
291        if (!runner.isInitialized) {
292          LOG.info("Waiting for task tracker to start.");
293        } else {
294          LOG.info("Waiting for task tracker " + runner.tt.getName() +
295                   " to be idle.");
296        }
297        try {
298          Thread.sleep(1000);
299        } catch (InterruptedException ie) {}
300      }
301    }
302  }
303 
304  /**
305   * Get the actual rpc port used.
306   */
307  public int getJobTrackerPort() {
308    return jobTrackerPort;
309  }
310
311  public JobConf createJobConf() {
312    return createJobConf(new JobConf());
313  }
314
315  public JobConf createJobConf(JobConf conf) {
316    if(conf == null) {
317      conf = new JobConf();
318    }
319    return configureJobConf(conf, namenode, jobTrackerPort, jobTrackerInfoPort, 
320                            ugi);
321  }
322 
323  static JobConf configureJobConf(JobConf conf, String namenode, 
324                                  int jobTrackerPort, int jobTrackerInfoPort, 
325                                  UnixUserGroupInformation ugi) {
326    JobConf result = new JobConf(conf);
327    FileSystem.setDefaultUri(result, namenode);
328    result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
329    result.set("mapred.job.tracker.http.address", 
330                        "127.0.0.1:" + jobTrackerInfoPort);
331    if (ugi != null) {
332      result.set("mapred.system.dir", "/mapred/system");
333      UnixUserGroupInformation.saveToConf(result,
334          UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
335    }
336    // for debugging have all task output sent to the test output
337    JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
338    return result;
339  }
340
341  /**
342   * Create the config and the cluster.
343   * @param numTaskTrackers no. of tasktrackers in the cluster
344   * @param namenode the namenode
345   * @param numDir no. of directories
346   * @throws IOException
347   */
348  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, 
349      String[] racks, String[] hosts) throws IOException {
350    this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts);
351  }
352 
353  /**
354   * Create the config and the cluster.
355   * @param numTaskTrackers no. of tasktrackers in the cluster
356   * @param namenode the namenode
357   * @param numDir no. of directories
358   * @param racks Array of racks
359   * @param hosts Array of hosts in the corresponding racks
360   * @param conf Default conf for the jobtracker
361   * @throws IOException
362   */
363  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, 
364                       String[] racks, String[] hosts, JobConf conf) 
365  throws IOException {
366    this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
367  }
368
369  /**
370   * Create the config and the cluster.
371   * @param numTaskTrackers no. of tasktrackers in the cluster
372   * @param namenode the namenode
373   * @param numDir no. of directories
374   * @throws IOException
375   */
376  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) 
377    throws IOException {
378    this(0, 0, numTaskTrackers, namenode, numDir);
379  }
380   
381  public MiniMRCluster(int jobTrackerPort,
382      int taskTrackerPort,
383      int numTaskTrackers,
384      String namenode,
385      int numDir)
386  throws IOException {
387    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
388         numDir, null);
389  }
390 
391  public MiniMRCluster(int jobTrackerPort,
392      int taskTrackerPort,
393      int numTaskTrackers,
394      String namenode,
395      int numDir,
396      String[] racks) throws IOException {
397    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
398         numDir, racks, null);
399  }
400 
401  public MiniMRCluster(int jobTrackerPort,
402                       int taskTrackerPort,
403                       int numTaskTrackers,
404                       String namenode,
405                       int numDir,
406                       String[] racks, String[] hosts) throws IOException {
407    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
408         numDir, racks, hosts, null);
409  }
410
411  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
412      int numTaskTrackers, String namenode, 
413      int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi
414      ) throws IOException {
415    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
416         numDir, racks, hosts, ugi, null);
417  }
418
419  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
420      int numTaskTrackers, String namenode, 
421      int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
422      JobConf conf) throws IOException {
423    if (racks != null && racks.length < numTaskTrackers) {
424      LOG.error("Invalid number of racks specified. It should be at least " +
425          "equal to the number of tasktrackers");
426      shutdown();
427    }
428    if (hosts != null && numTaskTrackers > hosts.length ) {
429      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
430          + "] is less than the number of tasktrackers [" + numTaskTrackers + "].");
431    }
432     
433     //Generate rack names if required
434     if (racks == null) {
435       System.out.println("Generating rack names for tasktrackers");
436       racks = new String[numTaskTrackers];
437       for (int i=0; i < racks.length; ++i) {
438         racks[i] = NetworkTopology.DEFAULT_RACK;
439       }
440     }
441     
442    //Generate some hostnames if required
443    if (hosts == null) {
444      System.out.println("Generating host names for tasktrackers");
445      hosts = new String[numTaskTrackers];
446      for (int i = 0; i < numTaskTrackers; i++) {
447        hosts[i] = "host" + i + ".foo.com";
448      }
449    }
450    this.jobTrackerPort = jobTrackerPort;
451    this.taskTrackerPort = taskTrackerPort;
452    this.jobTrackerInfoPort = 0;
453    this.numTaskTrackers = 0;
454    this.namenode = namenode;
455    this.ugi = ugi;
456    this.conf = conf; // this is the conf the mr starts with
457
458    // start the jobtracker
459    startJobTracker();
460
461    // Create the TaskTrackers
462    for (int idx = 0; idx < numTaskTrackers; idx++) {
463      String rack = null;
464      String host = null;
465      if (racks != null) {
466        rack = racks[idx];
467      }
468      if (hosts != null) {
469        host = hosts[idx];
470      }
471     
472      startTaskTracker(host, rack, idx, numDir);
473    }
474
475    this.job = createJobConf(conf);
476    waitUntilIdle();
477  }
478   
479  /**
480   * Get the task completion events
481   */
482  public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, 
483                                                          int max) 
484  throws IOException {
485    return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);
486  }
487
488  /**
489   * Change the job's priority
490   */
491  public void setJobPriority(JobID jobId, JobPriority priority) {
492    jobTracker.getJobTracker().setJobPriority(jobId, priority);
493  }
494
495  /**
496   * Get the job's priority
497   */
498  public JobPriority getJobPriority(JobID jobId) {
499    return jobTracker.getJobTracker().getJob(jobId).getPriority();
500  }
501
502  /**
503   * Get the job finish time
504   */
505  public long getJobFinishTime(JobID jobId) {
506    return jobTracker.getJobTracker().getJob(jobId).getFinishTime();
507  }
508
509  /**
510   * Init the job
511   */
512  public void initializeJob(JobID jobId) throws IOException {
513    JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
514    jobTracker.getJobTracker().initJob(job);
515  }
516 
517  /**
518   * Get the events list at the tasktracker
519   */
520  public MapTaskCompletionEventsUpdate
521         getMapTaskCompletionEventsUpdates(int index, JobID jobId, int max) 
522  throws IOException {
523    String jtId = jobTracker.getJobTracker().getTrackerIdentifier();
524    TaskAttemptID dummy = 
525      new TaskAttemptID(jtId, jobId.getId(), false, 0, 0);
526    return taskTrackerList.get(index).getTaskTracker()
527                                     .getMapCompletionEvents(jobId, 0, max, 
528                                                             dummy);
529  }
530 
531  /**
532   * Get jobtracker conf
533   */
534  public JobConf getJobTrackerConf() {
535    return this.conf;
536  }
537 
538  /**
539   * Get num events recovered
540   */
541  public int getNumEventsRecovered() {
542    return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered();
543  }
544
545  public int getFaultCount(String hostName) {
546    return jobTracker.getJobTracker().getFaultCount(hostName);
547  }
548 
549  /**
550   * Start the jobtracker.
551   */
552  public void startJobTracker() {
553    startJobTracker(true);
554  }
555 
556  void startJobTracker(boolean wait) {
557    //  Create the JobTracker
558    jobTracker = new JobTrackerRunner(conf);
559    jobTrackerThread = new Thread(jobTracker);
560       
561    jobTrackerThread.start();
562   
563    if (!wait) {
564        return;
565    }
566   
567    while (jobTracker.isActive() && !jobTracker.isUp()) {
568      try {                                     // let daemons get started
569        Thread.sleep(1000);
570      } catch(InterruptedException e) {
571      }
572    }
573       
574    // is the jobtracker has started then wait for it to init
575    ClusterStatus status = null;
576    if (jobTracker.isUp()) {
577      status = jobTracker.getJobTracker().getClusterStatus(false);
578      while (jobTracker.isActive() && status.getJobTrackerState() 
579             == JobTracker.State.INITIALIZING) {
580        try {
581          LOG.info("JobTracker still initializing. Waiting.");
582          Thread.sleep(1000);
583        } catch(InterruptedException e) {}
584        status = jobTracker.getJobTracker().getClusterStatus(false);
585      }
586    }
587
588    if (!jobTracker.isActive()) {
589      // return if jobtracker has crashed
590      return;
591    }
592 
593    // Set the configuration for the task-trackers
594    this.jobTrackerPort = jobTracker.getJobTrackerPort();
595    this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
596  }
597
598  /**
599   * Kill the jobtracker.
600   */
601  public void stopJobTracker() {
602    //jobTracker.exit(-1);
603    jobTracker.shutdown();
604
605    jobTrackerThread.interrupt();
606    try {
607      jobTrackerThread.join();
608    } catch (InterruptedException ex) {
609      LOG.error("Problem waiting for job tracker to finish", ex);
610    }
611  }
612
613  /**
614   * Kill the tasktracker.
615   */
616  public void stopTaskTracker(int id) {
617    TaskTrackerRunner tracker = taskTrackerList.remove(id);
618    tracker.shutdown();
619
620    Thread thread = taskTrackerThreadList.remove(id);
621    thread.interrupt();
622   
623    try {
624      thread.join();
625      // This will break the wait until idle loop
626      tracker.isDead = true;
627      --numTaskTrackers;
628    } catch (InterruptedException ex) {
629      LOG.error("Problem waiting for task tracker to finish", ex);
630    }
631  }
632 
633  /**
634   * Start the tasktracker.
635   */
636  public void startTaskTracker(String host, String rack, int idx, int numDir) 
637  throws IOException {
638    if (rack != null) {
639      StaticMapping.addNodeToRack(host, rack);
640    }
641    if (host != null) {
642      NetUtils.addStaticResolution(host, "localhost");
643    }
644    TaskTrackerRunner taskTracker;
645    taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
646   
647    Thread taskTrackerThread = new Thread(taskTracker);
648    taskTrackerList.add(taskTracker);
649    taskTrackerThreadList.add(taskTrackerThread);
650    taskTrackerThread.start();
651    ++numTaskTrackers;
652  }
653 
654  /**
655   * Get the tasktrackerID in MiniMRCluster with given trackerName.
656   */
657  int getTaskTrackerID(String trackerName) {
658    for (int id=0; id < numTaskTrackers; id++) {
659      if (taskTrackerList.get(id).getTaskTracker().getName().equals(
660          trackerName)) {
661        return id;
662      }
663    }
664    return -1;
665  }
666 
667  /**
668   * Shut down the servers.
669   */
670  public void shutdown() {
671    try {
672      waitTaskTrackers();
673      for (int idx = 0; idx < numTaskTrackers; idx++) {
674        TaskTrackerRunner taskTracker = taskTrackerList.get(idx);
675        Thread taskTrackerThread = taskTrackerThreadList.get(idx);
676        taskTracker.shutdown();
677        taskTrackerThread.interrupt();
678        try {
679          taskTrackerThread.join();
680        } catch (InterruptedException ex) {
681          LOG.error("Problem shutting down task tracker", ex);
682        }
683      }
684      stopJobTracker();
685    } finally {
686      File configDir = new File("build", "minimr");
687      File siteFile = new File(configDir, "mapred-site.xml");
688      siteFile.delete();
689    }
690  }
691   
692  public static void main(String[] args) throws IOException {
693    LOG.info("Bringing up Jobtracker and tasktrackers.");
694    MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1);
695    LOG.info("JobTracker and TaskTrackers are up.");
696    mr.shutdown();
697    LOG.info("JobTracker and TaskTrackers brought down.");
698  }
699}
700
Note: See TracBrowser for help on using the repository browser.