source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/RingMaster/idleJobTracker.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.9 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16import os, re, time
17from hodlib.Common.threads import loop, func
18from hodlib.Common.threads import simpleCommand
19from hodlib.Common.util import get_exception_string, hadoopVersion
20
21class HadoopJobStatus:
22  """This class represents the status of a single Hadoop job"""
23 
24  def __init__(self, jobId, status):
25    self.__jobId = jobId
26    self.__status = status
27
28  def getJobId(self):
29    return self.__jobId
30
31  def getStatus(self):
32    return self.__status
33
34class HadoopClientException(Exception):
35  """This class represents an exception that is raised when we fail in
36     running the job client."""
37 
38  def __init__(self, errorCode):
39    self.errorCode = errorCode
40 
41class JobTrackerMonitor:
42  """This class monitors the JobTracker of an allocated cluster
43     periodically to detect whether it is idle. If it is found
44     to be idle for more than a configured limit, it calls back
45     registered handlers who can act upon the idle cluster."""
46
47  def __init__(self, log, idleJTHandler, interval, limit,
48                      hadoopDir, javaHome, servInfoProvider):
49    self.__log = log
50    self.__idlenessLimit = limit
51    self.__idleJobTrackerHandler = idleJTHandler
52    self.__hadoopDir = hadoopDir
53    hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")
54    #hadoop directory can be from pkgs or a temp location like tarball. Verify once.
55    if not os.path.exists(hadoopPath):
56      raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)
57    self.__javaHome = javaHome
58    # Note that when this object is created, we don't yet know the JT URL.
59    # The service info provider will be polled until we get the URL.
60    self.__serviceInfoProvider = servInfoProvider
61    self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
62    self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")
63    self.__firstIdleTime = 0
64    self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
65    #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
66    if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
67      raise Exception('Incompatible Hadoop Version: Cannot check status')
68    self.__stopFlag = False
69    self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
70    self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,
71                                  sleep=interval)
72    self.__jobTrackerURL = None
73
74  def start(self):
75    """This method starts a thread that will determine the JobTracker URL"""
76    self.__jtURLFinderThread.start()
77
78  def stop(self):
79    self.__log.debug('Joining the monitoring thread.')
80    self.__stopFlag = True
81    if self.__jtMonitorThread.isAlive():
82      self.__jtMonitorThread.join()
83    self.__log.debug('Joined the monitoring thread.')
84
85  def getJobTrackerURL(self):
86    """This method periodically checks the service info provider for the JT URL"""
87    self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
88    while not self.__stopFlag and not self.__isValidJobTrackerURL():
89      time.sleep(10)
90      if not self.__stopFlag:
91        self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
92      else:
93        break
94
95    if self.__isValidJobTrackerURL():
96      self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
97      self.__jtMonitorThread.start()
98
99  def monitorJobTracker(self):
100    """This method is periodically called to monitor the JobTracker of the cluster."""
101    try:
102      if self.__isIdle():
103        if self.__idleJobTrackerHandler:
104          self.__log.info('Detected cluster as idle. Calling registered callback handler.')
105          self.__idleJobTrackerHandler.handleIdleJobTracker()
106    except:
107      self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
108
109  def getJobsStatus(self):
110    """This method should return the status of all jobs that are run on the HOD allocated
111       hadoop cluster"""
112    jobStatusList = []
113    try:
114      hadoop16Version = { 'major' : '0', 'minor' : '16' }
115      if self.__isCompatibleHadoopVersion(hadoop16Version):
116        jtStatusCommand = self.__initStatusCommand(option='-list all')
117        jtStatusCommand.start()
118        jtStatusCommand.wait()
119        jtStatusCommand.join()
120        if jtStatusCommand.exit_code() == 0:
121          for line in jtStatusCommand.output():
122            jobStatus = self.__extractJobStatus(line)
123            if jobStatus is not None:
124              jobStatusList.append(jobStatus)
125    except:
126      self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
127    return jobStatusList
128
129  def __isValidJobTrackerURL(self):
130    """This method checks that the passed in URL is not one of the special case strings
131       returned by the getServiceAddr API"""
132    return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') \
133              and (not self.__jobTrackerURL.startswith('Error')))
134
135  def __extractJobStatus(self, line):
136    """This method parses an output line from the job status command and creates
137       the JobStatus object if there is a match"""
138    jobStatus = None
139    line = line.strip()
140    jsMatch = self.__jobStatusRegExp.match(line)
141    if jsMatch:
142      jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
143    return jobStatus
144
145  def __isIdle(self):
146    """This method checks if the JobTracker is idle beyond a certain limit."""
147    jobCount = 0
148    err = False
149
150    try:
151      jobCount = self.__getJobCount()
152    except HadoopClientException, hce:
153      self.__log.debug('HadoopClientException handled in getting job count. \
154                                      Error code: %s' % hce.errorCode)
155      err = True
156
157    if (jobCount==0) or err:
158      if self.__firstIdleTime == 0:
159        #detecting idleness for the first time
160        self.__firstIdleTime = time.time()
161      else:
162        if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):
163          self.__log.info('Idleness limit crossed for cluster')
164          return True
165    else:
166      # reset idleness time
167      self.__firstIdleTime = 0
168     
169    return False
170
171  def __getJobCount(self):
172    """This method executes the hadoop job -list command and parses the output to detect
173       the number of running jobs."""
174
175    # We assume here that the poll interval is small enough to detect running jobs.
176    # If jobs start and stop within the poll interval, the cluster would be incorrectly
177    # treated as idle. Hadoop 2266 will provide a better mechanism than this.
178    jobs = -1
179    jtStatusCommand = self.__initStatusCommand()
180    jtStatusCommand.start()
181    jtStatusCommand.wait()
182    jtStatusCommand.join()
183    if jtStatusCommand.exit_code() == 0:
184      for line in jtStatusCommand.output():
185        match = self.__jobCountRegExp.match(line)
186        if match:
187          jobs = int(match.group(1))
188    elif jtStatusCommand.exit_code() == 1:
189      # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets
190      # to differentiate and give more granular exit codes, we can check for those errors
191      # corresponding to network errors etc.
192      raise HadoopClientException(jtStatusCommand.exit_code())
193    return jobs
194
195  def __isCompatibleHadoopVersion(self, expectedVersion):
196    """This method determines whether the version of hadoop being used is one that
197       is higher than the expectedVersion.
198       This can be used for checking if a particular feature is available or not"""
199    ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
200    ret = False
201 
202    if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \
203      and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
204      ret = True
205    return ret
206
207  def __initStatusCommand(self, option="-list"):
208    """This method initializes the command to run to check the JT status"""
209    cmd = None
210    hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
211    cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
212    cmdStr = "%s %s" % (cmdStr, option)
213    self.__log.debug('cmd str %s' % cmdStr)
214    env = os.environ
215    env['JAVA_HOME'] = self.__javaHome
216    cmd = simpleCommand('HadoopStatus', cmdStr, env)
217    return cmd
218   
Note: See TracBrowser for help on using the repository browser.