#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. import threading, time, os, sys, pprint from popen2 import Popen4, Popen3, MAXFD from signal import SIGTERM, SIGKILL class baseThread(threading.Thread): """Base CAM threading class. The run method should be overridden.""" def __init__(self, name): threading.Thread.__init__(self, name=name) self.stopFlag = threading.Event() self.stopFlag.clear() self.running = threading.Event() self.running.set() self.isFinished = threading.Event() self.isFinished.clear() def join(self, timeout=None): self.stopFlag.set() threading.Thread.join(self, timeout) def pause(self): """Pause thread.""" self.running.clear() def cont(self): """Resume thread operation.""" self.running.set() class simpleCommand(baseThread): """Command execution object. Command output and exit status are captured. Public class attributes: cmdString - command to be executed outputBuffer - command output, stdout + stderr status - exit status, as returned by wait stdin - standard input for command stdout - standard output of command when buffer == False stderr - standard error of command when mode == 3 and buffer == False """ def __init__(self, name, cmdString, env=os.environ, mode=4, buffer=True, wait=True, chdir=None): """Class initialization. name - thread name to use when running the command cmdString - command string to execute inputString - string to print to command's stdin env - shell environment dictionary mode - 3 for popen3 and 4 for popen4 buffer - out put to be retrieved with output() method wait - return immediately after start() is called and output command results as they come to stdout""" baseThread.__init__(self, name=name) self.cmdString = cmdString self.__mode = mode self.__buffer = buffer self.__wait = wait self.__chdir = chdir self.__outputBuffer = [] self.__status = None self.__pid = None self.__isFinished = threading.Event() self.__isFinished.clear() self.stdin = None self.stdout = None self.stderr = None self.__env = env def run(self): """ Overridden run method. Most of the work happens here. start() should be called in place of this method.""" oldDir = None if self.__chdir: if os.path.exists(self.__chdir): oldDir = os.getcwd() os.chdir(self.__chdir) else: raise Exception( "simpleCommand: invalid chdir specified: %s" % self.__chdir) cmd = None if self.__mode == 3: cmd = _Popen3Env(self.cmdString, env=self.__env) else: cmd = _Popen4Env(self.cmdString, env=self.__env) self.__pid = cmd.pid self.stdin = cmd.tochild if self.__mode == 3: self.stderr = cmd.childerr while cmd.fromchild == None: time.sleep(1) if self.__buffer == True: output = cmd.fromchild.readline() while output != '': while not self.running.isSet(): if self.stopFlag.isSet(): break time.sleep(1) self.__outputBuffer.append(output) output = cmd.fromchild.readline() elif self.__wait == False: output = cmd.fromchild.readline() while output != '': while not self.running.isSet(): if self.stopFlag.isSet(): break time.sleep(1) print output, if self.stopFlag.isSet(): break output = cmd.fromchild.readline() else: self.stdout = cmd.fromchild self.__status = cmd.poll() while self.__status == -1: while not self.running.isSet(): if self.stopFlag.isSet(): break time.sleep(1) self.__status = cmd.poll() time.sleep(1) if oldDir: os.chdir(oldDir) self.__isFinished.set() sys.exit(0) def getPid(self): """return pid of the launches process""" return self.__pid def output(self): return self.__outputBuffer[:] def wait(self): """Wait blocking until command execution completes.""" self.__isFinished.wait() return os.WEXITSTATUS(self.__status) def is_running(self): """Returns boolean, are we running?""" status = True if self.__isFinished.isSet(): status = False return status def exit_code(self): """ Returns process exit code.""" if self.__status != None: return os.WEXITSTATUS(self.__status) else: return None def exit_status_string(self): """Return a string representation of the command's exit status.""" statusString = None if self.__status: exitStatus = os.WEXITSTATUS(self.__status) exitSignal = os.WIFSIGNALED(self.__status) coreDump = os.WCOREDUMP(self.__status) statusString = "exit code: %s | signal: %s | core %s" % \ (exitStatus, exitSignal, coreDump) return(statusString) def stop(self): """Stop the running command and join it's execution thread.""" self.join() def kill(self): count = 0 while self.is_running(): try: if count > 20: os.kill(self.__pid, SIGKILL) break else: os.kill(self.__pid, SIGTERM) except: break time.sleep(.1) count = count + 1 self.stop() class _Popen3Env(Popen3): def __init__(self, cmd, capturestderr=False, bufsize=-1, env=os.environ): self._env = env Popen3.__init__(self, cmd, capturestderr, bufsize) def _run_child(self, cmd): if isinstance(cmd, basestring): cmd = ['/bin/sh', '-c', cmd] for i in xrange(3, MAXFD): try: os.close(i) except OSError: pass try: os.execvpe(cmd[0], cmd, self._env) finally: os._exit(1) class _Popen4Env(_Popen3Env, Popen4): childerr = None def __init__(self, cmd, bufsize=-1, env=os.environ): self._env = env Popen4.__init__(self, cmd, bufsize) class loop(baseThread): """ A simple extension of the threading.Thread class which continuously executes a block of code until join(). """ def __init__(self, name, functionRef, functionArgs=None, sleep=1, wait=0, offset=False): """Initialize a loop object. name - thread name functionRef - a function reference functionArgs - function arguments in the form of a tuple, sleep - time to wait between function execs wait - time to wait before executing the first time offset - set true to sleep as an offset of the start of the last func exec instead of the end of the last func exec """ self.__functionRef = functionRef self.__functionArgs = functionArgs self.__sleep = sleep self.__wait = wait self.__offset = offset baseThread.__init__(self, name=name) def run(self): """Do not call this directly. Call self.start().""" startTime = None while not self.stopFlag.isSet(): sleep = self.__sleep if self.__wait > 0: startWaitCount = 0 while not self.stopFlag.isSet(): while not self.running.isSet(): if self.stopFlag.isSet(): break time.sleep(1) time.sleep(0.5) startWaitCount = startWaitCount + .5 if startWaitCount >= self.__wait: self.__wait = 0 break startTime = time.time() if not self.stopFlag.isSet(): if self.running.isSet(): if self.__functionArgs: self.__functionRef(self.__functionArgs) else: self.__functionRef() endTime = time.time() while not self.running.isSet(): time.sleep(1) while not self.stopFlag.isSet(): while not self.running.isSet(): if self.stopFlag.isSet(): break time.sleep(1) currentTime = time.time() if self.__offset: elapsed = time.time() - startTime else: elapsed = time.time() - endTime if elapsed >= self.__sleep: break time.sleep(0.5) self.isFinished.set() def set_sleep(self, sleep, wait=None, offset=None): """Modify loop frequency paramaters. sleep - time to wait between function execs wait - time to wait before executing the first time offset - set true to sleep as an offset of the start of the last func exec instead of the end of the last func exec """ self.__sleep = sleep if wait != None: self.__wait = wait if offset != None: self.__offset = offset def get_sleep(self): """Get loop frequency paramaters. Returns a dictionary with sleep, wait, offset. """ return { 'sleep' : self.__sleep, 'wait' : self.__wait, 'offset' : self.__offset, } class func(baseThread): """ A simple extension of the threading.Thread class which executes a function in a separate thread. """ def __init__(self, name, functionRef, functionArgs=None): """Initialize a func object. name - thread name functionRef - a function reference functionArgs - function arguments in the form of a tuple, """ self.__functionRef = functionRef self.__functionArgs = functionArgs baseThread.__init__(self, name=name) def run(self): """Do not call this directly. Call self.start().""" if not self.stopFlag.isSet(): if self.running.isSet(): if self.__functionArgs: self.__functionRef(self.__functionArgs) else: self.__functionRef() sys.exit(0)