source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/Hod/hadoop.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 27.0 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16"""define WorkLoad as abstract interface for user job"""
17# -*- python -*-
18
19import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, math
20
21from HTMLParser import HTMLParser
22
23import xml.dom.minidom
24import xml.dom.pulldom
25from xml.dom import getDOMImplementation
26
27from hodlib.Common.util import *
28from hodlib.Common.xmlrpc import hodXRClient
29from hodlib.Common.miniHTMLParser import miniHTMLParser
30from hodlib.Common.nodepoolutil import NodePoolUtil
31from hodlib.Common.tcp import tcpError, tcpSocket
32
33reCommandDelimeterString = r"(?<!\\);"
34reCommandDelimeter = re.compile(reCommandDelimeterString)
35
36class hadoopConfig:
37  def __create_xml_element(self, doc, name, value, description, final = False):
38    prop = doc.createElement("property")
39    nameP = doc.createElement("name")
40    string = doc.createTextNode(name)
41    nameP.appendChild(string)
42    valueP = doc.createElement("value")
43    string = doc.createTextNode(value)
44    valueP.appendChild(string)
45    if final:
46      finalP = doc.createElement("final")
47      string = doc.createTextNode("true")
48      finalP.appendChild(string)
49    desc = doc.createElement("description")
50    string = doc.createTextNode(description)
51    desc.appendChild(string)
52    prop.appendChild(nameP)
53    prop.appendChild(valueP)
54    if final:
55      prop.appendChild(finalP)
56    prop.appendChild(desc)
57   
58    return prop
59
60  def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr, mrSysDir,\
61             mapredAddr=None, clientParams=None, serverParams=None,\
62             finalServerParams=None, clusterFactor=None):
63    if not mapredAddr:
64      mapredAddr = "dummy:8181"
65   
66    implementation = getDOMImplementation()
67    doc = implementation.createDocument('', 'configuration', None)
68    comment = doc.createComment(
69      "This is an auto generated hadoop-site.xml, do not modify")
70    topElement = doc.documentElement
71    topElement.appendChild(comment)
72
73    description = {}
74    paramsDict = {  'mapred.job.tracker'    : mapredAddr , \
75                    'fs.default.name'       : "hdfs://" + hdfsAddr, \
76                    'hadoop.tmp.dir'        : tempDir, \
77                 }
78
79    paramsDict['mapred.system.dir'] = mrSysDir
80   
81    # mapred-default.xml is no longer used now.
82    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
83    paramsDict['mapred.reduce.tasks'] = str(numred)
84    # end
85
86    # for all the above vars generated, set the description
87    for k, v in paramsDict.iteritems():
88      description[k] = 'Hod generated parameter'
89
90    # finalservelParams
91    if finalServerParams:
92      for k, v in finalServerParams.iteritems():
93        if not description.has_key(k):
94          description[k] = "final server parameter"
95          paramsDict[k] = v
96
97    # servelParams
98    if serverParams:
99      for k, v in serverParams.iteritems():
100        if not description.has_key(k):
101          # if no final value for same param is mentioned
102          description[k] = "server parameter"
103          paramsDict[k] = v
104
105    # clientParams
106    if clientParams:
107      for k, v in clientParams.iteritems():
108        if not description.has_key(k) or description[k] == "server parameter":
109          # Just add, if no final value for same param is mentioned.
110          # Replace even if server param is mentioned for same config variable
111          description[k] = "client-side parameter"
112          paramsDict[k] = v
113   
114    # generate the xml elements
115    for k,v in paramsDict.iteritems():
116      if ( description[k] == "final server parameter" or \
117                             description[k] == "Hod generated parameter" ): 
118         final = True
119      else: final = False
120      prop = self.__create_xml_element(doc, k, v, description[k], final)
121      topElement.appendChild(prop)
122
123    siteName = os.path.join(confDir, "hadoop-site.xml")
124    sitefile = file(siteName, 'w')
125    print >> sitefile, topElement.toxml()
126    sitefile.close()
127
128class hadoopCluster:
129  def __init__(self, cfg, log):
130    self.__cfg = cfg
131    self.__log = log
132    self.__changedClusterParams = []
133   
134    self.__hostname = local_fqdn()   
135    self.__svcrgyClient = None
136    self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'], 
137                                               self.__cfg, self.__log)       
138    self.__hadoopCfg = hadoopConfig()
139    self.jobId = None
140    self.mapredInfo = None
141    self.hdfsInfo = None
142    self.ringmasterXRS = None
143
144  def __get_svcrgy_client(self):
145    svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])
146    return hodXRClient(svcrgyUrl)
147
148  def __get_service_status(self):
149    serviceData = self.__get_service_data()
150   
151    status = True
152    hdfs = False
153    mapred = False
154   
155    for host in serviceData.keys():
156      for item in serviceData[host]:
157        service = item.keys()
158        if service[0] == 'hdfs.grid' and \
159          self.__cfg['gridservice-hdfs']['external'] == False:
160          hdfs = True
161        elif service[0] == 'mapred.grid':
162          mapred = True
163   
164    if not mapred:
165      status = "mapred"
166   
167    if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:
168      if status != True:
169        status = "mapred and hdfs"
170      else:
171        status = "hdfs"
172     
173    return status
174 
175  def __get_service_data(self):
176    registry = to_http_url(self.__cfg['hod']['xrs-address'])
177    serviceData = self.__svcrgyClient.getServiceInfo(
178      self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())
179   
180    return serviceData
181 
182  def __check_job_status(self):
183    failureCount = 0
184    status = False
185    state = 'Q'
186    userLimitsFirstFlag = True
187
188    while (state=='Q') or (state==False):
189      if hodInterrupt.isSet():
190        raise HodInterruptException()
191
192      jobInfo = self.__nodePool.getJobInfo()
193      state = jobInfo['job_state']
194      self.__log.debug('job state %s' % state)
195      if state == False:
196        failureCount += 1
197        if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
198          self.__log.debug('Number of retries reached max limit while querying job status')
199          break
200        time.sleep(self.__cfg['hod']['job-command-failure-interval'])
201      elif state!='Q':
202        break
203      else:
204        self.__log.debug('querying for job status after job-status-query-interval')
205        time.sleep(self.__cfg['hod']['job-status-query-interval'])
206
207      if self.__cfg['hod'].has_key('job-feasibility-attr') and \
208                      self.__cfg['hod']['job-feasibility-attr']:
209        (status, msg) = self.__isJobFeasible()
210        if status == "Never":
211          self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \
212                "This cluster cannot be allocated now.")
213          return -1
214        elif status == False:
215          if userLimitsFirstFlag:
216            self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \
217                "This cluster allocation will succeed only after other " + \
218                "clusters are deallocated.")
219            userLimitsFirstFlag = False
220   
221    if state and state != 'C':
222      status = True
223   
224    return status
225
226  def __isJobFeasible(self):
227    return self.__nodePool.isJobFeasible()
228 
229  def __get_ringmaster_client(self):
230    ringmasterXRS = None
231   
232    ringList = self.__svcrgyClient.getServiceInfo(
233      self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
234      'ringmaster', 'hod')
235
236    if ringList and len(ringList):
237      if isinstance(ringList, list):
238        ringmasterXRS = ringList[0]['xrs']
239    else:   
240      count = 0
241      waitTime = self.__cfg['hod']['allocate-wait-time']
242 
243      while count < waitTime:
244        if hodInterrupt.isSet():
245          raise HodInterruptException()
246
247        ringList = self.__svcrgyClient.getServiceInfo(
248          self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
249          'ringmaster', 
250          'hod')
251       
252        if ringList and len(ringList):
253          if isinstance(ringList, list):       
254            ringmasterXRS = ringList[0]['xrs']
255       
256        if ringmasterXRS is not None:
257          break
258        else:
259          time.sleep(1)
260          count = count + 1
261          # check to see if the job exited by any chance in that time:
262          if (count % self.__cfg['hod']['job-status-query-interval'] == 0):
263            if not self.__check_job_status():
264              break
265    return ringmasterXRS
266 
267  def __init_hadoop_service(self, serviceName, xmlrpcClient):
268    status = True
269    serviceAddress = None
270    serviceInfo = None
271 
272    for i in range(0, 250): 
273      try:
274        if hodInterrupt.isSet():
275            raise HodInterruptException()
276
277        serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
278        if serviceAddress:
279          if serviceAddress == 'not found':
280            time.sleep(1)
281          # check to see if the job exited by any chance in that time:
282            if ((i+1) % self.__cfg['hod']['job-status-query-interval'] == 0):
283              if not self.__check_job_status():
284                break
285          else:
286            serviceInfo = xmlrpcClient.getURLs(serviceName)           
287            break 
288      except HodInterruptException,h :
289        raise h
290      except:
291        self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
292        self.__log.debug(get_exception_string())
293        status = False
294        break
295   
296    if serviceAddress == 'not found' or not serviceAddress:
297      self.__log.critical("Failed to retrieve '%s' service address." % 
298                          serviceName)
299      status = False
300    elif serviceAddress.startswith("Error: "):
301      errs = serviceAddress[len("Error: "):]
302      self.__log.critical("Cluster could not be allocated because of the following errors.\n%s" % \
303                             errs)
304      status = False
305    else:
306      try:
307        self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'], 
308                                            self.jobId, self.__hostname, 
309                                            serviceName, 'grid', serviceInfo)
310       
311      except HodInterruptException, h:
312        raise h
313      except:
314        self.__log.critical("'%s': registry xmlrpc error." % serviceName)   
315        self.__log.debug(get_exception_string())
316        status = False
317       
318    return status, serviceAddress, serviceInfo
319
320  def __collect_jobtracker_ui(self, dir):
321
322     link = self.mapredInfo + "/jobtracker.jsp"
323     parser = miniHTMLParser()
324     parser.setBaseUrl(self.mapredInfo)
325     node_cache = {}
326
327     self.__log.debug("collect_jobtracker_ui seeded with " + link)
328
329     def alarm_handler(number, stack):
330         raise AlarmException("timeout")
331       
332     signal.signal(signal.SIGALRM, alarm_handler)
333
334     input = None
335     while link:
336       self.__log.debug("link: %s" % link)
337       # taskstats.jsp,taskdetails.jsp not included since too many to collect
338       if re.search(
339         "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp", 
340         link):
341
342         for i in range(1,5):
343           if hodInterrupt.isSet():
344             raise HodInterruptException()
345           try:
346             input = urllib.urlopen(link)
347             break
348           except:
349             self.__log.debug(get_exception_string())
350             time.sleep(1)
351 
352         if input:
353           out = None
354   
355           self.__log.debug("collecting " + link + "...")
356           filename = re.sub(self.mapredInfo, "", link)
357           filename = dir + "/"  + filename
358           filename = re.sub("http://","", filename)
359           filename = re.sub("[\?\&=:]","_",filename)
360           filename = filename + ".html"
361   
362           try:
363             tempdir, tail = os.path.split(filename)
364             if not os.path.exists(tempdir):
365               os.makedirs(tempdir)
366           except:
367             self.__log.debug(get_exception_string())
368   
369           out = open(filename, 'w')
370           
371           bufSz = 8192
372           
373           signal.alarm(10)
374           
375           try:
376             self.__log.debug("Starting to grab: %s" % link)
377             buf = input.read(bufSz)
378     
379             while len(buf) > 0:
380               # Feed the file into the HTML parser
381               parser.feed(buf)
382       
383         # Re-write the hrefs in the file
384               p = re.compile("\?(.+?)=(.+?)")
385               buf = p.sub(r"_\1_\2",buf)
386               p= re.compile("&(.+?)=(.+?)")
387               buf = p.sub(r"_\1_\2",buf)
388               p = re.compile("http://(.+?):(\d+)?")
389               buf = p.sub(r"\1_\2/",buf)
390               buf = re.sub("href=\"/","href=\"",buf)
391               p = re.compile("href=\"(.+?)\"")
392               buf = p.sub(r"href=\1.html",buf)
393 
394               out.write(buf)
395               buf = input.read(bufSz)
396     
397             signal.alarm(0)
398             input.close()
399             if out:
400               out.close()
401               
402             self.__log.debug("Finished grabbing: %s" % link)
403           except AlarmException:
404             if hodInterrupt.isSet():
405               raise HodInterruptException()
406             if out: out.close()
407             if input: input.close()
408             
409             self.__log.debug("Failed to retrieve: %s" % link)
410         else:
411           self.__log.debug("Failed to retrieve: %s" % link)
412         
413       # Get the next link in level traversal order
414       link = parser.getNextLink()
415
416     parser.close()
417     
418  def check_cluster(self, clusterInfo):
419    status = 0
420
421    if 'mapred' in clusterInfo:
422      mapredAddress = clusterInfo['mapred'][7:]
423      hdfsAddress = clusterInfo['hdfs'][7:]
424      status = get_cluster_status(hdfsAddress, mapredAddress)
425      if status == 0:
426        status = 12
427    else:
428      status = 15
429
430    return status
431
432  def is_cluster_deallocated(self, jobId):
433    """Returns True if the JobId that represents this cluster
434       is in the Completed or exiting state."""
435    jobInfo = self.__nodePool.getJobInfo(jobId)
436    state = None
437    if jobInfo is not None and jobInfo.has_key('job_state'):
438      state = jobInfo['job_state']
439    return ((state == 'C') or (state == 'E'))
440
441  def cleanup(self):
442    if self.__nodePool: self.__nodePool.finalize()     
443
444  def get_job_id(self):
445    return self.jobId
446
447  def delete_job(self, jobId):
448    '''Delete a job given it's ID'''
449    ret = 0
450    if self.__nodePool: 
451      ret = self.__nodePool.deleteJob(jobId)
452    else:
453      raise Exception("Invalid state: Node pool is not initialized to delete the given job.")
454    return ret
455         
456  def is_valid_account(self):
457    """Verify if the account being used to submit the job is a valid account.
458       This code looks for a file <install-dir>/bin/verify-account.
459       If the file is present, it executes the file, passing as argument
460       the account name. It returns the exit code and output from the
461       script on non-zero exit code."""
462
463    accountValidationScript = os.path.abspath('./verify-account')
464    if not os.path.exists(accountValidationScript):
465      return (0, None)
466
467    account = self.__nodePool.getAccountString()
468    exitCode = 0
469    errMsg = None
470    try:
471      accountValidationCmd = simpleCommand('Account Validation Command',\
472                                             '%s %s' % (accountValidationScript,
473                                                        account))
474      accountValidationCmd.start()
475      accountValidationCmd.wait()
476      accountValidationCmd.join()
477      exitCode = accountValidationCmd.exit_code()
478      self.__log.debug('account validation script is run %d' \
479                          % exitCode)
480      errMsg = None
481      if exitCode is not 0:
482        errMsg = accountValidationCmd.output()
483    except Exception, e:
484      exitCode = 0
485      self.__log.warn('Error executing account script: %s ' \
486                         'Accounting is disabled.' \
487                          % get_exception_error_string())
488      self.__log.debug(get_exception_string())
489    return (exitCode, errMsg)
490   
491  def allocate(self, clusterDir, min, max=None):
492    status = 0
493    failureCount = 0
494    self.__svcrgyClient = self.__get_svcrgy_client()
495       
496    self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
497   
498    if min < 3:
499      self.__log.critical("Minimum nodes must be greater than 2.")
500      status = 2
501    else:
502      nodeSet = self.__nodePool.newNodeSet(min)
503      walltime = None
504      if self.__cfg['hod'].has_key('walltime'):
505        walltime = self.__cfg['hod']['walltime']
506      self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
507      # if the job submission returned an error other than no resources
508      # retry a couple of times
509      while (self.jobId is False) and (exitCode != 188):
510        if hodInterrupt.isSet():
511          raise HodInterruptException()
512
513        failureCount += 1
514        if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
515          self.__log.debug("failed submitting job more than the retries. exiting")
516          break
517        else:
518          # wait a bit before retrying
519          time.sleep(self.__cfg['hod']['job-command-failure-interval'])
520          if hodInterrupt.isSet():
521            raise HodInterruptException()
522          self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
523
524      if self.jobId:
525        jobStatus = None
526        try:
527          jobStatus = self.__check_job_status()
528        except HodInterruptException, h:
529          self.__log.info(HOD_INTERRUPTED_MESG)
530          self.delete_job(self.jobId)
531          self.__log.info("Cluster %s removed from queue." % self.jobId)
532          raise h
533        else:
534          if jobStatus == -1:
535            self.delete_job(self.jobId);
536            status = 4
537            return status
538
539        if jobStatus:
540          self.__log.info("Cluster Id %s" \
541                                                              % self.jobId)
542          try:
543            self.ringmasterXRS = self.__get_ringmaster_client()
544           
545            self.__log.debug("Ringmaster at : %s" % self.ringmasterXRS )
546            ringClient = None
547            if self.ringmasterXRS:
548              ringClient =  hodXRClient(self.ringmasterXRS)
549               
550              hdfsStatus, hdfsAddr, self.hdfsInfo = \
551                self.__init_hadoop_service('hdfs', ringClient)
552               
553              if hdfsStatus:
554                self.__log.info("HDFS UI at http://%s" % self.hdfsInfo)
555 
556                mapredStatus, mapredAddr, self.mapredInfo = \
557                  self.__init_hadoop_service('mapred', ringClient)
558 
559                if mapredStatus:
560                  self.__log.info("Mapred UI at http://%s" % self.mapredInfo)
561 
562                  if self.__cfg['hod'].has_key('update-worker-info') \
563                    and self.__cfg['hod']['update-worker-info']:
564                    workerInfoMap = {}
565                    workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
566                    workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
567                    # Ringmaster URL sample format : http://hostname:port/
568                    workerInfoMap['RM RPC Port'] = '%s' % self.ringmasterXRS.split(":")[2].strip("/")
569                    if mapredAddr.find(':') != -1:
570                      workerInfoMap['Mapred RPC Port'] = mapredAddr.split(':')[1]
571                    ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
572                    if ret != 0:
573                      self.__log.warn('Could not update HDFS and Mapred information.' \
574                                      'User Portal may not show relevant information.' \
575                                      'Error code=%s' % ret)
576 
577                  self.__cfg.replace_escape_seqs()
578                   
579                  # Go generate the client side hadoop-site.xml now
580                  # adding final-params as well, just so that conf on
581                  # client-side and server-side are (almost) the same
582                  clientParams = None
583                  serverParams = {}
584                  finalServerParams = {}
585 
586                  # client-params
587                  if self.__cfg['hod'].has_key('client-params'):
588                    clientParams = self.__cfg['hod']['client-params']
589 
590                  # server-params
591                  if self.__cfg['gridservice-mapred'].has_key('server-params'):
592                    serverParams.update(\
593                      self.__cfg['gridservice-mapred']['server-params'])
594                  if self.__cfg['gridservice-hdfs'].has_key('server-params'):
595                    # note that if there are params in both mapred and hdfs
596                    # sections, the ones in hdfs overwirte the ones in mapred
597                    serverParams.update(\
598                        self.__cfg['gridservice-hdfs']['server-params'])
599                   
600                  # final-server-params
601                  if self.__cfg['gridservice-mapred'].has_key(\
602                                                    'final-server-params'):
603                    finalServerParams.update(\
604                      self.__cfg['gridservice-mapred']['final-server-params'])
605                  if self.__cfg['gridservice-hdfs'].has_key(
606                                                    'final-server-params'):
607                    finalServerParams.update(\
608                        self.__cfg['gridservice-hdfs']['final-server-params'])
609 
610                  clusterFactor = self.__cfg['hod']['cluster-factor']
611                  tempDir = self.__cfg['hod']['temp-dir']
612                  if not os.path.exists(tempDir):
613                    os.makedirs(tempDir)
614                  tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\
615                                  + "." + self.jobId )
616                  mrSysDir = getMapredSystemDirectory(self.__cfg['hodring']['mapred-system-dir-root'],\
617                                      self.__cfg['hod']['userid'], self.jobId)
618                  self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\
619                            hdfsAddr, mrSysDir, mapredAddr, clientParams,\
620                            serverParams, finalServerParams,\
621                            clusterFactor)
622                  self.__log.info("hadoop-site.xml at %s" % clusterDir)
623                  # end of hadoop-site.xml generation
624                else:
625                  status = 8
626              else:
627                status = 7 
628            else:
629              status = 6
630            if status != 0:
631              self.__log.debug("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)
632              if ringClient is None:
633                self.delete_job(self.jobId)
634              else:
635                self.__log.debug("Calling rm.stop()")
636                ringClient.stopRM()
637                self.__log.debug("Returning from rm.stop()")
638          except HodInterruptException, h:
639            self.__log.info(HOD_INTERRUPTED_MESG)
640            if self.ringmasterXRS:
641              if ringClient is None:
642                ringClient =  hodXRClient(self.ringmasterXRS)
643              self.__log.debug("Calling rm.stop()")
644              ringClient.stopRM()
645              self.__log.debug("Returning from rm.stop()")
646              self.__log.info("Cluster Shutdown by informing ringmaster.")
647            else:
648              self.delete_job(self.jobId)
649              self.__log.info("Cluster %s removed from queue directly." % self.jobId)
650            raise h
651        else:
652          self.__log.critical("No cluster found, ringmaster failed to run.")
653          status = 5 
654
655      elif self.jobId == False:
656        if exitCode == 188:
657          self.__log.critical("Request execeeded maximum resource allocation.")
658        else:
659          self.__log.critical("Job submission failed with exit code %s" % exitCode)
660        status = 4
661      else:   
662        self.__log.critical("Scheduler failure, allocation failed.\n\n")       
663        status = 4
664   
665    if status == 5 or status == 6:
666      ringMasterErrors = self.__svcrgyClient.getRMError()
667      if ringMasterErrors:
668        self.__log.critical("Cluster could not be allocated because" \
669                            " of the following errors on the "\
670                            "ringmaster host %s.\n%s" % \
671                               (ringMasterErrors[0], ringMasterErrors[1]))
672        self.__log.debug("Stack trace on ringmaster: %s" % ringMasterErrors[2])
673    return status
674
675  def __isRingMasterAlive(self, rmAddr):
676    ret = True
677    rmSocket = tcpSocket(rmAddr)
678    try:
679      rmSocket.open()
680      rmSocket.close()
681    except tcpError:
682      ret = False
683
684    return ret
685
686  def deallocate(self, clusterDir, clusterInfo):
687    status = 0 
688   
689    nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'], 
690                                         id=clusterInfo['jobid'])
691    self.mapredInfo = clusterInfo['mapred']
692    self.hdfsInfo = clusterInfo['hdfs']
693
694    try:
695      if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
696        clusterStatus = self.check_cluster(clusterInfo)
697        if clusterStatus != 14 and clusterStatus != 10:   
698          # If JT is still alive
699          self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
700      else:
701        self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
702    except HodInterruptException, h:
703      # got an interrupt. just pass and proceed to qdel
704      pass 
705    except:
706      self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
707   
708    rmAddr = None
709    if clusterInfo.has_key('ring'):
710      # format is http://host:port/ We need host:port
711      rmAddr = clusterInfo['ring'][7:]
712      if rmAddr.endswith('/'):
713        rmAddr = rmAddr[:-1]
714
715    if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
716      # Cluster is already dead, don't try to contact ringmaster.
717      self.__nodePool.finalize()
718      status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
719    else:
720      xrsAddr = clusterInfo['ring']
721      rmClient = hodXRClient(xrsAddr)
722      self.__log.debug('calling rm.stop')
723      rmClient.stopRM()
724      self.__log.debug('completed rm.stop')
725
726    # cleanup hod temp dirs
727    tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \
728                    self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
729    if os.path.exists(tempDir):
730      shutil.rmtree(tempDir)
731   
732    return status
733 
734class hadoopScript:
735  def __init__(self, conf, execDir):
736    self.__environ = os.environ.copy()
737    self.__environ['HADOOP_CONF_DIR'] = conf
738    self.__execDir = execDir
739   
740  def run(self, script):
741    scriptThread = simpleCommand(script, script, self.__environ, 4, False, 
742                                 False, self.__execDir)
743    scriptThread.start()
744    scriptThread.wait()
745    scriptThread.join()
746   
747    return scriptThread.exit_code()
Note: See TracBrowser for help on using the repository browser.