source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 15.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.util;
20
21import java.io.BufferedReader;
22import java.io.File;
23import java.io.FileNotFoundException;
24import java.io.FileReader;
25import java.io.IOException;
26import java.util.ArrayList;
27import java.util.List;
28import java.util.Map;
29import java.util.HashMap;
30import java.util.regex.Matcher;
31import java.util.regex.Pattern;
32import java.util.Arrays;
33import java.util.LinkedList;
34
35import org.apache.commons.logging.Log;
36import org.apache.commons.logging.LogFactory;
37
38import org.apache.hadoop.util.Shell.ExitCodeException;
39import org.apache.hadoop.util.Shell.ShellCommandExecutor;
40
41/**
42 * A Proc file-system based ProcessTree. Works only on Linux.
43 */
44public class ProcfsBasedProcessTree {
45
46  private static final Log LOG = LogFactory
47      .getLog("org.apache.hadoop.mapred.ProcfsBasedProcessTree");
48
49  private static final String PROCFS = "/proc/";
50  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
51  private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
52  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
53      .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
54
55  // to enable testing, using this variable which can be configured
56  // to a test directory.
57  private String procfsDir;
58 
59  private Integer pid = -1;
60
61  private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
62
63  public ProcfsBasedProcessTree(String pid) {
64    this(pid, PROCFS);
65  }
66
67  public ProcfsBasedProcessTree(String pid, String procfsDir) {
68    this.pid = getValidPID(pid);
69    this.procfsDir = procfsDir;
70  }
71 
72  public void setSigKillInterval(long interval) {
73    sleepTimeBeforeSigKill = interval;
74  }
75
76  /**
77   * Checks if the ProcfsBasedProcessTree is available on this system.
78   *
79   * @return true if ProcfsBasedProcessTree is available. False otherwise.
80   */
81  public static boolean isAvailable() {
82    try {
83      String osName = System.getProperty("os.name");
84      if (!osName.startsWith("Linux")) {
85        LOG.info("ProcfsBasedProcessTree currently is supported only on "
86            + "Linux.");
87        return false;
88      }
89    } catch (SecurityException se) {
90      LOG.warn("Failed to get Operating System name. " + se);
91      return false;
92    }
93    return true;
94  }
95
96  /**
97   * Get the process-tree with latest state. If the root-process is not alive,
98   * an empty tree will be returned.
99   *
100   * @return the process-tree with latest state.
101   */
102  public ProcfsBasedProcessTree getProcessTree() {
103    if (pid != -1) {
104      // Get the list of processes
105      List<Integer> processList = getProcessList();
106
107      Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
108     
109      // cache the processTree to get the age for processes
110      Map<Integer, ProcessInfo> oldProcs = 
111              new HashMap<Integer, ProcessInfo>(processTree);
112      processTree.clear();
113
114      ProcessInfo me = null;
115      for (Integer proc : processList) {
116        // Get information for each process
117        ProcessInfo pInfo = new ProcessInfo(proc);
118        if (constructProcessInfo(pInfo, procfsDir) != null) {
119          allProcessInfo.put(proc, pInfo);
120          if (proc.equals(this.pid)) {
121            me = pInfo; // cache 'me'
122            processTree.put(proc, pInfo);
123          }
124        }
125      }
126
127      if (me == null) {
128        return this; 
129      }
130
131      // Add each process to its parent.
132      for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
133        Integer pID = entry.getKey();
134        if (pID != 1) {
135          ProcessInfo pInfo = entry.getValue();
136          ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
137          if (parentPInfo != null) {
138            parentPInfo.addChild(pInfo);
139          }
140        }
141      }
142
143      // now start constructing the process-tree
144      LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
145      pInfoQueue.addAll(me.getChildren());
146      while (!pInfoQueue.isEmpty()) {
147        ProcessInfo pInfo = pInfoQueue.remove();
148        if (!processTree.containsKey(pInfo.getPid())) {
149          processTree.put(pInfo.getPid(), pInfo);
150        }
151        pInfoQueue.addAll(pInfo.getChildren());
152      }
153
154      // update age values.
155      for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
156        ProcessInfo oldInfo = oldProcs.get(procs.getKey());
157        if (oldInfo != null) {
158          if (procs.getValue() != null) {
159            procs.getValue().updateAge(oldInfo); 
160          }
161        }
162      }
163
164      if (LOG.isDebugEnabled()) {
165        // Log.debug the ProcfsBasedProcessTree
166        LOG.debug(this.toString());
167      }
168    }
169    return this;
170  }
171
172  /**
173   * Is the process-tree alive? Currently we care only about the status of the
174   * root-process.
175   *
176   * @return true if the process-true is alive, false otherwise.
177   */
178  public boolean isAlive() {
179    if (pid == -1) {
180      return false;
181    } else {
182      return this.isAlive(pid);
183    }
184  }
185
186  /**
187   * Destroy the process-tree. Currently we only make sure the root process is
188   * gone. It is the responsibility of the root process to make sure that all
189   * its descendants are cleaned up.
190   */
191  public void destroy() {
192    LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
193    if (pid == -1) {
194      return;
195    }
196    ShellCommandExecutor shexec = null;
197
198    if (isAlive(this.pid)) {
199      try {
200        String[] args = { "kill", this.pid.toString() };
201        shexec = new ShellCommandExecutor(args);
202        shexec.execute();
203      } catch (IOException ioe) {
204        LOG.warn("Error executing shell command " + ioe);
205      } finally {
206        LOG.info("Killing " + pid + " with SIGTERM. Exit code "
207            + shexec.getExitCode());
208      }
209    }
210
211    SigKillThread sigKillThread = new SigKillThread();
212    sigKillThread.setDaemon(true);
213    sigKillThread.start();
214  }
215
216  /**
217   * Get the cumulative virtual memory used by all the processes in the
218   * process-tree.
219   *
220   * @return cumulative virtual memory used by the process-tree in bytes.
221   */
222  public long getCumulativeVmem() {
223    // include all processes.. all processes will be older than 0.
224    return getCumulativeVmem(0);
225  }
226
227  /**
228   * Get the cumulative virtual memory used by all the processes in the
229   * process-tree that are older than the passed in age.
230   *
231   * @param olderThanAge processes above this age are included in the
232   *                      memory addition
233   * @return cumulative virtual memory used by the process-tree in bytes,
234   *          for processes older than this age.
235   */
236  public long getCumulativeVmem(int olderThanAge) {
237    long total = 0;
238    for (ProcessInfo p : processTree.values()) {
239      if ((p != null) && (p.getAge() > olderThanAge)) {
240        total += p.getVmem();
241      }
242    }
243    return total;
244  }
245
246  /**
247   * Get PID from a pid-file.
248   *
249   * @param pidFileName
250   *          Name of the pid-file.
251   * @return the PID string read from the pid-file. Returns null if the
252   *         pidFileName points to a non-existing file or if read fails from the
253   *         file.
254   */
255  public static String getPidFromPidFile(String pidFileName) {
256    BufferedReader pidFile = null;
257    FileReader fReader = null;
258    String pid = null;
259
260    try {
261      fReader = new FileReader(pidFileName);
262      pidFile = new BufferedReader(fReader);
263    } catch (FileNotFoundException f) {
264      LOG.debug("PidFile doesn't exist : " + pidFileName);
265      return pid;
266    }
267
268    try {
269      pid = pidFile.readLine();
270    } catch (IOException i) {
271      LOG.error("Failed to read from " + pidFileName);
272    } finally {
273      try {
274        if (fReader != null) {
275          fReader.close();
276        }
277        try {
278          if (pidFile != null) {
279            pidFile.close();
280          }
281        } catch (IOException i) {
282          LOG.warn("Error closing the stream " + pidFile);
283        }
284      } catch (IOException i) {
285        LOG.warn("Error closing the stream " + fReader);
286      }
287    }
288    return pid;
289  }
290
291  private Integer getValidPID(String pid) {
292    Integer retPid = -1;
293    try {
294      retPid = Integer.parseInt((String) pid);
295      if (retPid <= 0) {
296        retPid = -1;
297      }
298    } catch (NumberFormatException nfe) {
299      retPid = -1;
300    }
301    return retPid;
302  }
303
304  /**
305   * Get the list of all processes in the system.
306   */
307  private List<Integer> getProcessList() {
308    String[] processDirs = (new File(procfsDir)).list();
309    List<Integer> processList = new ArrayList<Integer>();
310
311    for (String dir : processDirs) {
312      try {
313        int pd = Integer.parseInt(dir);
314        if ((new File(procfsDir, dir)).isDirectory()) {
315          processList.add(Integer.valueOf(pd));
316        }
317      } catch (NumberFormatException n) {
318        // skip this directory
319      } catch (SecurityException s) {
320        // skip this process
321      }
322    }
323    return processList;
324  }
325
326  /**
327   *
328   * Construct the ProcessInfo using the process' PID and procfs and return the
329   * same. Returns null on failing to read from procfs,
330   */
331  private ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
332    return constructProcessInfo(pinfo, PROCFS);
333  }
334
335  /**
336   * Construct the ProcessInfo using the process' PID and procfs rooted at the
337   * specified directory and return the same. It is provided mainly to assist
338   * testing purposes.
339   *
340   * Returns null on failing to read from procfs,
341   *
342   * @param pinfo ProcessInfo that needs to be updated
343   * @param procfsDir root of the proc file system
344   * @return updated ProcessInfo, null on errors.
345   */
346  private ProcessInfo constructProcessInfo(ProcessInfo pinfo, 
347                                                    String procfsDir) {
348    ProcessInfo ret = null;
349    // Read "procfsDir/<pid>/stat" file
350    BufferedReader in = null;
351    FileReader fReader = null;
352    try {
353      File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
354      fReader = new FileReader(new File(pidDir, "/stat"));
355      in = new BufferedReader(fReader);
356    } catch (FileNotFoundException f) {
357      // The process vanished in the interim!
358      return ret;
359    }
360
361    ret = pinfo;
362    try {
363      String str = in.readLine(); // only one line
364      Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
365      boolean mat = m.find();
366      if (mat) {
367        // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
368        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
369            .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
370            .parseLong(m.group(7)));
371      }
372    } catch (IOException io) {
373      LOG.warn("Error reading the stream " + io);
374      ret = null;
375    } finally {
376      // Close the streams
377      try {
378        if (fReader != null) {
379          fReader.close();
380        }
381        try {
382          if (in != null) {
383            in.close();
384          }
385        } catch (IOException i) {
386          LOG.warn("Error closing the stream " + in);
387        }
388      } catch (IOException i) {
389        LOG.warn("Error closing the stream " + fReader);
390      }
391    }
392
393    return ret;
394  }
395 
396  /**
397   * Is the process with PID pid still alive?
398   */
399  private boolean isAlive(Integer pid) {
400    // This method assumes that isAlive is called on a pid that was alive not
401    // too long ago, and hence assumes no chance of pid-wrapping-around.
402    ShellCommandExecutor shexec = null;
403    try {
404      String[] args = { "kill", "-0", pid.toString() };
405      shexec = new ShellCommandExecutor(args);
406      shexec.execute();
407    } catch (ExitCodeException ee) {
408      return false;
409    } catch (IOException ioe) {
410      LOG.warn("Error executing shell command "
411          + Arrays.toString(shexec.getExecString()) + ioe);
412      return false;
413    }
414    return (shexec.getExitCode() == 0 ? true : false);
415  }
416
417  /**
418   * Helper thread class that kills process-tree with SIGKILL in background
419   */
420  private class SigKillThread extends Thread {
421
422    public void run() {
423      this.setName(this.getClass().getName() + "-" + String.valueOf(pid));
424      ShellCommandExecutor shexec = null;
425
426      try {
427        // Sleep for some time before sending SIGKILL
428        Thread.sleep(sleepTimeBeforeSigKill);
429      } catch (InterruptedException i) {
430        LOG.warn("Thread sleep is interrupted.");
431      }
432
433      // Kill the root process with SIGKILL if it is still alive
434      if (ProcfsBasedProcessTree.this.isAlive(pid)) {
435        try {
436          String[] args = { "kill", "-9", pid.toString() };
437          shexec = new ShellCommandExecutor(args);
438          shexec.execute();
439        } catch (IOException ioe) {
440          LOG.warn("Error executing shell command " + ioe);
441        } finally {
442          LOG.info("Killing " + pid + " with SIGKILL. Exit code "
443              + shexec.getExitCode());
444        }
445      }
446    }
447  }
448  /**
449   * Returns a string printing PIDs of process present in the
450   * ProcfsBasedProcessTree. Output format : [pid pid ..]
451   */
452  public String toString() {
453    StringBuffer pTree = new StringBuffer("[ ");
454    for (Integer p : processTree.keySet()) {
455      pTree.append(p);
456      pTree.append(" ");
457    }
458    return pTree.substring(0, pTree.length()) + "]";
459  }
460
461  /**
462   *
463   * Class containing information of a process.
464   *
465   */
466  private static class ProcessInfo {
467    private Integer pid; // process-id
468    private String name; // command name
469    private Integer pgrpId; // process group-id
470    private Integer ppid; // parent process-id
471    private Integer sessionId; // session-id
472    private Long vmem; // virtual memory usage
473    // how many times has this process been seen alive
474    private int age; 
475    private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
476
477    public ProcessInfo(int pid) {
478      this.pid = Integer.valueOf(pid);
479      // seeing this the first time.
480      this.age = 1;
481    }
482
483    public Integer getPid() {
484      return pid;
485    }
486
487    public String getName() {
488      return name;
489    }
490
491    public Integer getPgrpId() {
492      return pgrpId;
493    }
494
495    public Integer getPpid() {
496      return ppid;
497    }
498
499    public Integer getSessionId() {
500      return sessionId;
501    }
502
503    public Long getVmem() {
504      return vmem;
505    }
506
507    public int getAge() {
508      return age;
509    }
510   
511    public boolean isParent(ProcessInfo p) {
512      if (pid.equals(p.getPpid())) {
513        return true;
514      }
515      return false;
516    }
517
518    public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
519        Integer sessionId, Long vmem) {
520      this.name = name;
521      this.ppid = ppid;
522      this.pgrpId = pgrpId;
523      this.sessionId = sessionId;
524      this.vmem = vmem;
525    }
526
527    public void updateAge(ProcessInfo oldInfo) {
528      this.age = oldInfo.age + 1;
529    }
530   
531    public boolean addChild(ProcessInfo p) {
532      return children.add(p);
533    }
534
535    public List<ProcessInfo> getChildren() {
536      return children;
537    }
538  }
539}
Note: See TracBrowser for help on using the repository browser.