source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/Common/threads.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 12.0 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16
17import threading, time, os, sys, pprint
18
19from popen2 import Popen4, Popen3, MAXFD
20from signal import SIGTERM, SIGKILL
21
22class baseThread(threading.Thread):
23    """Base CAM threading class.  The run method should be overridden."""
24
25    def __init__(self, name):
26        threading.Thread.__init__(self, name=name)
27        self.stopFlag = threading.Event()
28        self.stopFlag.clear()
29        self.running = threading.Event()
30        self.running.set()
31        self.isFinished = threading.Event()
32        self.isFinished.clear()
33
34    def join(self, timeout=None):
35        self.stopFlag.set()
36        threading.Thread.join(self, timeout)
37
38    def pause(self):
39        """Pause thread."""
40
41        self.running.clear()
42
43    def cont(self):
44        """Resume thread operation."""
45
46        self.running.set()
47
48class simpleCommand(baseThread):
49    """Command execution object.  Command output and exit status are captured.
50
51       Public class attributes:
52
53       cmdString    - command to be executed
54       outputBuffer - command output, stdout + stderr
55       status       - exit status, as returned by wait
56       
57       stdin        - standard input for command
58       stdout       - standard output of command when buffer == False
59       stderr       - standard error of command when mode == 3 and buffer == False
60       
61       """
62
63    def __init__(self, name, cmdString, env=os.environ, mode=4, buffer=True, 
64                 wait=True, chdir=None):
65        """Class initialization.
66
67           name        - thread name to use when running the command
68           cmdString   - command string to execute
69           inputString - string to print to command's stdin
70           env         - shell environment dictionary
71           mode        - 3 for popen3 and 4 for popen4
72           buffer      - out put to be retrieved with output() method
73           wait        - return immediately after start() is called and output
74                         command results as they come to stdout"""
75
76        baseThread.__init__(self, name=name)
77
78        self.cmdString = cmdString
79        self.__mode = mode
80        self.__buffer = buffer
81        self.__wait = wait
82        self.__chdir = chdir
83        self.__outputBuffer = []
84        self.__status = None
85        self.__pid = None
86        self.__isFinished = threading.Event()
87        self.__isFinished.clear()
88       
89        self.stdin = None
90        self.stdout = None
91        self.stderr = None
92
93        self.__env = env
94   
95    def run(self):
96        """ Overridden run method.  Most of the work happens here.  start()
97            should be called in place of this method."""
98           
99        oldDir = None
100        if self.__chdir:
101            if os.path.exists(self.__chdir):
102                oldDir = os.getcwd() 
103                os.chdir(self.__chdir)
104            else:
105                raise Exception(
106                    "simpleCommand: invalid chdir specified: %s" % 
107                    self.__chdir)
108           
109        cmd = None
110        if self.__mode == 3:
111            cmd = _Popen3Env(self.cmdString, env=self.__env)
112        else:
113            cmd = _Popen4Env(self.cmdString, env=self.__env)
114        self.__pid = cmd.pid
115
116        self.stdin = cmd.tochild
117       
118        if self.__mode == 3:
119            self.stderr = cmd.childerr
120
121        while cmd.fromchild == None:
122            time.sleep(1)
123       
124        if self.__buffer == True:
125            output = cmd.fromchild.readline()
126            while output != '':
127                while not self.running.isSet():
128                    if self.stopFlag.isSet():
129                        break
130                    time.sleep(1)
131                self.__outputBuffer.append(output)
132                output = cmd.fromchild.readline()
133
134        elif self.__wait == False:
135            output = cmd.fromchild.readline()
136            while output != '':
137                while not self.running.isSet():
138                    if self.stopFlag.isSet():
139                        break
140                    time.sleep(1)
141                print output,
142                if self.stopFlag.isSet():
143                    break
144                output = cmd.fromchild.readline()
145        else:
146            self.stdout = cmd.fromchild
147
148        self.__status = cmd.poll()
149        while self.__status == -1:
150            while not self.running.isSet():
151                if self.stopFlag.isSet():
152                    break
153                time.sleep(1)
154
155            self.__status = cmd.poll()
156            time.sleep(1)
157
158        if oldDir:
159            os.chdir(oldDir)
160
161        self.__isFinished.set()
162       
163        sys.exit(0)
164
165    def getPid(self):
166        """return pid of the launches process"""
167        return self.__pid
168
169    def output(self):
170        return self.__outputBuffer[:]
171
172    def wait(self):
173        """Wait blocking until command execution completes."""
174
175        self.__isFinished.wait()
176
177        return os.WEXITSTATUS(self.__status)
178
179    def is_running(self):
180        """Returns boolean, are we running?"""
181       
182        status = True
183        if self.__isFinished.isSet():
184            status = False
185           
186        return status
187
188    def exit_code(self):
189        """ Returns process exit code."""
190       
191        if self.__status != None:
192            return os.WEXITSTATUS(self.__status)
193        else:
194            return None
195       
196    def exit_status_string(self):
197        """Return a string representation of the command's exit status."""
198
199        statusString = None
200        if self.__status:
201            exitStatus = os.WEXITSTATUS(self.__status)
202            exitSignal = os.WIFSIGNALED(self.__status)
203            coreDump   = os.WCOREDUMP(self.__status)
204
205            statusString = "exit code: %s | signal: %s | core %s" % \
206                (exitStatus, exitSignal, coreDump)
207
208        return(statusString)
209
210    def stop(self):
211        """Stop the running command and join it's execution thread."""
212
213        self.join()
214
215    def kill(self):
216        count = 0
217        while self.is_running():
218          try:
219            if count > 20:
220              os.kill(self.__pid, SIGKILL)
221              break
222            else: 
223              os.kill(self.__pid, SIGTERM)
224          except:
225            break
226         
227          time.sleep(.1)
228          count = count + 1
229       
230        self.stop()
231       
232class _Popen3Env(Popen3):
233    def __init__(self, cmd, capturestderr=False, bufsize=-1, env=os.environ):
234        self._env = env
235        Popen3.__init__(self, cmd, capturestderr, bufsize)
236   
237    def _run_child(self, cmd):
238        if isinstance(cmd, basestring):
239            cmd = ['/bin/sh', '-c', cmd]
240        for i in xrange(3, MAXFD):
241            try:
242                os.close(i)
243            except OSError:
244                pass
245
246        try:
247            os.execvpe(cmd[0], cmd, self._env)
248        finally:
249            os._exit(1)
250           
251class _Popen4Env(_Popen3Env, Popen4):
252    childerr = None
253
254    def __init__(self, cmd, bufsize=-1, env=os.environ):
255        self._env = env
256        Popen4.__init__(self, cmd, bufsize)
257       
258class loop(baseThread):
259    """ A simple extension of the threading.Thread class which continuously
260        executes a block of code until join().
261    """
262
263    def __init__(self, name, functionRef, functionArgs=None, sleep=1, wait=0,
264        offset=False):
265        """Initialize a loop object.
266
267           name         - thread name
268           functionRef  - a function reference
269           functionArgs - function arguments in the form of a tuple,
270           sleep        - time to wait between function execs
271           wait         - time to wait before executing the first time
272           offset       - set true to sleep as an offset of the start of the
273                          last func exec instead of the end of the last func
274                          exec
275        """
276
277        self.__functionRef  = functionRef
278        self.__functionArgs = functionArgs
279        self.__sleep        = sleep
280        self.__wait         = wait
281        self.__offset       = offset
282
283        baseThread.__init__(self, name=name)
284
285    def run(self):
286        """Do not call this directly.  Call self.start()."""
287
288        startTime = None
289        while not self.stopFlag.isSet():
290            sleep = self.__sleep
291            if self.__wait > 0:
292                startWaitCount = 0
293                while not self.stopFlag.isSet():
294                    while not self.running.isSet():
295                        if self.stopFlag.isSet():
296                            break
297                        time.sleep(1)
298                    time.sleep(0.5)
299                    startWaitCount = startWaitCount + .5
300                    if startWaitCount >= self.__wait:
301                        self.__wait = 0
302                        break
303            startTime = time.time()
304
305            if not self.stopFlag.isSet():
306                if self.running.isSet():
307                    if self.__functionArgs:
308                        self.__functionRef(self.__functionArgs)
309                    else:
310                        self.__functionRef()
311            endTime = time.time()
312
313            while not self.running.isSet():
314                time.sleep(1)
315
316            while not self.stopFlag.isSet():
317                while not self.running.isSet():
318                    if self.stopFlag.isSet():
319                        break
320                    time.sleep(1)
321
322                currentTime = time.time()
323                if self.__offset:
324                    elapsed = time.time() - startTime
325                else:
326                    elapsed = time.time() - endTime
327
328                if elapsed >= self.__sleep:
329                    break
330
331                time.sleep(0.5)
332       
333        self.isFinished.set()
334
335    def set_sleep(self, sleep, wait=None, offset=None):
336        """Modify loop frequency paramaters.
337
338           sleep        - time to wait between function execs
339           wait         - time to wait before executing the first time
340           offset       - set true to sleep as an offset of the start of the
341                          last func exec instead of the end of the last func
342                          exec
343        """
344
345        self.__sleep = sleep
346        if wait != None:
347            self.__wait = wait
348        if offset != None:
349            self.__offset = offset
350
351    def get_sleep(self):
352        """Get loop frequency paramaters.
353        Returns a dictionary with sleep, wait, offset.
354        """
355
356        return {
357            'sleep'  : self.__sleep,
358            'wait'   : self.__wait,
359            'offset' : self.__offset,
360            }
361       
362class func(baseThread):
363    """ A simple extension of the threading.Thread class which executes
364        a function in a separate thread.
365    """
366
367    def __init__(self, name, functionRef, functionArgs=None):
368        """Initialize a func object.
369
370           name         - thread name
371           functionRef  - a function reference
372           functionArgs - function arguments in the form of a tuple,
373        """
374
375        self.__functionRef  = functionRef
376        self.__functionArgs = functionArgs
377
378        baseThread.__init__(self, name=name)
379
380    def run(self):
381        """Do not call this directly.  Call self.start()."""
382
383        if not self.stopFlag.isSet():
384            if self.running.isSet():
385                if self.__functionArgs:
386                    self.__functionRef(self.__functionArgs)
387                else:
388                    self.__functionRef()
389        sys.exit(0)
Note: See TracBrowser for help on using the repository browser.