#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. import os, pprint, re, time from hodlib.Common.threads import simpleCommand from hodlib.Common.util import args_to_string from hodlib.Common.logger import hodDummyLogger reQstatLine = re.compile("^\s*(\w+)\s*=\s*(.*)\s*$") class torqueInterface: def __init__(self, torqueDir, environment, log=None): self.__qsub = os.path.join(torqueDir, 'bin', 'qsub') self.__qdel = os.path.join(torqueDir, 'bin', 'qdel') self.__qstat = os.path.join(torqueDir, 'bin', 'qstat') self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes') self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh') self.__qalter = os.path.join(torqueDir, 'bin', 'qalter') self.__env = environment self.__log = log if not self.__log: self.__log = hodDummyLogger() def qsub(self, argList, stdinList): jobID = False exitCode = 0 qsubCommand = "%s %s" % (self.__qsub, args_to_string(argList)) self.__log.debug("qsub -> %s" % qsubCommand) qsubProcess = simpleCommand('qsub', qsubCommand, env=self.__env) qsubProcess.start() while qsubProcess.stdin == None: time.sleep(.2) try: for line in stdinList: self.__log.debug("qsub stdin: %s" % line) print >>qsubProcess.stdin, line qsubProcess.stdin.close() except IOError, i: # If torque's qsub is given invalid params, it fails & returns immediately # Check for such errors here # Wait for command execution to finish qsubProcess.wait() qsubProcess.join() output = qsubProcess.output() if output!=[]: self.__log.critical("qsub Failure : %s " % output[0].strip()) self.__log.critical("qsub Command : %s" % qsubCommand) return None, qsubProcess.exit_code() qsubProcess.wait() qsubProcess.join() exitCode = qsubProcess.exit_code() if exitCode == 0: buffer = qsubProcess.output() jobID = buffer[0].rstrip('\n') self.__log.debug("qsub jobid: %s" % jobID) else: self.__log.critical("qsub error: %s" % qsubProcess.exit_status_string()) return jobID, exitCode def qstat(self, jobID): qstatInfo = None qstatCommand = "%s -f -1 %s" % (self.__qstat, jobID) self.__log.debug(qstatCommand) qstatProcess = simpleCommand('qstat', qstatCommand, env=self.__env) qstatProcess.start() qstatProcess.wait() qstatProcess.join() exitCode = qstatProcess.exit_code() if exitCode > 0: self.__log.warn('qstat error: %s' % qstatProcess.exit_status_string()) else: qstatInfo = {} for line in qstatProcess.output(): line = line.rstrip() if line.find('=') != -1: qstatMatch = reQstatLine.match(line) if qstatMatch: key = qstatMatch.group(1) value = qstatMatch.group(2) qstatInfo[key] = value if 'exec_host' in qstatInfo: list = qstatInfo['exec_host'].split('+') addrList = [] for item in list: [head, end] = item.split('/', 1) addrList.append(head) qstatInfo['exec_host'] = addrList return qstatInfo, exitCode def pbs_nodes(self, argString): pass def qdel(self, jobId, force=False): exitCode = 0 qdel = self.__qdel if force: qdel = "%s -p %s" % (qdel, jobId) else: qdel = "%s %s" % (qdel, jobId) self.__log.debug(qdel) qdelProcess = simpleCommand('qdel', qdel, env=self.__env) qdelProcess.start() qdelProcess.wait() qdelProcess.join() exitCode = qdelProcess.exit_code() return exitCode def pbsdsh(self, arguments): status = None pbsdshCommand = "%s %s" % (self.__pbsdsh, args_to_string(arguments)) self.__log.debug("pbsdsh command: %s" % pbsdshCommand) pbsdsh = simpleCommand('pbsdsh', pbsdshCommand, env=self.__env) pbsdsh.start() for i in range(0, 30): status = pbsdsh.exit_code() if status: self.__log.error("pbsdsh failed: %s" % pbsdsh.exit_status_string()) break if not status: status = 0 return status def qalter(self, fieldName, fieldValue, jobId): """Update the job field with fieldName with the fieldValue. The fieldValue must be modifiable after the job is submitted.""" # E.g. to alter comment: qalter -W notes='value` jobId qalterCmd = '%s -W %s=\"%s\" %s' % (self.__qalter, fieldName, fieldValue, jobId) self.__log.debug("qalter command: %s" % qalterCmd) qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env) qalterProcess.start() qalterProcess.wait() qalterProcess.join() exitCode = qalterProcess.exit_code() return exitCode