[120] | 1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
| 2 | #or more contributor license agreements. See the NOTICE file |
---|
| 3 | #distributed with this work for additional information |
---|
| 4 | #regarding copyright ownership. The ASF licenses this file |
---|
| 5 | #to you under the Apache License, Version 2.0 (the |
---|
| 6 | #"License"); you may not use this file except in compliance |
---|
| 7 | #with the License. You may obtain a copy of the License at |
---|
| 8 | |
---|
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 10 | |
---|
| 11 | #Unless required by applicable law or agreed to in writing, software |
---|
| 12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 14 | #See the License for the specific language governing permissions and |
---|
| 15 | #limitations under the License. |
---|
| 16 | #!/usr/bin/env python |
---|
| 17 | """hodring launches hadoop commands on work node and |
---|
| 18 | cleans up all the work dirs afterward |
---|
| 19 | """ |
---|
| 20 | # -*- python -*- |
---|
| 21 | import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom |
---|
| 22 | import socket, sets, urllib, csv, signal, pprint, random, re, httplib |
---|
| 23 | |
---|
| 24 | from xml.dom import getDOMImplementation |
---|
| 25 | from pprint import pformat |
---|
| 26 | from optparse import OptionParser |
---|
| 27 | from urlparse import urlparse |
---|
| 28 | from hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory, isProcessRunning |
---|
| 29 | from hodlib.Common.tcp import tcpSocket, tcpError |
---|
| 30 | |
---|
| 31 | binfile = sys.path[0] |
---|
| 32 | libdir = os.path.dirname(binfile) |
---|
| 33 | sys.path.append(libdir) |
---|
| 34 | |
---|
| 35 | import hodlib.Common.logger |
---|
| 36 | |
---|
| 37 | from hodlib.GridServices.service import * |
---|
| 38 | from hodlib.Common.util import * |
---|
| 39 | from hodlib.Common.socketServers import threadedHTTPServer |
---|
| 40 | from hodlib.Common.hodsvc import hodBaseService |
---|
| 41 | from hodlib.Common.threads import simpleCommand |
---|
| 42 | from hodlib.Common.xmlrpc import hodXRClient |
---|
| 43 | |
---|
| 44 | mswindows = (sys.platform == "win32") |
---|
| 45 | originalcwd = os.getcwd() |
---|
| 46 | |
---|
| 47 | reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)") |
---|
| 48 | |
---|
| 49 | class CommandDesc: |
---|
| 50 | """A class that represents the commands that |
---|
| 51 | are run by hodring""" |
---|
| 52 | def __init__(self, dict, log): |
---|
| 53 | self.log = log |
---|
| 54 | self.log.debug("In command desc") |
---|
| 55 | self.log.debug("Done in command desc") |
---|
| 56 | dict.setdefault('argv', []) |
---|
| 57 | dict.setdefault('version', None) |
---|
| 58 | dict.setdefault('envs', {}) |
---|
| 59 | dict.setdefault('workdirs', []) |
---|
| 60 | dict.setdefault('attrs', {}) |
---|
| 61 | dict.setdefault('final-attrs', {}) |
---|
| 62 | dict.setdefault('fg', False) |
---|
| 63 | dict.setdefault('ignorefailures', False) |
---|
| 64 | dict.setdefault('stdin', None) |
---|
| 65 | |
---|
| 66 | self.log.debug("Printing dict") |
---|
| 67 | self._checkRequired(dict) |
---|
| 68 | self.dict = dict |
---|
| 69 | |
---|
| 70 | def _checkRequired(self, dict): |
---|
| 71 | if 'name' not in dict: |
---|
| 72 | raise ValueError, "Command description lacks 'name'" |
---|
| 73 | if 'program' not in dict: |
---|
| 74 | raise ValueError, "Command description lacks 'program'" |
---|
| 75 | if 'pkgdirs' not in dict: |
---|
| 76 | raise ValueError, "Command description lacks 'pkgdirs'" |
---|
| 77 | |
---|
| 78 | def getName(self): |
---|
| 79 | return self.dict['name'] |
---|
| 80 | |
---|
| 81 | def getProgram(self): |
---|
| 82 | return self.dict['program'] |
---|
| 83 | |
---|
| 84 | def getArgv(self): |
---|
| 85 | return self.dict['argv'] |
---|
| 86 | |
---|
| 87 | def getVersion(self): |
---|
| 88 | return self.dict['version'] |
---|
| 89 | |
---|
| 90 | def getEnvs(self): |
---|
| 91 | return self.dict['envs'] |
---|
| 92 | |
---|
| 93 | def getPkgDirs(self): |
---|
| 94 | return self.dict['pkgdirs'] |
---|
| 95 | |
---|
| 96 | def getWorkDirs(self): |
---|
| 97 | return self.dict['workdirs'] |
---|
| 98 | |
---|
| 99 | def getAttrs(self): |
---|
| 100 | return self.dict['attrs'] |
---|
| 101 | |
---|
| 102 | def getfinalAttrs(self): |
---|
| 103 | return self.dict['final-attrs'] |
---|
| 104 | |
---|
| 105 | def isForeground(self): |
---|
| 106 | return self.dict['fg'] |
---|
| 107 | |
---|
| 108 | def isIgnoreFailures(self): |
---|
| 109 | return self.dict['ignorefailures'] |
---|
| 110 | |
---|
| 111 | def getStdin(self): |
---|
| 112 | return self.dict['stdin'] |
---|
| 113 | |
---|
| 114 | def parseDesc(str): |
---|
| 115 | |
---|
| 116 | dict = CommandDesc._parseMap(str) |
---|
| 117 | |
---|
| 118 | dict['argv'] = CommandDesc._parseList(dict['argv']) |
---|
| 119 | dict['envs'] = CommandDesc._parseMap(dict['envs']) |
---|
| 120 | dict['pkgdirs'] = CommandDesc._parseList(dict['pkgdirs'], ':') |
---|
| 121 | dict['workdirs'] = CommandDesc._parseList(dict['workdirs'], ':') |
---|
| 122 | dict['attrs'] = CommandDesc._parseMap(dict['attrs']) |
---|
| 123 | dict['final-attrs'] = CommandDesc._parseMap(dict['final-attrs']) |
---|
| 124 | |
---|
| 125 | return CommandDesc(dict) |
---|
| 126 | |
---|
| 127 | parseDesc = staticmethod(parseDesc) |
---|
| 128 | |
---|
| 129 | def _parseList(str, delim = ','): |
---|
| 130 | list = [] |
---|
| 131 | for row in csv.reader([str], delimiter=delim, escapechar='\\', |
---|
| 132 | quoting=csv.QUOTE_NONE, doublequote=False): |
---|
| 133 | list.extend(row) |
---|
| 134 | return list |
---|
| 135 | |
---|
| 136 | _parseList = staticmethod(_parseList) |
---|
| 137 | |
---|
| 138 | def _parseMap(str): |
---|
| 139 | """Parses key value pairs""" |
---|
| 140 | dict = {} |
---|
| 141 | for row in csv.reader([str], escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False): |
---|
| 142 | for f in row: |
---|
| 143 | [k, v] = f.split('=', 1) |
---|
| 144 | dict[k] = v |
---|
| 145 | return dict |
---|
| 146 | |
---|
| 147 | _parseMap = staticmethod(_parseMap) |
---|
| 148 | |
---|
| 149 | class MRSystemDirectoryManager: |
---|
| 150 | """Class that is responsible for managing the MapReduce system directory""" |
---|
| 151 | |
---|
| 152 | def __init__(self, jtPid, mrSysDir, fsName, hadoopPath, log, retries=120): |
---|
| 153 | self.__jtPid = jtPid |
---|
| 154 | self.__mrSysDir = mrSysDir |
---|
| 155 | self.__fsName = fsName |
---|
| 156 | self.__hadoopPath = hadoopPath |
---|
| 157 | self.__log = log |
---|
| 158 | self.__retries = retries |
---|
| 159 | |
---|
| 160 | def toCleanupArgs(self): |
---|
| 161 | return " --jt-pid %s --mr-sys-dir %s --fs-name %s --hadoop-path %s " \ |
---|
| 162 | % (self.__jtPid, self.__mrSysDir, self.__fsName, self.__hadoopPath) |
---|
| 163 | |
---|
| 164 | def removeMRSystemDirectory(self): |
---|
| 165 | |
---|
| 166 | jtActive = isProcessRunning(self.__jtPid) |
---|
| 167 | count = 0 # try for a max of a minute for the process to end |
---|
| 168 | while jtActive and (count<self.__retries): |
---|
| 169 | time.sleep(0.5) |
---|
| 170 | jtActive = isProcessRunning(self.__jtPid) |
---|
| 171 | count += 1 |
---|
| 172 | |
---|
| 173 | if count == self.__retries: |
---|
| 174 | self.__log.warn('Job Tracker did not exit even after a minute. Not going to try and cleanup the system directory') |
---|
| 175 | return |
---|
| 176 | |
---|
| 177 | self.__log.debug('jt is now inactive') |
---|
| 178 | |
---|
| 179 | cmd = "%s dfs -fs hdfs://%s -rmr %s" % (self.__hadoopPath, self.__fsName, \ |
---|
| 180 | self.__mrSysDir) |
---|
| 181 | self.__log.debug('Command to run to remove system directory: %s' % (cmd)) |
---|
| 182 | try: |
---|
| 183 | hadoopCommand = simpleCommand('mr-sys-dir-cleaner', cmd) |
---|
| 184 | hadoopCommand.start() |
---|
| 185 | hadoopCommand.wait() |
---|
| 186 | hadoopCommand.join() |
---|
| 187 | ret = hadoopCommand.exit_code() |
---|
| 188 | if ret != 0: |
---|
| 189 | self.__log.warn("Error in removing MapReduce system directory '%s' from '%s' using path '%s'" \ |
---|
| 190 | % (self.__mrSysDir, self.__fsName, self.__hadoopPath)) |
---|
| 191 | self.__log.warn(pprint.pformat(hadoopCommand.output())) |
---|
| 192 | else: |
---|
| 193 | self.__log.info("Removed MapReduce system directory successfully.") |
---|
| 194 | except: |
---|
| 195 | self.__log.error('Exception while cleaning up MapReduce system directory. May not be cleaned up. %s', \ |
---|
| 196 | get_exception_error_string()) |
---|
| 197 | self.__log.debug(get_exception_string()) |
---|
| 198 | |
---|
| 199 | |
---|
| 200 | def createMRSystemDirectoryManager(dict, log): |
---|
| 201 | keys = [ 'jt-pid', 'mr-sys-dir', 'fs-name', 'hadoop-path' ] |
---|
| 202 | for key in keys: |
---|
| 203 | if (not dict.has_key(key)) or (dict[key] is None): |
---|
| 204 | return None |
---|
| 205 | |
---|
| 206 | mrSysDirManager = MRSystemDirectoryManager(int(dict['jt-pid']), dict['mr-sys-dir'], \ |
---|
| 207 | dict['fs-name'], dict['hadoop-path'], log) |
---|
| 208 | return mrSysDirManager |
---|
| 209 | |
---|
| 210 | class HadoopCommand: |
---|
| 211 | """Runs a single hadoop command""" |
---|
| 212 | |
---|
| 213 | def __init__(self, id, desc, tempdir, tardir, log, javahome, |
---|
| 214 | mrSysDir, restart=False): |
---|
| 215 | self.desc = desc |
---|
| 216 | self.log = log |
---|
| 217 | self.javahome = javahome |
---|
| 218 | self.__mrSysDir = mrSysDir |
---|
| 219 | self.program = desc.getProgram() |
---|
| 220 | self.name = desc.getName() |
---|
| 221 | self.workdirs = desc.getWorkDirs() |
---|
| 222 | self.hadoopdir = tempdir |
---|
| 223 | self.confdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), |
---|
| 224 | "confdir") |
---|
| 225 | self.logdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), |
---|
| 226 | "logdir") |
---|
| 227 | self.out = os.path.join(self.logdir, '%s.out' % self.name) |
---|
| 228 | self.err = os.path.join(self.logdir, '%s.err' % self.name) |
---|
| 229 | |
---|
| 230 | self.child = None |
---|
| 231 | self.restart = restart |
---|
| 232 | self.filledInKeyVals = [] |
---|
| 233 | self._createWorkDirs() |
---|
| 234 | self._createHadoopSiteXml() |
---|
| 235 | self._createHadoopLogDir() |
---|
| 236 | self.__hadoopThread = None |
---|
| 237 | self.stdErrContents = "" # store list of contents for returning to user |
---|
| 238 | |
---|
| 239 | def _createWorkDirs(self): |
---|
| 240 | for dir in self.workdirs: |
---|
| 241 | if os.path.exists(dir): |
---|
| 242 | if not os.access(dir, os.F_OK | os.R_OK | os.W_OK | os.X_OK): |
---|
| 243 | raise ValueError, "Workdir %s does not allow rwx permission." % (dir) |
---|
| 244 | continue |
---|
| 245 | try: |
---|
| 246 | os.makedirs(dir) |
---|
| 247 | except: |
---|
| 248 | pass |
---|
| 249 | |
---|
| 250 | def getFilledInKeyValues(self): |
---|
| 251 | return self.filledInKeyVals |
---|
| 252 | |
---|
| 253 | def createXML(self, doc, attr, topElement, final): |
---|
| 254 | for k,v in attr.iteritems(): |
---|
| 255 | self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v)) |
---|
| 256 | if ( v == "fillinport" ): |
---|
| 257 | v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log)) |
---|
| 258 | |
---|
| 259 | keyvalpair = '' |
---|
| 260 | if isinstance(v, (tuple, list)): |
---|
| 261 | for item in v: |
---|
| 262 | keyvalpair = "%s%s=%s," % (keyvalpair, k, item) |
---|
| 263 | keyvalpair = keyvalpair[:-1] |
---|
| 264 | else: |
---|
| 265 | keyvalpair = k + '=' + v |
---|
| 266 | |
---|
| 267 | self.filledInKeyVals.append(keyvalpair) |
---|
| 268 | if(k == "mapred.job.tracker"): # total hack for time's sake |
---|
| 269 | keyvalpair = k + "=" + v |
---|
| 270 | self.filledInKeyVals.append(keyvalpair) |
---|
| 271 | |
---|
| 272 | if ( v == "fillinhostport"): |
---|
| 273 | port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log)) |
---|
| 274 | self.log.debug('Setting hostname to: %s' % local_fqdn()) |
---|
| 275 | v = local_fqdn() + ':' + port |
---|
| 276 | |
---|
| 277 | keyvalpair = '' |
---|
| 278 | if isinstance(v, (tuple, list)): |
---|
| 279 | for item in v: |
---|
| 280 | keyvalpair = "%s%s=%s," % (keyvalpair, k, item) |
---|
| 281 | keyvalpair = keyvalpair[:-1] |
---|
| 282 | else: |
---|
| 283 | keyvalpair = k + '=' + v |
---|
| 284 | |
---|
| 285 | self.filledInKeyVals.append(keyvalpair) |
---|
| 286 | if ( v == "fillindir"): |
---|
| 287 | v = self.__mrSysDir |
---|
| 288 | pass |
---|
| 289 | |
---|
| 290 | prop = None |
---|
| 291 | if isinstance(v, (tuple, list)): |
---|
| 292 | for item in v: |
---|
| 293 | prop = self._createXmlElement(doc, k, item, "No description", final) |
---|
| 294 | topElement.appendChild(prop) |
---|
| 295 | else: |
---|
| 296 | if k == 'fs.default.name': |
---|
| 297 | prop = self._createXmlElement(doc, k, "hdfs://" + v, "No description", final) |
---|
| 298 | else: |
---|
| 299 | prop = self._createXmlElement(doc, k, v, "No description", final) |
---|
| 300 | topElement.appendChild(prop) |
---|
| 301 | |
---|
| 302 | def _createHadoopSiteXml(self): |
---|
| 303 | if self.restart: |
---|
| 304 | if not os.path.exists(self.confdir): |
---|
| 305 | os.makedirs(self.confdir) |
---|
| 306 | else: |
---|
| 307 | assert os.path.exists(self.confdir) == False |
---|
| 308 | os.makedirs(self.confdir) |
---|
| 309 | |
---|
| 310 | implementation = getDOMImplementation() |
---|
| 311 | doc = implementation.createDocument('', 'configuration', None) |
---|
| 312 | comment = doc.createComment("This is an auto generated hadoop-site.xml, do not modify") |
---|
| 313 | topElement = doc.documentElement |
---|
| 314 | topElement.appendChild(comment) |
---|
| 315 | |
---|
| 316 | finalAttr = self.desc.getfinalAttrs() |
---|
| 317 | self.createXML(doc, finalAttr, topElement, True) |
---|
| 318 | attr = {} |
---|
| 319 | attr1 = self.desc.getAttrs() |
---|
| 320 | for k,v in attr1.iteritems(): |
---|
| 321 | if not finalAttr.has_key(k): |
---|
| 322 | attr[k] = v |
---|
| 323 | self.createXML(doc, attr, topElement, False) |
---|
| 324 | |
---|
| 325 | |
---|
| 326 | siteName = os.path.join(self.confdir, "hadoop-site.xml") |
---|
| 327 | sitefile = file(siteName, 'w') |
---|
| 328 | print >> sitefile, topElement.toxml() |
---|
| 329 | sitefile.close() |
---|
| 330 | self.log.debug('created %s' % (siteName)) |
---|
| 331 | |
---|
| 332 | def _createHadoopLogDir(self): |
---|
| 333 | if self.restart: |
---|
| 334 | if not os.path.exists(self.logdir): |
---|
| 335 | os.makedirs(self.logdir) |
---|
| 336 | else: |
---|
| 337 | assert os.path.exists(self.logdir) == False |
---|
| 338 | os.makedirs(self.logdir) |
---|
| 339 | |
---|
| 340 | def _createXmlElement(self, doc, name, value, description, final): |
---|
| 341 | prop = doc.createElement("property") |
---|
| 342 | nameP = doc.createElement("name") |
---|
| 343 | string = doc.createTextNode(name) |
---|
| 344 | nameP.appendChild(string) |
---|
| 345 | valueP = doc.createElement("value") |
---|
| 346 | string = doc.createTextNode(value) |
---|
| 347 | valueP.appendChild(string) |
---|
| 348 | desc = doc.createElement("description") |
---|
| 349 | string = doc.createTextNode(description) |
---|
| 350 | desc.appendChild(string) |
---|
| 351 | prop.appendChild(nameP) |
---|
| 352 | prop.appendChild(valueP) |
---|
| 353 | prop.appendChild(desc) |
---|
| 354 | if (final): |
---|
| 355 | felement = doc.createElement("final") |
---|
| 356 | string = doc.createTextNode("true") |
---|
| 357 | felement.appendChild(string) |
---|
| 358 | prop.appendChild(felement) |
---|
| 359 | pass |
---|
| 360 | |
---|
| 361 | return prop |
---|
| 362 | |
---|
| 363 | def getMRSystemDirectoryManager(self): |
---|
| 364 | return MRSystemDirectoryManager(self.__hadoopThread.getPid(), self.__mrSysDir, \ |
---|
| 365 | self.desc.getfinalAttrs()['fs.default.name'], \ |
---|
| 366 | self.path, self.log) |
---|
| 367 | |
---|
| 368 | def run(self, dir): |
---|
| 369 | status = True |
---|
| 370 | args = [] |
---|
| 371 | desc = self.desc |
---|
| 372 | |
---|
| 373 | self.log.debug(pprint.pformat(desc.dict)) |
---|
| 374 | |
---|
| 375 | |
---|
| 376 | self.log.debug("Got package dir of %s" % dir) |
---|
| 377 | |
---|
| 378 | self.path = os.path.join(dir, self.program) |
---|
| 379 | |
---|
| 380 | self.log.debug("path: %s" % self.path) |
---|
| 381 | args.append(self.path) |
---|
| 382 | args.extend(desc.getArgv()) |
---|
| 383 | envs = desc.getEnvs() |
---|
| 384 | fenvs = os.environ |
---|
| 385 | |
---|
| 386 | for k, v in envs.iteritems(): |
---|
| 387 | fenvs[k] = v |
---|
| 388 | |
---|
| 389 | if envs.has_key('HADOOP_OPTS'): |
---|
| 390 | fenvs['HADOOP_OPTS'] = envs['HADOOP_OPTS'] |
---|
| 391 | self.log.debug("HADOOP_OPTS : %s" % fenvs['HADOOP_OPTS']) |
---|
| 392 | |
---|
| 393 | fenvs['JAVA_HOME'] = self.javahome |
---|
| 394 | fenvs['HADOOP_CONF_DIR'] = self.confdir |
---|
| 395 | fenvs['HADOOP_LOG_DIR'] = self.logdir |
---|
| 396 | |
---|
| 397 | self.log.info(pprint.pformat(fenvs)) |
---|
| 398 | |
---|
| 399 | hadoopCommand = '' |
---|
| 400 | for item in args: |
---|
| 401 | hadoopCommand = "%s%s " % (hadoopCommand, item) |
---|
| 402 | |
---|
| 403 | # Redirecting output and error to self.out and self.err |
---|
| 404 | hadoopCommand = hadoopCommand + ' 1>%s 2>%s ' % (self.out, self.err) |
---|
| 405 | |
---|
| 406 | self.log.debug('running command: %s' % (hadoopCommand)) |
---|
| 407 | self.log.debug('hadoop env: %s' % fenvs) |
---|
| 408 | self.log.debug('Command stdout will be redirected to %s ' % self.out + \ |
---|
| 409 | 'and command stderr to %s' % self.err) |
---|
| 410 | |
---|
| 411 | self.__hadoopThread = simpleCommand('hadoop', hadoopCommand, env=fenvs) |
---|
| 412 | self.__hadoopThread.start() |
---|
| 413 | |
---|
| 414 | while self.__hadoopThread.stdin == None: |
---|
| 415 | time.sleep(.2) |
---|
| 416 | self.log.debug("hadoopThread still == None ...") |
---|
| 417 | |
---|
| 418 | input = desc.getStdin() |
---|
| 419 | self.log.debug("hadoop input: %s" % input) |
---|
| 420 | if input: |
---|
| 421 | if self.__hadoopThread.is_running(): |
---|
| 422 | print >>self.__hadoopThread.stdin, input |
---|
| 423 | else: |
---|
| 424 | self.log.error("hadoop command failed to start") |
---|
| 425 | |
---|
| 426 | self.__hadoopThread.stdin.close() |
---|
| 427 | |
---|
| 428 | self.log.debug("isForground: %s" % desc.isForeground()) |
---|
| 429 | if desc.isForeground(): |
---|
| 430 | self.log.debug("Waiting on hadoop to finish...") |
---|
| 431 | self.__hadoopThread.wait() |
---|
| 432 | |
---|
| 433 | self.log.debug("Joining hadoop thread...") |
---|
| 434 | self.__hadoopThread.join() |
---|
| 435 | if self.__hadoopThread.exit_code() != 0: |
---|
| 436 | status = False |
---|
| 437 | else: |
---|
| 438 | status = self.getCommandStatus() |
---|
| 439 | |
---|
| 440 | self.log.debug("hadoop run status: %s" % status) |
---|
| 441 | |
---|
| 442 | if status == False: |
---|
| 443 | self.handleFailedCommand() |
---|
| 444 | |
---|
| 445 | if (status == True) or (not desc.isIgnoreFailures()): |
---|
| 446 | return status |
---|
| 447 | else: |
---|
| 448 | self.log.error("Ignoring Failure") |
---|
| 449 | return True |
---|
| 450 | |
---|
| 451 | def kill(self): |
---|
| 452 | self.__hadoopThread.kill() |
---|
| 453 | if self.__hadoopThread: |
---|
| 454 | self.__hadoopThread.join() |
---|
| 455 | |
---|
| 456 | def addCleanup(self, list): |
---|
| 457 | list.extend(self.workdirs) |
---|
| 458 | list.append(self.confdir) |
---|
| 459 | |
---|
| 460 | def getCommandStatus(self): |
---|
| 461 | status = True |
---|
| 462 | ec = self.__hadoopThread.exit_code() |
---|
| 463 | if (ec != 0) and (ec != None): |
---|
| 464 | status = False |
---|
| 465 | return status |
---|
| 466 | |
---|
| 467 | def handleFailedCommand(self): |
---|
| 468 | self.log.error('hadoop error: %s' % ( |
---|
| 469 | self.__hadoopThread.exit_status_string())) |
---|
| 470 | # read the contents of redirected stderr to print information back to user |
---|
| 471 | if os.path.exists(self.err): |
---|
| 472 | f = None |
---|
| 473 | try: |
---|
| 474 | f = open(self.err) |
---|
| 475 | lines = f.readlines() |
---|
| 476 | # format |
---|
| 477 | for line in lines: |
---|
| 478 | self.stdErrContents = "%s%s" % (self.stdErrContents, line) |
---|
| 479 | finally: |
---|
| 480 | if f is not None: |
---|
| 481 | f.close() |
---|
| 482 | self.log.error('See %s.out and/or %s.err for details. They are ' % \ |
---|
| 483 | (self.name, self.name) + \ |
---|
| 484 | 'located at subdirectories under either ' + \ |
---|
| 485 | 'hodring.work-dirs or hodring.log-destination-uri.') |
---|
| 486 | |
---|
| 487 | class HodRing(hodBaseService): |
---|
| 488 | """The main class for hodring that |
---|
| 489 | polls the commands it runs""" |
---|
| 490 | def __init__(self, config): |
---|
| 491 | hodBaseService.__init__(self, 'hodring', config['hodring']) |
---|
| 492 | self.log = self.logs['main'] |
---|
| 493 | self._http = None |
---|
| 494 | self.__pkg = None |
---|
| 495 | self.__pkgDir = None |
---|
| 496 | self.__tempDir = None |
---|
| 497 | self.__running = {} |
---|
| 498 | self.__hadoopLogDirs = [] |
---|
| 499 | self.__init_temp_dir() |
---|
| 500 | |
---|
| 501 | def __init_temp_dir(self): |
---|
| 502 | self.__tempDir = os.path.join(self._cfg['temp-dir'], |
---|
| 503 | "%s.%s.hodring" % (self._cfg['userid'], |
---|
| 504 | self._cfg['service-id'])) |
---|
| 505 | if not os.path.exists(self.__tempDir): |
---|
| 506 | os.makedirs(self.__tempDir) |
---|
| 507 | os.chdir(self.__tempDir) |
---|
| 508 | |
---|
| 509 | def __fetch(self, url, spath): |
---|
| 510 | retry = 3 |
---|
| 511 | success = False |
---|
| 512 | while (retry != 0 and success != True): |
---|
| 513 | try: |
---|
| 514 | input = urllib.urlopen(url) |
---|
| 515 | bufsz = 81920 |
---|
| 516 | buf = input.read(bufsz) |
---|
| 517 | out = open(spath, 'w') |
---|
| 518 | while len(buf) > 0: |
---|
| 519 | out.write(buf) |
---|
| 520 | buf = input.read(bufsz) |
---|
| 521 | input.close() |
---|
| 522 | out.close() |
---|
| 523 | success = True |
---|
| 524 | except: |
---|
| 525 | self.log.debug("Failed to copy file") |
---|
| 526 | retry = retry - 1 |
---|
| 527 | if (retry == 0 and success != True): |
---|
| 528 | raise IOError, "Failed to copy the files" |
---|
| 529 | |
---|
| 530 | |
---|
| 531 | def __get_name(self, addr): |
---|
| 532 | parsedUrl = urlparse(addr) |
---|
| 533 | path = parsedUrl[2] |
---|
| 534 | split = path.split('/', 1) |
---|
| 535 | return split[1] |
---|
| 536 | |
---|
| 537 | def __get_dir(self, name): |
---|
| 538 | """Return the root directory inside the tarball |
---|
| 539 | specified by name. Assumes that the tarball begins |
---|
| 540 | with a root directory.""" |
---|
| 541 | import tarfile |
---|
| 542 | myTarFile = tarfile.open(name) |
---|
| 543 | hadoopPackage = myTarFile.getnames()[0] |
---|
| 544 | self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) |
---|
| 545 | return hadoopPackage |
---|
| 546 | |
---|
| 547 | def getRunningValues(self): |
---|
| 548 | return self.__running.values() |
---|
| 549 | |
---|
| 550 | def getTempDir(self): |
---|
| 551 | return self.__tempDir |
---|
| 552 | |
---|
| 553 | def getHadoopLogDirs(self): |
---|
| 554 | return self.__hadoopLogDirs |
---|
| 555 | |
---|
| 556 | def __download_package(self, ringClient): |
---|
| 557 | self.log.debug("Found download address: %s" % |
---|
| 558 | self._cfg['download-addr']) |
---|
| 559 | try: |
---|
| 560 | addr = 'none' |
---|
| 561 | downloadTime = self._cfg['tarball-retry-initial-time'] # download time depends on tarball size and network bandwidth |
---|
| 562 | |
---|
| 563 | increment = 0 |
---|
| 564 | |
---|
| 565 | addr = ringClient.getTarList(self.hostname) |
---|
| 566 | |
---|
| 567 | while(addr == 'none'): |
---|
| 568 | rand = self._cfg['tarball-retry-initial-time'] + increment + \ |
---|
| 569 | random.uniform(0,self._cfg['tarball-retry-interval']) |
---|
| 570 | increment = increment + 1 |
---|
| 571 | self.log.debug("got no tarball. Retrying again in %s seconds." % rand) |
---|
| 572 | time.sleep(rand) |
---|
| 573 | addr = ringClient.getTarList(self.hostname) |
---|
| 574 | |
---|
| 575 | |
---|
| 576 | self.log.debug("got this address %s" % addr) |
---|
| 577 | |
---|
| 578 | tarName = self.__get_name(addr) |
---|
| 579 | self.log.debug("tar package name: %s" % tarName) |
---|
| 580 | |
---|
| 581 | fetchPath = os.path.join(os.getcwd(), tarName) |
---|
| 582 | self.log.debug("fetch path: %s" % fetchPath) |
---|
| 583 | |
---|
| 584 | self.__fetch(addr, fetchPath) |
---|
| 585 | self.log.debug("done fetching") |
---|
| 586 | |
---|
| 587 | tarUrl = "http://%s:%d/%s" % (self._http.server_address[0], |
---|
| 588 | self._http.server_address[1], |
---|
| 589 | tarName) |
---|
| 590 | try: |
---|
| 591 | ringClient.registerTarSource(self.hostname, tarUrl,addr) |
---|
| 592 | #ringClient.tarDone(addr) |
---|
| 593 | except KeyError, e: |
---|
| 594 | self.log.error("registerTarSource and tarDone failed: ", e) |
---|
| 595 | raise KeyError(e) |
---|
| 596 | |
---|
| 597 | check = untar(fetchPath, os.getcwd()) |
---|
| 598 | |
---|
| 599 | if (check == False): |
---|
| 600 | raise IOError, "Untarring failed." |
---|
| 601 | |
---|
| 602 | self.__pkg = self.__get_dir(tarName) |
---|
| 603 | self.__pkgDir = os.path.join(os.getcwd(), self.__pkg) |
---|
| 604 | except Exception, e: |
---|
| 605 | self.log.error("Failed download tar package: %s" % |
---|
| 606 | get_exception_error_string()) |
---|
| 607 | raise Exception(e) |
---|
| 608 | |
---|
| 609 | def __run_hadoop_commands(self, restart=True): |
---|
| 610 | id = 0 |
---|
| 611 | for desc in self._cfg['commanddesc']: |
---|
| 612 | self.log.debug(pprint.pformat(desc.dict)) |
---|
| 613 | mrSysDir = getMapredSystemDirectory(self._cfg['mapred-system-dir-root'], |
---|
| 614 | self._cfg['userid'], self._cfg['service-id']) |
---|
| 615 | self.log.debug('mrsysdir is %s' % mrSysDir) |
---|
| 616 | cmd = HadoopCommand(id, desc, self.__tempDir, self.__pkgDir, self.log, |
---|
| 617 | self._cfg['java-home'], mrSysDir, restart) |
---|
| 618 | |
---|
| 619 | self.__hadoopLogDirs.append(cmd.logdir) |
---|
| 620 | self.log.debug("hadoop log directory: %s" % self.__hadoopLogDirs) |
---|
| 621 | |
---|
| 622 | try: |
---|
| 623 | # if the tarball isn't there, we use the pkgs dir given. |
---|
| 624 | if self.__pkgDir == None: |
---|
| 625 | pkgdir = desc.getPkgDirs() |
---|
| 626 | else: |
---|
| 627 | pkgdir = self.__pkgDir |
---|
| 628 | |
---|
| 629 | self.log.debug('This is the packcage dir %s ' % (pkgdir)) |
---|
| 630 | if not cmd.run(pkgdir): |
---|
| 631 | addnInfo = "" |
---|
| 632 | if cmd.stdErrContents is not "": |
---|
| 633 | addnInfo = " Information from stderr of the command:\n%s" % (cmd.stdErrContents) |
---|
| 634 | raise Exception("Could not launch the %s using %s/bin/hadoop.%s" % (desc.getName(), pkgdir, addnInfo)) |
---|
| 635 | except Exception, e: |
---|
| 636 | self.log.debug("Exception running hadoop command: %s\n%s" % (get_exception_error_string(), get_exception_string())) |
---|
| 637 | self.__running[id] = cmd |
---|
| 638 | raise Exception(e) |
---|
| 639 | |
---|
| 640 | id += 1 |
---|
| 641 | if desc.isForeground(): |
---|
| 642 | continue |
---|
| 643 | self.__running[id-1] = cmd |
---|
| 644 | |
---|
| 645 | # ok.. now command is running. If this HodRing got jobtracker, |
---|
| 646 | # Check if it is ready for accepting jobs, and then only return |
---|
| 647 | self.__check_jobtracker(desc, id-1, pkgdir) |
---|
| 648 | |
---|
| 649 | def __check_jobtracker(self, desc, id, pkgdir): |
---|
| 650 | # Check jobtracker status. Return properly if it is ready to accept jobs. |
---|
| 651 | # Currently Checks for Jetty to come up, the last thing that can be checked |
---|
| 652 | # before JT completes initialisation. To be perfectly reliable, we need |
---|
| 653 | # hadoop support |
---|
| 654 | name = desc.getName() |
---|
| 655 | if name == 'jobtracker': |
---|
| 656 | # Yes I am the Jobtracker |
---|
| 657 | self.log.debug("Waiting for jobtracker to initialise") |
---|
| 658 | version = desc.getVersion() |
---|
| 659 | self.log.debug("jobtracker version : %s" % version) |
---|
| 660 | hadoopCmd = self.getRunningValues()[id] |
---|
| 661 | attrs = hadoopCmd.getFilledInKeyValues() |
---|
| 662 | attrs = parseEquals(attrs) |
---|
| 663 | jobTrackerAddr = attrs['mapred.job.tracker'] |
---|
| 664 | self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr) |
---|
| 665 | if version < 16: |
---|
| 666 | jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \ |
---|
| 667 | attrs['mapred.job.tracker.info.port'] |
---|
| 668 | else: |
---|
| 669 | jettyAddr = attrs['mapred.job.tracker.http.address'] |
---|
| 670 | self.log.debug("Jobtracker jetty : %s" % jettyAddr) |
---|
| 671 | |
---|
| 672 | # Check for Jetty to come up |
---|
| 673 | # For this do a http head, and then look at the status |
---|
| 674 | defaultTimeout = socket.getdefaulttimeout() |
---|
| 675 | # socket timeout isn`t exposed at httplib level. Setting explicitly. |
---|
| 676 | socket.setdefaulttimeout(1) |
---|
| 677 | sleepTime = 0.5 |
---|
| 678 | jettyStatus = False |
---|
| 679 | jettyStatusmsg = "" |
---|
| 680 | while sleepTime <= 32: |
---|
| 681 | # There is a possibility that the command might fail after a while. |
---|
| 682 | # This code will check if the command failed so that a better |
---|
| 683 | # error message can be returned to the user. |
---|
| 684 | if not hadoopCmd.getCommandStatus(): |
---|
| 685 | self.log.critical('Hadoop command found to have failed when ' \ |
---|
| 686 | 'checking for jobtracker status') |
---|
| 687 | hadoopCmd.handleFailedCommand() |
---|
| 688 | addnInfo = "" |
---|
| 689 | if hadoopCmd.stdErrContents is not "": |
---|
| 690 | addnInfo = " Information from stderr of the command:\n%s" \ |
---|
| 691 | % (hadoopCmd.stdErrContents) |
---|
| 692 | raise Exception("Could not launch the %s using %s/bin/hadoop.%s" \ |
---|
| 693 | % (desc.getName(), pkgdir, addnInfo)) |
---|
| 694 | |
---|
| 695 | try: |
---|
| 696 | jettyConn = httplib.HTTPConnection(jettyAddr) |
---|
| 697 | jettyConn.request("HEAD", "/jobtracker.jsp") |
---|
| 698 | # httplib inherently retries the following till socket timeout |
---|
| 699 | resp = jettyConn.getresponse() |
---|
| 700 | if resp.status != 200: |
---|
| 701 | # Some problem? |
---|
| 702 | jettyStatus = False |
---|
| 703 | jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\ |
---|
| 704 | " request. HTTP Status (Code, Msg): (%s, %s)" % \ |
---|
| 705 | ( resp.status, resp.reason ) |
---|
| 706 | break |
---|
| 707 | else: |
---|
| 708 | self.log.info("Jetty returned a 200 status (%s)" % resp.reason) |
---|
| 709 | self.log.info("JobTracker successfully initialised") |
---|
| 710 | return |
---|
| 711 | except socket.error: |
---|
| 712 | self.log.debug("Jetty gave a socket error. Sleeping for %s" \ |
---|
| 713 | % sleepTime) |
---|
| 714 | time.sleep(sleepTime) |
---|
| 715 | sleepTime = sleepTime * 2 |
---|
| 716 | except Exception, e: |
---|
| 717 | jettyStatus = False |
---|
| 718 | jettyStatusmsg = ("Process(possibly other than jetty) running on" + \ |
---|
| 719 | " port assigned to jetty is returning invalid http response") |
---|
| 720 | break |
---|
| 721 | socket.setdefaulttimeout(defaultTimeout) |
---|
| 722 | if not jettyStatus: |
---|
| 723 | self.log.critical("Jobtracker failed to initialise.") |
---|
| 724 | if jettyStatusmsg: |
---|
| 725 | self.log.critical( "Reason: %s" % jettyStatusmsg ) |
---|
| 726 | else: self.log.critical( "Reason: Jetty failed to give response") |
---|
| 727 | raise Exception("JobTracker failed to initialise") |
---|
| 728 | |
---|
| 729 | def stop(self): |
---|
| 730 | self.log.debug("Entered hodring stop.") |
---|
| 731 | if self._http: |
---|
| 732 | self.log.debug("stopping http server...") |
---|
| 733 | self._http.stop() |
---|
| 734 | |
---|
| 735 | self.log.debug("call hodsvcrgy stop...") |
---|
| 736 | hodBaseService.stop(self) |
---|
| 737 | |
---|
| 738 | def _xr_method_clusterStart(self, initialize=True): |
---|
| 739 | return self.clusterStart(initialize) |
---|
| 740 | |
---|
| 741 | def _xr_method_clusterStop(self): |
---|
| 742 | return self.clusterStop() |
---|
| 743 | |
---|
| 744 | def start(self): |
---|
| 745 | """Run and maintain hodring commands""" |
---|
| 746 | |
---|
| 747 | try: |
---|
| 748 | if self._cfg.has_key('download-addr'): |
---|
| 749 | self._http = threadedHTTPServer('', self._cfg['http-port-range']) |
---|
| 750 | self.log.info("Starting http server...") |
---|
| 751 | self._http.serve_forever() |
---|
| 752 | self.log.debug("http://%s:%d" % (self._http.server_address[0], |
---|
| 753 | self._http.server_address[1])) |
---|
| 754 | |
---|
| 755 | hodBaseService.start(self) |
---|
| 756 | |
---|
| 757 | ringXRAddress = None |
---|
| 758 | if self._cfg.has_key('ringmaster-xrs-addr'): |
---|
| 759 | ringXRAddress = "http://%s:%s/" % (self._cfg['ringmaster-xrs-addr'][0], |
---|
| 760 | self._cfg['ringmaster-xrs-addr'][1]) |
---|
| 761 | self.log.debug("Ringmaster at %s" % ringXRAddress) |
---|
| 762 | |
---|
| 763 | self.log.debug("Creating service registry XML-RPC client.") |
---|
| 764 | serviceClient = hodXRClient(to_http_url( |
---|
| 765 | self._cfg['svcrgy-addr'])) |
---|
| 766 | if ringXRAddress == None: |
---|
| 767 | self.log.info("Did not get ringmaster XML-RPC address. Fetching information from service registry.") |
---|
| 768 | ringList = serviceClient.getServiceInfo(self._cfg['userid'], |
---|
| 769 | self._cfg['service-id'], 'ringmaster', 'hod') |
---|
| 770 | |
---|
| 771 | self.log.debug(pprint.pformat(ringList)) |
---|
| 772 | |
---|
| 773 | if len(ringList): |
---|
| 774 | if isinstance(ringList, list): |
---|
| 775 | ringXRAddress = ringList[0]['xrs'] |
---|
| 776 | |
---|
| 777 | count = 0 |
---|
| 778 | while (ringXRAddress == None and count < 3000): |
---|
| 779 | ringList = serviceClient.getServiceInfo(self._cfg['userid'], |
---|
| 780 | self._cfg['service-id'], 'ringmaster', 'hod') |
---|
| 781 | |
---|
| 782 | if len(ringList): |
---|
| 783 | if isinstance(ringList, list): |
---|
| 784 | ringXRAddress = ringList[0]['xrs'] |
---|
| 785 | |
---|
| 786 | count = count + 1 |
---|
| 787 | time.sleep(.2) |
---|
| 788 | |
---|
| 789 | if ringXRAddress == None: |
---|
| 790 | raise Exception("Could not get ringmaster XML-RPC server address.") |
---|
| 791 | |
---|
| 792 | self.log.debug("Creating ringmaster XML-RPC client.") |
---|
| 793 | ringClient = hodXRClient(ringXRAddress) |
---|
| 794 | |
---|
| 795 | id = self.hostname + "_" + str(os.getpid()) |
---|
| 796 | |
---|
| 797 | if 'download-addr' in self._cfg: |
---|
| 798 | self.__download_package(ringClient) |
---|
| 799 | else: |
---|
| 800 | self.log.debug("Did not find a download address.") |
---|
| 801 | |
---|
| 802 | cmdlist = [] |
---|
| 803 | firstTime = True |
---|
| 804 | increment = 0 |
---|
| 805 | hadoopStartupTime = 2 |
---|
| 806 | |
---|
| 807 | cmdlist = ringClient.getCommand(id) |
---|
| 808 | |
---|
| 809 | while (cmdlist == []): |
---|
| 810 | if firstTime: |
---|
| 811 | sleepTime = increment + self._cfg['cmd-retry-initial-time'] + hadoopStartupTime\ |
---|
| 812 | + random.uniform(0,self._cfg['cmd-retry-interval']) |
---|
| 813 | firstTime = False |
---|
| 814 | else: |
---|
| 815 | sleepTime = increment + self._cfg['cmd-retry-initial-time'] + \ |
---|
| 816 | + random.uniform(0,self._cfg['cmd-retry-interval']) |
---|
| 817 | self.log.debug("Did not get command list. Waiting for %s seconds." % (sleepTime)) |
---|
| 818 | time.sleep(sleepTime) |
---|
| 819 | increment = increment + 1 |
---|
| 820 | cmdlist = ringClient.getCommand(id) |
---|
| 821 | |
---|
| 822 | self.log.debug(pformat(cmdlist)) |
---|
| 823 | cmdDescs = [] |
---|
| 824 | for cmds in cmdlist: |
---|
| 825 | cmdDescs.append(CommandDesc(cmds['dict'], self.log)) |
---|
| 826 | |
---|
| 827 | self._cfg['commanddesc'] = cmdDescs |
---|
| 828 | |
---|
| 829 | self.log.info("Running hadoop commands...") |
---|
| 830 | |
---|
| 831 | self.__run_hadoop_commands(False) |
---|
| 832 | |
---|
| 833 | masterParams = [] |
---|
| 834 | for k, cmd in self.__running.iteritems(): |
---|
| 835 | masterParams.extend(cmd.filledInKeyVals) |
---|
| 836 | |
---|
| 837 | self.log.debug("printing getparams") |
---|
| 838 | self.log.debug(pformat(id)) |
---|
| 839 | self.log.debug(pformat(masterParams)) |
---|
| 840 | # when this is on a required host, the ringMaster already has our masterParams |
---|
| 841 | if(len(masterParams) > 0): |
---|
| 842 | ringClient.addMasterParams(id, masterParams) |
---|
| 843 | except Exception, e: |
---|
| 844 | raise Exception(e) |
---|
| 845 | |
---|
| 846 | def clusterStart(self, initialize=True): |
---|
| 847 | """Start a stopped mapreduce/dfs cluster""" |
---|
| 848 | if initialize: |
---|
| 849 | self.log.debug('clusterStart Method Invoked - Initialize') |
---|
| 850 | else: |
---|
| 851 | self.log.debug('clusterStart Method Invoked - No Initialize') |
---|
| 852 | try: |
---|
| 853 | self.log.debug("Creating service registry XML-RPC client.") |
---|
| 854 | serviceClient = hodXRClient(to_http_url(self._cfg['svcrgy-addr']), |
---|
| 855 | None, None, 0, 0, 0) |
---|
| 856 | |
---|
| 857 | self.log.info("Fetching ringmaster information from service registry.") |
---|
| 858 | count = 0 |
---|
| 859 | ringXRAddress = None |
---|
| 860 | while (ringXRAddress == None and count < 3000): |
---|
| 861 | ringList = serviceClient.getServiceInfo(self._cfg['userid'], |
---|
| 862 | self._cfg['service-id'], 'ringmaster', 'hod') |
---|
| 863 | if len(ringList): |
---|
| 864 | if isinstance(ringList, list): |
---|
| 865 | ringXRAddress = ringList[0]['xrs'] |
---|
| 866 | count = count + 1 |
---|
| 867 | |
---|
| 868 | if ringXRAddress == None: |
---|
| 869 | raise Exception("Could not get ringmaster XML-RPC server address.") |
---|
| 870 | |
---|
| 871 | self.log.debug("Creating ringmaster XML-RPC client.") |
---|
| 872 | ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0) |
---|
| 873 | |
---|
| 874 | id = self.hostname + "_" + str(os.getpid()) |
---|
| 875 | |
---|
| 876 | cmdlist = [] |
---|
| 877 | if initialize: |
---|
| 878 | if 'download-addr' in self._cfg: |
---|
| 879 | self.__download_package(ringClient) |
---|
| 880 | else: |
---|
| 881 | self.log.debug("Did not find a download address.") |
---|
| 882 | while (cmdlist == []): |
---|
| 883 | cmdlist = ringClient.getCommand(id) |
---|
| 884 | else: |
---|
| 885 | while (cmdlist == []): |
---|
| 886 | cmdlist = ringClient.getAdminCommand(id) |
---|
| 887 | |
---|
| 888 | self.log.debug(pformat(cmdlist)) |
---|
| 889 | cmdDescs = [] |
---|
| 890 | for cmds in cmdlist: |
---|
| 891 | cmdDescs.append(CommandDesc(cmds['dict'], self.log)) |
---|
| 892 | |
---|
| 893 | self._cfg['commanddesc'] = cmdDescs |
---|
| 894 | |
---|
| 895 | if initialize: |
---|
| 896 | self.log.info("Running hadoop commands again... - Initialize") |
---|
| 897 | self.__run_hadoop_commands() |
---|
| 898 | masterParams = [] |
---|
| 899 | for k, cmd in self.__running.iteritems(): |
---|
| 900 | self.log.debug(cmd) |
---|
| 901 | masterParams.extend(cmd.filledInKeyVals) |
---|
| 902 | |
---|
| 903 | self.log.debug("printing getparams") |
---|
| 904 | self.log.debug(pformat(id)) |
---|
| 905 | self.log.debug(pformat(masterParams)) |
---|
| 906 | # when this is on a required host, the ringMaster already has our masterParams |
---|
| 907 | if(len(masterParams) > 0): |
---|
| 908 | ringClient.addMasterParams(id, masterParams) |
---|
| 909 | else: |
---|
| 910 | self.log.info("Running hadoop commands again... - No Initialize") |
---|
| 911 | self.__run_hadoop_commands() |
---|
| 912 | |
---|
| 913 | except: |
---|
| 914 | self.log.error(get_exception_string()) |
---|
| 915 | |
---|
| 916 | return True |
---|
| 917 | |
---|
| 918 | def clusterStop(self): |
---|
| 919 | """Stop a running mapreduce/dfs cluster without stopping the hodring""" |
---|
| 920 | self.log.debug('clusterStop Method Invoked') |
---|
| 921 | try: |
---|
| 922 | for cmd in self.__running.values(): |
---|
| 923 | cmd.kill() |
---|
| 924 | self.__running = {} |
---|
| 925 | except: |
---|
| 926 | self.log.error(get_exception_string()) |
---|
| 927 | |
---|
| 928 | return True |
---|