source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/HodRing/hodRing.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 31.7 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16#!/usr/bin/env python
17"""hodring launches hadoop commands on work node and
18 cleans up all the work dirs afterward
19"""
20# -*- python -*-
21import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom
22import socket, sets, urllib, csv, signal, pprint, random, re, httplib
23
24from xml.dom import getDOMImplementation
25from pprint import pformat
26from optparse import OptionParser
27from urlparse import urlparse
28from hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory, isProcessRunning
29from hodlib.Common.tcp import tcpSocket, tcpError
30
31binfile = sys.path[0]
32libdir = os.path.dirname(binfile)
33sys.path.append(libdir)
34
35import hodlib.Common.logger
36
37from hodlib.GridServices.service import *
38from hodlib.Common.util import *
39from hodlib.Common.socketServers import threadedHTTPServer
40from hodlib.Common.hodsvc import hodBaseService
41from hodlib.Common.threads import simpleCommand
42from hodlib.Common.xmlrpc import hodXRClient
43
44mswindows = (sys.platform == "win32")
45originalcwd = os.getcwd()
46
47reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)")
48
49class CommandDesc:
50  """A class that represents the commands that
51  are run by hodring"""
52  def __init__(self, dict, log):
53    self.log = log
54    self.log.debug("In command desc")
55    self.log.debug("Done in command desc")
56    dict.setdefault('argv', [])
57    dict.setdefault('version', None)
58    dict.setdefault('envs', {})
59    dict.setdefault('workdirs', [])
60    dict.setdefault('attrs', {})
61    dict.setdefault('final-attrs', {})
62    dict.setdefault('fg', False)
63    dict.setdefault('ignorefailures', False)
64    dict.setdefault('stdin', None)
65
66    self.log.debug("Printing dict")
67    self._checkRequired(dict)
68    self.dict = dict
69
70  def _checkRequired(self, dict):
71    if 'name' not in dict:
72      raise ValueError, "Command description lacks 'name'"
73    if 'program' not in dict:
74      raise ValueError, "Command description lacks 'program'"
75    if 'pkgdirs' not in dict:
76      raise ValueError, "Command description lacks 'pkgdirs'"
77
78  def getName(self):
79    return self.dict['name']
80
81  def getProgram(self):
82    return self.dict['program']
83
84  def getArgv(self):
85    return self.dict['argv']
86
87  def getVersion(self):
88    return self.dict['version']
89
90  def getEnvs(self):
91    return self.dict['envs']
92
93  def getPkgDirs(self):
94    return self.dict['pkgdirs']
95
96  def getWorkDirs(self):
97    return self.dict['workdirs']
98
99  def getAttrs(self):
100    return self.dict['attrs']
101
102  def getfinalAttrs(self):
103    return self.dict['final-attrs']
104 
105  def isForeground(self):
106    return self.dict['fg']
107
108  def isIgnoreFailures(self):
109    return self.dict['ignorefailures']
110
111  def getStdin(self):
112    return self.dict['stdin']
113
114  def parseDesc(str):
115
116    dict = CommandDesc._parseMap(str)
117
118    dict['argv'] = CommandDesc._parseList(dict['argv'])
119    dict['envs'] = CommandDesc._parseMap(dict['envs'])
120    dict['pkgdirs'] = CommandDesc._parseList(dict['pkgdirs'], ':')
121    dict['workdirs'] = CommandDesc._parseList(dict['workdirs'], ':')
122    dict['attrs'] = CommandDesc._parseMap(dict['attrs'])
123    dict['final-attrs'] = CommandDesc._parseMap(dict['final-attrs'])
124                                               
125    return CommandDesc(dict)
126
127  parseDesc = staticmethod(parseDesc)
128
129  def _parseList(str, delim = ','):
130    list = []
131    for row in csv.reader([str], delimiter=delim, escapechar='\\', 
132                          quoting=csv.QUOTE_NONE, doublequote=False):
133      list.extend(row)
134    return list
135
136  _parseList = staticmethod(_parseList)
137
138  def _parseMap(str):
139    """Parses key value pairs"""
140    dict = {}
141    for row in csv.reader([str], escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False):
142      for f in row:
143        [k, v] = f.split('=', 1)
144        dict[k] = v
145    return dict
146
147  _parseMap = staticmethod(_parseMap)
148
149class MRSystemDirectoryManager:
150  """Class that is responsible for managing the MapReduce system directory"""
151
152  def __init__(self, jtPid, mrSysDir, fsName, hadoopPath, log, retries=120):
153    self.__jtPid = jtPid
154    self.__mrSysDir = mrSysDir
155    self.__fsName = fsName
156    self.__hadoopPath = hadoopPath
157    self.__log = log
158    self.__retries = retries
159
160  def toCleanupArgs(self):
161    return " --jt-pid %s --mr-sys-dir %s --fs-name %s --hadoop-path %s " \
162              % (self.__jtPid, self.__mrSysDir, self.__fsName, self.__hadoopPath)
163
164  def removeMRSystemDirectory(self):
165   
166    jtActive = isProcessRunning(self.__jtPid)
167    count = 0 # try for a max of a minute for the process to end
168    while jtActive and (count<self.__retries):
169      time.sleep(0.5)
170      jtActive = isProcessRunning(self.__jtPid)
171      count += 1
172   
173    if count == self.__retries:
174      self.__log.warn('Job Tracker did not exit even after a minute. Not going to try and cleanup the system directory')
175      return
176
177    self.__log.debug('jt is now inactive')
178
179    cmd = "%s dfs -fs hdfs://%s -rmr %s" % (self.__hadoopPath, self.__fsName, \
180                                            self.__mrSysDir)
181    self.__log.debug('Command to run to remove system directory: %s' % (cmd))
182    try:
183      hadoopCommand = simpleCommand('mr-sys-dir-cleaner', cmd)
184      hadoopCommand.start()
185      hadoopCommand.wait()
186      hadoopCommand.join()
187      ret = hadoopCommand.exit_code()
188      if ret != 0:
189        self.__log.warn("Error in removing MapReduce system directory '%s' from '%s' using path '%s'" \
190                          % (self.__mrSysDir, self.__fsName, self.__hadoopPath))
191        self.__log.warn(pprint.pformat(hadoopCommand.output()))
192      else:
193        self.__log.info("Removed MapReduce system directory successfully.")
194    except:
195      self.__log.error('Exception while cleaning up MapReduce system directory. May not be cleaned up. %s', \
196                          get_exception_error_string())
197      self.__log.debug(get_exception_string())
198
199
200def createMRSystemDirectoryManager(dict, log):
201  keys = [ 'jt-pid', 'mr-sys-dir', 'fs-name', 'hadoop-path' ]
202  for key in keys:
203    if (not dict.has_key(key)) or (dict[key] is None):
204      return None
205
206  mrSysDirManager = MRSystemDirectoryManager(int(dict['jt-pid']), dict['mr-sys-dir'], \
207                                              dict['fs-name'], dict['hadoop-path'], log)
208  return mrSysDirManager
209
210class HadoopCommand:
211  """Runs a single hadoop command"""
212   
213  def __init__(self, id, desc, tempdir, tardir, log, javahome, 
214                mrSysDir, restart=False):
215    self.desc = desc
216    self.log = log
217    self.javahome = javahome
218    self.__mrSysDir = mrSysDir
219    self.program = desc.getProgram()
220    self.name = desc.getName()
221    self.workdirs = desc.getWorkDirs()
222    self.hadoopdir = tempdir
223    self.confdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), 
224                                "confdir")
225    self.logdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), 
226                               "logdir")
227    self.out = os.path.join(self.logdir, '%s.out' % self.name)
228    self.err = os.path.join(self.logdir, '%s.err' % self.name)
229
230    self.child = None
231    self.restart = restart
232    self.filledInKeyVals = []
233    self._createWorkDirs()
234    self._createHadoopSiteXml()
235    self._createHadoopLogDir()
236    self.__hadoopThread = None
237    self.stdErrContents = "" # store list of contents for returning to user
238
239  def _createWorkDirs(self):
240    for dir in self.workdirs:
241      if os.path.exists(dir):
242        if not os.access(dir, os.F_OK | os.R_OK | os.W_OK | os.X_OK):
243          raise ValueError, "Workdir %s does not allow rwx permission." % (dir)
244        continue
245      try:
246        os.makedirs(dir)
247      except:
248        pass
249
250  def getFilledInKeyValues(self):
251    return self.filledInKeyVals
252
253  def createXML(self, doc, attr, topElement, final):
254    for k,v in attr.iteritems():
255      self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v))
256      if ( v == "fillinport" ):
257        v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log))
258
259      keyvalpair = ''
260      if isinstance(v, (tuple, list)):
261        for item in v:
262          keyvalpair = "%s%s=%s," % (keyvalpair, k, item)
263        keyvalpair = keyvalpair[:-1]
264      else:
265        keyvalpair = k + '=' + v
266
267      self.filledInKeyVals.append(keyvalpair)
268      if(k == "mapred.job.tracker"): # total hack for time's sake
269        keyvalpair = k + "=" + v
270        self.filledInKeyVals.append(keyvalpair)
271       
272      if ( v == "fillinhostport"):
273        port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log))
274        self.log.debug('Setting hostname to: %s' % local_fqdn())
275        v = local_fqdn() + ':' + port
276     
277      keyvalpair = ''
278      if isinstance(v, (tuple, list)):
279        for item in v:
280          keyvalpair = "%s%s=%s," % (keyvalpair, k, item)
281        keyvalpair = keyvalpair[:-1]
282      else:
283        keyvalpair = k + '=' + v
284     
285      self.filledInKeyVals.append(keyvalpair)
286      if ( v == "fillindir"):
287        v = self.__mrSysDir
288        pass
289     
290      prop = None
291      if isinstance(v, (tuple, list)):
292        for item in v:
293          prop = self._createXmlElement(doc, k, item, "No description", final)
294          topElement.appendChild(prop)
295      else:
296        if k == 'fs.default.name':
297          prop = self._createXmlElement(doc, k, "hdfs://" + v, "No description", final)
298        else:
299          prop = self._createXmlElement(doc, k, v, "No description", final)
300        topElement.appendChild(prop)
301       
302  def _createHadoopSiteXml(self):
303    if self.restart:
304      if not os.path.exists(self.confdir):
305        os.makedirs(self.confdir)
306    else:
307      assert os.path.exists(self.confdir) == False
308      os.makedirs(self.confdir)
309
310    implementation = getDOMImplementation()
311    doc = implementation.createDocument('', 'configuration', None)
312    comment = doc.createComment("This is an auto generated hadoop-site.xml, do not modify")
313    topElement = doc.documentElement
314    topElement.appendChild(comment)
315   
316    finalAttr = self.desc.getfinalAttrs()
317    self.createXML(doc, finalAttr, topElement, True)
318    attr = {}
319    attr1 = self.desc.getAttrs()
320    for k,v in attr1.iteritems():
321      if not finalAttr.has_key(k):
322        attr[k] = v
323    self.createXML(doc, attr, topElement, False)
324             
325   
326    siteName = os.path.join(self.confdir, "hadoop-site.xml")
327    sitefile = file(siteName, 'w')
328    print >> sitefile, topElement.toxml()
329    sitefile.close()
330    self.log.debug('created %s' % (siteName))
331
332  def _createHadoopLogDir(self):
333    if self.restart:
334      if not os.path.exists(self.logdir):
335        os.makedirs(self.logdir)
336    else:
337      assert os.path.exists(self.logdir) == False
338      os.makedirs(self.logdir)
339
340  def _createXmlElement(self, doc, name, value, description, final):
341    prop = doc.createElement("property")
342    nameP = doc.createElement("name")
343    string = doc.createTextNode(name)
344    nameP.appendChild(string)
345    valueP = doc.createElement("value")
346    string = doc.createTextNode(value)
347    valueP.appendChild(string)
348    desc = doc.createElement("description")
349    string = doc.createTextNode(description)
350    desc.appendChild(string)
351    prop.appendChild(nameP)
352    prop.appendChild(valueP)
353    prop.appendChild(desc)
354    if (final):
355      felement = doc.createElement("final")
356      string = doc.createTextNode("true")
357      felement.appendChild(string)
358      prop.appendChild(felement)
359      pass
360   
361    return prop
362
363  def getMRSystemDirectoryManager(self):
364    return MRSystemDirectoryManager(self.__hadoopThread.getPid(), self.__mrSysDir, \
365                                    self.desc.getfinalAttrs()['fs.default.name'], \
366                                    self.path, self.log)
367
368  def run(self, dir):
369    status = True
370    args = []
371    desc = self.desc
372   
373    self.log.debug(pprint.pformat(desc.dict))
374   
375   
376    self.log.debug("Got package dir of %s" % dir)
377   
378    self.path = os.path.join(dir, self.program)
379   
380    self.log.debug("path: %s" % self.path)
381    args.append(self.path)
382    args.extend(desc.getArgv())
383    envs = desc.getEnvs()
384    fenvs = os.environ
385   
386    for k, v in envs.iteritems():
387      fenvs[k] = v
388   
389    if envs.has_key('HADOOP_OPTS'):
390      fenvs['HADOOP_OPTS'] = envs['HADOOP_OPTS']
391      self.log.debug("HADOOP_OPTS : %s" % fenvs['HADOOP_OPTS'])
392   
393    fenvs['JAVA_HOME'] = self.javahome
394    fenvs['HADOOP_CONF_DIR'] = self.confdir
395    fenvs['HADOOP_LOG_DIR'] = self.logdir
396
397    self.log.info(pprint.pformat(fenvs))
398
399    hadoopCommand = ''
400    for item in args:
401        hadoopCommand = "%s%s " % (hadoopCommand, item)
402
403    # Redirecting output and error to self.out and self.err
404    hadoopCommand = hadoopCommand + ' 1>%s 2>%s ' % (self.out, self.err)
405       
406    self.log.debug('running command: %s' % (hadoopCommand)) 
407    self.log.debug('hadoop env: %s' % fenvs)
408    self.log.debug('Command stdout will be redirected to %s ' % self.out + \
409                   'and command stderr to %s' % self.err)
410
411    self.__hadoopThread = simpleCommand('hadoop', hadoopCommand, env=fenvs)
412    self.__hadoopThread.start()
413   
414    while self.__hadoopThread.stdin == None:
415      time.sleep(.2)
416      self.log.debug("hadoopThread still == None ...")
417   
418    input = desc.getStdin()
419    self.log.debug("hadoop input: %s" % input)
420    if input:
421      if self.__hadoopThread.is_running():
422        print >>self.__hadoopThread.stdin, input
423      else:
424        self.log.error("hadoop command failed to start")
425   
426    self.__hadoopThread.stdin.close() 
427   
428    self.log.debug("isForground: %s" % desc.isForeground())
429    if desc.isForeground():
430      self.log.debug("Waiting on hadoop to finish...")
431      self.__hadoopThread.wait()
432     
433      self.log.debug("Joining hadoop thread...")
434      self.__hadoopThread.join()
435      if self.__hadoopThread.exit_code() != 0:
436        status = False
437    else:
438      status = self.getCommandStatus()
439       
440    self.log.debug("hadoop run status: %s" % status)   
441   
442    if status == False:
443      self.handleFailedCommand()
444   
445    if (status == True) or (not desc.isIgnoreFailures()):
446      return status
447    else:
448      self.log.error("Ignoring Failure")
449      return True
450
451  def kill(self):
452    self.__hadoopThread.kill()
453    if self.__hadoopThread:
454      self.__hadoopThread.join()
455
456  def addCleanup(self, list):
457    list.extend(self.workdirs)
458    list.append(self.confdir)
459
460  def getCommandStatus(self):
461    status = True
462    ec = self.__hadoopThread.exit_code()
463    if (ec != 0) and (ec != None):
464      status = False
465    return status
466
467  def handleFailedCommand(self):
468    self.log.error('hadoop error: %s' % (
469                     self.__hadoopThread.exit_status_string()))
470    # read the contents of redirected stderr to print information back to user
471    if os.path.exists(self.err):
472      f = None
473      try:
474        f = open(self.err)
475        lines = f.readlines()
476        # format
477        for line in lines:
478          self.stdErrContents = "%s%s" % (self.stdErrContents, line)
479      finally:
480        if f is not None:
481          f.close()
482    self.log.error('See %s.out and/or %s.err for details. They are ' % \
483                   (self.name, self.name) + \
484                   'located at subdirectories under either ' + \
485                   'hodring.work-dirs or hodring.log-destination-uri.')
486
487class HodRing(hodBaseService):
488  """The main class for hodring that
489  polls the commands it runs"""
490  def __init__(self, config):
491    hodBaseService.__init__(self, 'hodring', config['hodring'])
492    self.log = self.logs['main']
493    self._http = None
494    self.__pkg = None
495    self.__pkgDir = None 
496    self.__tempDir = None
497    self.__running = {}
498    self.__hadoopLogDirs = []
499    self.__init_temp_dir()
500
501  def __init_temp_dir(self):
502    self.__tempDir = os.path.join(self._cfg['temp-dir'], 
503                                  "%s.%s.hodring" % (self._cfg['userid'], 
504                                                      self._cfg['service-id']))
505    if not os.path.exists(self.__tempDir):
506      os.makedirs(self.__tempDir)
507    os.chdir(self.__tempDir) 
508
509  def __fetch(self, url, spath):
510    retry = 3
511    success = False
512    while (retry != 0 and success != True):
513      try:
514        input = urllib.urlopen(url)
515        bufsz = 81920
516        buf = input.read(bufsz)
517        out = open(spath, 'w')
518        while len(buf) > 0:
519          out.write(buf)
520          buf = input.read(bufsz)
521        input.close()
522        out.close()
523        success = True
524      except:
525        self.log.debug("Failed to copy file")
526        retry = retry - 1
527    if (retry == 0 and success != True):
528      raise IOError, "Failed to copy the files"
529
530     
531  def __get_name(self, addr):
532    parsedUrl = urlparse(addr)
533    path = parsedUrl[2]
534    split = path.split('/', 1)
535    return split[1]
536
537  def __get_dir(self, name):
538    """Return the root directory inside the tarball
539    specified by name. Assumes that the tarball begins
540    with a root directory."""
541    import tarfile
542    myTarFile = tarfile.open(name)
543    hadoopPackage = myTarFile.getnames()[0]
544    self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
545    return hadoopPackage
546
547  def getRunningValues(self):
548    return self.__running.values()
549
550  def getTempDir(self):
551    return self.__tempDir
552
553  def getHadoopLogDirs(self):
554    return self.__hadoopLogDirs
555 
556  def __download_package(self, ringClient):
557    self.log.debug("Found download address: %s" % 
558                   self._cfg['download-addr'])
559    try:
560      addr = 'none'
561      downloadTime = self._cfg['tarball-retry-initial-time']           # download time depends on tarball size and network bandwidth
562     
563      increment = 0
564     
565      addr = ringClient.getTarList(self.hostname)
566
567      while(addr == 'none'):
568        rand = self._cfg['tarball-retry-initial-time'] + increment + \
569                        random.uniform(0,self._cfg['tarball-retry-interval'])
570        increment = increment + 1
571        self.log.debug("got no tarball. Retrying again in %s seconds." % rand)
572        time.sleep(rand)
573        addr = ringClient.getTarList(self.hostname)
574
575   
576      self.log.debug("got this address %s" % addr)
577     
578      tarName = self.__get_name(addr)
579      self.log.debug("tar package name: %s" % tarName)
580     
581      fetchPath = os.path.join(os.getcwd(), tarName) 
582      self.log.debug("fetch path: %s" % fetchPath)
583     
584      self.__fetch(addr, fetchPath)
585      self.log.debug("done fetching")
586   
587      tarUrl = "http://%s:%d/%s" % (self._http.server_address[0], 
588                                    self._http.server_address[1], 
589                                    tarName)
590      try: 
591        ringClient.registerTarSource(self.hostname, tarUrl,addr)
592        #ringClient.tarDone(addr)
593      except KeyError, e:
594        self.log.error("registerTarSource and tarDone failed: ", e)
595        raise KeyError(e)
596     
597      check = untar(fetchPath, os.getcwd())
598     
599      if (check == False):
600        raise IOError, "Untarring failed."
601     
602      self.__pkg = self.__get_dir(tarName)
603      self.__pkgDir = os.path.join(os.getcwd(), self.__pkg)     
604    except Exception, e:
605      self.log.error("Failed download tar package: %s" % 
606                     get_exception_error_string())
607      raise Exception(e)
608     
609  def __run_hadoop_commands(self, restart=True):
610    id = 0
611    for desc in self._cfg['commanddesc']:
612      self.log.debug(pprint.pformat(desc.dict))
613      mrSysDir = getMapredSystemDirectory(self._cfg['mapred-system-dir-root'],
614                          self._cfg['userid'], self._cfg['service-id'])
615      self.log.debug('mrsysdir is %s' % mrSysDir)
616      cmd = HadoopCommand(id, desc, self.__tempDir, self.__pkgDir, self.log, 
617                          self._cfg['java-home'], mrSysDir, restart)
618   
619      self.__hadoopLogDirs.append(cmd.logdir)
620      self.log.debug("hadoop log directory: %s" % self.__hadoopLogDirs)
621     
622      try:
623        # if the tarball isn't there, we use the pkgs dir given.
624        if self.__pkgDir == None:
625          pkgdir = desc.getPkgDirs()
626        else:
627          pkgdir = self.__pkgDir
628
629        self.log.debug('This is the packcage dir %s ' % (pkgdir))
630        if not cmd.run(pkgdir):
631          addnInfo = ""
632          if cmd.stdErrContents is not "":
633            addnInfo = " Information from stderr of the command:\n%s" % (cmd.stdErrContents)
634          raise Exception("Could not launch the %s using %s/bin/hadoop.%s" % (desc.getName(), pkgdir, addnInfo))
635      except Exception, e:
636        self.log.debug("Exception running hadoop command: %s\n%s" % (get_exception_error_string(), get_exception_string()))
637        self.__running[id] = cmd
638        raise Exception(e)
639
640      id += 1
641      if desc.isForeground():
642        continue
643      self.__running[id-1] = cmd
644
645      # ok.. now command is running. If this HodRing got jobtracker,
646      # Check if it is ready for accepting jobs, and then only return
647      self.__check_jobtracker(desc, id-1, pkgdir)
648     
649  def __check_jobtracker(self, desc, id, pkgdir):
650    # Check jobtracker status. Return properly if it is ready to accept jobs.
651    # Currently Checks for Jetty to come up, the last thing that can be checked
652    # before JT completes initialisation. To be perfectly reliable, we need
653    # hadoop support
654    name = desc.getName()
655    if name == 'jobtracker':
656      # Yes I am the Jobtracker
657      self.log.debug("Waiting for jobtracker to initialise")
658      version = desc.getVersion()
659      self.log.debug("jobtracker version : %s" % version)
660      hadoopCmd = self.getRunningValues()[id]
661      attrs = hadoopCmd.getFilledInKeyValues()
662      attrs = parseEquals(attrs)
663      jobTrackerAddr = attrs['mapred.job.tracker']
664      self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr)
665      if version < 16:
666        jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \
667                              attrs['mapred.job.tracker.info.port']
668      else:
669        jettyAddr = attrs['mapred.job.tracker.http.address']
670      self.log.debug("Jobtracker jetty : %s" % jettyAddr)
671
672      # Check for Jetty to come up
673      # For this do a http head, and then look at the status
674      defaultTimeout = socket.getdefaulttimeout()
675      # socket timeout isn`t exposed at httplib level. Setting explicitly.
676      socket.setdefaulttimeout(1)
677      sleepTime = 0.5
678      jettyStatus = False
679      jettyStatusmsg = ""
680      while sleepTime <= 32:
681        # There is a possibility that the command might fail after a while.
682        # This code will check if the command failed so that a better
683        # error message can be returned to the user.
684        if not hadoopCmd.getCommandStatus():
685          self.log.critical('Hadoop command found to have failed when ' \
686                            'checking for jobtracker status')
687          hadoopCmd.handleFailedCommand()
688          addnInfo = ""
689          if hadoopCmd.stdErrContents is not "":
690            addnInfo = " Information from stderr of the command:\n%s" \
691                                        % (hadoopCmd.stdErrContents)
692          raise Exception("Could not launch the %s using %s/bin/hadoop.%s" \
693                                        % (desc.getName(), pkgdir, addnInfo))
694         
695        try:
696          jettyConn = httplib.HTTPConnection(jettyAddr)
697          jettyConn.request("HEAD", "/jobtracker.jsp")
698          # httplib inherently retries the following till socket timeout
699          resp = jettyConn.getresponse()
700          if resp.status != 200:
701            # Some problem?
702            jettyStatus = False
703            jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\
704                             " request. HTTP Status (Code, Msg): (%s, %s)" % \
705                             ( resp.status, resp.reason )
706            break
707          else:
708            self.log.info("Jetty returned a 200 status (%s)" % resp.reason)
709            self.log.info("JobTracker successfully initialised")
710            return
711        except socket.error:
712          self.log.debug("Jetty gave a socket error. Sleeping for %s" \
713                                                                  % sleepTime)
714          time.sleep(sleepTime)
715          sleepTime = sleepTime * 2
716        except Exception, e:
717          jettyStatus = False
718          jettyStatusmsg = ("Process(possibly other than jetty) running on" + \
719                  " port assigned to jetty is returning invalid http response")
720          break
721      socket.setdefaulttimeout(defaultTimeout)
722      if not jettyStatus:
723        self.log.critical("Jobtracker failed to initialise.")
724        if jettyStatusmsg:
725          self.log.critical( "Reason: %s" % jettyStatusmsg )
726        else: self.log.critical( "Reason: Jetty failed to give response")
727        raise Exception("JobTracker failed to initialise")
728
729  def stop(self):
730    self.log.debug("Entered hodring stop.")
731    if self._http: 
732      self.log.debug("stopping http server...")
733      self._http.stop()
734   
735    self.log.debug("call hodsvcrgy stop...")
736    hodBaseService.stop(self)
737   
738  def _xr_method_clusterStart(self, initialize=True):
739    return self.clusterStart(initialize)
740
741  def _xr_method_clusterStop(self):
742    return self.clusterStop()
743 
744  def start(self):
745    """Run and maintain hodring commands"""
746   
747    try:
748      if self._cfg.has_key('download-addr'):
749        self._http = threadedHTTPServer('', self._cfg['http-port-range'])
750        self.log.info("Starting http server...")
751        self._http.serve_forever()
752        self.log.debug("http://%s:%d" % (self._http.server_address[0],
753                     self._http.server_address[1]))
754     
755      hodBaseService.start(self)
756     
757      ringXRAddress = None
758      if self._cfg.has_key('ringmaster-xrs-addr'):
759        ringXRAddress = "http://%s:%s/" % (self._cfg['ringmaster-xrs-addr'][0],
760                          self._cfg['ringmaster-xrs-addr'][1])
761        self.log.debug("Ringmaster at %s" % ringXRAddress)
762
763      self.log.debug("Creating service registry XML-RPC client.")
764      serviceClient = hodXRClient(to_http_url(
765                                  self._cfg['svcrgy-addr']))
766      if ringXRAddress == None:
767        self.log.info("Did not get ringmaster XML-RPC address. Fetching information from service registry.")
768        ringList = serviceClient.getServiceInfo(self._cfg['userid'], 
769            self._cfg['service-id'], 'ringmaster', 'hod')
770     
771        self.log.debug(pprint.pformat(ringList))
772     
773        if len(ringList):
774          if isinstance(ringList, list):
775            ringXRAddress = ringList[0]['xrs']
776     
777        count = 0
778        while (ringXRAddress == None and count < 3000):
779          ringList = serviceClient.getServiceInfo(self._cfg['userid'], 
780            self._cfg['service-id'], 'ringmaster', 'hod')
781       
782          if len(ringList):
783            if isinstance(ringList, list):
784              ringXRAddress = ringList[0]['xrs']
785       
786          count = count + 1
787          time.sleep(.2)
788     
789      if ringXRAddress == None:
790        raise Exception("Could not get ringmaster XML-RPC server address.")
791       
792      self.log.debug("Creating ringmaster XML-RPC client.")
793      ringClient = hodXRClient(ringXRAddress)   
794     
795      id = self.hostname + "_" + str(os.getpid())
796     
797      if 'download-addr' in self._cfg:
798        self.__download_package(ringClient)
799      else:
800        self.log.debug("Did not find a download address.")
801         
802      cmdlist = []
803      firstTime = True
804      increment = 0
805      hadoopStartupTime = 2
806       
807      cmdlist = ringClient.getCommand(id)
808
809      while (cmdlist == []):
810        if firstTime:
811          sleepTime = increment + self._cfg['cmd-retry-initial-time'] + hadoopStartupTime\
812                        + random.uniform(0,self._cfg['cmd-retry-interval'])
813          firstTime = False
814        else:
815          sleepTime = increment + self._cfg['cmd-retry-initial-time'] + \
816                        + random.uniform(0,self._cfg['cmd-retry-interval'])
817        self.log.debug("Did not get command list. Waiting for %s seconds." % (sleepTime))
818        time.sleep(sleepTime)
819        increment = increment + 1
820        cmdlist = ringClient.getCommand(id)
821
822      self.log.debug(pformat(cmdlist)) 
823      cmdDescs = []
824      for cmds in cmdlist:
825        cmdDescs.append(CommandDesc(cmds['dict'], self.log))
826 
827      self._cfg['commanddesc'] = cmdDescs
828     
829      self.log.info("Running hadoop commands...")
830
831      self.__run_hadoop_commands(False)
832       
833      masterParams = []
834      for k, cmd in self.__running.iteritems():
835        masterParams.extend(cmd.filledInKeyVals)
836 
837      self.log.debug("printing getparams")
838      self.log.debug(pformat(id))
839      self.log.debug(pformat(masterParams))
840      # when this is on a required host, the ringMaster already has our masterParams
841      if(len(masterParams) > 0):
842        ringClient.addMasterParams(id, masterParams)
843    except Exception, e:
844      raise Exception(e)
845
846  def clusterStart(self, initialize=True):
847    """Start a stopped mapreduce/dfs cluster"""
848    if initialize:
849      self.log.debug('clusterStart Method Invoked - Initialize')
850    else:
851      self.log.debug('clusterStart Method Invoked - No Initialize')
852    try:
853      self.log.debug("Creating service registry XML-RPC client.")
854      serviceClient = hodXRClient(to_http_url(self._cfg['svcrgy-addr']),
855                                  None, None, 0, 0, 0)
856
857      self.log.info("Fetching ringmaster information from service registry.")
858      count = 0
859      ringXRAddress = None
860      while (ringXRAddress == None and count < 3000):
861        ringList = serviceClient.getServiceInfo(self._cfg['userid'],
862          self._cfg['service-id'], 'ringmaster', 'hod')
863        if len(ringList):
864          if isinstance(ringList, list):
865            ringXRAddress = ringList[0]['xrs']
866        count = count + 1
867
868      if ringXRAddress == None:
869        raise Exception("Could not get ringmaster XML-RPC server address.")
870
871      self.log.debug("Creating ringmaster XML-RPC client.")
872      ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0)
873
874      id = self.hostname + "_" + str(os.getpid())
875
876      cmdlist = []
877      if initialize:
878        if 'download-addr' in self._cfg:
879          self.__download_package(ringClient)
880        else:
881          self.log.debug("Did not find a download address.")
882        while (cmdlist == []):
883          cmdlist = ringClient.getCommand(id)
884      else:
885        while (cmdlist == []):
886          cmdlist = ringClient.getAdminCommand(id)
887
888      self.log.debug(pformat(cmdlist))
889      cmdDescs = []
890      for cmds in cmdlist:
891        cmdDescs.append(CommandDesc(cmds['dict'], self.log))
892
893      self._cfg['commanddesc'] = cmdDescs
894
895      if initialize:
896        self.log.info("Running hadoop commands again... - Initialize")
897        self.__run_hadoop_commands()
898        masterParams = []
899        for k, cmd in self.__running.iteritems():
900          self.log.debug(cmd)
901          masterParams.extend(cmd.filledInKeyVals)
902
903        self.log.debug("printing getparams")
904        self.log.debug(pformat(id))
905        self.log.debug(pformat(masterParams))
906        # when this is on a required host, the ringMaster already has our masterParams
907        if(len(masterParams) > 0):
908          ringClient.addMasterParams(id, masterParams)
909      else:
910        self.log.info("Running hadoop commands again... - No Initialize")
911        self.__run_hadoop_commands()
912
913    except:
914      self.log.error(get_exception_string())
915
916    return True
917
918  def clusterStop(self):
919    """Stop a running mapreduce/dfs cluster without stopping the hodring"""
920    self.log.debug('clusterStop Method Invoked')
921    try:
922      for cmd in self.__running.values():
923        cmd.kill()
924      self.__running = {}
925    except:
926      self.log.error(get_exception_string())
927
928    return True
Note: See TracBrowser for help on using the repository browser.