#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. import os, re, time from hodlib.Common.threads import loop, func from hodlib.Common.threads import simpleCommand from hodlib.Common.util import get_exception_string, hadoopVersion class HadoopJobStatus: """This class represents the status of a single Hadoop job""" def __init__(self, jobId, status): self.__jobId = jobId self.__status = status def getJobId(self): return self.__jobId def getStatus(self): return self.__status class HadoopClientException(Exception): """This class represents an exception that is raised when we fail in running the job client.""" def __init__(self, errorCode): self.errorCode = errorCode class JobTrackerMonitor: """This class monitors the JobTracker of an allocated cluster periodically to detect whether it is idle. If it is found to be idle for more than a configured limit, it calls back registered handlers who can act upon the idle cluster.""" def __init__(self, log, idleJTHandler, interval, limit, hadoopDir, javaHome, servInfoProvider): self.__log = log self.__idlenessLimit = limit self.__idleJobTrackerHandler = idleJTHandler self.__hadoopDir = hadoopDir hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop") #hadoop directory can be from pkgs or a temp location like tarball. Verify once. if not os.path.exists(hadoopPath): raise Exception('Invalid Hadoop path specified: %s' % hadoopPath) self.__javaHome = javaHome # Note that when this object is created, we don't yet know the JT URL. # The service info provider will be polled until we get the URL. self.__serviceInfoProvider = servInfoProvider self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*") self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$") self.__firstIdleTime = 0 self.__hadoop15Version = { 'major' : '0', 'minor' : '15' } #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker. if not self.__isCompatibleHadoopVersion(self.__hadoop15Version): raise Exception('Incompatible Hadoop Version: Cannot check status') self.__stopFlag = False self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL) self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker, sleep=interval) self.__jobTrackerURL = None def start(self): """This method starts a thread that will determine the JobTracker URL""" self.__jtURLFinderThread.start() def stop(self): self.__log.debug('Joining the monitoring thread.') self.__stopFlag = True if self.__jtMonitorThread.isAlive(): self.__jtMonitorThread.join() self.__log.debug('Joined the monitoring thread.') def getJobTrackerURL(self): """This method periodically checks the service info provider for the JT URL""" self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred') while not self.__stopFlag and not self.__isValidJobTrackerURL(): time.sleep(10) if not self.__stopFlag: self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred') else: break if self.__isValidJobTrackerURL(): self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL) self.__jtMonitorThread.start() def monitorJobTracker(self): """This method is periodically called to monitor the JobTracker of the cluster.""" try: if self.__isIdle(): if self.__idleJobTrackerHandler: self.__log.info('Detected cluster as idle. Calling registered callback handler.') self.__idleJobTrackerHandler.handleIdleJobTracker() except: self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string()) def getJobsStatus(self): """This method should return the status of all jobs that are run on the HOD allocated hadoop cluster""" jobStatusList = [] try: hadoop16Version = { 'major' : '0', 'minor' : '16' } if self.__isCompatibleHadoopVersion(hadoop16Version): jtStatusCommand = self.__initStatusCommand(option='-list all') jtStatusCommand.start() jtStatusCommand.wait() jtStatusCommand.join() if jtStatusCommand.exit_code() == 0: for line in jtStatusCommand.output(): jobStatus = self.__extractJobStatus(line) if jobStatus is not None: jobStatusList.append(jobStatus) except: self.__log.debug('Exception while getting job statuses. %s' % get_exception_string()) return jobStatusList def __isValidJobTrackerURL(self): """This method checks that the passed in URL is not one of the special case strings returned by the getServiceAddr API""" return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') \ and (not self.__jobTrackerURL.startswith('Error'))) def __extractJobStatus(self, line): """This method parses an output line from the job status command and creates the JobStatus object if there is a match""" jobStatus = None line = line.strip() jsMatch = self.__jobStatusRegExp.match(line) if jsMatch: jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2))) return jobStatus def __isIdle(self): """This method checks if the JobTracker is idle beyond a certain limit.""" jobCount = 0 err = False try: jobCount = self.__getJobCount() except HadoopClientException, hce: self.__log.debug('HadoopClientException handled in getting job count. \ Error code: %s' % hce.errorCode) err = True if (jobCount==0) or err: if self.__firstIdleTime == 0: #detecting idleness for the first time self.__firstIdleTime = time.time() else: if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit): self.__log.info('Idleness limit crossed for cluster') return True else: # reset idleness time self.__firstIdleTime = 0 return False def __getJobCount(self): """This method executes the hadoop job -list command and parses the output to detect the number of running jobs.""" # We assume here that the poll interval is small enough to detect running jobs. # If jobs start and stop within the poll interval, the cluster would be incorrectly # treated as idle. Hadoop 2266 will provide a better mechanism than this. jobs = -1 jtStatusCommand = self.__initStatusCommand() jtStatusCommand.start() jtStatusCommand.wait() jtStatusCommand.join() if jtStatusCommand.exit_code() == 0: for line in jtStatusCommand.output(): match = self.__jobCountRegExp.match(line) if match: jobs = int(match.group(1)) elif jtStatusCommand.exit_code() == 1: # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets # to differentiate and give more granular exit codes, we can check for those errors # corresponding to network errors etc. raise HadoopClientException(jtStatusCommand.exit_code()) return jobs def __isCompatibleHadoopVersion(self, expectedVersion): """This method determines whether the version of hadoop being used is one that is higher than the expectedVersion. This can be used for checking if a particular feature is available or not""" ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log) ret = False if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \ and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])): ret = True return ret def __initStatusCommand(self, option="-list"): """This method initializes the command to run to check the JT status""" cmd = None hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop') cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL) cmdStr = "%s %s" % (cmdStr, option) self.__log.debug('cmd str %s' % cmdStr) env = os.environ env['JAVA_HOME'] = self.__javaHome cmd = simpleCommand('HadoopStatus', cmdStr, env) return cmd