1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
2 | #or more contributor license agreements. See the NOTICE file |
---|
3 | #distributed with this work for additional information |
---|
4 | #regarding copyright ownership. The ASF licenses this file |
---|
5 | #to you under the Apache License, Version 2.0 (the |
---|
6 | #"License"); you may not use this file except in compliance |
---|
7 | #with the License. You may obtain a copy of the License at |
---|
8 | |
---|
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | |
---|
11 | #Unless required by applicable law or agreed to in writing, software |
---|
12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
14 | #See the License for the specific language governing permissions and |
---|
15 | #limitations under the License. |
---|
16 | import os, re, time |
---|
17 | from hodlib.Common.threads import loop, func |
---|
18 | from hodlib.Common.threads import simpleCommand |
---|
19 | from hodlib.Common.util import get_exception_string, hadoopVersion |
---|
20 | |
---|
21 | class HadoopJobStatus: |
---|
22 | """This class represents the status of a single Hadoop job""" |
---|
23 | |
---|
24 | def __init__(self, jobId, status): |
---|
25 | self.__jobId = jobId |
---|
26 | self.__status = status |
---|
27 | |
---|
28 | def getJobId(self): |
---|
29 | return self.__jobId |
---|
30 | |
---|
31 | def getStatus(self): |
---|
32 | return self.__status |
---|
33 | |
---|
34 | class HadoopClientException(Exception): |
---|
35 | """This class represents an exception that is raised when we fail in |
---|
36 | running the job client.""" |
---|
37 | |
---|
38 | def __init__(self, errorCode): |
---|
39 | self.errorCode = errorCode |
---|
40 | |
---|
41 | class JobTrackerMonitor: |
---|
42 | """This class monitors the JobTracker of an allocated cluster |
---|
43 | periodically to detect whether it is idle. If it is found |
---|
44 | to be idle for more than a configured limit, it calls back |
---|
45 | registered handlers who can act upon the idle cluster.""" |
---|
46 | |
---|
47 | def __init__(self, log, idleJTHandler, interval, limit, |
---|
48 | hadoopDir, javaHome, servInfoProvider): |
---|
49 | self.__log = log |
---|
50 | self.__idlenessLimit = limit |
---|
51 | self.__idleJobTrackerHandler = idleJTHandler |
---|
52 | self.__hadoopDir = hadoopDir |
---|
53 | hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop") |
---|
54 | #hadoop directory can be from pkgs or a temp location like tarball. Verify once. |
---|
55 | if not os.path.exists(hadoopPath): |
---|
56 | raise Exception('Invalid Hadoop path specified: %s' % hadoopPath) |
---|
57 | self.__javaHome = javaHome |
---|
58 | # Note that when this object is created, we don't yet know the JT URL. |
---|
59 | # The service info provider will be polled until we get the URL. |
---|
60 | self.__serviceInfoProvider = servInfoProvider |
---|
61 | self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*") |
---|
62 | self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$") |
---|
63 | self.__firstIdleTime = 0 |
---|
64 | self.__hadoop15Version = { 'major' : '0', 'minor' : '15' } |
---|
65 | #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker. |
---|
66 | if not self.__isCompatibleHadoopVersion(self.__hadoop15Version): |
---|
67 | raise Exception('Incompatible Hadoop Version: Cannot check status') |
---|
68 | self.__stopFlag = False |
---|
69 | self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL) |
---|
70 | self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker, |
---|
71 | sleep=interval) |
---|
72 | self.__jobTrackerURL = None |
---|
73 | |
---|
74 | def start(self): |
---|
75 | """This method starts a thread that will determine the JobTracker URL""" |
---|
76 | self.__jtURLFinderThread.start() |
---|
77 | |
---|
78 | def stop(self): |
---|
79 | self.__log.debug('Joining the monitoring thread.') |
---|
80 | self.__stopFlag = True |
---|
81 | if self.__jtMonitorThread.isAlive(): |
---|
82 | self.__jtMonitorThread.join() |
---|
83 | self.__log.debug('Joined the monitoring thread.') |
---|
84 | |
---|
85 | def getJobTrackerURL(self): |
---|
86 | """This method periodically checks the service info provider for the JT URL""" |
---|
87 | self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred') |
---|
88 | while not self.__stopFlag and not self.__isValidJobTrackerURL(): |
---|
89 | time.sleep(10) |
---|
90 | if not self.__stopFlag: |
---|
91 | self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred') |
---|
92 | else: |
---|
93 | break |
---|
94 | |
---|
95 | if self.__isValidJobTrackerURL(): |
---|
96 | self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL) |
---|
97 | self.__jtMonitorThread.start() |
---|
98 | |
---|
99 | def monitorJobTracker(self): |
---|
100 | """This method is periodically called to monitor the JobTracker of the cluster.""" |
---|
101 | try: |
---|
102 | if self.__isIdle(): |
---|
103 | if self.__idleJobTrackerHandler: |
---|
104 | self.__log.info('Detected cluster as idle. Calling registered callback handler.') |
---|
105 | self.__idleJobTrackerHandler.handleIdleJobTracker() |
---|
106 | except: |
---|
107 | self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string()) |
---|
108 | |
---|
109 | def getJobsStatus(self): |
---|
110 | """This method should return the status of all jobs that are run on the HOD allocated |
---|
111 | hadoop cluster""" |
---|
112 | jobStatusList = [] |
---|
113 | try: |
---|
114 | hadoop16Version = { 'major' : '0', 'minor' : '16' } |
---|
115 | if self.__isCompatibleHadoopVersion(hadoop16Version): |
---|
116 | jtStatusCommand = self.__initStatusCommand(option='-list all') |
---|
117 | jtStatusCommand.start() |
---|
118 | jtStatusCommand.wait() |
---|
119 | jtStatusCommand.join() |
---|
120 | if jtStatusCommand.exit_code() == 0: |
---|
121 | for line in jtStatusCommand.output(): |
---|
122 | jobStatus = self.__extractJobStatus(line) |
---|
123 | if jobStatus is not None: |
---|
124 | jobStatusList.append(jobStatus) |
---|
125 | except: |
---|
126 | self.__log.debug('Exception while getting job statuses. %s' % get_exception_string()) |
---|
127 | return jobStatusList |
---|
128 | |
---|
129 | def __isValidJobTrackerURL(self): |
---|
130 | """This method checks that the passed in URL is not one of the special case strings |
---|
131 | returned by the getServiceAddr API""" |
---|
132 | return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') \ |
---|
133 | and (not self.__jobTrackerURL.startswith('Error'))) |
---|
134 | |
---|
135 | def __extractJobStatus(self, line): |
---|
136 | """This method parses an output line from the job status command and creates |
---|
137 | the JobStatus object if there is a match""" |
---|
138 | jobStatus = None |
---|
139 | line = line.strip() |
---|
140 | jsMatch = self.__jobStatusRegExp.match(line) |
---|
141 | if jsMatch: |
---|
142 | jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2))) |
---|
143 | return jobStatus |
---|
144 | |
---|
145 | def __isIdle(self): |
---|
146 | """This method checks if the JobTracker is idle beyond a certain limit.""" |
---|
147 | jobCount = 0 |
---|
148 | err = False |
---|
149 | |
---|
150 | try: |
---|
151 | jobCount = self.__getJobCount() |
---|
152 | except HadoopClientException, hce: |
---|
153 | self.__log.debug('HadoopClientException handled in getting job count. \ |
---|
154 | Error code: %s' % hce.errorCode) |
---|
155 | err = True |
---|
156 | |
---|
157 | if (jobCount==0) or err: |
---|
158 | if self.__firstIdleTime == 0: |
---|
159 | #detecting idleness for the first time |
---|
160 | self.__firstIdleTime = time.time() |
---|
161 | else: |
---|
162 | if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit): |
---|
163 | self.__log.info('Idleness limit crossed for cluster') |
---|
164 | return True |
---|
165 | else: |
---|
166 | # reset idleness time |
---|
167 | self.__firstIdleTime = 0 |
---|
168 | |
---|
169 | return False |
---|
170 | |
---|
171 | def __getJobCount(self): |
---|
172 | """This method executes the hadoop job -list command and parses the output to detect |
---|
173 | the number of running jobs.""" |
---|
174 | |
---|
175 | # We assume here that the poll interval is small enough to detect running jobs. |
---|
176 | # If jobs start and stop within the poll interval, the cluster would be incorrectly |
---|
177 | # treated as idle. Hadoop 2266 will provide a better mechanism than this. |
---|
178 | jobs = -1 |
---|
179 | jtStatusCommand = self.__initStatusCommand() |
---|
180 | jtStatusCommand.start() |
---|
181 | jtStatusCommand.wait() |
---|
182 | jtStatusCommand.join() |
---|
183 | if jtStatusCommand.exit_code() == 0: |
---|
184 | for line in jtStatusCommand.output(): |
---|
185 | match = self.__jobCountRegExp.match(line) |
---|
186 | if match: |
---|
187 | jobs = int(match.group(1)) |
---|
188 | elif jtStatusCommand.exit_code() == 1: |
---|
189 | # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets |
---|
190 | # to differentiate and give more granular exit codes, we can check for those errors |
---|
191 | # corresponding to network errors etc. |
---|
192 | raise HadoopClientException(jtStatusCommand.exit_code()) |
---|
193 | return jobs |
---|
194 | |
---|
195 | def __isCompatibleHadoopVersion(self, expectedVersion): |
---|
196 | """This method determines whether the version of hadoop being used is one that |
---|
197 | is higher than the expectedVersion. |
---|
198 | This can be used for checking if a particular feature is available or not""" |
---|
199 | ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log) |
---|
200 | ret = False |
---|
201 | |
---|
202 | if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \ |
---|
203 | and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])): |
---|
204 | ret = True |
---|
205 | return ret |
---|
206 | |
---|
207 | def __initStatusCommand(self, option="-list"): |
---|
208 | """This method initializes the command to run to check the JT status""" |
---|
209 | cmd = None |
---|
210 | hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop') |
---|
211 | cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL) |
---|
212 | cmdStr = "%s %s" % (cmdStr, option) |
---|
213 | self.__log.debug('cmd str %s' % cmdStr) |
---|
214 | env = os.environ |
---|
215 | env['JAVA_HOME'] = self.__javaHome |
---|
216 | cmd = simpleCommand('HadoopStatus', cmdStr, env) |
---|
217 | return cmd |
---|
218 | |
---|