source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/Schedulers/torque.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.4 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16import os, pprint, re, time
17
18from hodlib.Common.threads import simpleCommand
19from hodlib.Common.util import args_to_string
20from hodlib.Common.logger import hodDummyLogger
21
22reQstatLine = re.compile("^\s*(\w+)\s*=\s*(.*)\s*$")
23
24class torqueInterface:
25  def __init__(self, torqueDir, environment, log=None):
26    self.__qsub = os.path.join(torqueDir, 'bin', 'qsub')
27    self.__qdel = os.path.join(torqueDir, 'bin', 'qdel')
28    self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')
29    self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')
30    self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')
31    self.__qalter = os.path.join(torqueDir, 'bin', 'qalter')
32    self.__env = environment
33   
34    self.__log = log
35    if not self.__log:
36      self.__log = hodDummyLogger()
37       
38  def qsub(self, argList, stdinList):
39    jobID = False
40    exitCode = 0
41
42    qsubCommand = "%s %s" % (self.__qsub, args_to_string(argList))
43   
44    self.__log.debug("qsub -> %s" % qsubCommand)
45   
46    qsubProcess = simpleCommand('qsub', qsubCommand, env=self.__env)
47    qsubProcess.start()
48   
49    while qsubProcess.stdin == None:
50      time.sleep(.2)
51
52    try:
53      for line in stdinList:
54        self.__log.debug("qsub stdin: %s" % line)
55        print >>qsubProcess.stdin, line
56      qsubProcess.stdin.close()
57    except IOError, i:
58      # If torque's qsub is given invalid params, it fails & returns immediately
59      # Check for such errors here
60      # Wait for command execution to finish
61      qsubProcess.wait()
62      qsubProcess.join()
63      output = qsubProcess.output()
64      if output!=[]:
65        self.__log.critical("qsub Failure : %s " % output[0].strip())
66        self.__log.critical("qsub Command : %s" % qsubCommand)
67      return None, qsubProcess.exit_code()
68
69    qsubProcess.wait()
70    qsubProcess.join()
71   
72    exitCode = qsubProcess.exit_code()
73    if exitCode == 0:
74      buffer = qsubProcess.output()
75      jobID = buffer[0].rstrip('\n')
76      self.__log.debug("qsub jobid: %s" % jobID)
77    else:
78      self.__log.critical("qsub error: %s" % qsubProcess.exit_status_string())   
79   
80    return jobID, exitCode
81 
82  def qstat(self, jobID):
83    qstatInfo = None 
84   
85    qstatCommand = "%s -f -1 %s" % (self.__qstat, jobID)
86   
87    self.__log.debug(qstatCommand)
88
89    qstatProcess = simpleCommand('qstat', qstatCommand, env=self.__env)
90    qstatProcess.start()
91    qstatProcess.wait()
92    qstatProcess.join()
93   
94    exitCode = qstatProcess.exit_code()
95    if exitCode > 0:
96      self.__log.warn('qstat error: %s' % qstatProcess.exit_status_string())
97    else:
98      qstatInfo = {}
99      for line in qstatProcess.output():
100        line = line.rstrip()
101        if line.find('=') != -1:
102          qstatMatch = reQstatLine.match(line)
103          if qstatMatch:
104            key = qstatMatch.group(1)
105            value = qstatMatch.group(2)
106            qstatInfo[key] = value
107         
108      if 'exec_host' in qstatInfo:
109        list = qstatInfo['exec_host'].split('+')
110        addrList = []
111       
112        for item in list:
113          [head, end] = item.split('/', 1)
114          addrList.append(head)
115       
116        qstatInfo['exec_host'] = addrList
117       
118    return qstatInfo, exitCode
119 
120  def pbs_nodes(self, argString):
121    pass
122 
123  def qdel(self, jobId, force=False):
124    exitCode = 0
125    qdel = self.__qdel
126    if force:
127      qdel = "%s -p %s" % (qdel, jobId)
128    else:
129      qdel = "%s %s" % (qdel, jobId) 
130
131    self.__log.debug(qdel)
132
133    qdelProcess = simpleCommand('qdel', qdel, env=self.__env)
134    qdelProcess.start()
135    qdelProcess.wait()
136    qdelProcess.join()     
137     
138    exitCode = qdelProcess.exit_code()
139   
140    return exitCode
141 
142  def pbsdsh(self, arguments):
143    status = None
144   
145    pbsdshCommand = "%s %s" % (self.__pbsdsh, args_to_string(arguments))
146   
147    self.__log.debug("pbsdsh command: %s" % pbsdshCommand)
148   
149    pbsdsh = simpleCommand('pbsdsh', pbsdshCommand, env=self.__env)
150    pbsdsh.start()   
151
152    for i in range(0, 30):
153      status = pbsdsh.exit_code()
154      if status:
155        self.__log.error("pbsdsh failed: %s" % pbsdsh.exit_status_string())
156        break 
157   
158    if not status: status = 0
159     
160    return status 
161
162  def qalter(self, fieldName, fieldValue, jobId):
163    """Update the job field with fieldName with the fieldValue.
164       The fieldValue must be modifiable after the job is submitted."""
165
166    # E.g. to alter comment: qalter -W notes='value` jobId
167    qalterCmd = '%s -W %s=\"%s\" %s' % (self.__qalter, fieldName, fieldValue, jobId) 
168    self.__log.debug("qalter command: %s" % qalterCmd)
169    qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env)
170    qalterProcess.start()
171    qalterProcess.wait()
172    qalterProcess.join()
173    exitCode = qalterProcess.exit_code()
174
175    return exitCode
Note: See TracBrowser for help on using the repository browser.