source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/Common/hodsvc.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.2 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16# $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
17#
18#------------------------------------------------------------------------------
19import os, time, shutil, xmlrpclib, socket, pprint
20
21from signal import *
22
23from hodlib.Common.logger import hodLog, hodDummyLogger
24from hodlib.Common.socketServers import hodXMLRPCServer
25from hodlib.Common.util import local_fqdn
26from hodlib.Common.xmlrpc import hodXRClient
27
28class hodBaseService:
29  """hodBaseService class - This class provides service registration, logging,
30     and configuration access methods.  It also provides an XML-RPC server.
31     This class should be extended to create hod services.  Methods beginning
32     with _xr_method will automatically be added to instances of this class.
33     """
34  def __init__(self, name, config, xrtype='threaded'):
35    """ Initialization requires a name string and a config object of type
36        hodlib.Common.setup.options or hodlib.Common.setup.config."""
37       
38    self.name = name
39    self.hostname = local_fqdn()
40    self._cfg = config
41    self._xrc = None
42    self.logs = {}
43    self._baseLogger = None
44    self._serviceID = os.getenv('PBS_JOBID')
45       
46    self.__logDir = None
47    self.__svcrgy = None
48    self.__stop = False
49    self.__xrtype = xrtype
50   
51    self._init_logging()
52       
53    if name != 'serviceRegistry': self._init_signals()
54    self._init_xrc_server()
55   
56  def __set_logging_level(self, level):
57    self.logs['main'].info("Setting log level to %s." % level)
58    for loggerName in self.loggers.keys():
59      self.logs['main'].set_logger_level(loggerName, level)
60
61  def __get_logging_level(self):
62    if self._cfg.has_key('stream'):
63      return self.loggers['main'].get_level('stream', 'main')
64    elif self._cfg.has_key('log-dir'):
65      return self.loggers['main'].get_level('file', 'main')
66    else:
67      return 0
68 
69  def _xr_method_stop(self, *args):
70    """XML-RPC method, calls stop() on ourselves."""
71   
72    return self.stop()
73 
74  def _xr_method_status(self, *args):
75    """XML-RPC method, calls status() on ourselves."""
76   
77    return self.status()
78 
79  def _init_logging(self):
80    if self._cfg.has_key('debug'):
81      if self._cfg['debug'] > 0:
82        self._baseLogger = hodLog(self.name)
83        self.logs['main'] = self._baseLogger.add_logger('main')
84       
85        if self._cfg.has_key('stream'):
86          if self._cfg['stream']:
87            self._baseLogger.add_stream(level=self._cfg['debug'], 
88                                 addToLoggerNames=('main',))
89           
90        if self._cfg.has_key('log-dir'):
91          if self._serviceID:
92              self.__logDir = os.path.join(self._cfg['log-dir'], "%s.%s" % (
93                                       self._cfg['userid'], self._serviceID))
94          else:
95              self.__logDir = os.path.join(self._cfg['log-dir'], 
96                                           self._cfg['userid'])
97          if not os.path.exists(self.__logDir):
98            os.mkdir(self.__logDir)
99           
100          self._baseLogger.add_file(logDirectory=self.__logDir, 
101            level=self._cfg['debug'], addToLoggerNames=('main',))
102         
103        if self._cfg.has_key('syslog-address'):
104          self._baseLogger.add_syslog(self._cfg['syslog-address'], 
105            level=self._cfg['debug'], addToLoggerNames=('main',))
106       
107        if not self.logs.has_key('main'):
108          self.logs['main'] = hodDummyLogger()
109      else:
110        self.logs['main'] = hodDummyLogger()
111    else:
112      self.logs['main'] = hodDummyLogger()
113 
114  def _init_signals(self):
115    def sigStop(sigNum, handler):
116      self.sig_wrapper(sigNum, self.stop)
117
118    def toggleLevel():
119      currentLevel = self.__get_logging_level()
120      if currentLevel == 4:
121        self.__set_logging_level(1)
122      else:
123        self.__set_logging_level(currentLevel + 1)
124
125    def sigStop(sigNum, handler):
126      self._sig_wrapper(sigNum, self.stop)
127
128    def sigDebug(sigNum, handler):
129      self.sig_wrapper(sigNum, toggleLevel)
130
131    signal(SIGTERM, sigStop)
132    signal(SIGQUIT, sigStop)
133    signal(SIGINT, sigStop)
134    signal(SIGUSR2, sigDebug)
135
136  def _sig_wrapper(self, sigNum, handler, *args):
137    self.logs['main'].info("Caught signal %s." % sigNum)
138
139    if args:
140        handler(args)
141    else:
142        handler()
143 
144  def _init_xrc_server(self):
145    host = None
146    ports = None
147    if self._cfg.has_key('xrs-address'):
148      (host, port) = (self._cfg['xrs-address'][0], self._cfg['xrs-address'][1])
149      ports = (port,)
150    elif self._cfg.has_key('xrs-port-range'):
151      host = ''
152      ports = self._cfg['xrs-port-range']
153   
154    if host != None: 
155      if self.__xrtype == 'threaded':
156        self._xrc = hodXMLRPCServer(host, ports)
157      elif self.__xrtype == 'twisted':
158        try:
159          from socketServers import twistedXMLRPCServer
160          self._xrc = twistedXMLRPCServer(host, ports, self.logs['main'])
161        except ImportError:
162          self.logs['main'].error("Twisted XML-RPC server not available, "
163                                  + "falling back on threaded server.")
164          self._xrc = hodXMLRPCServer(host, ports)
165      for attr in dir(self):
166        if attr.startswith('_xr_method_'):
167          self._xrc.register_function(getattr(self, attr),
168                                      attr[11:])
169   
170      self._xrc.register_introspection_functions()
171 
172  def _register_service(self, port=None, installSignalHandlers=1):
173    if self.__svcrgy:
174      self.logs['main'].info(
175          "Registering service with service registery %s... " % self.__svcrgy)
176      svcrgy = hodXRClient(self.__svcrgy, None, None, 0, 0, installSignalHandlers)
177     
178      if self._xrc and self._http:
179        svcrgy.registerService(self._cfg['userid'], self._serviceID, 
180                               self.hostname, self.name, 'hod', {
181                               'xrs' : "http://%s:%s" % (
182                               self._xrc.server_address[0], 
183                               self._xrc.server_address[1]),'http' : 
184                               "http://%s:%s" % (self._http.server_address[0], 
185                               self._http.server_address[1])})
186      elif self._xrc:
187        svcrgy.registerService(self._cfg['userid'], self._serviceID, 
188                               self.hostname, self.name, 'hod', {
189                               'xrs' : "http://%s:%s" % (
190                               self._xrc.server_address[0], 
191                               self._xrc.server_address[1]),})
192      elif self._http:
193        svcrgy.registerService(self._cfg['userid'], self._serviceID, 
194                               self.hostname, self.name, 'hod', {'http' : 
195                               "http://%s:%s" % (self._http.server_address[0], 
196                               self._http.server_address[1]),})       
197      else:
198        svcrgy.registerService(self._cfg['userid'], self._serviceID, 
199                               self.hostname, name, 'hod', {} )
200 
201  def start(self):
202    """ Start XML-RPC server and register service."""
203   
204    self.logs['main'].info("Starting HOD service: %s ..." % self.name)
205
206    if self._xrc: self._xrc.serve_forever()
207    if self._cfg.has_key('register') and self._cfg['register']:
208        self._register_service()
209 
210  def stop(self):
211    """ Stop XML-RPC server, unregister service and set stop flag. """
212   
213    self.logs['main'].info("Stopping service...")
214    if self._xrc: self._xrc.stop()
215    self.__stop = True
216 
217    return True
218 
219  def status(self):
220    """Returns true, should be overriden."""
221   
222    return True
223 
224  def wait(self):
225    """Wait until stop method is called."""
226   
227    while not self.__stop:
228      time.sleep(.1)
Note: See TracBrowser for help on using the repository browser.