1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
2 | #or more contributor license agreements. See the NOTICE file |
---|
3 | #distributed with this work for additional information |
---|
4 | #regarding copyright ownership. The ASF licenses this file |
---|
5 | #to you under the Apache License, Version 2.0 (the |
---|
6 | #"License"); you may not use this file except in compliance |
---|
7 | #with the License. You may obtain a copy of the License at |
---|
8 | |
---|
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | |
---|
11 | #Unless required by applicable law or agreed to in writing, software |
---|
12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
14 | #See the License for the specific language governing permissions and |
---|
15 | #limitations under the License. |
---|
16 | #!/usr/bin/env python |
---|
17 | """hodring launches hadoop commands on work node and |
---|
18 | cleans up all the work dirs afterward |
---|
19 | """ |
---|
20 | # -*- python -*- |
---|
21 | import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom |
---|
22 | import socket, sets, urllib, csv, signal, pprint, random, re, httplib |
---|
23 | |
---|
24 | from xml.dom import getDOMImplementation |
---|
25 | from pprint import pformat |
---|
26 | from optparse import OptionParser |
---|
27 | from urlparse import urlparse |
---|
28 | from hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory, isProcessRunning |
---|
29 | from hodlib.Common.tcp import tcpSocket, tcpError |
---|
30 | |
---|
31 | binfile = sys.path[0] |
---|
32 | libdir = os.path.dirname(binfile) |
---|
33 | sys.path.append(libdir) |
---|
34 | |
---|
35 | import hodlib.Common.logger |
---|
36 | |
---|
37 | from hodlib.GridServices.service import * |
---|
38 | from hodlib.Common.util import * |
---|
39 | from hodlib.Common.socketServers import threadedHTTPServer |
---|
40 | from hodlib.Common.hodsvc import hodBaseService |
---|
41 | from hodlib.Common.threads import simpleCommand |
---|
42 | from hodlib.Common.xmlrpc import hodXRClient |
---|
43 | |
---|
44 | mswindows = (sys.platform == "win32") |
---|
45 | originalcwd = os.getcwd() |
---|
46 | |
---|
47 | reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)") |
---|
48 | |
---|
49 | class CommandDesc: |
---|
50 | """A class that represents the commands that |
---|
51 | are run by hodring""" |
---|
52 | def __init__(self, dict, log): |
---|
53 | self.log = log |
---|
54 | self.log.debug("In command desc") |
---|
55 | self.log.debug("Done in command desc") |
---|
56 | dict.setdefault('argv', []) |
---|
57 | dict.setdefault('version', None) |
---|
58 | dict.setdefault('envs', {}) |
---|
59 | dict.setdefault('workdirs', []) |
---|
60 | dict.setdefault('attrs', {}) |
---|
61 | dict.setdefault('final-attrs', {}) |
---|
62 | dict.setdefault('fg', False) |
---|
63 | dict.setdefault('ignorefailures', False) |
---|
64 | dict.setdefault('stdin', None) |
---|
65 | |
---|
66 | self.log.debug("Printing dict") |
---|
67 | self._checkRequired(dict) |
---|
68 | self.dict = dict |
---|
69 | |
---|
70 | def _checkRequired(self, dict): |
---|
71 | if 'name' not in dict: |
---|
72 | raise ValueError, "Command description lacks 'name'" |
---|
73 | if 'program' not in dict: |
---|
74 | raise ValueError, "Command description lacks 'program'" |
---|
75 | if 'pkgdirs' not in dict: |
---|
76 | raise ValueError, "Command description lacks 'pkgdirs'" |
---|
77 | |
---|
78 | def getName(self): |
---|
79 | return self.dict['name'] |
---|
80 | |
---|
81 | def getProgram(self): |
---|
82 | return self.dict['program'] |
---|
83 | |
---|
84 | def getArgv(self): |
---|
85 | return self.dict['argv'] |
---|
86 | |
---|
87 | def getVersion(self): |
---|
88 | return self.dict['version'] |
---|
89 | |
---|
90 | def getEnvs(self): |
---|
91 | return self.dict['envs'] |
---|
92 | |
---|
93 | def getPkgDirs(self): |
---|
94 | return self.dict['pkgdirs'] |
---|
95 | |
---|
96 | def getWorkDirs(self): |
---|
97 | return self.dict['workdirs'] |
---|
98 | |
---|
99 | def getAttrs(self): |
---|
100 | return self.dict['attrs'] |
---|
101 | |
---|
102 | def getfinalAttrs(self): |
---|
103 | return self.dict['final-attrs'] |
---|
104 | |
---|
105 | def isForeground(self): |
---|
106 | return self.dict['fg'] |
---|
107 | |
---|
108 | def isIgnoreFailures(self): |
---|
109 | return self.dict['ignorefailures'] |
---|
110 | |
---|
111 | def getStdin(self): |
---|
112 | return self.dict['stdin'] |
---|
113 | |
---|
114 | def parseDesc(str): |
---|
115 | |
---|
116 | dict = CommandDesc._parseMap(str) |
---|
117 | |
---|
118 | dict['argv'] = CommandDesc._parseList(dict['argv']) |
---|
119 | dict['envs'] = CommandDesc._parseMap(dict['envs']) |
---|
120 | dict['pkgdirs'] = CommandDesc._parseList(dict['pkgdirs'], ':') |
---|
121 | dict['workdirs'] = CommandDesc._parseList(dict['workdirs'], ':') |
---|
122 | dict['attrs'] = CommandDesc._parseMap(dict['attrs']) |
---|
123 | dict['final-attrs'] = CommandDesc._parseMap(dict['final-attrs']) |
---|
124 | |
---|
125 | return CommandDesc(dict) |
---|
126 | |
---|
127 | parseDesc = staticmethod(parseDesc) |
---|
128 | |
---|
129 | def _parseList(str, delim = ','): |
---|
130 | list = [] |
---|
131 | for row in csv.reader([str], delimiter=delim, escapechar='\\', |
---|
132 | quoting=csv.QUOTE_NONE, doublequote=False): |
---|
133 | list.extend(row) |
---|
134 | return list |
---|
135 | |
---|
136 | _parseList = staticmethod(_parseList) |
---|
137 | |
---|
138 | def _parseMap(str): |
---|
139 | """Parses key value pairs""" |
---|
140 | dict = {} |
---|
141 | for row in csv.reader([str], escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False): |
---|
142 | for f in row: |
---|
143 | [k, v] = f.split('=', 1) |
---|
144 | dict[k] = v |
---|
145 | return dict |
---|
146 | |
---|
147 | _parseMap = staticmethod(_parseMap) |
---|
148 | |
---|
149 | class MRSystemDirectoryManager: |
---|
150 | """Class that is responsible for managing the MapReduce system directory""" |
---|
151 | |
---|
152 | def __init__(self, jtPid, mrSysDir, fsName, hadoopPath, log, retries=120): |
---|
153 | self.__jtPid = jtPid |
---|
154 | self.__mrSysDir = mrSysDir |
---|
155 | self.__fsName = fsName |
---|
156 | self.__hadoopPath = hadoopPath |
---|
157 | self.__log = log |
---|
158 | self.__retries = retries |
---|
159 | |
---|
160 | def toCleanupArgs(self): |
---|
161 | return " --jt-pid %s --mr-sys-dir %s --fs-name %s --hadoop-path %s " \ |
---|
162 | % (self.__jtPid, self.__mrSysDir, self.__fsName, self.__hadoopPath) |
---|
163 | |
---|
164 | def removeMRSystemDirectory(self): |
---|
165 | |
---|
166 | jtActive = isProcessRunning(self.__jtPid) |
---|
167 | count = 0 # try for a max of a minute for the process to end |
---|
168 | while jtActive and (count<self.__retries): |
---|
169 | time.sleep(0.5) |
---|
170 | jtActive = isProcessRunning(self.__jtPid) |
---|
171 | count += 1 |
---|
172 | |
---|
173 | if count == self.__retries: |
---|
174 | self.__log.warn('Job Tracker did not exit even after a minute. Not going to try and cleanup the system directory') |
---|
175 | return |
---|
176 | |
---|
177 | self.__log.debug('jt is now inactive') |
---|
178 | |
---|
179 | cmd = "%s dfs -fs hdfs://%s -rmr %s" % (self.__hadoopPath, self.__fsName, \ |
---|
180 | self.__mrSysDir) |
---|
181 | self.__log.debug('Command to run to remove system directory: %s' % (cmd)) |
---|
182 | try: |
---|
183 | hadoopCommand = simpleCommand('mr-sys-dir-cleaner', cmd) |
---|
184 | hadoopCommand.start() |
---|
185 | hadoopCommand.wait() |
---|
186 | hadoopCommand.join() |
---|
187 | ret = hadoopCommand.exit_code() |
---|
188 | if ret != 0: |
---|
189 | self.__log.warn("Error in removing MapReduce system directory '%s' from '%s' using path '%s'" \ |
---|
190 | % (self.__mrSysDir, self.__fsName, self.__hadoopPath)) |
---|
191 | self.__log.warn(pprint.pformat(hadoopCommand.output())) |
---|
192 | else: |
---|
193 | self.__log.info("Removed MapReduce system directory successfully.") |
---|
194 | except: |
---|
195 | self.__log.error('Exception while cleaning up MapReduce system directory. May not be cleaned up. %s', \ |
---|
196 | get_exception_error_string()) |
---|
197 | self.__log.debug(get_exception_string()) |
---|
198 | |
---|
199 | |
---|
200 | def createMRSystemDirectoryManager(dict, log): |
---|
201 | keys = [ 'jt-pid', 'mr-sys-dir', 'fs-name', 'hadoop-path' ] |
---|
202 | for key in keys: |
---|
203 | if (not dict.has_key(key)) or (dict[key] is None): |
---|
204 | return None |
---|
205 | |
---|
206 | mrSysDirManager = MRSystemDirectoryManager(int(dict['jt-pid']), dict['mr-sys-dir'], \ |
---|
207 | dict['fs-name'], dict['hadoop-path'], log) |
---|
208 | return mrSysDirManager |
---|
209 | |
---|
210 | class HadoopCommand: |
---|
211 | """Runs a single hadoop command""" |
---|
212 | |
---|
213 | def __init__(self, id, desc, tempdir, tardir, log, javahome, |
---|
214 | mrSysDir, restart=False): |
---|
215 | self.desc = desc |
---|
216 | self.log = log |
---|
217 | self.javahome = javahome |
---|
218 | self.__mrSysDir = mrSysDir |
---|
219 | self.program = desc.getProgram() |
---|
220 | self.name = desc.getName() |
---|
221 | self.workdirs = desc.getWorkDirs() |
---|
222 | self.hadoopdir = tempdir |
---|
223 | self.confdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), |
---|
224 | "confdir") |
---|
225 | self.logdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), |
---|
226 | "logdir") |
---|
227 | self.out = os.path.join(self.logdir, '%s.out' % self.name) |
---|
228 | self.err = os.path.join(self.logdir, '%s.err' % self.name) |
---|
229 | |
---|
230 | self.child = None |
---|
231 | self.restart = restart |
---|
232 | self.filledInKeyVals = [] |
---|
233 | self._createWorkDirs() |
---|
234 | self._createHadoopSiteXml() |
---|
235 | self._createHadoopLogDir() |
---|
236 | self.__hadoopThread = None |
---|
237 | self.stdErrContents = "" # store list of contents for returning to user |
---|
238 | |
---|
239 | def _createWorkDirs(self): |
---|
240 | for dir in self.workdirs: |
---|
241 | if os.path.exists(dir): |
---|
242 | if not os.access(dir, os.F_OK | os.R_OK | os.W_OK | os.X_OK): |
---|
243 | raise ValueError, "Workdir %s does not allow rwx permission." % (dir) |
---|
244 | continue |
---|
245 | try: |
---|
246 | os.makedirs(dir) |
---|
247 | except: |
---|
248 | pass |
---|
249 | |
---|
250 | def getFilledInKeyValues(self): |
---|
251 | return self.filledInKeyVals |
---|
252 | |
---|
253 | def createXML(self, doc, attr, topElement, final): |
---|
254 | for k,v in attr.iteritems(): |
---|
255 | self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v)) |
---|
256 | if ( v == "fillinport" ): |
---|
257 | v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log)) |
---|
258 | |
---|
259 | keyvalpair = '' |
---|
260 | if isinstance(v, (tuple, list)): |
---|
261 | for item in v: |
---|
262 | keyvalpair = "%s%s=%s," % (keyvalpair, k, item) |
---|
263 | keyvalpair = keyvalpair[:-1] |
---|
264 | else: |
---|
265 | keyvalpair = k + '=' + v |
---|
266 | |
---|
267 | self.filledInKeyVals.append(keyvalpair) |
---|
268 | if(k == "mapred.job.tracker"): # total hack for time's sake |
---|
269 | keyvalpair = k + "=" + v |
---|
270 | self.filledInKeyVals.append(keyvalpair) |
---|
271 | |
---|
272 | if ( v == "fillinhostport"): |
---|
273 | port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log)) |
---|
274 | self.log.debug('Setting hostname to: %s' % local_fqdn()) |
---|
275 | v = local_fqdn() + ':' + port |
---|
276 | |
---|
277 | keyvalpair = '' |
---|
278 | if isinstance(v, (tuple, list)): |
---|
279 | for item in v: |
---|
280 | keyvalpair = "%s%s=%s," % (keyvalpair, k, item) |
---|
281 | keyvalpair = keyvalpair[:-1] |
---|
282 | else: |
---|
283 | keyvalpair = k + '=' + v |
---|
284 | |
---|
285 | self.filledInKeyVals.append(keyvalpair) |
---|
286 | if ( v == "fillindir"): |
---|
287 | v = self.__mrSysDir |
---|
288 | pass |
---|
289 | |
---|
290 | prop = None |
---|
291 | if isinstance(v, (tuple, list)): |
---|
292 | for item in v: |
---|
293 | prop = self._createXmlElement(doc, k, item, "No description", final) |
---|
294 | topElement.appendChild(prop) |
---|
295 | else: |
---|
296 | if k == 'fs.default.name': |
---|
297 | prop = self._createXmlElement(doc, k, "hdfs://" + v, "No description", final) |
---|
298 | else: |
---|
299 | prop = self._createXmlElement(doc, k, v, "No description", final) |
---|
300 | topElement.appendChild(prop) |
---|
301 | |
---|
302 | def _createHadoopSiteXml(self): |
---|
303 | if self.restart: |
---|
304 | if not os.path.exists(self.confdir): |
---|
305 | os.makedirs(self.confdir) |
---|
306 | else: |
---|
307 | assert os.path.exists(self.confdir) == False |
---|
308 | os.makedirs(self.confdir) |
---|
309 | |
---|
310 | implementation = getDOMImplementation() |
---|
311 | doc = implementation.createDocument('', 'configuration', None) |
---|
312 | comment = doc.createComment("This is an auto generated hadoop-site.xml, do not modify") |
---|
313 | topElement = doc.documentElement |
---|
314 | topElement.appendChild(comment) |
---|
315 | |
---|
316 | finalAttr = self.desc.getfinalAttrs() |
---|
317 | self.createXML(doc, finalAttr, topElement, True) |
---|
318 | attr = {} |
---|
319 | attr1 = self.desc.getAttrs() |
---|
320 | for k,v in attr1.iteritems(): |
---|
321 | if not finalAttr.has_key(k): |
---|
322 | attr[k] = v |
---|
323 | self.createXML(doc, attr, topElement, False) |
---|
324 | |
---|
325 | |
---|
326 | siteName = os.path.join(self.confdir, "hadoop-site.xml") |
---|
327 | sitefile = file(siteName, 'w') |
---|
328 | print >> sitefile, topElement.toxml() |
---|
329 | sitefile.close() |
---|
330 | self.log.debug('created %s' % (siteName)) |
---|
331 | |
---|
332 | def _createHadoopLogDir(self): |
---|
333 | if self.restart: |
---|
334 | if not os.path.exists(self.logdir): |
---|
335 | os.makedirs(self.logdir) |
---|
336 | else: |
---|
337 | assert os.path.exists(self.logdir) == False |
---|
338 | os.makedirs(self.logdir) |
---|
339 | |
---|
340 | def _createXmlElement(self, doc, name, value, description, final): |
---|
341 | prop = doc.createElement("property") |
---|
342 | nameP = doc.createElement("name") |
---|
343 | string = doc.createTextNode(name) |
---|
344 | nameP.appendChild(string) |
---|
345 | valueP = doc.createElement("value") |
---|
346 | string = doc.createTextNode(value) |
---|
347 | valueP.appendChild(string) |
---|
348 | desc = doc.createElement("description") |
---|
349 | string = doc.createTextNode(description) |
---|
350 | desc.appendChild(string) |
---|
351 | prop.appendChild(nameP) |
---|
352 | prop.appendChild(valueP) |
---|
353 | prop.appendChild(desc) |
---|
354 | if (final): |
---|
355 | felement = doc.createElement("final") |
---|
356 | string = doc.createTextNode("true") |
---|
357 | felement.appendChild(string) |
---|
358 | prop.appendChild(felement) |
---|
359 | pass |
---|
360 | |
---|
361 | return prop |
---|
362 | |
---|
363 | def getMRSystemDirectoryManager(self): |
---|
364 | return MRSystemDirectoryManager(self.__hadoopThread.getPid(), self.__mrSysDir, \ |
---|
365 | self.desc.getfinalAttrs()['fs.default.name'], \ |
---|
366 | self.path, self.log) |
---|
367 | |
---|
368 | def run(self, dir): |
---|
369 | status = True |
---|
370 | args = [] |
---|
371 | desc = self.desc |
---|
372 | |
---|
373 | self.log.debug(pprint.pformat(desc.dict)) |
---|
374 | |
---|
375 | |
---|
376 | self.log.debug("Got package dir of %s" % dir) |
---|
377 | |
---|
378 | self.path = os.path.join(dir, self.program) |
---|
379 | |
---|
380 | self.log.debug("path: %s" % self.path) |
---|
381 | args.append(self.path) |
---|
382 | args.extend(desc.getArgv()) |
---|
383 | envs = desc.getEnvs() |
---|
384 | fenvs = os.environ |
---|
385 | |
---|
386 | for k, v in envs.iteritems(): |
---|
387 | fenvs[k] = v |
---|
388 | |
---|
389 | if envs.has_key('HADOOP_OPTS'): |
---|
390 | fenvs['HADOOP_OPTS'] = envs['HADOOP_OPTS'] |
---|
391 | self.log.debug("HADOOP_OPTS : %s" % fenvs['HADOOP_OPTS']) |
---|
392 | |
---|
393 | fenvs['JAVA_HOME'] = self.javahome |
---|
394 | fenvs['HADOOP_CONF_DIR'] = self.confdir |
---|
395 | fenvs['HADOOP_LOG_DIR'] = self.logdir |
---|
396 | |
---|
397 | self.log.info(pprint.pformat(fenvs)) |
---|
398 | |
---|
399 | hadoopCommand = '' |
---|
400 | for item in args: |
---|
401 | hadoopCommand = "%s%s " % (hadoopCommand, item) |
---|
402 | |
---|
403 | # Redirecting output and error to self.out and self.err |
---|
404 | hadoopCommand = hadoopCommand + ' 1>%s 2>%s ' % (self.out, self.err) |
---|
405 | |
---|
406 | self.log.debug('running command: %s' % (hadoopCommand)) |
---|
407 | self.log.debug('hadoop env: %s' % fenvs) |
---|
408 | self.log.debug('Command stdout will be redirected to %s ' % self.out + \ |
---|
409 | 'and command stderr to %s' % self.err) |
---|
410 | |
---|
411 | self.__hadoopThread = simpleCommand('hadoop', hadoopCommand, env=fenvs) |
---|
412 | self.__hadoopThread.start() |
---|
413 | |
---|
414 | while self.__hadoopThread.stdin == None: |
---|
415 | time.sleep(.2) |
---|
416 | self.log.debug("hadoopThread still == None ...") |
---|
417 | |
---|
418 | input = desc.getStdin() |
---|
419 | self.log.debug("hadoop input: %s" % input) |
---|
420 | if input: |
---|
421 | if self.__hadoopThread.is_running(): |
---|
422 | print >>self.__hadoopThread.stdin, input |
---|
423 | else: |
---|
424 | self.log.error("hadoop command failed to start") |
---|
425 | |
---|
426 | self.__hadoopThread.stdin.close() |
---|
427 | |
---|
428 | self.log.debug("isForground: %s" % desc.isForeground()) |
---|
429 | if desc.isForeground(): |
---|
430 | self.log.debug("Waiting on hadoop to finish...") |
---|
431 | self.__hadoopThread.wait() |
---|
432 | |
---|
433 | self.log.debug("Joining hadoop thread...") |
---|
434 | self.__hadoopThread.join() |
---|
435 | if self.__hadoopThread.exit_code() != 0: |
---|
436 | status = False |
---|
437 | else: |
---|
438 | status = self.getCommandStatus() |
---|
439 | |
---|
440 | self.log.debug("hadoop run status: %s" % status) |
---|
441 | |
---|
442 | if status == False: |
---|
443 | self.handleFailedCommand() |
---|
444 | |
---|
445 | if (status == True) or (not desc.isIgnoreFailures()): |
---|
446 | return status |
---|
447 | else: |
---|
448 | self.log.error("Ignoring Failure") |
---|
449 | return True |
---|
450 | |
---|
451 | def kill(self): |
---|
452 | self.__hadoopThread.kill() |
---|
453 | if self.__hadoopThread: |
---|
454 | self.__hadoopThread.join() |
---|
455 | |
---|
456 | def addCleanup(self, list): |
---|
457 | list.extend(self.workdirs) |
---|
458 | list.append(self.confdir) |
---|
459 | |
---|
460 | def getCommandStatus(self): |
---|
461 | status = True |
---|
462 | ec = self.__hadoopThread.exit_code() |
---|
463 | if (ec != 0) and (ec != None): |
---|
464 | status = False |
---|
465 | return status |
---|
466 | |
---|
467 | def handleFailedCommand(self): |
---|
468 | self.log.error('hadoop error: %s' % ( |
---|
469 | self.__hadoopThread.exit_status_string())) |
---|
470 | # read the contents of redirected stderr to print information back to user |
---|
471 | if os.path.exists(self.err): |
---|
472 | f = None |
---|
473 | try: |
---|
474 | f = open(self.err) |
---|
475 | lines = f.readlines() |
---|
476 | # format |
---|
477 | for line in lines: |
---|
478 | self.stdErrContents = "%s%s" % (self.stdErrContents, line) |
---|
479 | finally: |
---|
480 | if f is not None: |
---|
481 | f.close() |
---|
482 | self.log.error('See %s.out and/or %s.err for details. They are ' % \ |
---|
483 | (self.name, self.name) + \ |
---|
484 | 'located at subdirectories under either ' + \ |
---|
485 | 'hodring.work-dirs or hodring.log-destination-uri.') |
---|
486 | |
---|
487 | class HodRing(hodBaseService): |
---|
488 | """The main class for hodring that |
---|
489 | polls the commands it runs""" |
---|
490 | def __init__(self, config): |
---|
491 | hodBaseService.__init__(self, 'hodring', config['hodring']) |
---|
492 | self.log = self.logs['main'] |
---|
493 | self._http = None |
---|
494 | self.__pkg = None |
---|
495 | self.__pkgDir = None |
---|
496 | self.__tempDir = None |
---|
497 | self.__running = {} |
---|
498 | self.__hadoopLogDirs = [] |
---|
499 | self.__init_temp_dir() |
---|
500 | |
---|
501 | def __init_temp_dir(self): |
---|
502 | self.__tempDir = os.path.join(self._cfg['temp-dir'], |
---|
503 | "%s.%s.hodring" % (self._cfg['userid'], |
---|
504 | self._cfg['service-id'])) |
---|
505 | if not os.path.exists(self.__tempDir): |
---|
506 | os.makedirs(self.__tempDir) |
---|
507 | os.chdir(self.__tempDir) |
---|
508 | |
---|
509 | def __fetch(self, url, spath): |
---|
510 | retry = 3 |
---|
511 | success = False |
---|
512 | while (retry != 0 and success != True): |
---|
513 | try: |
---|
514 | input = urllib.urlopen(url) |
---|
515 | bufsz = 81920 |
---|
516 | buf = input.read(bufsz) |
---|
517 | out = open(spath, 'w') |
---|
518 | while len(buf) > 0: |
---|
519 | out.write(buf) |
---|
520 | buf = input.read(bufsz) |
---|
521 | input.close() |
---|
522 | out.close() |
---|
523 | success = True |
---|
524 | except: |
---|
525 | self.log.debug("Failed to copy file") |
---|
526 | retry = retry - 1 |
---|
527 | if (retry == 0 and success != True): |
---|
528 | raise IOError, "Failed to copy the files" |
---|
529 | |
---|
530 | |
---|
531 | def __get_name(self, addr): |
---|
532 | parsedUrl = urlparse(addr) |
---|
533 | path = parsedUrl[2] |
---|
534 | split = path.split('/', 1) |
---|
535 | return split[1] |
---|
536 | |
---|
537 | def __get_dir(self, name): |
---|
538 | """Return the root directory inside the tarball |
---|
539 | specified by name. Assumes that the tarball begins |
---|
540 | with a root directory.""" |
---|
541 | import tarfile |
---|
542 | myTarFile = tarfile.open(name) |
---|
543 | hadoopPackage = myTarFile.getnames()[0] |
---|
544 | self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) |
---|
545 | return hadoopPackage |
---|
546 | |
---|
547 | def getRunningValues(self): |
---|
548 | return self.__running.values() |
---|
549 | |
---|
550 | def getTempDir(self): |
---|
551 | return self.__tempDir |
---|
552 | |
---|
553 | def getHadoopLogDirs(self): |
---|
554 | return self.__hadoopLogDirs |
---|
555 | |
---|
556 | def __download_package(self, ringClient): |
---|
557 | self.log.debug("Found download address: %s" % |
---|
558 | self._cfg['download-addr']) |
---|
559 | try: |
---|
560 | addr = 'none' |
---|
561 | downloadTime = self._cfg['tarball-retry-initial-time'] # download time depends on tarball size and network bandwidth |
---|
562 | |
---|
563 | increment = 0 |
---|
564 | |
---|
565 | addr = ringClient.getTarList(self.hostname) |
---|
566 | |
---|
567 | while(addr == 'none'): |
---|
568 | rand = self._cfg['tarball-retry-initial-time'] + increment + \ |
---|
569 | random.uniform(0,self._cfg['tarball-retry-interval']) |
---|
570 | increment = increment + 1 |
---|
571 | self.log.debug("got no tarball. Retrying again in %s seconds." % rand) |
---|
572 | time.sleep(rand) |
---|
573 | addr = ringClient.getTarList(self.hostname) |
---|
574 | |
---|
575 | |
---|
576 | self.log.debug("got this address %s" % addr) |
---|
577 | |
---|
578 | tarName = self.__get_name(addr) |
---|
579 | self.log.debug("tar package name: %s" % tarName) |
---|
580 | |
---|
581 | fetchPath = os.path.join(os.getcwd(), tarName) |
---|
582 | self.log.debug("fetch path: %s" % fetchPath) |
---|
583 | |
---|
584 | self.__fetch(addr, fetchPath) |
---|
585 | self.log.debug("done fetching") |
---|
586 | |
---|
587 | tarUrl = "http://%s:%d/%s" % (self._http.server_address[0], |
---|
588 | self._http.server_address[1], |
---|
589 | tarName) |
---|
590 | try: |
---|
591 | ringClient.registerTarSource(self.hostname, tarUrl,addr) |
---|
592 | #ringClient.tarDone(addr) |
---|
593 | except KeyError, e: |
---|
594 | self.log.error("registerTarSource and tarDone failed: ", e) |
---|
595 | raise KeyError(e) |
---|
596 | |
---|
597 | check = untar(fetchPath, os.getcwd()) |
---|
598 | |
---|
599 | if (check == False): |
---|
600 | raise IOError, "Untarring failed." |
---|
601 | |
---|
602 | self.__pkg = self.__get_dir(tarName) |
---|
603 | self.__pkgDir = os.path.join(os.getcwd(), self.__pkg) |
---|
604 | except Exception, e: |
---|
605 | self.log.error("Failed download tar package: %s" % |
---|
606 | get_exception_error_string()) |
---|
607 | raise Exception(e) |
---|
608 | |
---|
609 | def __run_hadoop_commands(self, restart=True): |
---|
610 | id = 0 |
---|
611 | for desc in self._cfg['commanddesc']: |
---|
612 | self.log.debug(pprint.pformat(desc.dict)) |
---|
613 | mrSysDir = getMapredSystemDirectory(self._cfg['mapred-system-dir-root'], |
---|
614 | self._cfg['userid'], self._cfg['service-id']) |
---|
615 | self.log.debug('mrsysdir is %s' % mrSysDir) |
---|
616 | cmd = HadoopCommand(id, desc, self.__tempDir, self.__pkgDir, self.log, |
---|
617 | self._cfg['java-home'], mrSysDir, restart) |
---|
618 | |
---|
619 | self.__hadoopLogDirs.append(cmd.logdir) |
---|
620 | self.log.debug("hadoop log directory: %s" % self.__hadoopLogDirs) |
---|
621 | |
---|
622 | try: |
---|
623 | # if the tarball isn't there, we use the pkgs dir given. |
---|
624 | if self.__pkgDir == None: |
---|
625 | pkgdir = desc.getPkgDirs() |
---|
626 | else: |
---|
627 | pkgdir = self.__pkgDir |
---|
628 | |
---|
629 | self.log.debug('This is the packcage dir %s ' % (pkgdir)) |
---|
630 | if not cmd.run(pkgdir): |
---|
631 | addnInfo = "" |
---|
632 | if cmd.stdErrContents is not "": |
---|
633 | addnInfo = " Information from stderr of the command:\n%s" % (cmd.stdErrContents) |
---|
634 | raise Exception("Could not launch the %s using %s/bin/hadoop.%s" % (desc.getName(), pkgdir, addnInfo)) |
---|
635 | except Exception, e: |
---|
636 | self.log.debug("Exception running hadoop command: %s\n%s" % (get_exception_error_string(), get_exception_string())) |
---|
637 | self.__running[id] = cmd |
---|
638 | raise Exception(e) |
---|
639 | |
---|
640 | id += 1 |
---|
641 | if desc.isForeground(): |
---|
642 | continue |
---|
643 | self.__running[id-1] = cmd |
---|
644 | |
---|
645 | # ok.. now command is running. If this HodRing got jobtracker, |
---|
646 | # Check if it is ready for accepting jobs, and then only return |
---|
647 | self.__check_jobtracker(desc, id-1, pkgdir) |
---|
648 | |
---|
649 | def __check_jobtracker(self, desc, id, pkgdir): |
---|
650 | # Check jobtracker status. Return properly if it is ready to accept jobs. |
---|
651 | # Currently Checks for Jetty to come up, the last thing that can be checked |
---|
652 | # before JT completes initialisation. To be perfectly reliable, we need |
---|
653 | # hadoop support |
---|
654 | name = desc.getName() |
---|
655 | if name == 'jobtracker': |
---|
656 | # Yes I am the Jobtracker |
---|
657 | self.log.debug("Waiting for jobtracker to initialise") |
---|
658 | version = desc.getVersion() |
---|
659 | self.log.debug("jobtracker version : %s" % version) |
---|
660 | hadoopCmd = self.getRunningValues()[id] |
---|
661 | attrs = hadoopCmd.getFilledInKeyValues() |
---|
662 | attrs = parseEquals(attrs) |
---|
663 | jobTrackerAddr = attrs['mapred.job.tracker'] |
---|
664 | self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr) |
---|
665 | if version < 16: |
---|
666 | jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \ |
---|
667 | attrs['mapred.job.tracker.info.port'] |
---|
668 | else: |
---|
669 | jettyAddr = attrs['mapred.job.tracker.http.address'] |
---|
670 | self.log.debug("Jobtracker jetty : %s" % jettyAddr) |
---|
671 | |
---|
672 | # Check for Jetty to come up |
---|
673 | # For this do a http head, and then look at the status |
---|
674 | defaultTimeout = socket.getdefaulttimeout() |
---|
675 | # socket timeout isn`t exposed at httplib level. Setting explicitly. |
---|
676 | socket.setdefaulttimeout(1) |
---|
677 | sleepTime = 0.5 |
---|
678 | jettyStatus = False |
---|
679 | jettyStatusmsg = "" |
---|
680 | while sleepTime <= 32: |
---|
681 | # There is a possibility that the command might fail after a while. |
---|
682 | # This code will check if the command failed so that a better |
---|
683 | # error message can be returned to the user. |
---|
684 | if not hadoopCmd.getCommandStatus(): |
---|
685 | self.log.critical('Hadoop command found to have failed when ' \ |
---|
686 | 'checking for jobtracker status') |
---|
687 | hadoopCmd.handleFailedCommand() |
---|
688 | addnInfo = "" |
---|
689 | if hadoopCmd.stdErrContents is not "": |
---|
690 | addnInfo = " Information from stderr of the command:\n%s" \ |
---|
691 | % (hadoopCmd.stdErrContents) |
---|
692 | raise Exception("Could not launch the %s using %s/bin/hadoop.%s" \ |
---|
693 | % (desc.getName(), pkgdir, addnInfo)) |
---|
694 | |
---|
695 | try: |
---|
696 | jettyConn = httplib.HTTPConnection(jettyAddr) |
---|
697 | jettyConn.request("HEAD", "/jobtracker.jsp") |
---|
698 | # httplib inherently retries the following till socket timeout |
---|
699 | resp = jettyConn.getresponse() |
---|
700 | if resp.status != 200: |
---|
701 | # Some problem? |
---|
702 | jettyStatus = False |
---|
703 | jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\ |
---|
704 | " request. HTTP Status (Code, Msg): (%s, %s)" % \ |
---|
705 | ( resp.status, resp.reason ) |
---|
706 | break |
---|
707 | else: |
---|
708 | self.log.info("Jetty returned a 200 status (%s)" % resp.reason) |
---|
709 | self.log.info("JobTracker successfully initialised") |
---|
710 | return |
---|
711 | except socket.error: |
---|
712 | self.log.debug("Jetty gave a socket error. Sleeping for %s" \ |
---|
713 | % sleepTime) |
---|
714 | time.sleep(sleepTime) |
---|
715 | sleepTime = sleepTime * 2 |
---|
716 | except Exception, e: |
---|
717 | jettyStatus = False |
---|
718 | jettyStatusmsg = ("Process(possibly other than jetty) running on" + \ |
---|
719 | " port assigned to jetty is returning invalid http response") |
---|
720 | break |
---|
721 | socket.setdefaulttimeout(defaultTimeout) |
---|
722 | if not jettyStatus: |
---|
723 | self.log.critical("Jobtracker failed to initialise.") |
---|
724 | if jettyStatusmsg: |
---|
725 | self.log.critical( "Reason: %s" % jettyStatusmsg ) |
---|
726 | else: self.log.critical( "Reason: Jetty failed to give response") |
---|
727 | raise Exception("JobTracker failed to initialise") |
---|
728 | |
---|
729 | def stop(self): |
---|
730 | self.log.debug("Entered hodring stop.") |
---|
731 | if self._http: |
---|
732 | self.log.debug("stopping http server...") |
---|
733 | self._http.stop() |
---|
734 | |
---|
735 | self.log.debug("call hodsvcrgy stop...") |
---|
736 | hodBaseService.stop(self) |
---|
737 | |
---|
738 | def _xr_method_clusterStart(self, initialize=True): |
---|
739 | return self.clusterStart(initialize) |
---|
740 | |
---|
741 | def _xr_method_clusterStop(self): |
---|
742 | return self.clusterStop() |
---|
743 | |
---|
744 | def start(self): |
---|
745 | """Run and maintain hodring commands""" |
---|
746 | |
---|
747 | try: |
---|
748 | if self._cfg.has_key('download-addr'): |
---|
749 | self._http = threadedHTTPServer('', self._cfg['http-port-range']) |
---|
750 | self.log.info("Starting http server...") |
---|
751 | self._http.serve_forever() |
---|
752 | self.log.debug("http://%s:%d" % (self._http.server_address[0], |
---|
753 | self._http.server_address[1])) |
---|
754 | |
---|
755 | hodBaseService.start(self) |
---|
756 | |
---|
757 | ringXRAddress = None |
---|
758 | if self._cfg.has_key('ringmaster-xrs-addr'): |
---|
759 | ringXRAddress = "http://%s:%s/" % (self._cfg['ringmaster-xrs-addr'][0], |
---|
760 | self._cfg['ringmaster-xrs-addr'][1]) |
---|
761 | self.log.debug("Ringmaster at %s" % ringXRAddress) |
---|
762 | |
---|
763 | self.log.debug("Creating service registry XML-RPC client.") |
---|
764 | serviceClient = hodXRClient(to_http_url( |
---|
765 | self._cfg['svcrgy-addr'])) |
---|
766 | if ringXRAddress == None: |
---|
767 | self.log.info("Did not get ringmaster XML-RPC address. Fetching information from service registry.") |
---|
768 | ringList = serviceClient.getServiceInfo(self._cfg['userid'], |
---|
769 | self._cfg['service-id'], 'ringmaster', 'hod') |
---|
770 | |
---|
771 | self.log.debug(pprint.pformat(ringList)) |
---|
772 | |
---|
773 | if len(ringList): |
---|
774 | if isinstance(ringList, list): |
---|
775 | ringXRAddress = ringList[0]['xrs'] |
---|
776 | |
---|
777 | count = 0 |
---|
778 | while (ringXRAddress == None and count < 3000): |
---|
779 | ringList = serviceClient.getServiceInfo(self._cfg['userid'], |
---|
780 | self._cfg['service-id'], 'ringmaster', 'hod') |
---|
781 | |
---|
782 | if len(ringList): |
---|
783 | if isinstance(ringList, list): |
---|
784 | ringXRAddress = ringList[0]['xrs'] |
---|
785 | |
---|
786 | count = count + 1 |
---|
787 | time.sleep(.2) |
---|
788 | |
---|
789 | if ringXRAddress == None: |
---|
790 | raise Exception("Could not get ringmaster XML-RPC server address.") |
---|
791 | |
---|
792 | self.log.debug("Creating ringmaster XML-RPC client.") |
---|
793 | ringClient = hodXRClient(ringXRAddress) |
---|
794 | |
---|
795 | id = self.hostname + "_" + str(os.getpid()) |
---|
796 | |
---|
797 | if 'download-addr' in self._cfg: |
---|
798 | self.__download_package(ringClient) |
---|
799 | else: |
---|
800 | self.log.debug("Did not find a download address.") |
---|
801 | |
---|
802 | cmdlist = [] |
---|
803 | firstTime = True |
---|
804 | increment = 0 |
---|
805 | hadoopStartupTime = 2 |
---|
806 | |
---|
807 | cmdlist = ringClient.getCommand(id) |
---|
808 | |
---|
809 | while (cmdlist == []): |
---|
810 | if firstTime: |
---|
811 | sleepTime = increment + self._cfg['cmd-retry-initial-time'] + hadoopStartupTime\ |
---|
812 | + random.uniform(0,self._cfg['cmd-retry-interval']) |
---|
813 | firstTime = False |
---|
814 | else: |
---|
815 | sleepTime = increment + self._cfg['cmd-retry-initial-time'] + \ |
---|
816 | + random.uniform(0,self._cfg['cmd-retry-interval']) |
---|
817 | self.log.debug("Did not get command list. Waiting for %s seconds." % (sleepTime)) |
---|
818 | time.sleep(sleepTime) |
---|
819 | increment = increment + 1 |
---|
820 | cmdlist = ringClient.getCommand(id) |
---|
821 | |
---|
822 | self.log.debug(pformat(cmdlist)) |
---|
823 | cmdDescs = [] |
---|
824 | for cmds in cmdlist: |
---|
825 | cmdDescs.append(CommandDesc(cmds['dict'], self.log)) |
---|
826 | |
---|
827 | self._cfg['commanddesc'] = cmdDescs |
---|
828 | |
---|
829 | self.log.info("Running hadoop commands...") |
---|
830 | |
---|
831 | self.__run_hadoop_commands(False) |
---|
832 | |
---|
833 | masterParams = [] |
---|
834 | for k, cmd in self.__running.iteritems(): |
---|
835 | masterParams.extend(cmd.filledInKeyVals) |
---|
836 | |
---|
837 | self.log.debug("printing getparams") |
---|
838 | self.log.debug(pformat(id)) |
---|
839 | self.log.debug(pformat(masterParams)) |
---|
840 | # when this is on a required host, the ringMaster already has our masterParams |
---|
841 | if(len(masterParams) > 0): |
---|
842 | ringClient.addMasterParams(id, masterParams) |
---|
843 | except Exception, e: |
---|
844 | raise Exception(e) |
---|
845 | |
---|
846 | def clusterStart(self, initialize=True): |
---|
847 | """Start a stopped mapreduce/dfs cluster""" |
---|
848 | if initialize: |
---|
849 | self.log.debug('clusterStart Method Invoked - Initialize') |
---|
850 | else: |
---|
851 | self.log.debug('clusterStart Method Invoked - No Initialize') |
---|
852 | try: |
---|
853 | self.log.debug("Creating service registry XML-RPC client.") |
---|
854 | serviceClient = hodXRClient(to_http_url(self._cfg['svcrgy-addr']), |
---|
855 | None, None, 0, 0, 0) |
---|
856 | |
---|
857 | self.log.info("Fetching ringmaster information from service registry.") |
---|
858 | count = 0 |
---|
859 | ringXRAddress = None |
---|
860 | while (ringXRAddress == None and count < 3000): |
---|
861 | ringList = serviceClient.getServiceInfo(self._cfg['userid'], |
---|
862 | self._cfg['service-id'], 'ringmaster', 'hod') |
---|
863 | if len(ringList): |
---|
864 | if isinstance(ringList, list): |
---|
865 | ringXRAddress = ringList[0]['xrs'] |
---|
866 | count = count + 1 |
---|
867 | |
---|
868 | if ringXRAddress == None: |
---|
869 | raise Exception("Could not get ringmaster XML-RPC server address.") |
---|
870 | |
---|
871 | self.log.debug("Creating ringmaster XML-RPC client.") |
---|
872 | ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0) |
---|
873 | |
---|
874 | id = self.hostname + "_" + str(os.getpid()) |
---|
875 | |
---|
876 | cmdlist = [] |
---|
877 | if initialize: |
---|
878 | if 'download-addr' in self._cfg: |
---|
879 | self.__download_package(ringClient) |
---|
880 | else: |
---|
881 | self.log.debug("Did not find a download address.") |
---|
882 | while (cmdlist == []): |
---|
883 | cmdlist = ringClient.getCommand(id) |
---|
884 | else: |
---|
885 | while (cmdlist == []): |
---|
886 | cmdlist = ringClient.getAdminCommand(id) |
---|
887 | |
---|
888 | self.log.debug(pformat(cmdlist)) |
---|
889 | cmdDescs = [] |
---|
890 | for cmds in cmdlist: |
---|
891 | cmdDescs.append(CommandDesc(cmds['dict'], self.log)) |
---|
892 | |
---|
893 | self._cfg['commanddesc'] = cmdDescs |
---|
894 | |
---|
895 | if initialize: |
---|
896 | self.log.info("Running hadoop commands again... - Initialize") |
---|
897 | self.__run_hadoop_commands() |
---|
898 | masterParams = [] |
---|
899 | for k, cmd in self.__running.iteritems(): |
---|
900 | self.log.debug(cmd) |
---|
901 | masterParams.extend(cmd.filledInKeyVals) |
---|
902 | |
---|
903 | self.log.debug("printing getparams") |
---|
904 | self.log.debug(pformat(id)) |
---|
905 | self.log.debug(pformat(masterParams)) |
---|
906 | # when this is on a required host, the ringMaster already has our masterParams |
---|
907 | if(len(masterParams) > 0): |
---|
908 | ringClient.addMasterParams(id, masterParams) |
---|
909 | else: |
---|
910 | self.log.info("Running hadoop commands again... - No Initialize") |
---|
911 | self.__run_hadoop_commands() |
---|
912 | |
---|
913 | except: |
---|
914 | self.log.error(get_exception_string()) |
---|
915 | |
---|
916 | return True |
---|
917 | |
---|
918 | def clusterStop(self): |
---|
919 | """Stop a running mapreduce/dfs cluster without stopping the hodring""" |
---|
920 | self.log.debug('clusterStop Method Invoked') |
---|
921 | try: |
---|
922 | for cmd in self.__running.values(): |
---|
923 | cmd.kill() |
---|
924 | self.__running = {} |
---|
925 | except: |
---|
926 | self.log.error(get_exception_string()) |
---|
927 | |
---|
928 | return True |
---|