source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/RingMaster/ringMaster.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 34.7 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16#!/usr/bin/env python
17"""manages services and nodepool"""
18# -*- python -*-
19
20import os, sys, random, time, sets, shutil, threading
21import urllib, urlparse, re, getpass, pprint, signal, shutil
22
23from pprint import pformat
24from HTMLParser import HTMLParser
25
26binfile = sys.path[0]
27libdir = os.path.dirname(binfile)
28sys.path.append(libdir)
29
30import hodlib.Common.logger
31from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
32
33from hodlib.Common.threads import func
34
35from hodlib.Hod.nodePool import *
36from hodlib.Common.util import *
37from hodlib.Common.nodepoolutil import NodePoolUtil
38from hodlib.Common.socketServers import hodXMLRPCServer
39from hodlib.Common.socketServers import threadedHTTPServer
40from hodlib.NodePools import *
41from hodlib.NodePools.torque import *
42from hodlib.GridServices import *
43from hodlib.Common.descGenerator import *
44from hodlib.Common.xmlrpc import hodXRClient
45from hodlib.Common.miniHTMLParser import miniHTMLParser
46from hodlib.Common.threads import simpleCommand
47
48class ringMasterServer:
49  """The RPC server that exposes all the master config
50  changes. Also, one of these RPC servers runs as a proxy
51  and all the hodring instances register with this proxy"""
52  instance = None
53  xmlrpc = None
54 
55  def __init__(self, cfg, log, logMasterSources, retry=5):
56    try:
57      from hodlib.Common.socketServers import twistedXMLRPCServer
58      ringMasterServer.xmlrpc = twistedXMLRPCServer("", 
59        cfg['ringmaster']['xrs-port-range'])
60    except ImportError:
61      log.info("Twisted interface not found. Using hodXMLRPCServer.")
62      ringMasterServer.xmlrpc = hodXMLRPCServer("", 
63        cfg['ringmaster']['xrs-port-range'])
64
65    ringMasterServer.xmlrpc.register_instance(logMasterSources)
66    self.logMasterSources = logMasterSources
67    ringMasterServer.xmlrpc.serve_forever()
68       
69    while not ringMasterServer.xmlrpc.is_alive():
70      time.sleep(.5)
71         
72    log.debug('Ringmaster RPC Server at %d' % 
73                 ringMasterServer.xmlrpc.server_address[1])
74   
75  def startService(ss, cfg, np, log, rm):
76    logMasterSources = _LogMasterSources(ss, cfg, np, log, rm)
77    ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources)
78
79  def stopService():
80    ringMasterServer.xmlrpc.stop()
81 
82  def getPort():
83    return ringMasterServer.instance.port
84
85  def getAddress():
86    return 'http://%s:%d/' % (socket.gethostname(), 
87                              ringMasterServer.xmlrpc.server_address[1])
88 
89  startService = staticmethod(startService)
90  stopService = staticmethod(stopService)
91  getPort = staticmethod(getPort)
92  getAddress = staticmethod(getAddress)
93 
94class _LogMasterSources:
95  """All the methods that are run by the RPC server are
96  added into this class """
97 
98  def __init__(self, serviceDict, cfg, np, log, rm):
99    self.serviceDict = serviceDict
100    self.tarSource = []
101    self.tarSourceLock = threading.Lock()
102    self.dict = {}
103    self.count = {}
104    self.logsourceList = []
105    self.logsourceListLock = threading.Lock()
106    self.masterParam = []
107    self.masterParamLock = threading.Lock()
108    self.verify = 'none'
109    self.cmdLock = threading.Lock()
110    self.cfg = cfg
111    self.log = log
112    self.np = np
113    self.rm = rm
114    self.hdfsHost = None
115    self.mapredHost = None
116    self.maxconnect = self.cfg['ringmaster']['max-connect']
117    self.log.debug("Using max-connect value %s"%self.maxconnect)
118
119   
120  def registerTarSource(self, hostname, url, addr=None):
121    self.log.debug("registering: " + url)
122    lock = self.tarSourceLock
123    lock.acquire()
124    self.dict[url] = url
125    self.count[url] = 0
126    # addr is None when ringMaster himself invokes this method
127    if addr:
128      c = self.count[addr]
129      self.count[addr] = c - 1
130    lock.release()
131    if addr:
132      str = "%s is done" % (addr)
133      self.log.debug(str)
134    return url
135
136  def getTarList(self,hodring):   # this looks useful
137    lock = self.tarSourceLock
138    lock.acquire()
139    leastkey = None
140    leastval = -1
141    for k, v in self.count.iteritems():
142      if (leastval  == -1):
143        leastval = v
144        pass
145      if (v <= leastval and v < self.maxconnect):
146        leastkey = k
147        leastval = v
148    if (leastkey == None):
149      url  = 'none'
150    else:
151      url = self.dict[leastkey]
152      self.count[leastkey] = leastval + 1
153      self.log.debug("%s %d" % (leastkey, self.count[leastkey]))
154    lock.release()
155    self.log.debug('sending url ' + url+" to "+hodring)  # this looks useful
156    return url
157
158  def tarDone(self, uri):
159    str = "%s is done" % (uri)
160    self.log.debug(str)
161    lock = self.tarSourceLock
162    lock.acquire()
163    c = self.count[uri]
164    self.count[uri] = c - 1
165    lock.release()
166    return uri
167
168  def status(self):
169    return True
170
171# FIXME: this code is broken, it relies on a central service registry
172#
173#  def clusterStart(self, changedClusterParams=[]):
174#    self.log.debug("clusterStart method invoked.")
175#    self.dict = {}
176#    self.count = {}
177#    try:
178#      if (len(changedClusterParams) > 0):
179#        self.log.debug("Updating config.")
180#        for param in changedClusterParams:
181#          (key, sep1, val) = param.partition('=')
182#          (i1, sep2, i2) = key.partition('.')
183#          try:
184#            prev = self.cfg[i1][i2]
185#            self.rm.cfg[i1][i2] = val
186#            self.cfg[i1][i2] = val
187#            self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))
188#          except KeyError, e:
189#            self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)
190#        self.log.debug("Regenerating Service Description.")
191#        dGen = DescGenerator(self.rm.cfg)
192#        self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()
193#        self.cfg['servicedesc'] = self.rm.cfg['servicedesc']
194
195#      self.rm.tar = None
196#      if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):
197#        self.rm.download = True
198#        self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']
199#        self.log.debug("self.rm.tar=%s" % self.rm.tar)
200#
201#      self.rm.cd_to_tempdir()
202#
203#      self.rm.tarAddress = None
204#      hostname = socket.gethostname()
205#      if (self.rm.download):
206#        self.rm.basename = os.path.basename(self.rm.tar)
207#        dest = os.path.join(os.getcwd(), self.rm.basename)
208#        src =  self.rm.tar 
209#        self.log.debug("cp %s -> %s" % (src, dest))
210#        shutil.copy(src, dest)
211#        self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)
212#        self.registerTarSource(hostname, self.rm.tarAddress)
213#        self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)
214#      else:
215#        self.log.debug("Download not set.")
216#     
217#      if (self.rm.tar != None):
218#        self.cfg['hodring']['download-addr'] = self.rm.tarAddress
219#        self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress
220#
221#      sdl = self.rm.cfg['servicedesc']
222#      workDirs = self.rm.getWorkDirs(self.rm.cfg, True)
223#      hdfsDesc = sdl['hdfs']
224#      hdfs = None
225#      if hdfsDesc.isExternal():
226#        hdfs = HdfsExternal(hdfsDesc, workDirs)
227#      else:
228#        hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)
229#   
230#      self.rm.serviceDict[hdfs.getName()] = hdfs
231#      mrDesc = sdl['mapred']
232#      mr = None
233#      if mrDesc.isExternal():
234#        mr = MapReduceExternal(mrDesc, workDirs)
235#      else:
236#        mr = MapReduce(mrDesc, workDirs, 1)
237#      self.rm.serviceDict[mr.getName()] = mr
238#
239#      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
240#        self.np.getServiceId(), 'hodring', 'hod')
241#   
242#      slaveList = ringList
243#      hdfsringXRAddress = None
244#      # Start HDFS Master - Step 1
245#      if not hdfsDesc.isExternal():
246#        masterFound = False
247#        for ring in ringList:
248#          ringXRAddress = ring['xrs']
249#          if ringXRAddress == None:
250#            raise Exception("Could not get hodring XML-RPC server address.")
251#          if  (ringXRAddress.find(self.hdfsHost) != -1):
252#            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
253#            hdfsringXRAddress = ringXRAddress
254#            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")
255#            ringClient.clusterStart()
256#            masterFound = True
257#            slaveList.remove(ring)
258#            break
259#        if not masterFound:
260#          raise Exception("HDFS Master host not found")
261#        while hdfs.getInfoAddrs() == None:
262#          self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")
263#          time.sleep(1)
264#
265#      # Start MAPRED Master - Step 2
266#      if not mrDesc.isExternal():
267#        masterFound = False
268#        for ring in ringList:
269#          ringXRAddress = ring['xrs']
270#          if ringXRAddress == None:
271#            raise Exception("Could not get hodring XML-RPC server address.")
272#          if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):
273#            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
274#            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")
275#            ringClient.clusterStart()
276#            masterFound = True
277#            slaveList.remove(ring)
278#            break
279#        if not masterFound:
280#          raise Excpetion("MAPRED Master host not found")
281#        while mr.getInfoAddrs() == None:
282#          self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \
283# mapred.job.tracker.info.port")
284#          time.sleep(1)
285#
286#      # Start Slaves - Step 3
287#      for ring in slaveList:
288#          ringXRAddress = ring['xrs']
289#          if ringXRAddress == None:
290#            raise Exception("Could not get hodring XML-RPC server address.")
291#          ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
292#          self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")
293#          ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())
294#          ring['thread'] = ringThread
295#          ringThread.start()
296#
297#      for ring in slaveList:
298#        ringThread = ring['thread']
299#        if ringThread == None:
300#          raise Exception("Could not get hodring thread (Slave).")
301#        ringThread.join()
302#        self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")
303#
304#      # Run Admin Commands on HDFS Master - Step 4
305#      if not hdfsDesc.isExternal():
306#        if hdfsringXRAddress == None:
307#          raise Exception("HDFS Master host not found (to Run Admin Commands)")
308#        ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)
309#        self.log.debug("Invoking clusterStart(False) - Admin on "
310#                       + hdfsringXRAddress + " (HDFS Master)")
311#        ringClient.clusterStart(False)
312#
313#    except:
314#      self.log.debug(get_exception_string())
315#      return False
316#
317#    self.log.debug("Successfully started cluster.")
318#    return True
319#
320#  def clusterStop(self):
321#    self.log.debug("clusterStop method invoked.")
322#    try:
323#      hdfsAddr = self.getServiceAddr('hdfs')
324#      if hdfsAddr.find(':') != -1:
325#        h, p = hdfsAddr.split(':', 1)
326#        self.hdfsHost = h
327#        self.log.debug("hdfsHost: " + self.hdfsHost)
328#      mapredAddr = self.getServiceAddr('mapred')
329#      if mapredAddr.find(':') != -1:
330#        h, p = mapredAddr.split(':', 1)
331#        self.mapredHost = h
332#        self.log.debug("mapredHost: " + self.mapredHost)
333#      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
334#                                                      self.np.getServiceId(),
335#                                                      'hodring', 'hod')
336#      for ring in ringList:
337#        ringXRAddress = ring['xrs']
338#        if ringXRAddress == None:
339#          raise Exception("Could not get hodring XML-RPC server address.")
340#        ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)
341#        self.log.debug("Invoking clusterStop on " + ringXRAddress)
342#        ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())
343#        ring['thread'] = ringThread
344#        ringThread.start()
345#
346#      for ring in ringList:
347#        ringThread = ring['thread']
348#        if ringThread == None:
349#          raise Exception("Could not get hodring thread.")
350#        ringThread.join()
351#        self.log.debug("Completed clusterStop on " + ring['xrs'])
352#
353#    except:
354#      self.log.debug(get_exception_string())
355#      return False
356#
357#    self.log.debug("Successfully stopped cluster.")
358#   
359#    return True
360
361  def getCommand(self, addr):
362    """This method is called by the
363    hodrings to get commands from
364    the ringmaster"""
365    lock = self.cmdLock
366    cmdList = []
367    lock.acquire()
368    try:
369      try:
370        for v in self.serviceDict.itervalues():
371          if (not v.isExternal()):
372            if v.isLaunchable(self.serviceDict):
373              # If a master is still not launched, or the number of
374              # retries for launching master is not reached,
375              # launch master
376              if not v.isMasterLaunched() and \
377                  (v.getMasterFailureCount() <= \
378                      self.cfg['ringmaster']['max-master-failures']):
379                cmdList = v.getMasterCommands(self.serviceDict)
380                v.setlaunchedMaster()
381                v.setMasterAddress(addr)
382                break
383        if cmdList == []:
384          for s in self.serviceDict.itervalues():
385            if (not v.isExternal()):
386              if s.isMasterInitialized():
387                cl = s.getWorkerCommands(self.serviceDict)
388                cmdList.extend(cl)
389              else:
390                cmdList = []
391                break
392      except:
393        self.log.debug(get_exception_string())
394    finally:
395      lock.release()
396      pass
397   
398    cmd = addr + pformat(cmdList)
399    self.log.debug("getCommand returning " + cmd)
400    return cmdList
401 
402  def getAdminCommand(self, addr):
403    """This method is called by the
404    hodrings to get admin commands from
405    the ringmaster"""
406    lock = self.cmdLock
407    cmdList = []
408    lock.acquire()
409    try:
410      try:
411        for v in self.serviceDict.itervalues():
412          cmdList = v.getAdminCommands(self.serviceDict)
413          if cmdList != []:
414            break
415      except Exception, e:
416        self.log.debug(get_exception_string())
417    finally:
418      lock.release()
419      pass
420    cmd = addr + pformat(cmdList)
421    self.log.debug("getAdminCommand returning " + cmd)
422    return cmdList
423
424  def addMasterParams(self, addr, vals):
425    """This method is called by
426    hodring to update any parameters
427    its changed for the commands it was
428    running"""
429    self.log.debug('Comment: adding master params from %s' % addr)
430    self.log.debug(pformat(vals))
431    lock = self.masterParamLock
432    lock.acquire()
433    try:
434      for v in self.serviceDict.itervalues():
435        if v.isMasterLaunched():
436          if (v.getMasterAddress() == addr):
437            v.setMasterParams(vals)
438            v.setMasterInitialized()
439    except:
440      self.log.debug(get_exception_string())
441      pass
442    lock.release()
443           
444    return addr
445
446  def setHodRingErrors(self, addr, errors):
447    """This method is called by the hodrings to update errors
448      it encountered while starting up"""
449    self.log.critical("Hodring at %s failed with following errors:\n%s" \
450                        % (addr, errors))
451    lock = self.masterParamLock
452    lock.acquire()
453    try:
454      for v in self.serviceDict.itervalues():
455        if v.isMasterLaunched():
456          if (v.getMasterAddress() == addr):
457            # strip the PID part.
458            idx = addr.rfind('_')
459            if idx is not -1:
460              addr = addr[:idx]
461            v.setMasterFailed("Hodring at %s failed with following" \
462                                " errors:\n%s" % (addr, errors))
463    except:
464      self.log.debug(get_exception_string())
465      pass
466    lock.release()
467    return True
468
469  def getKeys(self):
470    lock= self.masterParamLock
471    lock.acquire()
472    keys = self.serviceDict.keys()
473    lock.release()   
474 
475    return keys
476 
477  def getServiceAddr(self, name):
478    addr = 'not found'
479    self.log.debug("getServiceAddr name: %s" % name)
480    lock= self.masterParamLock
481    lock.acquire()
482    try:
483      service = self.serviceDict[name]
484    except KeyError:
485      pass
486    else:
487      self.log.debug("getServiceAddr service: %s" % service)
488      # Check if we should give up ! If the limit on max failures is hit,
489      # give up.
490      err = service.getMasterFailed()
491      if (err is not None) and \
492            (service.getMasterFailureCount() > \
493                      self.cfg['ringmaster']['max-master-failures']):
494        self.log.critical("Detected errors (%s) beyond allowed number"\
495                            " of failures (%s). Flagging error to client" \
496                            % (service.getMasterFailureCount(), \
497                              self.cfg['ringmaster']['max-master-failures']))
498        addr = "Error: " + err
499      elif (service.isMasterInitialized()):
500        addr = service.getMasterAddrs()[0]
501      else:
502        addr = 'not found'
503    lock.release()
504    self.log.debug("getServiceAddr addr %s: %s" % (name, addr))
505   
506    return addr
507
508  def getURLs(self, name):
509    addr = 'none'
510    lock = self.masterParamLock
511    lock.acquire()
512   
513    try:
514      service = self.serviceDict[name]
515    except KeyError:
516      pass
517    else:
518      if (service.isMasterInitialized()):
519        addr = service.getInfoAddrs()[0]
520     
521    lock.release()
522   
523    return addr
524
525  def stopRM(self):
526    """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
527    # We spawn a thread here because we want the XMLRPC call to return. Calling
528    # stop directly from here will also stop the XMLRPC server.
529    try:
530      self.log.debug("inside xml-rpc call to stop ringmaster")
531      rmStopperThread = func('RMStopper', self.rm.stop)
532      rmStopperThread.start()
533      self.log.debug("returning from xml-rpc call to stop ringmaster")
534      return True
535    except:
536      self.log.debug("Exception in stop: %s" % get_exception_string())
537      return False
538
539class RingMaster:
540  def __init__(self, cfg, log, **kwds):
541    """starts nodepool and services"""
542    self.download = False
543    self.httpServer = None
544    self.cfg = cfg
545    self.log = log
546    self.__hostname = local_fqdn()
547    self.workDirs = None 
548
549    # ref to the idle job tracker object.
550    self.__jtMonitor = None
551    self.__idlenessDetected = False
552    self.__stopInProgress = False
553    self.__isStopped = False # to let main exit
554    self.__exitCode = 0 # exit code with which the ringmaster main method should return
555
556    self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring']
557
558    self.__initialize_signal_handlers()
559   
560    sdd = self.cfg['servicedesc']
561    gsvc = None
562    for key in sdd:
563      gsvc = sdd[key]
564      break
565   
566    npd = self.cfg['nodepooldesc']
567    self.np = NodePoolUtil.getNodePool(npd, cfg, log)
568
569    self.log.debug("Getting service ID.")
570   
571    self.serviceId = self.np.getServiceId()
572   
573    self.log.debug("Got service ID: %s" % self.serviceId)
574
575    self.tarSrcLoc = None
576    if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
577      self.download = True
578      self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball']
579 
580    self.cd_to_tempdir()
581
582    if (self.download):
583      self.__copy_tarball(os.getcwd())
584      self.basename = self.__find_tarball_in_dir(os.getcwd())
585      if self.basename is None:
586        raise Exception('Did not find tarball copied from %s in %s.'
587                          % (self.tarSrcLoc, os.getcwd()))
588     
589    self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr'])
590   
591    self.log.debug("Service registry @ %s" % self.serviceAddr)
592   
593    self.serviceClient = hodXRClient(self.serviceAddr)
594    self.serviceDict  = {}
595    try:
596      sdl = self.cfg['servicedesc']
597
598      workDirs = self.getWorkDirs(cfg)
599
600      hdfsDesc = sdl['hdfs']
601      hdfs = None
602 
603      # Determine hadoop Version
604      hadoopVers = hadoopVersion(self.__getHadoopDir(), \
605                                self.cfg['hodring']['java-home'], self.log)
606     
607      if (hadoopVers['major']==None) or (hadoopVers['minor']==None):
608        raise Exception('Could not retrive the version of Hadoop.'
609                        + ' Check the Hadoop installation or the value of the hodring.java-home variable.')
610      if hdfsDesc.isExternal():
611        hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
612        hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
613      else:
614        hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']),
615                    workers_per_ring = self.workers_per_ring)
616
617      self.serviceDict[hdfs.getName()] = hdfs
618     
619      mrDesc = sdl['mapred']
620      mr = None
621      if mrDesc.isExternal():
622        mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
623        mr.setMasterParams( self.cfg['gridservice-mapred'] )
624      else:
625        mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']),
626                       workers_per_ring = self.workers_per_ring)
627
628      self.serviceDict[mr.getName()] = mr
629    except:
630      self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \
631                            %s." % get_exception_error_string())
632      self.log.debug(get_exception_string())
633      raise
634
635    # should not be starting these in a constructor
636    ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
637   
638    self.rpcserver = ringMasterServer.getAddress()
639   
640    self.httpAddress = None   
641    self.tarAddress = None 
642    hostname = socket.gethostname()
643    if (self.download):
644      self.httpServer = threadedHTTPServer(hostname, 
645        self.cfg['ringmaster']['http-port-range'])
646     
647      self.httpServer.serve_forever()
648      self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], 
649                                 self.httpServer.server_address[1])
650      self.tarAddress = "%s%s" % (self.httpAddress, self.basename)
651     
652      ringMasterServer.instance.logMasterSources.registerTarSource(hostname, 
653                                                                   self.tarAddress)
654    else:
655      self.log.debug("Download not set.")
656   
657    self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], 
658      self.serviceId, self.__hostname, 'ringmaster', 'hod'))
659   
660    if self.cfg['ringmaster']['register']:     
661      if self.httpAddress:
662        self.serviceClient.registerService(self.cfg['ringmaster']['userid'], 
663          self.serviceId, self.__hostname, 'ringmaster', 'hod', {
664          'xrs' : self.rpcserver, 'http' : self.httpAddress })
665      else:
666        self.serviceClient.registerService(self.cfg['ringmaster']['userid'], 
667          self.serviceId, self.__hostname, 'ringmaster', 'hod', {
668          'xrs' : self.rpcserver, })
669   
670    self.log.debug("Registered with serivce registry: %s." % self.serviceAddr)
671   
672    hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring')
673    hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' 
674                                  + getpass.getuser())
675   
676    self.cfg['hodring']['hodring'] = [hodRingWorkDir,]
677    self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr']
678    self.cfg['hodring']['service-id'] = self.np.getServiceId()
679
680    self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver)
681   
682    if (self.tarSrcLoc != None):
683      cfg['hodring']['download-addr'] = self.tarAddress
684 
685    self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources)
686
687  def __init_job_tracker_monitor(self, logMasterSources):
688    hadoopDir = self.__getHadoopDir()
689    self.log.debug('hadoopdir=%s, java-home=%s' % \
690                (hadoopDir, self.cfg['hodring']['java-home']))
691    try:
692      self.__jtMonitor = JobTrackerMonitor(self.log, self, 
693                            self.cfg['ringmaster']['jt-poll-interval'], 
694                            self.cfg['ringmaster']['idleness-limit'],
695                            hadoopDir, self.cfg['hodring']['java-home'],
696                            logMasterSources)
697      self.log.debug('starting jt monitor')
698      self.__jtMonitor.start()
699    except:
700      self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\
701                          Exception message: %s' % get_exception_error_string())
702      self.log.debug('Exception details: %s' % get_exception_string())
703
704
705  def __getHadoopDir(self):
706    hadoopDir = None
707    if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
708      tarFile = os.path.join(os.getcwd(), self.basename)
709      ret = untar(tarFile, os.getcwd())
710      if not ret:
711        raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \
712                            % (tarFile, os.getcwd()))
713      hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile))
714    else:
715      hadoopDir = self.cfg['gridservice-mapred']['pkgs']
716    self.log.debug('Returning Hadoop directory as: %s' % hadoopDir)
717    return hadoopDir
718
719  def __get_dir(self, name):
720    """Return the root directory inside the tarball
721    specified by name. Assumes that the tarball begins
722    with a root directory."""
723    import tarfile
724    myTarFile = tarfile.open(name)
725    hadoopPackage = myTarFile.getnames()[0]
726    self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
727    return hadoopPackage
728
729  def __find_tarball_in_dir(self, dir):
730    """Find the tarball among files specified in the given
731    directory. We need this method because how the tarball
732    source URI is given depends on the method of copy and
733    we can't get the tarball name from that.
734    This method will fail if there are multiple tarballs
735    in the directory with the same suffix."""
736    files = os.listdir(dir)
737    for file in files:
738      if self.tarSrcLoc.endswith(file):
739        return file
740    return None
741
742  def __copy_tarball(self, destDir):
743    """Copy the hadoop tar ball from a remote location to the
744    specified destination directory. Based on the URL it executes
745    an appropriate copy command. Throws an exception if the command
746    returns a non-zero exit code."""
747    # for backwards compatibility, treat the default case as file://
748    url = ''
749    if self.tarSrcLoc.startswith('/'):
750      url = 'file:/'
751    src = '%s%s' % (url, self.tarSrcLoc)
752    if src.startswith('file://'):
753      src = src[len('file://')-1:]
754      cpCmd = '/bin/cp'
755      cmd = '%s %s %s' % (cpCmd, src, destDir)
756      self.log.debug('Command to execute: %s' % cmd)
757      copyProc = simpleCommand('remote copy', cmd)
758      copyProc.start()
759      copyProc.wait()
760      copyProc.join()
761      ret = copyProc.exit_code()
762      self.log.debug('Completed command execution. Exit Code: %s.' % ret)
763
764      if ret != 0:
765        output = copyProc.output()
766        raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' 
767                        % (cmd, ret, output))
768    else:
769      raise Exception('Unsupported URL for file: %s' % src)
770
771# input: http://hostname:port/. output: [hostname,port]
772  def __url_to_addr(self, url):
773    addr = url.rstrip('/')
774    if addr.startswith('http://'):
775      addr = addr.replace('http://', '', 1)
776    addr_parts = addr.split(':')
777    return [addr_parts[0], int(addr_parts[1])]
778
779  def __initialize_signal_handlers(self): 
780    def sigStop(sigNum, handler):
781      sig_wrapper(sigNum, self.stop)
782 
783    signal.signal(signal.SIGTERM, sigStop)
784    signal.signal(signal.SIGINT, sigStop)
785    signal.signal(signal.SIGQUIT, sigStop)
786
787  def __clean_up(self):
788    tempDir = self.__get_tempdir()
789    os.chdir(os.path.split(tempDir)[0])
790    if os.path.exists(tempDir):
791      shutil.rmtree(tempDir, True)
792     
793    self.log.debug("Cleaned up temporary dir: %s" % tempDir)
794
795  def __get_tempdir(self):
796    dir = os.path.join(self.cfg['ringmaster']['temp-dir'], 
797                          "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], 
798                                                self.np.getServiceId()))
799    return dir
800
801  def getWorkDirs(self, cfg, reUse=False):
802
803    if (not reUse) or (self.workDirs == None):
804      import math
805      frand = random.random()
806      while math.ceil(frand) != math.floor(frand):
807        frand = frand * 100
808
809      irand = int(frand)
810      uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand)
811      dirs = []
812      parentDirs = cfg['ringmaster']['work-dirs']
813      for p in parentDirs:
814        dir = os.path.join(p, uniq)
815        dirs.append(dir)
816      self.workDirs = dirs
817
818    return self.workDirs
819
820  def _fetchLink(self, link, parentDir):
821    parser = miniHTMLParser()
822    self.log.debug("Checking link %s" %link)
823    while link:
824
825      # Get the file from the site and link
826      input = urllib.urlopen(link)
827      out = None
828      contentType = input.info().gettype()
829      isHtml = contentType == 'text/html'
830
831      #print contentType
832      if isHtml:
833        parser.setBaseUrl(input.geturl())
834      else:
835        parsed = urlparse.urlparse(link)
836        hp = parsed[1]
837        h = hp
838        p = None
839        if hp.find(':') != -1:
840          h, p = hp.split(':', 1)
841        path = parsed[2]
842        path = path.split('/')
843        file = os.path.join(parentDir, h, p)
844        for c in path:
845          if c == '':
846            continue
847          file = os.path.join(file, c)
848
849        try:
850          self.log.debug('Creating %s' % file)
851          dir, tail = os.path.split(file)
852          if not os.path.exists(dir):
853            os.makedirs(dir)
854        except:
855          self.log.debug(get_exception_string())
856
857        out = open(file, 'w')
858
859      bufSz = 8192
860      buf = input.read(bufSz)
861      while len(buf) > 0:
862        if isHtml:
863          # Feed the file into the HTML parser
864          parser.feed(buf)
865        if out:
866          out.write(buf)
867        buf = input.read(bufSz)
868
869      input.close()
870      if out:
871        out.close()
872
873      # Search the retfile here
874
875      # Get the next link in level traversal order
876      link = parser.getNextLink()
877     
878    parser.close()
879   
880  def _finalize(self):
881    try:
882      # FIXME: get dir from config
883      dir = 'HOD-log-P%d' % (os.getpid())
884      dir = os.path.join('.', dir)
885    except:
886      self.log.debug(get_exception_string())
887
888    self.np.finalize()
889
890  def handleIdleJobTracker(self):
891    self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \
892                          % self.cfg['ringmaster']['idleness-limit'])
893    self.__idlenessDetected = True
894
895  def cd_to_tempdir(self):
896    dir = self.__get_tempdir()
897   
898    if not os.path.exists(dir):
899      os.makedirs(dir)
900    os.chdir(dir)
901   
902    return dir
903 
904  def getWorkload(self):
905    return self.workload
906
907  def getHostName(self):
908    return self.__hostname
909
910  def start(self):
911    """run the thread main loop"""
912   
913    self.log.debug("Entered start method.")
914    hodring = os.path.join(self.cfg['ringmaster']['base-dir'], 
915                           'bin', 'hodring')
916    largs = [hodring]
917    targs = self.cfg.get_args(section='hodring')
918    largs.extend(targs) 
919   
920    hodringCmd = ""
921    for item in largs:
922      hodringCmd = "%s%s " % (hodringCmd, item)
923     
924    self.log.debug(hodringCmd)
925   
926    if self.np.runWorkers(largs) > 0:
927      self.log.critical("Failed to start worker.")
928   
929    self.log.debug("Returned from runWorkers.")
930   
931    self._finalize()
932
933  def __findExitCode(self):
934    """Determine the exit code based on the status of the cluster or jobs run on them"""
935    xmlrpcServer = ringMasterServer.instance.logMasterSources
936    if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \
937        xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "):
938      self.__exitCode = 7
939    elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \
940        xmlrpcServer.getServiceAddr('mapred').startswith("Error: "):
941      self.__exitCode = 8
942    else:
943      clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
944                                          xmlrpcServer.getServiceAddr('mapred'))
945      if clusterStatus != 0:
946        self.__exitCode = clusterStatus
947      else:
948        self.__exitCode = self.__findHadoopJobsExitCode()
949    self.log.debug('exit code %s' % self.__exitCode)
950
951  def __findHadoopJobsExitCode(self):
952    """Determine the consolidate exit code of hadoop jobs run on this cluster, provided
953       this information is available. Return 0 otherwise"""
954    ret = 0
955    failureStatus = 3
956    failureCount = 0
957    if self.__jtMonitor:
958      jobStatusList = self.__jtMonitor.getJobsStatus()
959      try:
960        if len(jobStatusList) > 0:
961          for jobStatus in jobStatusList:
962            self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), 
963                                                      jobStatus.getStatus()))
964            if jobStatus.getStatus() == failureStatus:
965              failureCount = failureCount+1
966        if failureCount > 0:
967          if failureCount == len(jobStatusList): # all jobs failed
968            ret = 16
969          else:
970            ret = 17
971      except:
972        self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
973    return ret
974
975  def stop(self):
976    self.log.debug("RingMaster stop method invoked.")
977    if self.__stopInProgress or self.__isStopped:
978      return
979    self.__stopInProgress = True
980    if ringMasterServer.instance is not None:
981      self.log.debug('finding exit code')
982      self.__findExitCode()
983      self.log.debug('stopping ringmaster instance')
984      ringMasterServer.stopService()
985    else:
986      self.__exitCode = 6
987    if self.__jtMonitor is not None:
988      self.__jtMonitor.stop()
989    if self.httpServer:
990      self.httpServer.stop()
991     
992    self.__clean_up()
993    self.__isStopped = True
994
995  def shouldStop(self):
996    """Indicates whether the main loop should exit, either due to idleness condition,
997    or a stop signal was received"""
998    return self.__idlenessDetected or self.__isStopped
999
1000  def getExitCode(self):
1001    """return the exit code of the program"""
1002    return self.__exitCode
1003
1004def main(cfg,log):
1005  try:
1006    rm = None
1007    dGen = DescGenerator(cfg)
1008    cfg = dGen.initializeDesc()
1009    rm = RingMaster(cfg, log)
1010    rm.start()
1011    while not rm.shouldStop():
1012      time.sleep(1)
1013    rm.stop()
1014    log.debug('returning from main')
1015    return rm.getExitCode()
1016  except Exception, e:
1017    if log:
1018      log.critical(get_exception_string())
1019    raise Exception(e)
Note: See TracBrowser for help on using the repository browser.