[120] | 1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
| 2 | #or more contributor license agreements. See the NOTICE file |
---|
| 3 | #distributed with this work for additional information |
---|
| 4 | #regarding copyright ownership. The ASF licenses this file |
---|
| 5 | #to you under the Apache License, Version 2.0 (the |
---|
| 6 | #"License"); you may not use this file except in compliance |
---|
| 7 | #with the License. You may obtain a copy of the License at |
---|
| 8 | |
---|
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 10 | |
---|
| 11 | #Unless required by applicable law or agreed to in writing, software |
---|
| 12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 14 | #See the License for the specific language governing permissions and |
---|
| 15 | #limitations under the License. |
---|
| 16 | #!/usr/bin/env python |
---|
| 17 | """manages services and nodepool""" |
---|
| 18 | # -*- python -*- |
---|
| 19 | |
---|
| 20 | import os, sys, random, time, sets, shutil, threading |
---|
| 21 | import urllib, urlparse, re, getpass, pprint, signal, shutil |
---|
| 22 | |
---|
| 23 | from pprint import pformat |
---|
| 24 | from HTMLParser import HTMLParser |
---|
| 25 | |
---|
| 26 | binfile = sys.path[0] |
---|
| 27 | libdir = os.path.dirname(binfile) |
---|
| 28 | sys.path.append(libdir) |
---|
| 29 | |
---|
| 30 | import hodlib.Common.logger |
---|
| 31 | from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus |
---|
| 32 | |
---|
| 33 | from hodlib.Common.threads import func |
---|
| 34 | |
---|
| 35 | from hodlib.Hod.nodePool import * |
---|
| 36 | from hodlib.Common.util import * |
---|
| 37 | from hodlib.Common.nodepoolutil import NodePoolUtil |
---|
| 38 | from hodlib.Common.socketServers import hodXMLRPCServer |
---|
| 39 | from hodlib.Common.socketServers import threadedHTTPServer |
---|
| 40 | from hodlib.NodePools import * |
---|
| 41 | from hodlib.NodePools.torque import * |
---|
| 42 | from hodlib.GridServices import * |
---|
| 43 | from hodlib.Common.descGenerator import * |
---|
| 44 | from hodlib.Common.xmlrpc import hodXRClient |
---|
| 45 | from hodlib.Common.miniHTMLParser import miniHTMLParser |
---|
| 46 | from hodlib.Common.threads import simpleCommand |
---|
| 47 | |
---|
| 48 | class ringMasterServer: |
---|
| 49 | """The RPC server that exposes all the master config |
---|
| 50 | changes. Also, one of these RPC servers runs as a proxy |
---|
| 51 | and all the hodring instances register with this proxy""" |
---|
| 52 | instance = None |
---|
| 53 | xmlrpc = None |
---|
| 54 | |
---|
| 55 | def __init__(self, cfg, log, logMasterSources, retry=5): |
---|
| 56 | try: |
---|
| 57 | from hodlib.Common.socketServers import twistedXMLRPCServer |
---|
| 58 | ringMasterServer.xmlrpc = twistedXMLRPCServer("", |
---|
| 59 | cfg['ringmaster']['xrs-port-range']) |
---|
| 60 | except ImportError: |
---|
| 61 | log.info("Twisted interface not found. Using hodXMLRPCServer.") |
---|
| 62 | ringMasterServer.xmlrpc = hodXMLRPCServer("", |
---|
| 63 | cfg['ringmaster']['xrs-port-range']) |
---|
| 64 | |
---|
| 65 | ringMasterServer.xmlrpc.register_instance(logMasterSources) |
---|
| 66 | self.logMasterSources = logMasterSources |
---|
| 67 | ringMasterServer.xmlrpc.serve_forever() |
---|
| 68 | |
---|
| 69 | while not ringMasterServer.xmlrpc.is_alive(): |
---|
| 70 | time.sleep(.5) |
---|
| 71 | |
---|
| 72 | log.debug('Ringmaster RPC Server at %d' % |
---|
| 73 | ringMasterServer.xmlrpc.server_address[1]) |
---|
| 74 | |
---|
| 75 | def startService(ss, cfg, np, log, rm): |
---|
| 76 | logMasterSources = _LogMasterSources(ss, cfg, np, log, rm) |
---|
| 77 | ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources) |
---|
| 78 | |
---|
| 79 | def stopService(): |
---|
| 80 | ringMasterServer.xmlrpc.stop() |
---|
| 81 | |
---|
| 82 | def getPort(): |
---|
| 83 | return ringMasterServer.instance.port |
---|
| 84 | |
---|
| 85 | def getAddress(): |
---|
| 86 | return 'http://%s:%d/' % (socket.gethostname(), |
---|
| 87 | ringMasterServer.xmlrpc.server_address[1]) |
---|
| 88 | |
---|
| 89 | startService = staticmethod(startService) |
---|
| 90 | stopService = staticmethod(stopService) |
---|
| 91 | getPort = staticmethod(getPort) |
---|
| 92 | getAddress = staticmethod(getAddress) |
---|
| 93 | |
---|
| 94 | class _LogMasterSources: |
---|
| 95 | """All the methods that are run by the RPC server are |
---|
| 96 | added into this class """ |
---|
| 97 | |
---|
| 98 | def __init__(self, serviceDict, cfg, np, log, rm): |
---|
| 99 | self.serviceDict = serviceDict |
---|
| 100 | self.tarSource = [] |
---|
| 101 | self.tarSourceLock = threading.Lock() |
---|
| 102 | self.dict = {} |
---|
| 103 | self.count = {} |
---|
| 104 | self.logsourceList = [] |
---|
| 105 | self.logsourceListLock = threading.Lock() |
---|
| 106 | self.masterParam = [] |
---|
| 107 | self.masterParamLock = threading.Lock() |
---|
| 108 | self.verify = 'none' |
---|
| 109 | self.cmdLock = threading.Lock() |
---|
| 110 | self.cfg = cfg |
---|
| 111 | self.log = log |
---|
| 112 | self.np = np |
---|
| 113 | self.rm = rm |
---|
| 114 | self.hdfsHost = None |
---|
| 115 | self.mapredHost = None |
---|
| 116 | self.maxconnect = self.cfg['ringmaster']['max-connect'] |
---|
| 117 | self.log.debug("Using max-connect value %s"%self.maxconnect) |
---|
| 118 | |
---|
| 119 | |
---|
| 120 | def registerTarSource(self, hostname, url, addr=None): |
---|
| 121 | self.log.debug("registering: " + url) |
---|
| 122 | lock = self.tarSourceLock |
---|
| 123 | lock.acquire() |
---|
| 124 | self.dict[url] = url |
---|
| 125 | self.count[url] = 0 |
---|
| 126 | # addr is None when ringMaster himself invokes this method |
---|
| 127 | if addr: |
---|
| 128 | c = self.count[addr] |
---|
| 129 | self.count[addr] = c - 1 |
---|
| 130 | lock.release() |
---|
| 131 | if addr: |
---|
| 132 | str = "%s is done" % (addr) |
---|
| 133 | self.log.debug(str) |
---|
| 134 | return url |
---|
| 135 | |
---|
| 136 | def getTarList(self,hodring): # this looks useful |
---|
| 137 | lock = self.tarSourceLock |
---|
| 138 | lock.acquire() |
---|
| 139 | leastkey = None |
---|
| 140 | leastval = -1 |
---|
| 141 | for k, v in self.count.iteritems(): |
---|
| 142 | if (leastval == -1): |
---|
| 143 | leastval = v |
---|
| 144 | pass |
---|
| 145 | if (v <= leastval and v < self.maxconnect): |
---|
| 146 | leastkey = k |
---|
| 147 | leastval = v |
---|
| 148 | if (leastkey == None): |
---|
| 149 | url = 'none' |
---|
| 150 | else: |
---|
| 151 | url = self.dict[leastkey] |
---|
| 152 | self.count[leastkey] = leastval + 1 |
---|
| 153 | self.log.debug("%s %d" % (leastkey, self.count[leastkey])) |
---|
| 154 | lock.release() |
---|
| 155 | self.log.debug('sending url ' + url+" to "+hodring) # this looks useful |
---|
| 156 | return url |
---|
| 157 | |
---|
| 158 | def tarDone(self, uri): |
---|
| 159 | str = "%s is done" % (uri) |
---|
| 160 | self.log.debug(str) |
---|
| 161 | lock = self.tarSourceLock |
---|
| 162 | lock.acquire() |
---|
| 163 | c = self.count[uri] |
---|
| 164 | self.count[uri] = c - 1 |
---|
| 165 | lock.release() |
---|
| 166 | return uri |
---|
| 167 | |
---|
| 168 | def status(self): |
---|
| 169 | return True |
---|
| 170 | |
---|
| 171 | # FIXME: this code is broken, it relies on a central service registry |
---|
| 172 | # |
---|
| 173 | # def clusterStart(self, changedClusterParams=[]): |
---|
| 174 | # self.log.debug("clusterStart method invoked.") |
---|
| 175 | # self.dict = {} |
---|
| 176 | # self.count = {} |
---|
| 177 | # try: |
---|
| 178 | # if (len(changedClusterParams) > 0): |
---|
| 179 | # self.log.debug("Updating config.") |
---|
| 180 | # for param in changedClusterParams: |
---|
| 181 | # (key, sep1, val) = param.partition('=') |
---|
| 182 | # (i1, sep2, i2) = key.partition('.') |
---|
| 183 | # try: |
---|
| 184 | # prev = self.cfg[i1][i2] |
---|
| 185 | # self.rm.cfg[i1][i2] = val |
---|
| 186 | # self.cfg[i1][i2] = val |
---|
| 187 | # self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val)) |
---|
| 188 | # except KeyError, e: |
---|
| 189 | # self.log.info("Skipping %s as no such config parameter found in ringmaster" % param) |
---|
| 190 | # self.log.debug("Regenerating Service Description.") |
---|
| 191 | # dGen = DescGenerator(self.rm.cfg) |
---|
| 192 | # self.rm.cfg['servicedesc'] = dGen.createServiceDescDict() |
---|
| 193 | # self.cfg['servicedesc'] = self.rm.cfg['servicedesc'] |
---|
| 194 | # |
---|
| 195 | # self.rm.tar = None |
---|
| 196 | # if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
---|
| 197 | # self.rm.download = True |
---|
| 198 | # self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball'] |
---|
| 199 | # self.log.debug("self.rm.tar=%s" % self.rm.tar) |
---|
| 200 | # |
---|
| 201 | # self.rm.cd_to_tempdir() |
---|
| 202 | # |
---|
| 203 | # self.rm.tarAddress = None |
---|
| 204 | # hostname = socket.gethostname() |
---|
| 205 | # if (self.rm.download): |
---|
| 206 | # self.rm.basename = os.path.basename(self.rm.tar) |
---|
| 207 | # dest = os.path.join(os.getcwd(), self.rm.basename) |
---|
| 208 | # src = self.rm.tar |
---|
| 209 | # self.log.debug("cp %s -> %s" % (src, dest)) |
---|
| 210 | # shutil.copy(src, dest) |
---|
| 211 | # self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename) |
---|
| 212 | # self.registerTarSource(hostname, self.rm.tarAddress) |
---|
| 213 | # self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress) |
---|
| 214 | # else: |
---|
| 215 | # self.log.debug("Download not set.") |
---|
| 216 | # |
---|
| 217 | # if (self.rm.tar != None): |
---|
| 218 | # self.cfg['hodring']['download-addr'] = self.rm.tarAddress |
---|
| 219 | # self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress |
---|
| 220 | # |
---|
| 221 | # sdl = self.rm.cfg['servicedesc'] |
---|
| 222 | # workDirs = self.rm.getWorkDirs(self.rm.cfg, True) |
---|
| 223 | # hdfsDesc = sdl['hdfs'] |
---|
| 224 | # hdfs = None |
---|
| 225 | # if hdfsDesc.isExternal(): |
---|
| 226 | # hdfs = HdfsExternal(hdfsDesc, workDirs) |
---|
| 227 | # else: |
---|
| 228 | # hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True) |
---|
| 229 | # |
---|
| 230 | # self.rm.serviceDict[hdfs.getName()] = hdfs |
---|
| 231 | # mrDesc = sdl['mapred'] |
---|
| 232 | # mr = None |
---|
| 233 | # if mrDesc.isExternal(): |
---|
| 234 | # mr = MapReduceExternal(mrDesc, workDirs) |
---|
| 235 | # else: |
---|
| 236 | # mr = MapReduce(mrDesc, workDirs, 1) |
---|
| 237 | # self.rm.serviceDict[mr.getName()] = mr |
---|
| 238 | # |
---|
| 239 | # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'], |
---|
| 240 | # self.np.getServiceId(), 'hodring', 'hod') |
---|
| 241 | # |
---|
| 242 | # slaveList = ringList |
---|
| 243 | # hdfsringXRAddress = None |
---|
| 244 | # # Start HDFS Master - Step 1 |
---|
| 245 | # if not hdfsDesc.isExternal(): |
---|
| 246 | # masterFound = False |
---|
| 247 | # for ring in ringList: |
---|
| 248 | # ringXRAddress = ring['xrs'] |
---|
| 249 | # if ringXRAddress == None: |
---|
| 250 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
| 251 | # if (ringXRAddress.find(self.hdfsHost) != -1): |
---|
| 252 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
| 253 | # hdfsringXRAddress = ringXRAddress |
---|
| 254 | # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)") |
---|
| 255 | # ringClient.clusterStart() |
---|
| 256 | # masterFound = True |
---|
| 257 | # slaveList.remove(ring) |
---|
| 258 | # break |
---|
| 259 | # if not masterFound: |
---|
| 260 | # raise Exception("HDFS Master host not found") |
---|
| 261 | # while hdfs.getInfoAddrs() == None: |
---|
| 262 | # self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port") |
---|
| 263 | # time.sleep(1) |
---|
| 264 | # |
---|
| 265 | # # Start MAPRED Master - Step 2 |
---|
| 266 | # if not mrDesc.isExternal(): |
---|
| 267 | # masterFound = False |
---|
| 268 | # for ring in ringList: |
---|
| 269 | # ringXRAddress = ring['xrs'] |
---|
| 270 | # if ringXRAddress == None: |
---|
| 271 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
| 272 | # if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1): |
---|
| 273 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
| 274 | # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)") |
---|
| 275 | # ringClient.clusterStart() |
---|
| 276 | # masterFound = True |
---|
| 277 | # slaveList.remove(ring) |
---|
| 278 | # break |
---|
| 279 | # if not masterFound: |
---|
| 280 | # raise Excpetion("MAPRED Master host not found") |
---|
| 281 | # while mr.getInfoAddrs() == None: |
---|
| 282 | # self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \ |
---|
| 283 | # mapred.job.tracker.info.port") |
---|
| 284 | # time.sleep(1) |
---|
| 285 | # |
---|
| 286 | # # Start Slaves - Step 3 |
---|
| 287 | # for ring in slaveList: |
---|
| 288 | # ringXRAddress = ring['xrs'] |
---|
| 289 | # if ringXRAddress == None: |
---|
| 290 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
| 291 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
| 292 | # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)") |
---|
| 293 | # ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart()) |
---|
| 294 | # ring['thread'] = ringThread |
---|
| 295 | # ringThread.start() |
---|
| 296 | # |
---|
| 297 | # for ring in slaveList: |
---|
| 298 | # ringThread = ring['thread'] |
---|
| 299 | # if ringThread == None: |
---|
| 300 | # raise Exception("Could not get hodring thread (Slave).") |
---|
| 301 | # ringThread.join() |
---|
| 302 | # self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)") |
---|
| 303 | # |
---|
| 304 | # # Run Admin Commands on HDFS Master - Step 4 |
---|
| 305 | # if not hdfsDesc.isExternal(): |
---|
| 306 | # if hdfsringXRAddress == None: |
---|
| 307 | # raise Exception("HDFS Master host not found (to Run Admin Commands)") |
---|
| 308 | # ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
| 309 | # self.log.debug("Invoking clusterStart(False) - Admin on " |
---|
| 310 | # + hdfsringXRAddress + " (HDFS Master)") |
---|
| 311 | # ringClient.clusterStart(False) |
---|
| 312 | # |
---|
| 313 | # except: |
---|
| 314 | # self.log.debug(get_exception_string()) |
---|
| 315 | # return False |
---|
| 316 | # |
---|
| 317 | # self.log.debug("Successfully started cluster.") |
---|
| 318 | # return True |
---|
| 319 | # |
---|
| 320 | # def clusterStop(self): |
---|
| 321 | # self.log.debug("clusterStop method invoked.") |
---|
| 322 | # try: |
---|
| 323 | # hdfsAddr = self.getServiceAddr('hdfs') |
---|
| 324 | # if hdfsAddr.find(':') != -1: |
---|
| 325 | # h, p = hdfsAddr.split(':', 1) |
---|
| 326 | # self.hdfsHost = h |
---|
| 327 | # self.log.debug("hdfsHost: " + self.hdfsHost) |
---|
| 328 | # mapredAddr = self.getServiceAddr('mapred') |
---|
| 329 | # if mapredAddr.find(':') != -1: |
---|
| 330 | # h, p = mapredAddr.split(':', 1) |
---|
| 331 | # self.mapredHost = h |
---|
| 332 | # self.log.debug("mapredHost: " + self.mapredHost) |
---|
| 333 | # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'], |
---|
| 334 | # self.np.getServiceId(), |
---|
| 335 | # 'hodring', 'hod') |
---|
| 336 | # for ring in ringList: |
---|
| 337 | # ringXRAddress = ring['xrs'] |
---|
| 338 | # if ringXRAddress == None: |
---|
| 339 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
| 340 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False) |
---|
| 341 | # self.log.debug("Invoking clusterStop on " + ringXRAddress) |
---|
| 342 | # ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop()) |
---|
| 343 | # ring['thread'] = ringThread |
---|
| 344 | # ringThread.start() |
---|
| 345 | # |
---|
| 346 | # for ring in ringList: |
---|
| 347 | # ringThread = ring['thread'] |
---|
| 348 | # if ringThread == None: |
---|
| 349 | # raise Exception("Could not get hodring thread.") |
---|
| 350 | # ringThread.join() |
---|
| 351 | # self.log.debug("Completed clusterStop on " + ring['xrs']) |
---|
| 352 | # |
---|
| 353 | # except: |
---|
| 354 | # self.log.debug(get_exception_string()) |
---|
| 355 | # return False |
---|
| 356 | # |
---|
| 357 | # self.log.debug("Successfully stopped cluster.") |
---|
| 358 | # |
---|
| 359 | # return True |
---|
| 360 | |
---|
| 361 | def getCommand(self, addr): |
---|
| 362 | """This method is called by the |
---|
| 363 | hodrings to get commands from |
---|
| 364 | the ringmaster""" |
---|
| 365 | lock = self.cmdLock |
---|
| 366 | cmdList = [] |
---|
| 367 | lock.acquire() |
---|
| 368 | try: |
---|
| 369 | try: |
---|
| 370 | for v in self.serviceDict.itervalues(): |
---|
| 371 | if (not v.isExternal()): |
---|
| 372 | if v.isLaunchable(self.serviceDict): |
---|
| 373 | # If a master is still not launched, or the number of |
---|
| 374 | # retries for launching master is not reached, |
---|
| 375 | # launch master |
---|
| 376 | if not v.isMasterLaunched() and \ |
---|
| 377 | (v.getMasterFailureCount() <= \ |
---|
| 378 | self.cfg['ringmaster']['max-master-failures']): |
---|
| 379 | cmdList = v.getMasterCommands(self.serviceDict) |
---|
| 380 | v.setlaunchedMaster() |
---|
| 381 | v.setMasterAddress(addr) |
---|
| 382 | break |
---|
| 383 | if cmdList == []: |
---|
| 384 | for s in self.serviceDict.itervalues(): |
---|
| 385 | if (not v.isExternal()): |
---|
| 386 | if s.isMasterInitialized(): |
---|
| 387 | cl = s.getWorkerCommands(self.serviceDict) |
---|
| 388 | cmdList.extend(cl) |
---|
| 389 | else: |
---|
| 390 | cmdList = [] |
---|
| 391 | break |
---|
| 392 | except: |
---|
| 393 | self.log.debug(get_exception_string()) |
---|
| 394 | finally: |
---|
| 395 | lock.release() |
---|
| 396 | pass |
---|
| 397 | |
---|
| 398 | cmd = addr + pformat(cmdList) |
---|
| 399 | self.log.debug("getCommand returning " + cmd) |
---|
| 400 | return cmdList |
---|
| 401 | |
---|
| 402 | def getAdminCommand(self, addr): |
---|
| 403 | """This method is called by the |
---|
| 404 | hodrings to get admin commands from |
---|
| 405 | the ringmaster""" |
---|
| 406 | lock = self.cmdLock |
---|
| 407 | cmdList = [] |
---|
| 408 | lock.acquire() |
---|
| 409 | try: |
---|
| 410 | try: |
---|
| 411 | for v in self.serviceDict.itervalues(): |
---|
| 412 | cmdList = v.getAdminCommands(self.serviceDict) |
---|
| 413 | if cmdList != []: |
---|
| 414 | break |
---|
| 415 | except Exception, e: |
---|
| 416 | self.log.debug(get_exception_string()) |
---|
| 417 | finally: |
---|
| 418 | lock.release() |
---|
| 419 | pass |
---|
| 420 | cmd = addr + pformat(cmdList) |
---|
| 421 | self.log.debug("getAdminCommand returning " + cmd) |
---|
| 422 | return cmdList |
---|
| 423 | |
---|
| 424 | def addMasterParams(self, addr, vals): |
---|
| 425 | """This method is called by |
---|
| 426 | hodring to update any parameters |
---|
| 427 | its changed for the commands it was |
---|
| 428 | running""" |
---|
| 429 | self.log.debug('Comment: adding master params from %s' % addr) |
---|
| 430 | self.log.debug(pformat(vals)) |
---|
| 431 | lock = self.masterParamLock |
---|
| 432 | lock.acquire() |
---|
| 433 | try: |
---|
| 434 | for v in self.serviceDict.itervalues(): |
---|
| 435 | if v.isMasterLaunched(): |
---|
| 436 | if (v.getMasterAddress() == addr): |
---|
| 437 | v.setMasterParams(vals) |
---|
| 438 | v.setMasterInitialized() |
---|
| 439 | except: |
---|
| 440 | self.log.debug(get_exception_string()) |
---|
| 441 | pass |
---|
| 442 | lock.release() |
---|
| 443 | |
---|
| 444 | return addr |
---|
| 445 | |
---|
| 446 | def setHodRingErrors(self, addr, errors): |
---|
| 447 | """This method is called by the hodrings to update errors |
---|
| 448 | it encountered while starting up""" |
---|
| 449 | self.log.critical("Hodring at %s failed with following errors:\n%s" \ |
---|
| 450 | % (addr, errors)) |
---|
| 451 | lock = self.masterParamLock |
---|
| 452 | lock.acquire() |
---|
| 453 | try: |
---|
| 454 | for v in self.serviceDict.itervalues(): |
---|
| 455 | if v.isMasterLaunched(): |
---|
| 456 | if (v.getMasterAddress() == addr): |
---|
| 457 | # strip the PID part. |
---|
| 458 | idx = addr.rfind('_') |
---|
| 459 | if idx is not -1: |
---|
| 460 | addr = addr[:idx] |
---|
| 461 | v.setMasterFailed("Hodring at %s failed with following" \ |
---|
| 462 | " errors:\n%s" % (addr, errors)) |
---|
| 463 | except: |
---|
| 464 | self.log.debug(get_exception_string()) |
---|
| 465 | pass |
---|
| 466 | lock.release() |
---|
| 467 | return True |
---|
| 468 | |
---|
| 469 | def getKeys(self): |
---|
| 470 | lock= self.masterParamLock |
---|
| 471 | lock.acquire() |
---|
| 472 | keys = self.serviceDict.keys() |
---|
| 473 | lock.release() |
---|
| 474 | |
---|
| 475 | return keys |
---|
| 476 | |
---|
| 477 | def getServiceAddr(self, name): |
---|
| 478 | addr = 'not found' |
---|
| 479 | self.log.debug("getServiceAddr name: %s" % name) |
---|
| 480 | lock= self.masterParamLock |
---|
| 481 | lock.acquire() |
---|
| 482 | try: |
---|
| 483 | service = self.serviceDict[name] |
---|
| 484 | except KeyError: |
---|
| 485 | pass |
---|
| 486 | else: |
---|
| 487 | self.log.debug("getServiceAddr service: %s" % service) |
---|
| 488 | # Check if we should give up ! If the limit on max failures is hit, |
---|
| 489 | # give up. |
---|
| 490 | err = service.getMasterFailed() |
---|
| 491 | if (err is not None) and \ |
---|
| 492 | (service.getMasterFailureCount() > \ |
---|
| 493 | self.cfg['ringmaster']['max-master-failures']): |
---|
| 494 | self.log.critical("Detected errors (%s) beyond allowed number"\ |
---|
| 495 | " of failures (%s). Flagging error to client" \ |
---|
| 496 | % (service.getMasterFailureCount(), \ |
---|
| 497 | self.cfg['ringmaster']['max-master-failures'])) |
---|
| 498 | addr = "Error: " + err |
---|
| 499 | elif (service.isMasterInitialized()): |
---|
| 500 | addr = service.getMasterAddrs()[0] |
---|
| 501 | else: |
---|
| 502 | addr = 'not found' |
---|
| 503 | lock.release() |
---|
| 504 | self.log.debug("getServiceAddr addr %s: %s" % (name, addr)) |
---|
| 505 | |
---|
| 506 | return addr |
---|
| 507 | |
---|
| 508 | def getURLs(self, name): |
---|
| 509 | addr = 'none' |
---|
| 510 | lock = self.masterParamLock |
---|
| 511 | lock.acquire() |
---|
| 512 | |
---|
| 513 | try: |
---|
| 514 | service = self.serviceDict[name] |
---|
| 515 | except KeyError: |
---|
| 516 | pass |
---|
| 517 | else: |
---|
| 518 | if (service.isMasterInitialized()): |
---|
| 519 | addr = service.getInfoAddrs()[0] |
---|
| 520 | |
---|
| 521 | lock.release() |
---|
| 522 | |
---|
| 523 | return addr |
---|
| 524 | |
---|
| 525 | def stopRM(self): |
---|
| 526 | """An XMLRPC call which will spawn a thread to stop the Ringmaster program.""" |
---|
| 527 | # We spawn a thread here because we want the XMLRPC call to return. Calling |
---|
| 528 | # stop directly from here will also stop the XMLRPC server. |
---|
| 529 | try: |
---|
| 530 | self.log.debug("inside xml-rpc call to stop ringmaster") |
---|
| 531 | rmStopperThread = func('RMStopper', self.rm.stop) |
---|
| 532 | rmStopperThread.start() |
---|
| 533 | self.log.debug("returning from xml-rpc call to stop ringmaster") |
---|
| 534 | return True |
---|
| 535 | except: |
---|
| 536 | self.log.debug("Exception in stop: %s" % get_exception_string()) |
---|
| 537 | return False |
---|
| 538 | |
---|
| 539 | class RingMaster: |
---|
| 540 | def __init__(self, cfg, log, **kwds): |
---|
| 541 | """starts nodepool and services""" |
---|
| 542 | self.download = False |
---|
| 543 | self.httpServer = None |
---|
| 544 | self.cfg = cfg |
---|
| 545 | self.log = log |
---|
| 546 | self.__hostname = local_fqdn() |
---|
| 547 | self.workDirs = None |
---|
| 548 | |
---|
| 549 | # ref to the idle job tracker object. |
---|
| 550 | self.__jtMonitor = None |
---|
| 551 | self.__idlenessDetected = False |
---|
| 552 | self.__stopInProgress = False |
---|
| 553 | self.__isStopped = False # to let main exit |
---|
| 554 | self.__exitCode = 0 # exit code with which the ringmaster main method should return |
---|
| 555 | |
---|
| 556 | self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring'] |
---|
| 557 | |
---|
| 558 | self.__initialize_signal_handlers() |
---|
| 559 | |
---|
| 560 | sdd = self.cfg['servicedesc'] |
---|
| 561 | gsvc = None |
---|
| 562 | for key in sdd: |
---|
| 563 | gsvc = sdd[key] |
---|
| 564 | break |
---|
| 565 | |
---|
| 566 | npd = self.cfg['nodepooldesc'] |
---|
| 567 | self.np = NodePoolUtil.getNodePool(npd, cfg, log) |
---|
| 568 | |
---|
| 569 | self.log.debug("Getting service ID.") |
---|
| 570 | |
---|
| 571 | self.serviceId = self.np.getServiceId() |
---|
| 572 | |
---|
| 573 | self.log.debug("Got service ID: %s" % self.serviceId) |
---|
| 574 | |
---|
| 575 | self.tarSrcLoc = None |
---|
| 576 | if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
---|
| 577 | self.download = True |
---|
| 578 | self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball'] |
---|
| 579 | |
---|
| 580 | self.cd_to_tempdir() |
---|
| 581 | |
---|
| 582 | if (self.download): |
---|
| 583 | self.__copy_tarball(os.getcwd()) |
---|
| 584 | self.basename = self.__find_tarball_in_dir(os.getcwd()) |
---|
| 585 | if self.basename is None: |
---|
| 586 | raise Exception('Did not find tarball copied from %s in %s.' |
---|
| 587 | % (self.tarSrcLoc, os.getcwd())) |
---|
| 588 | |
---|
| 589 | self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr']) |
---|
| 590 | |
---|
| 591 | self.log.debug("Service registry @ %s" % self.serviceAddr) |
---|
| 592 | |
---|
| 593 | self.serviceClient = hodXRClient(self.serviceAddr) |
---|
| 594 | self.serviceDict = {} |
---|
| 595 | try: |
---|
| 596 | sdl = self.cfg['servicedesc'] |
---|
| 597 | |
---|
| 598 | workDirs = self.getWorkDirs(cfg) |
---|
| 599 | |
---|
| 600 | hdfsDesc = sdl['hdfs'] |
---|
| 601 | hdfs = None |
---|
| 602 | |
---|
| 603 | # Determine hadoop Version |
---|
| 604 | hadoopVers = hadoopVersion(self.__getHadoopDir(), \ |
---|
| 605 | self.cfg['hodring']['java-home'], self.log) |
---|
| 606 | |
---|
| 607 | if (hadoopVers['major']==None) or (hadoopVers['minor']==None): |
---|
| 608 | raise Exception('Could not retrive the version of Hadoop.' |
---|
| 609 | + ' Check the Hadoop installation or the value of the hodring.java-home variable.') |
---|
| 610 | if hdfsDesc.isExternal(): |
---|
| 611 | hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor'])) |
---|
| 612 | hdfs.setMasterParams( self.cfg['gridservice-hdfs'] ) |
---|
| 613 | else: |
---|
| 614 | hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']), |
---|
| 615 | workers_per_ring = self.workers_per_ring) |
---|
| 616 | |
---|
| 617 | self.serviceDict[hdfs.getName()] = hdfs |
---|
| 618 | |
---|
| 619 | mrDesc = sdl['mapred'] |
---|
| 620 | mr = None |
---|
| 621 | if mrDesc.isExternal(): |
---|
| 622 | mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor'])) |
---|
| 623 | mr.setMasterParams( self.cfg['gridservice-mapred'] ) |
---|
| 624 | else: |
---|
| 625 | mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']), |
---|
| 626 | workers_per_ring = self.workers_per_ring) |
---|
| 627 | |
---|
| 628 | self.serviceDict[mr.getName()] = mr |
---|
| 629 | except: |
---|
| 630 | self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \ |
---|
| 631 | %s." % get_exception_error_string()) |
---|
| 632 | self.log.debug(get_exception_string()) |
---|
| 633 | raise |
---|
| 634 | |
---|
| 635 | # should not be starting these in a constructor |
---|
| 636 | ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self) |
---|
| 637 | |
---|
| 638 | self.rpcserver = ringMasterServer.getAddress() |
---|
| 639 | |
---|
| 640 | self.httpAddress = None |
---|
| 641 | self.tarAddress = None |
---|
| 642 | hostname = socket.gethostname() |
---|
| 643 | if (self.download): |
---|
| 644 | self.httpServer = threadedHTTPServer(hostname, |
---|
| 645 | self.cfg['ringmaster']['http-port-range']) |
---|
| 646 | |
---|
| 647 | self.httpServer.serve_forever() |
---|
| 648 | self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], |
---|
| 649 | self.httpServer.server_address[1]) |
---|
| 650 | self.tarAddress = "%s%s" % (self.httpAddress, self.basename) |
---|
| 651 | |
---|
| 652 | ringMasterServer.instance.logMasterSources.registerTarSource(hostname, |
---|
| 653 | self.tarAddress) |
---|
| 654 | else: |
---|
| 655 | self.log.debug("Download not set.") |
---|
| 656 | |
---|
| 657 | self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], |
---|
| 658 | self.serviceId, self.__hostname, 'ringmaster', 'hod')) |
---|
| 659 | |
---|
| 660 | if self.cfg['ringmaster']['register']: |
---|
| 661 | if self.httpAddress: |
---|
| 662 | self.serviceClient.registerService(self.cfg['ringmaster']['userid'], |
---|
| 663 | self.serviceId, self.__hostname, 'ringmaster', 'hod', { |
---|
| 664 | 'xrs' : self.rpcserver, 'http' : self.httpAddress }) |
---|
| 665 | else: |
---|
| 666 | self.serviceClient.registerService(self.cfg['ringmaster']['userid'], |
---|
| 667 | self.serviceId, self.__hostname, 'ringmaster', 'hod', { |
---|
| 668 | 'xrs' : self.rpcserver, }) |
---|
| 669 | |
---|
| 670 | self.log.debug("Registered with serivce registry: %s." % self.serviceAddr) |
---|
| 671 | |
---|
| 672 | hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring') |
---|
| 673 | hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' |
---|
| 674 | + getpass.getuser()) |
---|
| 675 | |
---|
| 676 | self.cfg['hodring']['hodring'] = [hodRingWorkDir,] |
---|
| 677 | self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr'] |
---|
| 678 | self.cfg['hodring']['service-id'] = self.np.getServiceId() |
---|
| 679 | |
---|
| 680 | self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver) |
---|
| 681 | |
---|
| 682 | if (self.tarSrcLoc != None): |
---|
| 683 | cfg['hodring']['download-addr'] = self.tarAddress |
---|
| 684 | |
---|
| 685 | self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources) |
---|
| 686 | |
---|
| 687 | def __init_job_tracker_monitor(self, logMasterSources): |
---|
| 688 | hadoopDir = self.__getHadoopDir() |
---|
| 689 | self.log.debug('hadoopdir=%s, java-home=%s' % \ |
---|
| 690 | (hadoopDir, self.cfg['hodring']['java-home'])) |
---|
| 691 | try: |
---|
| 692 | self.__jtMonitor = JobTrackerMonitor(self.log, self, |
---|
| 693 | self.cfg['ringmaster']['jt-poll-interval'], |
---|
| 694 | self.cfg['ringmaster']['idleness-limit'], |
---|
| 695 | hadoopDir, self.cfg['hodring']['java-home'], |
---|
| 696 | logMasterSources) |
---|
| 697 | self.log.debug('starting jt monitor') |
---|
| 698 | self.__jtMonitor.start() |
---|
| 699 | except: |
---|
| 700 | self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\ |
---|
| 701 | Exception message: %s' % get_exception_error_string()) |
---|
| 702 | self.log.debug('Exception details: %s' % get_exception_string()) |
---|
| 703 | |
---|
| 704 | |
---|
| 705 | def __getHadoopDir(self): |
---|
| 706 | hadoopDir = None |
---|
| 707 | if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
---|
| 708 | tarFile = os.path.join(os.getcwd(), self.basename) |
---|
| 709 | ret = untar(tarFile, os.getcwd()) |
---|
| 710 | if not ret: |
---|
| 711 | raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \ |
---|
| 712 | % (tarFile, os.getcwd())) |
---|
| 713 | hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile)) |
---|
| 714 | else: |
---|
| 715 | hadoopDir = self.cfg['gridservice-mapred']['pkgs'] |
---|
| 716 | self.log.debug('Returning Hadoop directory as: %s' % hadoopDir) |
---|
| 717 | return hadoopDir |
---|
| 718 | |
---|
| 719 | def __get_dir(self, name): |
---|
| 720 | """Return the root directory inside the tarball |
---|
| 721 | specified by name. Assumes that the tarball begins |
---|
| 722 | with a root directory.""" |
---|
| 723 | import tarfile |
---|
| 724 | myTarFile = tarfile.open(name) |
---|
| 725 | hadoopPackage = myTarFile.getnames()[0] |
---|
| 726 | self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) |
---|
| 727 | return hadoopPackage |
---|
| 728 | |
---|
| 729 | def __find_tarball_in_dir(self, dir): |
---|
| 730 | """Find the tarball among files specified in the given |
---|
| 731 | directory. We need this method because how the tarball |
---|
| 732 | source URI is given depends on the method of copy and |
---|
| 733 | we can't get the tarball name from that. |
---|
| 734 | This method will fail if there are multiple tarballs |
---|
| 735 | in the directory with the same suffix.""" |
---|
| 736 | files = os.listdir(dir) |
---|
| 737 | for file in files: |
---|
| 738 | if self.tarSrcLoc.endswith(file): |
---|
| 739 | return file |
---|
| 740 | return None |
---|
| 741 | |
---|
| 742 | def __copy_tarball(self, destDir): |
---|
| 743 | """Copy the hadoop tar ball from a remote location to the |
---|
| 744 | specified destination directory. Based on the URL it executes |
---|
| 745 | an appropriate copy command. Throws an exception if the command |
---|
| 746 | returns a non-zero exit code.""" |
---|
| 747 | # for backwards compatibility, treat the default case as file:// |
---|
| 748 | url = '' |
---|
| 749 | if self.tarSrcLoc.startswith('/'): |
---|
| 750 | url = 'file:/' |
---|
| 751 | src = '%s%s' % (url, self.tarSrcLoc) |
---|
| 752 | if src.startswith('file://'): |
---|
| 753 | src = src[len('file://')-1:] |
---|
| 754 | cpCmd = '/bin/cp' |
---|
| 755 | cmd = '%s %s %s' % (cpCmd, src, destDir) |
---|
| 756 | self.log.debug('Command to execute: %s' % cmd) |
---|
| 757 | copyProc = simpleCommand('remote copy', cmd) |
---|
| 758 | copyProc.start() |
---|
| 759 | copyProc.wait() |
---|
| 760 | copyProc.join() |
---|
| 761 | ret = copyProc.exit_code() |
---|
| 762 | self.log.debug('Completed command execution. Exit Code: %s.' % ret) |
---|
| 763 | |
---|
| 764 | if ret != 0: |
---|
| 765 | output = copyProc.output() |
---|
| 766 | raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' |
---|
| 767 | % (cmd, ret, output)) |
---|
| 768 | else: |
---|
| 769 | raise Exception('Unsupported URL for file: %s' % src) |
---|
| 770 | |
---|
| 771 | # input: http://hostname:port/. output: [hostname,port] |
---|
| 772 | def __url_to_addr(self, url): |
---|
| 773 | addr = url.rstrip('/') |
---|
| 774 | if addr.startswith('http://'): |
---|
| 775 | addr = addr.replace('http://', '', 1) |
---|
| 776 | addr_parts = addr.split(':') |
---|
| 777 | return [addr_parts[0], int(addr_parts[1])] |
---|
| 778 | |
---|
| 779 | def __initialize_signal_handlers(self): |
---|
| 780 | def sigStop(sigNum, handler): |
---|
| 781 | sig_wrapper(sigNum, self.stop) |
---|
| 782 | |
---|
| 783 | signal.signal(signal.SIGTERM, sigStop) |
---|
| 784 | signal.signal(signal.SIGINT, sigStop) |
---|
| 785 | signal.signal(signal.SIGQUIT, sigStop) |
---|
| 786 | |
---|
| 787 | def __clean_up(self): |
---|
| 788 | tempDir = self.__get_tempdir() |
---|
| 789 | os.chdir(os.path.split(tempDir)[0]) |
---|
| 790 | if os.path.exists(tempDir): |
---|
| 791 | shutil.rmtree(tempDir, True) |
---|
| 792 | |
---|
| 793 | self.log.debug("Cleaned up temporary dir: %s" % tempDir) |
---|
| 794 | |
---|
| 795 | def __get_tempdir(self): |
---|
| 796 | dir = os.path.join(self.cfg['ringmaster']['temp-dir'], |
---|
| 797 | "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], |
---|
| 798 | self.np.getServiceId())) |
---|
| 799 | return dir |
---|
| 800 | |
---|
| 801 | def getWorkDirs(self, cfg, reUse=False): |
---|
| 802 | |
---|
| 803 | if (not reUse) or (self.workDirs == None): |
---|
| 804 | import math |
---|
| 805 | frand = random.random() |
---|
| 806 | while math.ceil(frand) != math.floor(frand): |
---|
| 807 | frand = frand * 100 |
---|
| 808 | |
---|
| 809 | irand = int(frand) |
---|
| 810 | uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand) |
---|
| 811 | dirs = [] |
---|
| 812 | parentDirs = cfg['ringmaster']['work-dirs'] |
---|
| 813 | for p in parentDirs: |
---|
| 814 | dir = os.path.join(p, uniq) |
---|
| 815 | dirs.append(dir) |
---|
| 816 | self.workDirs = dirs |
---|
| 817 | |
---|
| 818 | return self.workDirs |
---|
| 819 | |
---|
| 820 | def _fetchLink(self, link, parentDir): |
---|
| 821 | parser = miniHTMLParser() |
---|
| 822 | self.log.debug("Checking link %s" %link) |
---|
| 823 | while link: |
---|
| 824 | |
---|
| 825 | # Get the file from the site and link |
---|
| 826 | input = urllib.urlopen(link) |
---|
| 827 | out = None |
---|
| 828 | contentType = input.info().gettype() |
---|
| 829 | isHtml = contentType == 'text/html' |
---|
| 830 | |
---|
| 831 | #print contentType |
---|
| 832 | if isHtml: |
---|
| 833 | parser.setBaseUrl(input.geturl()) |
---|
| 834 | else: |
---|
| 835 | parsed = urlparse.urlparse(link) |
---|
| 836 | hp = parsed[1] |
---|
| 837 | h = hp |
---|
| 838 | p = None |
---|
| 839 | if hp.find(':') != -1: |
---|
| 840 | h, p = hp.split(':', 1) |
---|
| 841 | path = parsed[2] |
---|
| 842 | path = path.split('/') |
---|
| 843 | file = os.path.join(parentDir, h, p) |
---|
| 844 | for c in path: |
---|
| 845 | if c == '': |
---|
| 846 | continue |
---|
| 847 | file = os.path.join(file, c) |
---|
| 848 | |
---|
| 849 | try: |
---|
| 850 | self.log.debug('Creating %s' % file) |
---|
| 851 | dir, tail = os.path.split(file) |
---|
| 852 | if not os.path.exists(dir): |
---|
| 853 | os.makedirs(dir) |
---|
| 854 | except: |
---|
| 855 | self.log.debug(get_exception_string()) |
---|
| 856 | |
---|
| 857 | out = open(file, 'w') |
---|
| 858 | |
---|
| 859 | bufSz = 8192 |
---|
| 860 | buf = input.read(bufSz) |
---|
| 861 | while len(buf) > 0: |
---|
| 862 | if isHtml: |
---|
| 863 | # Feed the file into the HTML parser |
---|
| 864 | parser.feed(buf) |
---|
| 865 | if out: |
---|
| 866 | out.write(buf) |
---|
| 867 | buf = input.read(bufSz) |
---|
| 868 | |
---|
| 869 | input.close() |
---|
| 870 | if out: |
---|
| 871 | out.close() |
---|
| 872 | |
---|
| 873 | # Search the retfile here |
---|
| 874 | |
---|
| 875 | # Get the next link in level traversal order |
---|
| 876 | link = parser.getNextLink() |
---|
| 877 | |
---|
| 878 | parser.close() |
---|
| 879 | |
---|
| 880 | def _finalize(self): |
---|
| 881 | try: |
---|
| 882 | # FIXME: get dir from config |
---|
| 883 | dir = 'HOD-log-P%d' % (os.getpid()) |
---|
| 884 | dir = os.path.join('.', dir) |
---|
| 885 | except: |
---|
| 886 | self.log.debug(get_exception_string()) |
---|
| 887 | |
---|
| 888 | self.np.finalize() |
---|
| 889 | |
---|
| 890 | def handleIdleJobTracker(self): |
---|
| 891 | self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \ |
---|
| 892 | % self.cfg['ringmaster']['idleness-limit']) |
---|
| 893 | self.__idlenessDetected = True |
---|
| 894 | |
---|
| 895 | def cd_to_tempdir(self): |
---|
| 896 | dir = self.__get_tempdir() |
---|
| 897 | |
---|
| 898 | if not os.path.exists(dir): |
---|
| 899 | os.makedirs(dir) |
---|
| 900 | os.chdir(dir) |
---|
| 901 | |
---|
| 902 | return dir |
---|
| 903 | |
---|
| 904 | def getWorkload(self): |
---|
| 905 | return self.workload |
---|
| 906 | |
---|
| 907 | def getHostName(self): |
---|
| 908 | return self.__hostname |
---|
| 909 | |
---|
| 910 | def start(self): |
---|
| 911 | """run the thread main loop""" |
---|
| 912 | |
---|
| 913 | self.log.debug("Entered start method.") |
---|
| 914 | hodring = os.path.join(self.cfg['ringmaster']['base-dir'], |
---|
| 915 | 'bin', 'hodring') |
---|
| 916 | largs = [hodring] |
---|
| 917 | targs = self.cfg.get_args(section='hodring') |
---|
| 918 | largs.extend(targs) |
---|
| 919 | |
---|
| 920 | hodringCmd = "" |
---|
| 921 | for item in largs: |
---|
| 922 | hodringCmd = "%s%s " % (hodringCmd, item) |
---|
| 923 | |
---|
| 924 | self.log.debug(hodringCmd) |
---|
| 925 | |
---|
| 926 | if self.np.runWorkers(largs) > 0: |
---|
| 927 | self.log.critical("Failed to start worker.") |
---|
| 928 | |
---|
| 929 | self.log.debug("Returned from runWorkers.") |
---|
| 930 | |
---|
| 931 | self._finalize() |
---|
| 932 | |
---|
| 933 | def __findExitCode(self): |
---|
| 934 | """Determine the exit code based on the status of the cluster or jobs run on them""" |
---|
| 935 | xmlrpcServer = ringMasterServer.instance.logMasterSources |
---|
| 936 | if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \ |
---|
| 937 | xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "): |
---|
| 938 | self.__exitCode = 7 |
---|
| 939 | elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \ |
---|
| 940 | xmlrpcServer.getServiceAddr('mapred').startswith("Error: "): |
---|
| 941 | self.__exitCode = 8 |
---|
| 942 | else: |
---|
| 943 | clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'), |
---|
| 944 | xmlrpcServer.getServiceAddr('mapred')) |
---|
| 945 | if clusterStatus != 0: |
---|
| 946 | self.__exitCode = clusterStatus |
---|
| 947 | else: |
---|
| 948 | self.__exitCode = self.__findHadoopJobsExitCode() |
---|
| 949 | self.log.debug('exit code %s' % self.__exitCode) |
---|
| 950 | |
---|
| 951 | def __findHadoopJobsExitCode(self): |
---|
| 952 | """Determine the consolidate exit code of hadoop jobs run on this cluster, provided |
---|
| 953 | this information is available. Return 0 otherwise""" |
---|
| 954 | ret = 0 |
---|
| 955 | failureStatus = 3 |
---|
| 956 | failureCount = 0 |
---|
| 957 | if self.__jtMonitor: |
---|
| 958 | jobStatusList = self.__jtMonitor.getJobsStatus() |
---|
| 959 | try: |
---|
| 960 | if len(jobStatusList) > 0: |
---|
| 961 | for jobStatus in jobStatusList: |
---|
| 962 | self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), |
---|
| 963 | jobStatus.getStatus())) |
---|
| 964 | if jobStatus.getStatus() == failureStatus: |
---|
| 965 | failureCount = failureCount+1 |
---|
| 966 | if failureCount > 0: |
---|
| 967 | if failureCount == len(jobStatusList): # all jobs failed |
---|
| 968 | ret = 16 |
---|
| 969 | else: |
---|
| 970 | ret = 17 |
---|
| 971 | except: |
---|
| 972 | self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string()) |
---|
| 973 | return ret |
---|
| 974 | |
---|
| 975 | def stop(self): |
---|
| 976 | self.log.debug("RingMaster stop method invoked.") |
---|
| 977 | if self.__stopInProgress or self.__isStopped: |
---|
| 978 | return |
---|
| 979 | self.__stopInProgress = True |
---|
| 980 | if ringMasterServer.instance is not None: |
---|
| 981 | self.log.debug('finding exit code') |
---|
| 982 | self.__findExitCode() |
---|
| 983 | self.log.debug('stopping ringmaster instance') |
---|
| 984 | ringMasterServer.stopService() |
---|
| 985 | else: |
---|
| 986 | self.__exitCode = 6 |
---|
| 987 | if self.__jtMonitor is not None: |
---|
| 988 | self.__jtMonitor.stop() |
---|
| 989 | if self.httpServer: |
---|
| 990 | self.httpServer.stop() |
---|
| 991 | |
---|
| 992 | self.__clean_up() |
---|
| 993 | self.__isStopped = True |
---|
| 994 | |
---|
| 995 | def shouldStop(self): |
---|
| 996 | """Indicates whether the main loop should exit, either due to idleness condition, |
---|
| 997 | or a stop signal was received""" |
---|
| 998 | return self.__idlenessDetected or self.__isStopped |
---|
| 999 | |
---|
| 1000 | def getExitCode(self): |
---|
| 1001 | """return the exit code of the program""" |
---|
| 1002 | return self.__exitCode |
---|
| 1003 | |
---|
| 1004 | def main(cfg,log): |
---|
| 1005 | try: |
---|
| 1006 | rm = None |
---|
| 1007 | dGen = DescGenerator(cfg) |
---|
| 1008 | cfg = dGen.initializeDesc() |
---|
| 1009 | rm = RingMaster(cfg, log) |
---|
| 1010 | rm.start() |
---|
| 1011 | while not rm.shouldStop(): |
---|
| 1012 | time.sleep(1) |
---|
| 1013 | rm.stop() |
---|
| 1014 | log.debug('returning from main') |
---|
| 1015 | return rm.getExitCode() |
---|
| 1016 | except Exception, e: |
---|
| 1017 | if log: |
---|
| 1018 | log.critical(get_exception_string()) |
---|
| 1019 | raise Exception(e) |
---|