source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/Hod/hod.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 28.7 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16# -*- python -*-
17
18import sys, os, getpass, pprint, re, cPickle, random, shutil, time, errno
19
20import hodlib.Common.logger
21
22from hodlib.ServiceRegistry.serviceRegistry import svcrgy
23from hodlib.Common.xmlrpc import hodXRClient
24from hodlib.Common.util import to_http_url, get_exception_string
25from hodlib.Common.util import get_exception_error_string
26from hodlib.Common.util import hodInterrupt, HodInterruptException
27from hodlib.Common.util import HOD_INTERRUPTED_CODE
28
29from hodlib.Common.nodepoolutil import NodePoolUtil
30from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
31
32CLUSTER_DATA_FILE = 'clusters'
33INVALID_STATE_FILE_MSGS = \
34              [
35
36                "Requested operation cannot be performed. Cannot read %s: " + \
37                "Permission denied.",
38
39                "Requested operation cannot be performed. " + \
40                "Cannot write to %s: Permission denied.",
41
42                "Requested operation cannot be performed. " + \
43                "Cannot read/write to %s: Permission denied.",
44
45                "Cannot update %s: Permission denied. " + \
46                "Cluster is deallocated, but info and list " + \
47                "operations might show incorrect information.",
48
49              ]
50
51class hodState:
52  def __init__(self, store):
53    self.__store = store
54    self.__stateFile = None
55    self.__init_store()
56    self.__STORE_EXT = ".state"
57   
58  def __init_store(self):
59    if not os.path.exists(self.__store):
60      os.mkdir(self.__store)
61 
62  def __set_state_file(self, id=None):
63    if id:
64      self.__stateFile = os.path.join(self.__store, "%s%s" % (id, 
65                                      self.__STORE_EXT))
66    else:
67      for item in os.listdir(self.__store):
68        if item.endswith(self.__STORE_EXT): 
69          self.__stateFile = os.path.join(self.__store, item)         
70
71  def get_state_file(self):
72    return self.__stateFile
73         
74  def checkStateFile(self, id=None, modes=(os.R_OK,)):
75    # is state file exists/readable/writable/both?
76    self.__set_state_file(id)
77
78    # return true if file doesn't exist, because HOD CAN create
79    # state file and so WILL have permissions to read and/or write
80    try:
81      os.stat(self.__stateFile)
82    except OSError, err:
83      if err.errno == errno.ENOENT: # error 2 (no such file)
84        return True
85
86    # file exists
87    ret = True
88    for mode in modes:
89      ret = ret and os.access(self.__stateFile, mode)
90    return ret
91
92  def read(self, id=None):
93    info = {}
94   
95    self.__set_state_file(id)
96 
97    if self.__stateFile:
98      if os.path.isfile(self.__stateFile):
99        stateFile = open(self.__stateFile, 'r')
100        try:
101          info = cPickle.load(stateFile)
102        except EOFError:
103          pass
104       
105        stateFile.close()
106   
107    return info
108           
109  def write(self, id, info):
110    self.__set_state_file(id)
111    if not os.path.exists(self.__stateFile):
112      self.clear(id)
113 
114    stateFile = open(self.__stateFile, 'w')
115    cPickle.dump(info, stateFile)
116    stateFile.close()
117 
118  def clear(self, id=None):
119    self.__set_state_file(id)
120    if self.__stateFile and os.path.exists(self.__stateFile):
121      os.remove(self.__stateFile)
122    else:
123      for item in os.listdir(self.__store):
124        if item.endswith(self.__STORE_EXT):
125          os.remove(item)
126       
127class hodRunner:
128
129  def __init__(self, cfg, log=None, cluster=None):
130    self.__hodhelp = hodHelp()
131    self.__ops = self.__hodhelp.ops
132    self.__cfg = cfg 
133    self.__npd = self.__cfg['nodepooldesc']
134    self.__opCode = 0
135    self.__user = getpass.getuser()
136    self.__registry = None
137    self.__baseLogger = None
138    # Allowing to pass in log object to help testing - a stub can be passed in
139    if log is None:
140      self.__setup_logger()
141    else:
142      self.__log = log
143   
144    self.__userState = hodState(self.__cfg['hod']['user_state']) 
145   
146    self.__clusterState = None
147    self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
148   
149    # Allowing to pass in log object to help testing - a stib can be passed in
150    if cluster is None:
151      self.__cluster = hadoopCluster(self.__cfg, self.__log)
152    else:
153      self.__cluster = cluster
154 
155  def __setup_logger(self):
156    self.__baseLogger = hodlib.Common.logger.hodLog('hod')
157    self.__log = self.__baseLogger.add_logger(self.__user )
158 
159    if self.__cfg['hod']['stream']:
160      self.__baseLogger.add_stream(level=self.__cfg['hod']['debug'], 
161                            addToLoggerNames=(self.__user ,))
162 
163    if self.__cfg['hod'].has_key('syslog-address'):
164      self.__baseLogger.add_syslog(self.__cfg['hod']['syslog-address'], 
165                                   level=self.__cfg['hod']['debug'], 
166                                   addToLoggerNames=(self.__user ,))
167
168  def get_logger(self):
169    return self.__log
170
171  def __setup_cluster_logger(self, directory):
172    self.__baseLogger.add_file(logDirectory=directory, level=4,
173                          backupCount=self.__cfg['hod']['log-rollover-count'],
174                          addToLoggerNames=(self.__user ,))
175
176  def __setup_cluster_state(self, directory):
177    self.__clusterState = hodState(directory)
178
179  def __norm_cluster_dir(self, directory):
180    directory = os.path.expanduser(directory)
181    if not os.path.isabs(directory):
182      directory = os.path.join(self.__cfg['hod']['original-dir'], directory)
183    directory = os.path.abspath(directory)
184   
185    return directory
186 
187  def __setup_service_registry(self):
188    cfg = self.__cfg['hod'].copy()
189    cfg['debug'] = 0
190    self.__registry = svcrgy(cfg, self.__log)
191    self.__registry.start()
192    self.__log.debug(self.__registry.getXMLRPCAddr())
193    self.__cfg['hod']['xrs-address'] = self.__registry.getXMLRPCAddr()
194    self.__cfg['ringmaster']['svcrgy-addr'] = self.__cfg['hod']['xrs-address']
195
196  def __set_cluster_state_info(self, env, hdfs, mapred, ring, jobid, min, max):
197    self.__clusterStateInfo['env'] = env
198    self.__clusterStateInfo['hdfs'] = "http://%s" % hdfs
199    self.__clusterStateInfo['mapred'] = "http://%s" % mapred
200    self.__clusterStateInfo['ring'] = ring
201    self.__clusterStateInfo['jobid'] = jobid
202    self.__clusterStateInfo['min'] = min
203    self.__clusterStateInfo['max'] = max
204   
205  def __set_user_state_info(self, info):
206    userState = self.__userState.read(CLUSTER_DATA_FILE)
207    for key in info.keys():
208      userState[key] = info[key]
209     
210    self.__userState.write(CLUSTER_DATA_FILE, userState) 
211
212  def __remove_cluster(self, clusterDir):
213    clusterInfo = self.__userState.read(CLUSTER_DATA_FILE)
214    if clusterDir in clusterInfo:
215      del(clusterInfo[clusterDir])
216      self.__userState.write(CLUSTER_DATA_FILE, clusterInfo)
217     
218  def __cleanup(self):
219    if self.__registry: self.__registry.stop()
220   
221  def __check_operation(self, operation):   
222    opList = operation.split()
223   
224    if not opList[0] in self.__ops:
225      self.__log.critical("Invalid hod operation specified: %s" % operation)
226      self._op_help(None)
227      self.__opCode = 2
228         
229    return opList
230 
231  def __adjustMasterFailureCountConfig(self, nodeCount):
232    # This method adjusts the ringmaster.max-master-failures variable
233    # to a value that is bounded by the a function of the number of
234    # nodes.
235
236    maxFailures = self.__cfg['ringmaster']['max-master-failures']
237    # Count number of masters required - depends on which services
238    # are external
239    masters = 0
240    if not self.__cfg['gridservice-hdfs']['external']:
241      masters += 1
242    if not self.__cfg['gridservice-mapred']['external']:
243      masters += 1
244
245    # So, if there are n nodes and m masters, we look atleast for
246    # all masters to come up. Therefore, atleast m nodes should be
247    # good, which means a maximum of n-m master nodes can fail.
248    maxFailedNodes = nodeCount - masters
249
250    # The configured max number of failures is now bounded by this
251    # number.
252    self.__cfg['ringmaster']['max-master-failures'] = \
253                              min(maxFailures, maxFailedNodes)
254
255  def _op_allocate(self, args):
256    operation = "allocate"
257    argLength = len(args)
258    min = 0
259    max = 0
260    errorFlag = False
261    errorMsgs = []
262
263    if argLength == 3:
264      nodes = args[2]
265      clusterDir = self.__norm_cluster_dir(args[1])
266
267      if not os.path.exists(clusterDir):
268        try:
269          os.makedirs(clusterDir)
270        except OSError, err:
271          errorFlag = True
272          errorMsgs.append("Could not create cluster directory. %s" \
273                            % (str(err)))
274      elif not os.path.isdir(clusterDir):
275        errorFlag = True
276        errorMsgs.append( \
277                    "Invalid cluster directory (--hod.clusterdir or -d) : " + \
278                         clusterDir + " : Not a directory")
279       
280      if int(nodes) < 3 :
281        errorFlag = True
282        errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \
283                         "Must be >= 3. Given nodes: %s" % nodes)
284      if errorFlag:
285        for msg in errorMsgs:
286          self.__log.critical(msg)
287        self.__opCode = 3
288        return
289
290      if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, \
291                                              (os.R_OK, os.W_OK)):
292        self.__log.critical(INVALID_STATE_FILE_MSGS[2] % \
293                         self.__userState.get_state_file())
294        self.__opCode = 1
295        return
296
297      clusterList = self.__userState.read(CLUSTER_DATA_FILE)
298      if clusterDir in clusterList.keys():
299        self.__setup_cluster_state(clusterDir)
300        clusterInfo = self.__clusterState.read()
301        # Check if the job is not running. Only then can we safely
302        # allocate another cluster. Otherwise the user would need
303        # to deallocate and free up resources himself.
304        if clusterInfo.has_key('jobid') and \
305            self.__cluster.is_cluster_deallocated(clusterInfo['jobid']):
306          self.__log.warn("Found a dead cluster at cluster directory '%s'. Deallocating it to allocate a new one." % (clusterDir))
307          self.__remove_cluster(clusterDir)
308          self.__clusterState.clear()
309        else:
310          self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. HOD cannot determine if this cluster can be automatically deallocated. Deallocate the cluster if it is unused." % (clusterDir))
311          self.__opCode = 12
312          return
313 
314      self.__setup_cluster_logger(clusterDir)
315
316      (status, message) = self.__cluster.is_valid_account()
317      if status is not 0:
318        if message:
319          for line in message:
320            self.__log.critical("verify-account output: %s" % line)
321        self.__log.critical("Cluster cannot be allocated because account verification failed. " \
322                              + "verify-account returned exit code: %s." % status)
323        self.__opCode = 4
324        return
325      else:
326        self.__log.debug("verify-account returned zero exit code.")
327        if message:
328          self.__log.debug("verify-account output: %s" % message)
329
330      if re.match('\d+-\d+', nodes):
331        (min, max) = nodes.split("-")
332        min = int(min)
333        max = int(max)
334      else:
335        try:
336          nodes = int(nodes)
337          min = nodes
338          max = nodes
339        except ValueError:
340          print self.__hodhelp.help(operation)
341          self.__log.critical(
342          "%s operation requires a pos_int value for n(nodecount)." % 
343          operation)
344          self.__opCode = 3
345        else:
346          self.__setup_cluster_state(clusterDir)
347          clusterInfo = self.__clusterState.read()
348          self.__opCode = self.__cluster.check_cluster(clusterInfo)
349          if self.__opCode == 0 or self.__opCode == 15:
350            self.__setup_service_registry()   
351            if hodInterrupt.isSet(): 
352              self.__cleanup()
353              raise HodInterruptException()
354            self.__log.debug("Service Registry started.")
355
356            self.__adjustMasterFailureCountConfig(nodes)
357           
358            try:
359              allocateStatus = self.__cluster.allocate(clusterDir, min, max)   
360            except HodInterruptException, h:
361              self.__cleanup()
362              raise h
363            # Allocation has gone through.
364            # Don't care about interrupts any more
365
366            try:
367              if allocateStatus == 0:
368                self.__set_cluster_state_info(os.environ, 
369                                              self.__cluster.hdfsInfo, 
370                                              self.__cluster.mapredInfo, 
371                                              self.__cluster.ringmasterXRS,
372                                              self.__cluster.jobId,
373                                              min, max)
374                self.__setup_cluster_state(clusterDir)
375                self.__clusterState.write(self.__cluster.jobId, 
376                                          self.__clusterStateInfo)
377                #  Do we need to check for interrupts here ??
378 
379                self.__set_user_state_info( 
380                  { clusterDir : self.__cluster.jobId, } )
381              self.__opCode = allocateStatus
382            except Exception, e:
383              # Some unknown problem.
384              self.__cleanup()
385              self.__cluster.deallocate(clusterDir, self.__clusterStateInfo)
386              self.__opCode = 1
387              raise Exception(e)
388          elif self.__opCode == 12:
389            self.__log.critical("Cluster %s already allocated." % clusterDir)
390          elif self.__opCode == 10:
391            self.__log.critical("dead\t%s\t%s" % (clusterInfo['jobid'], 
392                                                  clusterDir))
393          elif self.__opCode == 13:
394            self.__log.warn("hdfs dead\t%s\t%s" % (clusterInfo['jobid'], 
395                                                       clusterDir))
396          elif self.__opCode == 14:
397            self.__log.warn("mapred dead\t%s\t%s" % (clusterInfo['jobid'], 
398                                                     clusterDir))   
399         
400          if self.__opCode > 0 and self.__opCode != 15:
401            self.__log.critical("Cannot allocate cluster %s" % clusterDir)
402    else:
403      print self.__hodhelp.help(operation)
404      self.__log.critical("%s operation requires two arguments. "  % operation
405                        + "A cluster directory and a nodecount.")
406      self.__opCode = 3
407 
408  def _is_cluster_allocated(self, clusterDir):
409    if os.path.isdir(clusterDir):
410      self.__setup_cluster_state(clusterDir)
411      clusterInfo = self.__clusterState.read()
412      if clusterInfo != {}:
413        return True
414    return False
415
416  def _op_deallocate(self, args):
417    operation = "deallocate"
418    argLength = len(args)
419    if argLength == 2:
420      clusterDir = self.__norm_cluster_dir(args[1])
421      if os.path.isdir(clusterDir):
422        self.__setup_cluster_state(clusterDir)
423        clusterInfo = self.__clusterState.read()
424        if clusterInfo == {}:
425          self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
426        else:
427          self.__opCode = \
428            self.__cluster.deallocate(clusterDir, clusterInfo)
429          # irrespective of whether deallocate failed or not\
430          # remove the cluster state.
431          self.__clusterState.clear()
432          if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
433            self.__log.critical(INVALID_STATE_FILE_MSGS[3] % \
434                               self.__userState.get_state_file())
435            self.__opCode = 1
436            return
437          self.__remove_cluster(clusterDir)
438      else:
439        self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
440    else:
441      print self.__hodhelp.help(operation)
442      self.__log.critical("%s operation requires one argument. "  % operation
443                        + "A cluster path.")
444      self.__opCode = 3
445           
446  def _op_list(self, args):
447    operation = 'list'
448    clusterList = self.__userState.read(CLUSTER_DATA_FILE)
449    for path in clusterList.keys():
450      if not os.path.isdir(path):
451        self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
452        continue
453      self.__setup_cluster_state(path)
454      clusterInfo = self.__clusterState.read()
455      if clusterInfo == {}:
456        # something wrong with the cluster directory.
457        self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
458        continue
459      clusterStatus = self.__cluster.check_cluster(clusterInfo)
460      if clusterStatus == 12:
461        self.__log.info("alive\t%s\t%s" % (clusterList[path], path))
462      elif clusterStatus == 10:
463        self.__log.info("dead\t%s\t%s" % (clusterList[path], path))
464      elif clusterStatus == 13:
465        self.__log.info("hdfs dead\t%s\t%s" % (clusterList[path], path))
466      elif clusterStatus == 14:
467        self.__log.info("mapred dead\t%s\t%s" % (clusterList[path], path))   
468         
469  def _op_info(self, args):
470    operation = 'info'
471    argLength = len(args) 
472    if argLength == 2:
473      clusterDir = self.__norm_cluster_dir(args[1])
474      if os.path.isdir(clusterDir):
475        self.__setup_cluster_state(clusterDir)
476        clusterInfo = self.__clusterState.read()
477        if clusterInfo == {}:
478          # something wrong with the cluster directory.
479          self.__handle_invalid_cluster_directory(clusterDir)
480        else:
481          clusterStatus = self.__cluster.check_cluster(clusterInfo)
482          if clusterStatus == 12:
483            self.__print_cluster_info(clusterInfo)
484            self.__log.info("hadoop-site.xml at %s" % clusterDir)
485          elif clusterStatus == 10:
486            self.__log.critical("%s cluster is dead" % clusterDir)
487          elif clusterStatus == 13:
488            self.__log.warn("%s cluster hdfs is dead" % clusterDir)
489          elif clusterStatus == 14:
490            self.__log.warn("%s cluster mapred is dead" % clusterDir)
491
492          if clusterStatus != 12:
493            if clusterStatus == 15:
494              self.__log.critical("Cluster %s not allocated." % clusterDir)
495            else:
496              self.__print_cluster_info(clusterInfo)
497              self.__log.info("hadoop-site.xml at %s" % clusterDir)
498           
499            self.__opCode = clusterStatus
500      else:
501        self.__handle_invalid_cluster_directory(clusterDir)
502    else:
503      print self.__hodhelp.help(operation)
504      self.__log.critical("%s operation requires one argument. "  % operation
505                        + "A cluster path.")
506      self.__opCode = 3     
507
508  def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
509    if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
510      self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \
511                           self.__userState.get_state_file())
512      self.__opCode = 1
513      return
514
515    clusterList = self.__userState.read(CLUSTER_DATA_FILE)
516    if clusterDir in clusterList.keys():
517      # previously allocated cluster.
518      self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
519      if cleanUp:
520        self.__cluster.delete_job(clusterList[clusterDir])
521        self.__log.critical("Freeing resources allocated to the cluster.")
522        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
523          self.__log.critical(INVALID_STATE_FILE_MSGS[1] % \
524                              self.__userState.get_state_file())
525          self.__opCode = 1
526          return
527        self.__remove_cluster(clusterDir)
528      self.__opCode = 3
529    else:
530      if not os.path.exists(clusterDir):
531        self.__log.critical(  \
532                  "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \
533                  clusterDir + " : No such directory")
534      elif not os.path.isdir(clusterDir):
535        self.__log.critical( \
536                  "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \
537                  clusterDir + " : Not a directory")
538      else:
539        self.__log.critical( \
540                  "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \
541                  clusterDir + " : Not tied to any allocated cluster.")
542      self.__opCode = 15
543   
544  def __print_cluster_info(self, clusterInfo):
545    keys = clusterInfo.keys()
546
547    _dict = { 
548              'jobid' : 'Cluster Id', 'min' : 'Nodecount',
549              'hdfs' : 'HDFS UI at' , 'mapred' : 'Mapred UI at'
550            }
551
552    for key in _dict.keys():
553      if clusterInfo.has_key(key):
554        self.__log.info("%s %s" % (_dict[key], clusterInfo[key]))
555
556    if clusterInfo.has_key('ring'):
557      self.__log.debug("%s\t%s" % ('Ringmaster at ', clusterInfo['ring']))
558   
559    if self.__cfg['hod']['debug'] == 4:
560      for var in clusterInfo['env'].keys():
561        self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
562
563  def _op_help(self, arg):
564    if arg == None or arg.__len__() != 2:
565      print "hod commands:\n"
566      for op in self.__ops:
567        print self.__hodhelp.help(op)
568    else:
569      if arg[1] not in self.__ops:
570        print self.__hodhelp.help('help')
571        self.__log.critical("Help requested for invalid operation : %s"%arg[1])
572        self.__opCode = 3
573      else: print self.__hodhelp.help(arg[1])
574
575  def operation(self): 
576    operation = self.__cfg['hod']['operation']
577    try:
578      opList = self.__check_operation(operation)
579      if self.__opCode == 0:
580        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
581           self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \
582                         self.__userState.get_state_file())
583           self.__opCode = 1
584           return self.__opCode
585        getattr(self, "_op_%s" % opList[0])(opList)
586    except HodInterruptException, h:
587      self.__log.critical("op: %s failed because of a process interrupt." \
588                                                                % operation)
589      self.__opCode = HOD_INTERRUPTED_CODE
590    except:
591      self.__log.critical("op: %s failed: %s" % (operation,
592                          get_exception_error_string()))
593      self.__log.debug(get_exception_string())
594   
595    self.__cleanup()
596   
597    self.__log.debug("return code: %s" % self.__opCode)
598   
599    return self.__opCode
600 
601  def script(self):
602    errorFlag = False
603    errorMsgs = []
604    scriptRet = 0 # return from the script, if run
605   
606    script = self.__cfg['hod']['script']
607    nodes = self.__cfg['hod']['nodecount']
608    clusterDir = self.__cfg['hod']['clusterdir']
609   
610    if not os.path.exists(script):
611      errorFlag = True
612      errorMsgs.append("Invalid script file (--hod.script or -s) : " + \
613                       script + " : No such file")
614    elif not os.path.isfile(script):
615      errorFlag = True
616      errorMsgs.append("Invalid script file (--hod.script or -s) : " + \
617                       script + " : Not a file.")
618    else:
619      isExecutable = os.access(script, os.X_OK)
620      if not isExecutable:
621        errorFlag = True
622        errorMsgs.append("Invalid script file (--hod.script or -s) : " + \
623                         script + " : Not an executable.")
624
625    if not os.path.exists(clusterDir):
626      try:
627        os.makedirs(clusterDir)
628      except OSError, err:
629        errorFlag = True
630        errorMsgs.append("Could not create cluster directory. %s" % (str(err)))
631    elif not os.path.isdir(clusterDir):
632      errorFlag = True
633      errorMsgs.append( \
634                  "Invalid cluster directory (--hod.clusterdir or -d) : " + \
635                       clusterDir + " : Not a directory")
636
637    if int(self.__cfg['hod']['nodecount']) < 3 :
638      errorFlag = True
639      errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \
640                       "Must be >= 3. Given nodes: %s" % nodes)
641
642    if errorFlag:
643      for msg in errorMsgs:
644        self.__log.critical(msg)
645      self.handle_script_exit_code(scriptRet, clusterDir)
646      sys.exit(3)
647
648    try:
649      self._op_allocate(('allocate', clusterDir, str(nodes)))
650      if self.__opCode == 0:
651        if self.__cfg['hod'].has_key('script-wait-time'):
652          time.sleep(self.__cfg['hod']['script-wait-time'])
653          self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time'])
654        if hodInterrupt.isSet():
655          self.__log.debug('Hod interrupted - not executing script')
656        else:
657          scriptRunner = hadoopScript(clusterDir, 
658                                  self.__cfg['hod']['original-dir'])
659          self.__opCode = scriptRunner.run(script)
660          scriptRet = self.__opCode
661          self.__log.info("Exit code from running the script: %d" % self.__opCode)
662      else:
663        self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode)
664
665      if hodInterrupt.isSet():
666        # Got interrupt while executing script. Unsetting it for deallocating
667        hodInterrupt.setFlag(False)
668      if self._is_cluster_allocated(clusterDir):
669        self._op_deallocate(('deallocate', clusterDir))
670    except HodInterruptException, h:
671      self.__log.critical("Script failed because of a process interrupt.")
672      self.__opCode = HOD_INTERRUPTED_CODE
673    except:
674      self.__log.critical("script: %s failed: %s" % (script,
675                          get_exception_error_string()))
676      self.__log.debug(get_exception_string())
677   
678    self.__cleanup()
679
680    self.handle_script_exit_code(scriptRet, clusterDir)
681   
682    return self.__opCode
683
684  def handle_script_exit_code(self, scriptRet, clusterDir):
685    # We want to give importance to a failed script's exit code, and write out exit code to a file separately
686    # so users can easily get it if required. This way they can differentiate between the script's exit code
687    # and hod's exit code.
688    if os.path.exists(clusterDir):
689      exit_code_file_name = (os.path.join(clusterDir, 'script.exitcode'))
690      if scriptRet != 0:
691        exit_code_file = open(exit_code_file_name, 'w')
692        print >>exit_code_file, scriptRet
693        exit_code_file.close()
694        self.__opCode = scriptRet
695      else:
696        #ensure script exit code file is not there:
697        if (os.path.exists(exit_code_file_name)):
698          os.remove(exit_code_file_name)
699
700class hodHelp:
701  def __init__(self):
702    self.ops = ['allocate', 'deallocate', 'info', 'list','script',  'help']
703
704    self.usage_strings = \
705      {
706        'allocate'   : 'hod allocate -d <clusterdir> -n <nodecount> [OPTIONS]',
707        'deallocate' : 'hod deallocate -d <clusterdir> [OPTIONS]',
708        'list'       : 'hod list [OPTIONS]',
709        'info'       : 'hod info -d <clusterdir> [OPTIONS]',
710        'script'     :
711              'hod script -d <clusterdir> -n <nodecount> -s <script> [OPTIONS]',
712        'help'       : 'hod help <OPERATION>',
713        }
714
715    self.description_strings = \
716      {
717       'allocate' : "Allocates a cluster of n nodes using the specified \n" + \
718      "              cluster directory to store cluster state \n" + \
719      "              information. The Hadoop site XML is also stored \n" + \
720      "              in this location.\n",
721
722       'deallocate' : "Deallocates a cluster using the specified \n" + \
723      "             cluster directory.  This operation is also \n" + \
724      "             required to clean up a dead cluster.\n",
725
726       'list' : "List all clusters currently allocated by a user, \n" + \
727      "              along with limited status information and the \n" + \
728      "              cluster ID.\n",
729
730       'info' : "Provide detailed information on an allocated cluster.\n",
731
732       'script' : "Allocates a cluster of n nodes with the given \n" +\
733           "              cluster directory, runs the specified script \n" + \
734           "              using the allocated cluster, and then \n" + \
735           "              deallocates the cluster.\n",
736 
737       'help' : "Print help for the operation and exit.\n" + \
738                "Available operations : %s.\n" % self.ops,
739       }
740
741  def usage(self, op):
742    return "Usage       : " + self.usage_strings[op] + "\n" + \
743           "For full description: hod help " + op + ".\n"
744
745  def help(self, op=None):
746    if op is None:
747      return "hod <operation> [ARGS] [OPTIONS]\n" + \
748             "Available operations : %s\n" % self.ops + \
749             "For help on a particular operation : hod help <operation>.\n" + \
750             "For all options : hod help options."
751    else:
752      return "Usage       : " + self.usage_strings[op] + "\n" + \
753             "Description : " + self.description_strings[op] + \
754             "For all options : hod help options.\n"
Note: See TracBrowser for help on using the repository browser.