source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/NodePools/torque.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.1 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16"""Maui/Torque implementation of NodePool"""
17# -*- python -*-
18
19import os, sys, csv, socket, time, re, pprint
20
21from hodlib.Hod.nodePool import *
22from hodlib.Schedulers.torque import torqueInterface
23from hodlib.Common.threads import simpleCommand
24from hodlib.Common.util import get_exception_string, args_to_string, local_fqdn, \
25                        TORQUE_USER_LIMITS_COMMENT_FIELD
26
27class TorqueNodeSet(NodeSet):
28  def __init__(self, id, numNodes, preferredList, isPreemptee):
29    NodeSet.__init__(self, id, numNodes, preferredList, isPreemptee)
30    self.qsubId = None
31    self.addrList = []
32
33  def _setQsubId(self, qsubId):
34    self.qsubId = qsubId
35
36  def _setAddrList(self, addrList):
37    self.addrList = addrList
38
39  def getAddrList(self):
40    return self.addrList
41
42class TorquePool(NodePool):
43  def __init__(self, nodePoolDesc, cfg, log):
44    NodePool.__init__(self, nodePoolDesc, cfg, log)
45
46    environ = os.environ.copy()
47   
48    if self._cfg['resource_manager'].has_key('pbs-server'):
49      environ['PBS_DEFAULT'] = self._cfg['resource_manager']['pbs-server']
50
51    self.__torque = torqueInterface(
52      self._cfg['resource_manager']['batch-home'], environ, self._log)
53
54  def getAccountString(self):
55    account = ''
56    if self._cfg['resource_manager'].has_key('pbs-account'):
57      account = self._cfg['resource_manager']['pbs-account']
58    return account
59
60  def __gen_submit_params(self, nodeSet, walltime = None, qosLevel = None, 
61                          account = None):
62    argList = []
63    stdinList = []
64   
65    npd = self.nodePoolDesc
66   
67    def gen_stdin_list():
68      # Here we are basically generating the standard input for qsub.
69      #  Specifically a script to exec ringmaster.
70      stdinList.append('#!/bin/sh')
71     
72      ringBin = os.path.join(self._cfg['hod']['base-dir'], 'bin', 
73                             'ringmaster')
74      ringArgs = [ringBin,]
75      ringArgs.extend(self._cfg.get_args(exclude=('hod')))
76     
77      ringMasterCommand = args_to_string(ringArgs)
78     
79      self._log.debug("ringmaster cmd: %s" % ringMasterCommand)
80     
81      stdinList.append(ringMasterCommand)
82     
83    def gen_arg_list():     
84      def process_qsub_attributes():
85        rawAttributes = self.nodePoolDesc.getAttrs()
86   
87        # 'W:x' is used to specify torque management extentensions ie -W x= ...
88        resourceManagementExtensions = ''
89        if 'W:x' in rawAttributes:
90          resourceManagementExtensions = rawAttributes['W:x']
91   
92        if qosLevel:
93          if len(resourceManagementExtensions) > 0:
94            resourceManagementExtensions += ';'
95          resourceManagementExtensions += 'QOS:%s' % (qosLevel)
96   
97        rawAttributes['W:x'] = resourceManagementExtensions
98       
99        hostname = local_fqdn()
100   
101        # key values are expected to have string values.
102        rawAttributes['l:nodes'] = "%s" % nodeSet._getNumNodes()
103       
104        if walltime:
105          rawAttributes['l:walltime'] = "%s" % walltime
106       
107        #create a dict of dictionaries for
108        # various arguments of torque
109        cmds = {}
110        for key in rawAttributes:
111          value = rawAttributes[key]
112   
113          if key.find(':') == -1:
114            raise ValueError, 'Syntax error: missing colon after %s in %s=%s' % (
115              key, key, value)
116   
117          [option, subOption] = key.split(':', 1)
118          if not option in cmds:
119            cmds[option] = {}
120          cmds[option][subOption] = value
121       
122        opts = []
123        #create a string from this
124        #dictionary of dictionaries createde above
125        for k in cmds:
126          csv = []
127          nv = cmds[k]
128          for n in nv:
129            v = nv[n]
130            if len(n) == 0:
131              csv.append(v)
132            else:
133              csv.append('%s=%s' % (n, v))
134          opts.append('-%s' % (k))
135          opts.append(','.join(csv))
136   
137        for option in cmds:
138          commandList = []
139          for subOption in cmds[option]:
140            value = cmds[option][subOption]
141            if len(subOption) == 0:
142                commandList.append(value)
143            else:
144                commandList.append("%s=%s" % (subOption, value))
145          opts.append('-%s' % option)
146          opts.append(','.join(commandList))
147         
148        return opts
149     
150      pkgdir = npd.getPkgDir()
151 
152      qsub = os.path.join(pkgdir, 'bin', 'qsub')
153      sdd = self._cfg['servicedesc']
154     
155      gsvc = None
156      for key in sdd:
157        gsvc = sdd[key]
158        break
159     
160      argList.extend(process_qsub_attributes())
161
162      argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))
163      argList.extend(('-r','n'))
164
165      if 'pbs-user' in self._cfg['resource_manager']:
166        argList.extend(('-u', self._cfg['resource_manager']['pbs-user']))
167 
168      argList.extend(('-d','/tmp/'))
169      if 'queue' in self._cfg['resource_manager']:
170        queue = self._cfg['resource_manager']['queue']
171        argList.extend(('-q',queue))
172 
173      # In HOD 0.4, we pass in an account string only if it is mentioned.
174      # Also, we don't append userid to the account string, as HOD jobs run as the
175      # user running them, not as 'HOD' user.
176      if self._cfg['resource_manager'].has_key('pbs-account'):
177        argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))
178   
179      if 'env-vars' in self._cfg['resource_manager']:
180        qsub_envs = self._cfg['resource_manager']['env-vars']
181        argList.extend(('-v', self.__keyValToString(qsub_envs)))
182
183    gen_arg_list()
184    gen_stdin_list()
185   
186    return argList, stdinList
187   
188  def __keyValToString(self, keyValList):
189    ret = ""
190    for key in keyValList:
191      ret = "%s%s=%s," % (ret, key, keyValList[key])
192    return ret[:-1]
193 
194  def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
195    if not id:
196      id = self.getNextNodeSetId()
197   
198    nodeSet = TorqueNodeSet(id, numNodes, preferred, isPreemptee)
199
200    self.nodeSetDict[nodeSet.getId()] = nodeSet
201   
202    return nodeSet
203     
204  def submitNodeSet(self, nodeSet, walltime = None, qosLevel = None, 
205                    account = None):
206
207    argList, stdinList = self.__gen_submit_params(nodeSet, walltime, qosLevel, 
208                                                  account)
209   
210    jobId, exitCode = self.__torque.qsub(argList, stdinList)
211   
212    ## UNUSED CODE: LINE ##
213    nodeSet.qsubId = jobId
214
215    return jobId, exitCode
216
217  def freeNodeSet(self, nodeSet):
218   
219    exitCode = self.deleteJob(nodeSet.getId())
220   
221    del self.nodeSetDict[nodeSet.getId()]
222 
223    return exitCode
224 
225  def finalize(self):
226    status = 0
227    exitCode = 0
228    for nodeSet in self.nodeSetDict.values():
229      exitCode = self.freeNodeSet(nodeSet)
230     
231    if exitCode > 0 and exitCode != 153:
232      status = 4
233     
234    return status
235   
236  ## UNUSED METHOD ?? ##
237  def getWorkers(self):
238    hosts = []
239   
240    qstatInfo = self.__torque(self.getServiceId())
241    if qstatInfo:
242      hosts = qstatInfop['exec_host']
243   
244    return hosts
245 
246  ## UNUSED METHOD ?? ##
247  def pollNodeSet(self, nodeSet):
248    status = NodeSet.COMPLETE 
249    nodeSet = self.nodeSetDict[0] 
250
251    qstatInfo = self.__torque(self.getServiceId())
252
253    if qstatMap:   
254      jobstate = qstatMap['job_state']
255      exechost = qstatMap['exec_host']
256
257    if jobstate == 'Q':
258      status = NodeSet.PENDING
259    elif exechost == None:
260      status = NodeSet.COMMITTED
261    else:
262      nodeSet._setAddrList(exec_host)
263
264    return status
265       
266  def getServiceId(self):
267    id = None
268   
269    nodeSets = self.nodeSetDict.values()
270    if len(nodeSets):
271      id = nodeSets[0].qsubId
272     
273    if id == None:
274      id = os.getenv('PBS_JOBID')
275     
276    return id
277
278  def getJobInfo(self, jobId=None):
279
280    jobNonExistentErrorCode = 153
281    self.__jobInfo = { 'job_state' : False }
282   
283    if jobId == None:
284      jobId = self.getServiceId()
285
286    qstatInfo, exitCode = self.__torque.qstat(jobId)
287    if exitCode == 0:
288      self.__jobInfo = qstatInfo
289    elif exitCode == jobNonExistentErrorCode:
290      # This really means that the job completed
291      # However, setting only job_state for now, not
292      # any other attributes, as none seem required.
293      self.__jobInfo = { 'job_state' : 'C' }
294
295    return self.__jobInfo
296
297  def deleteJob(self, jobId):
298    exitCode = self.__torque.qdel(jobId)
299    return exitCode
300
301  def isJobFeasible(self):
302    comment = None
303    msg = None
304    if self.__jobInfo.has_key('comment'):
305      comment = self.__jobInfo['comment']
306    try:
307      if comment:
308        commentField = re.compile(self._cfg['hod']['job-feasibility-attr'])
309        match = commentField.search(comment)
310        if match:
311          reqUsage = int(match.group(1))
312          currentUsage = int(match.group(2))
313          maxUsage = int(match.group(3))
314          msg = "Current Usage:%s, Requested:%s, Maximum Limit:%s " % \
315                                  (currentUsage, reqUsage, maxUsage)
316          if reqUsage > maxUsage:
317            return "Never", msg
318          if reqUsage + currentUsage > maxUsage:
319            return False, msg
320    except Exception, e:
321      self._log.error("Error in isJobFeasible : %s" %e)
322      raise Exception(e)
323    return True, msg
324   
325  def runWorkers(self, args):
326    return self.__torque.pbsdsh(args)
327
328  def updateWorkerInfo(self, workerInfoMap, jobId):
329    workerInfoStr = ''
330    for key in workerInfoMap.keys():
331      workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])
332    exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)
333    return exitCode
334
Note: See TracBrowser for help on using the repository browser.