1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
2 | #or more contributor license agreements. See the NOTICE file |
---|
3 | #distributed with this work for additional information |
---|
4 | #regarding copyright ownership. The ASF licenses this file |
---|
5 | #to you under the Apache License, Version 2.0 (the |
---|
6 | #"License"); you may not use this file except in compliance |
---|
7 | #with the License. You may obtain a copy of the License at |
---|
8 | |
---|
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | |
---|
11 | #Unless required by applicable law or agreed to in writing, software |
---|
12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
14 | #See the License for the specific language governing permissions and |
---|
15 | #limitations under the License. |
---|
16 | #!/usr/bin/env python |
---|
17 | """manages services and nodepool""" |
---|
18 | # -*- python -*- |
---|
19 | |
---|
20 | import os, sys, random, time, sets, shutil, threading |
---|
21 | import urllib, urlparse, re, getpass, pprint, signal, shutil |
---|
22 | |
---|
23 | from pprint import pformat |
---|
24 | from HTMLParser import HTMLParser |
---|
25 | |
---|
26 | binfile = sys.path[0] |
---|
27 | libdir = os.path.dirname(binfile) |
---|
28 | sys.path.append(libdir) |
---|
29 | |
---|
30 | import hodlib.Common.logger |
---|
31 | from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus |
---|
32 | |
---|
33 | from hodlib.Common.threads import func |
---|
34 | |
---|
35 | from hodlib.Hod.nodePool import * |
---|
36 | from hodlib.Common.util import * |
---|
37 | from hodlib.Common.nodepoolutil import NodePoolUtil |
---|
38 | from hodlib.Common.socketServers import hodXMLRPCServer |
---|
39 | from hodlib.Common.socketServers import threadedHTTPServer |
---|
40 | from hodlib.NodePools import * |
---|
41 | from hodlib.NodePools.torque import * |
---|
42 | from hodlib.GridServices import * |
---|
43 | from hodlib.Common.descGenerator import * |
---|
44 | from hodlib.Common.xmlrpc import hodXRClient |
---|
45 | from hodlib.Common.miniHTMLParser import miniHTMLParser |
---|
46 | from hodlib.Common.threads import simpleCommand |
---|
47 | |
---|
48 | class ringMasterServer: |
---|
49 | """The RPC server that exposes all the master config |
---|
50 | changes. Also, one of these RPC servers runs as a proxy |
---|
51 | and all the hodring instances register with this proxy""" |
---|
52 | instance = None |
---|
53 | xmlrpc = None |
---|
54 | |
---|
55 | def __init__(self, cfg, log, logMasterSources, retry=5): |
---|
56 | try: |
---|
57 | from hodlib.Common.socketServers import twistedXMLRPCServer |
---|
58 | ringMasterServer.xmlrpc = twistedXMLRPCServer("", |
---|
59 | cfg['ringmaster']['xrs-port-range']) |
---|
60 | except ImportError: |
---|
61 | log.info("Twisted interface not found. Using hodXMLRPCServer.") |
---|
62 | ringMasterServer.xmlrpc = hodXMLRPCServer("", |
---|
63 | cfg['ringmaster']['xrs-port-range']) |
---|
64 | |
---|
65 | ringMasterServer.xmlrpc.register_instance(logMasterSources) |
---|
66 | self.logMasterSources = logMasterSources |
---|
67 | ringMasterServer.xmlrpc.serve_forever() |
---|
68 | |
---|
69 | while not ringMasterServer.xmlrpc.is_alive(): |
---|
70 | time.sleep(.5) |
---|
71 | |
---|
72 | log.debug('Ringmaster RPC Server at %d' % |
---|
73 | ringMasterServer.xmlrpc.server_address[1]) |
---|
74 | |
---|
75 | def startService(ss, cfg, np, log, rm): |
---|
76 | logMasterSources = _LogMasterSources(ss, cfg, np, log, rm) |
---|
77 | ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources) |
---|
78 | |
---|
79 | def stopService(): |
---|
80 | ringMasterServer.xmlrpc.stop() |
---|
81 | |
---|
82 | def getPort(): |
---|
83 | return ringMasterServer.instance.port |
---|
84 | |
---|
85 | def getAddress(): |
---|
86 | return 'http://%s:%d/' % (socket.gethostname(), |
---|
87 | ringMasterServer.xmlrpc.server_address[1]) |
---|
88 | |
---|
89 | startService = staticmethod(startService) |
---|
90 | stopService = staticmethod(stopService) |
---|
91 | getPort = staticmethod(getPort) |
---|
92 | getAddress = staticmethod(getAddress) |
---|
93 | |
---|
94 | class _LogMasterSources: |
---|
95 | """All the methods that are run by the RPC server are |
---|
96 | added into this class """ |
---|
97 | |
---|
98 | def __init__(self, serviceDict, cfg, np, log, rm): |
---|
99 | self.serviceDict = serviceDict |
---|
100 | self.tarSource = [] |
---|
101 | self.tarSourceLock = threading.Lock() |
---|
102 | self.dict = {} |
---|
103 | self.count = {} |
---|
104 | self.logsourceList = [] |
---|
105 | self.logsourceListLock = threading.Lock() |
---|
106 | self.masterParam = [] |
---|
107 | self.masterParamLock = threading.Lock() |
---|
108 | self.verify = 'none' |
---|
109 | self.cmdLock = threading.Lock() |
---|
110 | self.cfg = cfg |
---|
111 | self.log = log |
---|
112 | self.np = np |
---|
113 | self.rm = rm |
---|
114 | self.hdfsHost = None |
---|
115 | self.mapredHost = None |
---|
116 | self.maxconnect = self.cfg['ringmaster']['max-connect'] |
---|
117 | self.log.debug("Using max-connect value %s"%self.maxconnect) |
---|
118 | |
---|
119 | |
---|
120 | def registerTarSource(self, hostname, url, addr=None): |
---|
121 | self.log.debug("registering: " + url) |
---|
122 | lock = self.tarSourceLock |
---|
123 | lock.acquire() |
---|
124 | self.dict[url] = url |
---|
125 | self.count[url] = 0 |
---|
126 | # addr is None when ringMaster himself invokes this method |
---|
127 | if addr: |
---|
128 | c = self.count[addr] |
---|
129 | self.count[addr] = c - 1 |
---|
130 | lock.release() |
---|
131 | if addr: |
---|
132 | str = "%s is done" % (addr) |
---|
133 | self.log.debug(str) |
---|
134 | return url |
---|
135 | |
---|
136 | def getTarList(self,hodring): # this looks useful |
---|
137 | lock = self.tarSourceLock |
---|
138 | lock.acquire() |
---|
139 | leastkey = None |
---|
140 | leastval = -1 |
---|
141 | for k, v in self.count.iteritems(): |
---|
142 | if (leastval == -1): |
---|
143 | leastval = v |
---|
144 | pass |
---|
145 | if (v <= leastval and v < self.maxconnect): |
---|
146 | leastkey = k |
---|
147 | leastval = v |
---|
148 | if (leastkey == None): |
---|
149 | url = 'none' |
---|
150 | else: |
---|
151 | url = self.dict[leastkey] |
---|
152 | self.count[leastkey] = leastval + 1 |
---|
153 | self.log.debug("%s %d" % (leastkey, self.count[leastkey])) |
---|
154 | lock.release() |
---|
155 | self.log.debug('sending url ' + url+" to "+hodring) # this looks useful |
---|
156 | return url |
---|
157 | |
---|
158 | def tarDone(self, uri): |
---|
159 | str = "%s is done" % (uri) |
---|
160 | self.log.debug(str) |
---|
161 | lock = self.tarSourceLock |
---|
162 | lock.acquire() |
---|
163 | c = self.count[uri] |
---|
164 | self.count[uri] = c - 1 |
---|
165 | lock.release() |
---|
166 | return uri |
---|
167 | |
---|
168 | def status(self): |
---|
169 | return True |
---|
170 | |
---|
171 | # FIXME: this code is broken, it relies on a central service registry |
---|
172 | # |
---|
173 | # def clusterStart(self, changedClusterParams=[]): |
---|
174 | # self.log.debug("clusterStart method invoked.") |
---|
175 | # self.dict = {} |
---|
176 | # self.count = {} |
---|
177 | # try: |
---|
178 | # if (len(changedClusterParams) > 0): |
---|
179 | # self.log.debug("Updating config.") |
---|
180 | # for param in changedClusterParams: |
---|
181 | # (key, sep1, val) = param.partition('=') |
---|
182 | # (i1, sep2, i2) = key.partition('.') |
---|
183 | # try: |
---|
184 | # prev = self.cfg[i1][i2] |
---|
185 | # self.rm.cfg[i1][i2] = val |
---|
186 | # self.cfg[i1][i2] = val |
---|
187 | # self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val)) |
---|
188 | # except KeyError, e: |
---|
189 | # self.log.info("Skipping %s as no such config parameter found in ringmaster" % param) |
---|
190 | # self.log.debug("Regenerating Service Description.") |
---|
191 | # dGen = DescGenerator(self.rm.cfg) |
---|
192 | # self.rm.cfg['servicedesc'] = dGen.createServiceDescDict() |
---|
193 | # self.cfg['servicedesc'] = self.rm.cfg['servicedesc'] |
---|
194 | # |
---|
195 | # self.rm.tar = None |
---|
196 | # if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
---|
197 | # self.rm.download = True |
---|
198 | # self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball'] |
---|
199 | # self.log.debug("self.rm.tar=%s" % self.rm.tar) |
---|
200 | # |
---|
201 | # self.rm.cd_to_tempdir() |
---|
202 | # |
---|
203 | # self.rm.tarAddress = None |
---|
204 | # hostname = socket.gethostname() |
---|
205 | # if (self.rm.download): |
---|
206 | # self.rm.basename = os.path.basename(self.rm.tar) |
---|
207 | # dest = os.path.join(os.getcwd(), self.rm.basename) |
---|
208 | # src = self.rm.tar |
---|
209 | # self.log.debug("cp %s -> %s" % (src, dest)) |
---|
210 | # shutil.copy(src, dest) |
---|
211 | # self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename) |
---|
212 | # self.registerTarSource(hostname, self.rm.tarAddress) |
---|
213 | # self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress) |
---|
214 | # else: |
---|
215 | # self.log.debug("Download not set.") |
---|
216 | # |
---|
217 | # if (self.rm.tar != None): |
---|
218 | # self.cfg['hodring']['download-addr'] = self.rm.tarAddress |
---|
219 | # self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress |
---|
220 | # |
---|
221 | # sdl = self.rm.cfg['servicedesc'] |
---|
222 | # workDirs = self.rm.getWorkDirs(self.rm.cfg, True) |
---|
223 | # hdfsDesc = sdl['hdfs'] |
---|
224 | # hdfs = None |
---|
225 | # if hdfsDesc.isExternal(): |
---|
226 | # hdfs = HdfsExternal(hdfsDesc, workDirs) |
---|
227 | # else: |
---|
228 | # hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True) |
---|
229 | # |
---|
230 | # self.rm.serviceDict[hdfs.getName()] = hdfs |
---|
231 | # mrDesc = sdl['mapred'] |
---|
232 | # mr = None |
---|
233 | # if mrDesc.isExternal(): |
---|
234 | # mr = MapReduceExternal(mrDesc, workDirs) |
---|
235 | # else: |
---|
236 | # mr = MapReduce(mrDesc, workDirs, 1) |
---|
237 | # self.rm.serviceDict[mr.getName()] = mr |
---|
238 | # |
---|
239 | # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'], |
---|
240 | # self.np.getServiceId(), 'hodring', 'hod') |
---|
241 | # |
---|
242 | # slaveList = ringList |
---|
243 | # hdfsringXRAddress = None |
---|
244 | # # Start HDFS Master - Step 1 |
---|
245 | # if not hdfsDesc.isExternal(): |
---|
246 | # masterFound = False |
---|
247 | # for ring in ringList: |
---|
248 | # ringXRAddress = ring['xrs'] |
---|
249 | # if ringXRAddress == None: |
---|
250 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
251 | # if (ringXRAddress.find(self.hdfsHost) != -1): |
---|
252 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
253 | # hdfsringXRAddress = ringXRAddress |
---|
254 | # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)") |
---|
255 | # ringClient.clusterStart() |
---|
256 | # masterFound = True |
---|
257 | # slaveList.remove(ring) |
---|
258 | # break |
---|
259 | # if not masterFound: |
---|
260 | # raise Exception("HDFS Master host not found") |
---|
261 | # while hdfs.getInfoAddrs() == None: |
---|
262 | # self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port") |
---|
263 | # time.sleep(1) |
---|
264 | # |
---|
265 | # # Start MAPRED Master - Step 2 |
---|
266 | # if not mrDesc.isExternal(): |
---|
267 | # masterFound = False |
---|
268 | # for ring in ringList: |
---|
269 | # ringXRAddress = ring['xrs'] |
---|
270 | # if ringXRAddress == None: |
---|
271 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
272 | # if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1): |
---|
273 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
274 | # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)") |
---|
275 | # ringClient.clusterStart() |
---|
276 | # masterFound = True |
---|
277 | # slaveList.remove(ring) |
---|
278 | # break |
---|
279 | # if not masterFound: |
---|
280 | # raise Excpetion("MAPRED Master host not found") |
---|
281 | # while mr.getInfoAddrs() == None: |
---|
282 | # self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \ |
---|
283 | # mapred.job.tracker.info.port") |
---|
284 | # time.sleep(1) |
---|
285 | # |
---|
286 | # # Start Slaves - Step 3 |
---|
287 | # for ring in slaveList: |
---|
288 | # ringXRAddress = ring['xrs'] |
---|
289 | # if ringXRAddress == None: |
---|
290 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
291 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
292 | # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)") |
---|
293 | # ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart()) |
---|
294 | # ring['thread'] = ringThread |
---|
295 | # ringThread.start() |
---|
296 | # |
---|
297 | # for ring in slaveList: |
---|
298 | # ringThread = ring['thread'] |
---|
299 | # if ringThread == None: |
---|
300 | # raise Exception("Could not get hodring thread (Slave).") |
---|
301 | # ringThread.join() |
---|
302 | # self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)") |
---|
303 | # |
---|
304 | # # Run Admin Commands on HDFS Master - Step 4 |
---|
305 | # if not hdfsDesc.isExternal(): |
---|
306 | # if hdfsringXRAddress == None: |
---|
307 | # raise Exception("HDFS Master host not found (to Run Admin Commands)") |
---|
308 | # ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0) |
---|
309 | # self.log.debug("Invoking clusterStart(False) - Admin on " |
---|
310 | # + hdfsringXRAddress + " (HDFS Master)") |
---|
311 | # ringClient.clusterStart(False) |
---|
312 | # |
---|
313 | # except: |
---|
314 | # self.log.debug(get_exception_string()) |
---|
315 | # return False |
---|
316 | # |
---|
317 | # self.log.debug("Successfully started cluster.") |
---|
318 | # return True |
---|
319 | # |
---|
320 | # def clusterStop(self): |
---|
321 | # self.log.debug("clusterStop method invoked.") |
---|
322 | # try: |
---|
323 | # hdfsAddr = self.getServiceAddr('hdfs') |
---|
324 | # if hdfsAddr.find(':') != -1: |
---|
325 | # h, p = hdfsAddr.split(':', 1) |
---|
326 | # self.hdfsHost = h |
---|
327 | # self.log.debug("hdfsHost: " + self.hdfsHost) |
---|
328 | # mapredAddr = self.getServiceAddr('mapred') |
---|
329 | # if mapredAddr.find(':') != -1: |
---|
330 | # h, p = mapredAddr.split(':', 1) |
---|
331 | # self.mapredHost = h |
---|
332 | # self.log.debug("mapredHost: " + self.mapredHost) |
---|
333 | # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'], |
---|
334 | # self.np.getServiceId(), |
---|
335 | # 'hodring', 'hod') |
---|
336 | # for ring in ringList: |
---|
337 | # ringXRAddress = ring['xrs'] |
---|
338 | # if ringXRAddress == None: |
---|
339 | # raise Exception("Could not get hodring XML-RPC server address.") |
---|
340 | # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False) |
---|
341 | # self.log.debug("Invoking clusterStop on " + ringXRAddress) |
---|
342 | # ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop()) |
---|
343 | # ring['thread'] = ringThread |
---|
344 | # ringThread.start() |
---|
345 | # |
---|
346 | # for ring in ringList: |
---|
347 | # ringThread = ring['thread'] |
---|
348 | # if ringThread == None: |
---|
349 | # raise Exception("Could not get hodring thread.") |
---|
350 | # ringThread.join() |
---|
351 | # self.log.debug("Completed clusterStop on " + ring['xrs']) |
---|
352 | # |
---|
353 | # except: |
---|
354 | # self.log.debug(get_exception_string()) |
---|
355 | # return False |
---|
356 | # |
---|
357 | # self.log.debug("Successfully stopped cluster.") |
---|
358 | # |
---|
359 | # return True |
---|
360 | |
---|
361 | def getCommand(self, addr): |
---|
362 | """This method is called by the |
---|
363 | hodrings to get commands from |
---|
364 | the ringmaster""" |
---|
365 | lock = self.cmdLock |
---|
366 | cmdList = [] |
---|
367 | lock.acquire() |
---|
368 | try: |
---|
369 | try: |
---|
370 | for v in self.serviceDict.itervalues(): |
---|
371 | if (not v.isExternal()): |
---|
372 | if v.isLaunchable(self.serviceDict): |
---|
373 | # If a master is still not launched, or the number of |
---|
374 | # retries for launching master is not reached, |
---|
375 | # launch master |
---|
376 | if not v.isMasterLaunched() and \ |
---|
377 | (v.getMasterFailureCount() <= \ |
---|
378 | self.cfg['ringmaster']['max-master-failures']): |
---|
379 | cmdList = v.getMasterCommands(self.serviceDict) |
---|
380 | v.setlaunchedMaster() |
---|
381 | v.setMasterAddress(addr) |
---|
382 | break |
---|
383 | if cmdList == []: |
---|
384 | for s in self.serviceDict.itervalues(): |
---|
385 | if (not v.isExternal()): |
---|
386 | if s.isMasterInitialized(): |
---|
387 | cl = s.getWorkerCommands(self.serviceDict) |
---|
388 | cmdList.extend(cl) |
---|
389 | else: |
---|
390 | cmdList = [] |
---|
391 | break |
---|
392 | except: |
---|
393 | self.log.debug(get_exception_string()) |
---|
394 | finally: |
---|
395 | lock.release() |
---|
396 | pass |
---|
397 | |
---|
398 | cmd = addr + pformat(cmdList) |
---|
399 | self.log.debug("getCommand returning " + cmd) |
---|
400 | return cmdList |
---|
401 | |
---|
402 | def getAdminCommand(self, addr): |
---|
403 | """This method is called by the |
---|
404 | hodrings to get admin commands from |
---|
405 | the ringmaster""" |
---|
406 | lock = self.cmdLock |
---|
407 | cmdList = [] |
---|
408 | lock.acquire() |
---|
409 | try: |
---|
410 | try: |
---|
411 | for v in self.serviceDict.itervalues(): |
---|
412 | cmdList = v.getAdminCommands(self.serviceDict) |
---|
413 | if cmdList != []: |
---|
414 | break |
---|
415 | except Exception, e: |
---|
416 | self.log.debug(get_exception_string()) |
---|
417 | finally: |
---|
418 | lock.release() |
---|
419 | pass |
---|
420 | cmd = addr + pformat(cmdList) |
---|
421 | self.log.debug("getAdminCommand returning " + cmd) |
---|
422 | return cmdList |
---|
423 | |
---|
424 | def addMasterParams(self, addr, vals): |
---|
425 | """This method is called by |
---|
426 | hodring to update any parameters |
---|
427 | its changed for the commands it was |
---|
428 | running""" |
---|
429 | self.log.debug('Comment: adding master params from %s' % addr) |
---|
430 | self.log.debug(pformat(vals)) |
---|
431 | lock = self.masterParamLock |
---|
432 | lock.acquire() |
---|
433 | try: |
---|
434 | for v in self.serviceDict.itervalues(): |
---|
435 | if v.isMasterLaunched(): |
---|
436 | if (v.getMasterAddress() == addr): |
---|
437 | v.setMasterParams(vals) |
---|
438 | v.setMasterInitialized() |
---|
439 | except: |
---|
440 | self.log.debug(get_exception_string()) |
---|
441 | pass |
---|
442 | lock.release() |
---|
443 | |
---|
444 | return addr |
---|
445 | |
---|
446 | def setHodRingErrors(self, addr, errors): |
---|
447 | """This method is called by the hodrings to update errors |
---|
448 | it encountered while starting up""" |
---|
449 | self.log.critical("Hodring at %s failed with following errors:\n%s" \ |
---|
450 | % (addr, errors)) |
---|
451 | lock = self.masterParamLock |
---|
452 | lock.acquire() |
---|
453 | try: |
---|
454 | for v in self.serviceDict.itervalues(): |
---|
455 | if v.isMasterLaunched(): |
---|
456 | if (v.getMasterAddress() == addr): |
---|
457 | # strip the PID part. |
---|
458 | idx = addr.rfind('_') |
---|
459 | if idx is not -1: |
---|
460 | addr = addr[:idx] |
---|
461 | v.setMasterFailed("Hodring at %s failed with following" \ |
---|
462 | " errors:\n%s" % (addr, errors)) |
---|
463 | except: |
---|
464 | self.log.debug(get_exception_string()) |
---|
465 | pass |
---|
466 | lock.release() |
---|
467 | return True |
---|
468 | |
---|
469 | def getKeys(self): |
---|
470 | lock= self.masterParamLock |
---|
471 | lock.acquire() |
---|
472 | keys = self.serviceDict.keys() |
---|
473 | lock.release() |
---|
474 | |
---|
475 | return keys |
---|
476 | |
---|
477 | def getServiceAddr(self, name): |
---|
478 | addr = 'not found' |
---|
479 | self.log.debug("getServiceAddr name: %s" % name) |
---|
480 | lock= self.masterParamLock |
---|
481 | lock.acquire() |
---|
482 | try: |
---|
483 | service = self.serviceDict[name] |
---|
484 | except KeyError: |
---|
485 | pass |
---|
486 | else: |
---|
487 | self.log.debug("getServiceAddr service: %s" % service) |
---|
488 | # Check if we should give up ! If the limit on max failures is hit, |
---|
489 | # give up. |
---|
490 | err = service.getMasterFailed() |
---|
491 | if (err is not None) and \ |
---|
492 | (service.getMasterFailureCount() > \ |
---|
493 | self.cfg['ringmaster']['max-master-failures']): |
---|
494 | self.log.critical("Detected errors (%s) beyond allowed number"\ |
---|
495 | " of failures (%s). Flagging error to client" \ |
---|
496 | % (service.getMasterFailureCount(), \ |
---|
497 | self.cfg['ringmaster']['max-master-failures'])) |
---|
498 | addr = "Error: " + err |
---|
499 | elif (service.isMasterInitialized()): |
---|
500 | addr = service.getMasterAddrs()[0] |
---|
501 | else: |
---|
502 | addr = 'not found' |
---|
503 | lock.release() |
---|
504 | self.log.debug("getServiceAddr addr %s: %s" % (name, addr)) |
---|
505 | |
---|
506 | return addr |
---|
507 | |
---|
508 | def getURLs(self, name): |
---|
509 | addr = 'none' |
---|
510 | lock = self.masterParamLock |
---|
511 | lock.acquire() |
---|
512 | |
---|
513 | try: |
---|
514 | service = self.serviceDict[name] |
---|
515 | except KeyError: |
---|
516 | pass |
---|
517 | else: |
---|
518 | if (service.isMasterInitialized()): |
---|
519 | addr = service.getInfoAddrs()[0] |
---|
520 | |
---|
521 | lock.release() |
---|
522 | |
---|
523 | return addr |
---|
524 | |
---|
525 | def stopRM(self): |
---|
526 | """An XMLRPC call which will spawn a thread to stop the Ringmaster program.""" |
---|
527 | # We spawn a thread here because we want the XMLRPC call to return. Calling |
---|
528 | # stop directly from here will also stop the XMLRPC server. |
---|
529 | try: |
---|
530 | self.log.debug("inside xml-rpc call to stop ringmaster") |
---|
531 | rmStopperThread = func('RMStopper', self.rm.stop) |
---|
532 | rmStopperThread.start() |
---|
533 | self.log.debug("returning from xml-rpc call to stop ringmaster") |
---|
534 | return True |
---|
535 | except: |
---|
536 | self.log.debug("Exception in stop: %s" % get_exception_string()) |
---|
537 | return False |
---|
538 | |
---|
539 | class RingMaster: |
---|
540 | def __init__(self, cfg, log, **kwds): |
---|
541 | """starts nodepool and services""" |
---|
542 | self.download = False |
---|
543 | self.httpServer = None |
---|
544 | self.cfg = cfg |
---|
545 | self.log = log |
---|
546 | self.__hostname = local_fqdn() |
---|
547 | self.workDirs = None |
---|
548 | |
---|
549 | # ref to the idle job tracker object. |
---|
550 | self.__jtMonitor = None |
---|
551 | self.__idlenessDetected = False |
---|
552 | self.__stopInProgress = False |
---|
553 | self.__isStopped = False # to let main exit |
---|
554 | self.__exitCode = 0 # exit code with which the ringmaster main method should return |
---|
555 | |
---|
556 | self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring'] |
---|
557 | |
---|
558 | self.__initialize_signal_handlers() |
---|
559 | |
---|
560 | sdd = self.cfg['servicedesc'] |
---|
561 | gsvc = None |
---|
562 | for key in sdd: |
---|
563 | gsvc = sdd[key] |
---|
564 | break |
---|
565 | |
---|
566 | npd = self.cfg['nodepooldesc'] |
---|
567 | self.np = NodePoolUtil.getNodePool(npd, cfg, log) |
---|
568 | |
---|
569 | self.log.debug("Getting service ID.") |
---|
570 | |
---|
571 | self.serviceId = self.np.getServiceId() |
---|
572 | |
---|
573 | self.log.debug("Got service ID: %s" % self.serviceId) |
---|
574 | |
---|
575 | self.tarSrcLoc = None |
---|
576 | if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
---|
577 | self.download = True |
---|
578 | self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball'] |
---|
579 | |
---|
580 | self.cd_to_tempdir() |
---|
581 | |
---|
582 | if (self.download): |
---|
583 | self.__copy_tarball(os.getcwd()) |
---|
584 | self.basename = self.__find_tarball_in_dir(os.getcwd()) |
---|
585 | if self.basename is None: |
---|
586 | raise Exception('Did not find tarball copied from %s in %s.' |
---|
587 | % (self.tarSrcLoc, os.getcwd())) |
---|
588 | |
---|
589 | self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr']) |
---|
590 | |
---|
591 | self.log.debug("Service registry @ %s" % self.serviceAddr) |
---|
592 | |
---|
593 | self.serviceClient = hodXRClient(self.serviceAddr) |
---|
594 | self.serviceDict = {} |
---|
595 | try: |
---|
596 | sdl = self.cfg['servicedesc'] |
---|
597 | |
---|
598 | workDirs = self.getWorkDirs(cfg) |
---|
599 | |
---|
600 | hdfsDesc = sdl['hdfs'] |
---|
601 | hdfs = None |
---|
602 | |
---|
603 | # Determine hadoop Version |
---|
604 | hadoopVers = hadoopVersion(self.__getHadoopDir(), \ |
---|
605 | self.cfg['hodring']['java-home'], self.log) |
---|
606 | |
---|
607 | if (hadoopVers['major']==None) or (hadoopVers['minor']==None): |
---|
608 | raise Exception('Could not retrive the version of Hadoop.' |
---|
609 | + ' Check the Hadoop installation or the value of the hodring.java-home variable.') |
---|
610 | if hdfsDesc.isExternal(): |
---|
611 | hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor'])) |
---|
612 | hdfs.setMasterParams( self.cfg['gridservice-hdfs'] ) |
---|
613 | else: |
---|
614 | hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']), |
---|
615 | workers_per_ring = self.workers_per_ring) |
---|
616 | |
---|
617 | self.serviceDict[hdfs.getName()] = hdfs |
---|
618 | |
---|
619 | mrDesc = sdl['mapred'] |
---|
620 | mr = None |
---|
621 | if mrDesc.isExternal(): |
---|
622 | mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor'])) |
---|
623 | mr.setMasterParams( self.cfg['gridservice-mapred'] ) |
---|
624 | else: |
---|
625 | mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']), |
---|
626 | workers_per_ring = self.workers_per_ring) |
---|
627 | |
---|
628 | self.serviceDict[mr.getName()] = mr |
---|
629 | except: |
---|
630 | self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \ |
---|
631 | %s." % get_exception_error_string()) |
---|
632 | self.log.debug(get_exception_string()) |
---|
633 | raise |
---|
634 | |
---|
635 | # should not be starting these in a constructor |
---|
636 | ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self) |
---|
637 | |
---|
638 | self.rpcserver = ringMasterServer.getAddress() |
---|
639 | |
---|
640 | self.httpAddress = None |
---|
641 | self.tarAddress = None |
---|
642 | hostname = socket.gethostname() |
---|
643 | if (self.download): |
---|
644 | self.httpServer = threadedHTTPServer(hostname, |
---|
645 | self.cfg['ringmaster']['http-port-range']) |
---|
646 | |
---|
647 | self.httpServer.serve_forever() |
---|
648 | self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], |
---|
649 | self.httpServer.server_address[1]) |
---|
650 | self.tarAddress = "%s%s" % (self.httpAddress, self.basename) |
---|
651 | |
---|
652 | ringMasterServer.instance.logMasterSources.registerTarSource(hostname, |
---|
653 | self.tarAddress) |
---|
654 | else: |
---|
655 | self.log.debug("Download not set.") |
---|
656 | |
---|
657 | self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], |
---|
658 | self.serviceId, self.__hostname, 'ringmaster', 'hod')) |
---|
659 | |
---|
660 | if self.cfg['ringmaster']['register']: |
---|
661 | if self.httpAddress: |
---|
662 | self.serviceClient.registerService(self.cfg['ringmaster']['userid'], |
---|
663 | self.serviceId, self.__hostname, 'ringmaster', 'hod', { |
---|
664 | 'xrs' : self.rpcserver, 'http' : self.httpAddress }) |
---|
665 | else: |
---|
666 | self.serviceClient.registerService(self.cfg['ringmaster']['userid'], |
---|
667 | self.serviceId, self.__hostname, 'ringmaster', 'hod', { |
---|
668 | 'xrs' : self.rpcserver, }) |
---|
669 | |
---|
670 | self.log.debug("Registered with serivce registry: %s." % self.serviceAddr) |
---|
671 | |
---|
672 | hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring') |
---|
673 | hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' |
---|
674 | + getpass.getuser()) |
---|
675 | |
---|
676 | self.cfg['hodring']['hodring'] = [hodRingWorkDir,] |
---|
677 | self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr'] |
---|
678 | self.cfg['hodring']['service-id'] = self.np.getServiceId() |
---|
679 | |
---|
680 | self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver) |
---|
681 | |
---|
682 | if (self.tarSrcLoc != None): |
---|
683 | cfg['hodring']['download-addr'] = self.tarAddress |
---|
684 | |
---|
685 | self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources) |
---|
686 | |
---|
687 | def __init_job_tracker_monitor(self, logMasterSources): |
---|
688 | hadoopDir = self.__getHadoopDir() |
---|
689 | self.log.debug('hadoopdir=%s, java-home=%s' % \ |
---|
690 | (hadoopDir, self.cfg['hodring']['java-home'])) |
---|
691 | try: |
---|
692 | self.__jtMonitor = JobTrackerMonitor(self.log, self, |
---|
693 | self.cfg['ringmaster']['jt-poll-interval'], |
---|
694 | self.cfg['ringmaster']['idleness-limit'], |
---|
695 | hadoopDir, self.cfg['hodring']['java-home'], |
---|
696 | logMasterSources) |
---|
697 | self.log.debug('starting jt monitor') |
---|
698 | self.__jtMonitor.start() |
---|
699 | except: |
---|
700 | self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\ |
---|
701 | Exception message: %s' % get_exception_error_string()) |
---|
702 | self.log.debug('Exception details: %s' % get_exception_string()) |
---|
703 | |
---|
704 | |
---|
705 | def __getHadoopDir(self): |
---|
706 | hadoopDir = None |
---|
707 | if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
---|
708 | tarFile = os.path.join(os.getcwd(), self.basename) |
---|
709 | ret = untar(tarFile, os.getcwd()) |
---|
710 | if not ret: |
---|
711 | raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \ |
---|
712 | % (tarFile, os.getcwd())) |
---|
713 | hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile)) |
---|
714 | else: |
---|
715 | hadoopDir = self.cfg['gridservice-mapred']['pkgs'] |
---|
716 | self.log.debug('Returning Hadoop directory as: %s' % hadoopDir) |
---|
717 | return hadoopDir |
---|
718 | |
---|
719 | def __get_dir(self, name): |
---|
720 | """Return the root directory inside the tarball |
---|
721 | specified by name. Assumes that the tarball begins |
---|
722 | with a root directory.""" |
---|
723 | import tarfile |
---|
724 | myTarFile = tarfile.open(name) |
---|
725 | hadoopPackage = myTarFile.getnames()[0] |
---|
726 | self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) |
---|
727 | return hadoopPackage |
---|
728 | |
---|
729 | def __find_tarball_in_dir(self, dir): |
---|
730 | """Find the tarball among files specified in the given |
---|
731 | directory. We need this method because how the tarball |
---|
732 | source URI is given depends on the method of copy and |
---|
733 | we can't get the tarball name from that. |
---|
734 | This method will fail if there are multiple tarballs |
---|
735 | in the directory with the same suffix.""" |
---|
736 | files = os.listdir(dir) |
---|
737 | for file in files: |
---|
738 | if self.tarSrcLoc.endswith(file): |
---|
739 | return file |
---|
740 | return None |
---|
741 | |
---|
742 | def __copy_tarball(self, destDir): |
---|
743 | """Copy the hadoop tar ball from a remote location to the |
---|
744 | specified destination directory. Based on the URL it executes |
---|
745 | an appropriate copy command. Throws an exception if the command |
---|
746 | returns a non-zero exit code.""" |
---|
747 | # for backwards compatibility, treat the default case as file:// |
---|
748 | url = '' |
---|
749 | if self.tarSrcLoc.startswith('/'): |
---|
750 | url = 'file:/' |
---|
751 | src = '%s%s' % (url, self.tarSrcLoc) |
---|
752 | if src.startswith('file://'): |
---|
753 | src = src[len('file://')-1:] |
---|
754 | cpCmd = '/bin/cp' |
---|
755 | cmd = '%s %s %s' % (cpCmd, src, destDir) |
---|
756 | self.log.debug('Command to execute: %s' % cmd) |
---|
757 | copyProc = simpleCommand('remote copy', cmd) |
---|
758 | copyProc.start() |
---|
759 | copyProc.wait() |
---|
760 | copyProc.join() |
---|
761 | ret = copyProc.exit_code() |
---|
762 | self.log.debug('Completed command execution. Exit Code: %s.' % ret) |
---|
763 | |
---|
764 | if ret != 0: |
---|
765 | output = copyProc.output() |
---|
766 | raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' |
---|
767 | % (cmd, ret, output)) |
---|
768 | else: |
---|
769 | raise Exception('Unsupported URL for file: %s' % src) |
---|
770 | |
---|
771 | # input: http://hostname:port/. output: [hostname,port] |
---|
772 | def __url_to_addr(self, url): |
---|
773 | addr = url.rstrip('/') |
---|
774 | if addr.startswith('http://'): |
---|
775 | addr = addr.replace('http://', '', 1) |
---|
776 | addr_parts = addr.split(':') |
---|
777 | return [addr_parts[0], int(addr_parts[1])] |
---|
778 | |
---|
779 | def __initialize_signal_handlers(self): |
---|
780 | def sigStop(sigNum, handler): |
---|
781 | sig_wrapper(sigNum, self.stop) |
---|
782 | |
---|
783 | signal.signal(signal.SIGTERM, sigStop) |
---|
784 | signal.signal(signal.SIGINT, sigStop) |
---|
785 | signal.signal(signal.SIGQUIT, sigStop) |
---|
786 | |
---|
787 | def __clean_up(self): |
---|
788 | tempDir = self.__get_tempdir() |
---|
789 | os.chdir(os.path.split(tempDir)[0]) |
---|
790 | if os.path.exists(tempDir): |
---|
791 | shutil.rmtree(tempDir, True) |
---|
792 | |
---|
793 | self.log.debug("Cleaned up temporary dir: %s" % tempDir) |
---|
794 | |
---|
795 | def __get_tempdir(self): |
---|
796 | dir = os.path.join(self.cfg['ringmaster']['temp-dir'], |
---|
797 | "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], |
---|
798 | self.np.getServiceId())) |
---|
799 | return dir |
---|
800 | |
---|
801 | def getWorkDirs(self, cfg, reUse=False): |
---|
802 | |
---|
803 | if (not reUse) or (self.workDirs == None): |
---|
804 | import math |
---|
805 | frand = random.random() |
---|
806 | while math.ceil(frand) != math.floor(frand): |
---|
807 | frand = frand * 100 |
---|
808 | |
---|
809 | irand = int(frand) |
---|
810 | uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand) |
---|
811 | dirs = [] |
---|
812 | parentDirs = cfg['ringmaster']['work-dirs'] |
---|
813 | for p in parentDirs: |
---|
814 | dir = os.path.join(p, uniq) |
---|
815 | dirs.append(dir) |
---|
816 | self.workDirs = dirs |
---|
817 | |
---|
818 | return self.workDirs |
---|
819 | |
---|
820 | def _fetchLink(self, link, parentDir): |
---|
821 | parser = miniHTMLParser() |
---|
822 | self.log.debug("Checking link %s" %link) |
---|
823 | while link: |
---|
824 | |
---|
825 | # Get the file from the site and link |
---|
826 | input = urllib.urlopen(link) |
---|
827 | out = None |
---|
828 | contentType = input.info().gettype() |
---|
829 | isHtml = contentType == 'text/html' |
---|
830 | |
---|
831 | #print contentType |
---|
832 | if isHtml: |
---|
833 | parser.setBaseUrl(input.geturl()) |
---|
834 | else: |
---|
835 | parsed = urlparse.urlparse(link) |
---|
836 | hp = parsed[1] |
---|
837 | h = hp |
---|
838 | p = None |
---|
839 | if hp.find(':') != -1: |
---|
840 | h, p = hp.split(':', 1) |
---|
841 | path = parsed[2] |
---|
842 | path = path.split('/') |
---|
843 | file = os.path.join(parentDir, h, p) |
---|
844 | for c in path: |
---|
845 | if c == '': |
---|
846 | continue |
---|
847 | file = os.path.join(file, c) |
---|
848 | |
---|
849 | try: |
---|
850 | self.log.debug('Creating %s' % file) |
---|
851 | dir, tail = os.path.split(file) |
---|
852 | if not os.path.exists(dir): |
---|
853 | os.makedirs(dir) |
---|
854 | except: |
---|
855 | self.log.debug(get_exception_string()) |
---|
856 | |
---|
857 | out = open(file, 'w') |
---|
858 | |
---|
859 | bufSz = 8192 |
---|
860 | buf = input.read(bufSz) |
---|
861 | while len(buf) > 0: |
---|
862 | if isHtml: |
---|
863 | # Feed the file into the HTML parser |
---|
864 | parser.feed(buf) |
---|
865 | if out: |
---|
866 | out.write(buf) |
---|
867 | buf = input.read(bufSz) |
---|
868 | |
---|
869 | input.close() |
---|
870 | if out: |
---|
871 | out.close() |
---|
872 | |
---|
873 | # Search the retfile here |
---|
874 | |
---|
875 | # Get the next link in level traversal order |
---|
876 | link = parser.getNextLink() |
---|
877 | |
---|
878 | parser.close() |
---|
879 | |
---|
880 | def _finalize(self): |
---|
881 | try: |
---|
882 | # FIXME: get dir from config |
---|
883 | dir = 'HOD-log-P%d' % (os.getpid()) |
---|
884 | dir = os.path.join('.', dir) |
---|
885 | except: |
---|
886 | self.log.debug(get_exception_string()) |
---|
887 | |
---|
888 | self.np.finalize() |
---|
889 | |
---|
890 | def handleIdleJobTracker(self): |
---|
891 | self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \ |
---|
892 | % self.cfg['ringmaster']['idleness-limit']) |
---|
893 | self.__idlenessDetected = True |
---|
894 | |
---|
895 | def cd_to_tempdir(self): |
---|
896 | dir = self.__get_tempdir() |
---|
897 | |
---|
898 | if not os.path.exists(dir): |
---|
899 | os.makedirs(dir) |
---|
900 | os.chdir(dir) |
---|
901 | |
---|
902 | return dir |
---|
903 | |
---|
904 | def getWorkload(self): |
---|
905 | return self.workload |
---|
906 | |
---|
907 | def getHostName(self): |
---|
908 | return self.__hostname |
---|
909 | |
---|
910 | def start(self): |
---|
911 | """run the thread main loop""" |
---|
912 | |
---|
913 | self.log.debug("Entered start method.") |
---|
914 | hodring = os.path.join(self.cfg['ringmaster']['base-dir'], |
---|
915 | 'bin', 'hodring') |
---|
916 | largs = [hodring] |
---|
917 | targs = self.cfg.get_args(section='hodring') |
---|
918 | largs.extend(targs) |
---|
919 | |
---|
920 | hodringCmd = "" |
---|
921 | for item in largs: |
---|
922 | hodringCmd = "%s%s " % (hodringCmd, item) |
---|
923 | |
---|
924 | self.log.debug(hodringCmd) |
---|
925 | |
---|
926 | if self.np.runWorkers(largs) > 0: |
---|
927 | self.log.critical("Failed to start worker.") |
---|
928 | |
---|
929 | self.log.debug("Returned from runWorkers.") |
---|
930 | |
---|
931 | self._finalize() |
---|
932 | |
---|
933 | def __findExitCode(self): |
---|
934 | """Determine the exit code based on the status of the cluster or jobs run on them""" |
---|
935 | xmlrpcServer = ringMasterServer.instance.logMasterSources |
---|
936 | if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \ |
---|
937 | xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "): |
---|
938 | self.__exitCode = 7 |
---|
939 | elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \ |
---|
940 | xmlrpcServer.getServiceAddr('mapred').startswith("Error: "): |
---|
941 | self.__exitCode = 8 |
---|
942 | else: |
---|
943 | clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'), |
---|
944 | xmlrpcServer.getServiceAddr('mapred')) |
---|
945 | if clusterStatus != 0: |
---|
946 | self.__exitCode = clusterStatus |
---|
947 | else: |
---|
948 | self.__exitCode = self.__findHadoopJobsExitCode() |
---|
949 | self.log.debug('exit code %s' % self.__exitCode) |
---|
950 | |
---|
951 | def __findHadoopJobsExitCode(self): |
---|
952 | """Determine the consolidate exit code of hadoop jobs run on this cluster, provided |
---|
953 | this information is available. Return 0 otherwise""" |
---|
954 | ret = 0 |
---|
955 | failureStatus = 3 |
---|
956 | failureCount = 0 |
---|
957 | if self.__jtMonitor: |
---|
958 | jobStatusList = self.__jtMonitor.getJobsStatus() |
---|
959 | try: |
---|
960 | if len(jobStatusList) > 0: |
---|
961 | for jobStatus in jobStatusList: |
---|
962 | self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), |
---|
963 | jobStatus.getStatus())) |
---|
964 | if jobStatus.getStatus() == failureStatus: |
---|
965 | failureCount = failureCount+1 |
---|
966 | if failureCount > 0: |
---|
967 | if failureCount == len(jobStatusList): # all jobs failed |
---|
968 | ret = 16 |
---|
969 | else: |
---|
970 | ret = 17 |
---|
971 | except: |
---|
972 | self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string()) |
---|
973 | return ret |
---|
974 | |
---|
975 | def stop(self): |
---|
976 | self.log.debug("RingMaster stop method invoked.") |
---|
977 | if self.__stopInProgress or self.__isStopped: |
---|
978 | return |
---|
979 | self.__stopInProgress = True |
---|
980 | if ringMasterServer.instance is not None: |
---|
981 | self.log.debug('finding exit code') |
---|
982 | self.__findExitCode() |
---|
983 | self.log.debug('stopping ringmaster instance') |
---|
984 | ringMasterServer.stopService() |
---|
985 | else: |
---|
986 | self.__exitCode = 6 |
---|
987 | if self.__jtMonitor is not None: |
---|
988 | self.__jtMonitor.stop() |
---|
989 | if self.httpServer: |
---|
990 | self.httpServer.stop() |
---|
991 | |
---|
992 | self.__clean_up() |
---|
993 | self.__isStopped = True |
---|
994 | |
---|
995 | def shouldStop(self): |
---|
996 | """Indicates whether the main loop should exit, either due to idleness condition, |
---|
997 | or a stop signal was received""" |
---|
998 | return self.__idlenessDetected or self.__isStopped |
---|
999 | |
---|
1000 | def getExitCode(self): |
---|
1001 | """return the exit code of the program""" |
---|
1002 | return self.__exitCode |
---|
1003 | |
---|
1004 | def main(cfg,log): |
---|
1005 | try: |
---|
1006 | rm = None |
---|
1007 | dGen = DescGenerator(cfg) |
---|
1008 | cfg = dGen.initializeDesc() |
---|
1009 | rm = RingMaster(cfg, log) |
---|
1010 | rm.start() |
---|
1011 | while not rm.shouldStop(): |
---|
1012 | time.sleep(1) |
---|
1013 | rm.stop() |
---|
1014 | log.debug('returning from main') |
---|
1015 | return rm.getExitCode() |
---|
1016 | except Exception, e: |
---|
1017 | if log: |
---|
1018 | log.critical(get_exception_string()) |
---|
1019 | raise Exception(e) |
---|