#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $ # #------------------------------------------------------------------------------ import os, time, shutil, xmlrpclib, socket, pprint from signal import * from hodlib.Common.logger import hodLog, hodDummyLogger from hodlib.Common.socketServers import hodXMLRPCServer from hodlib.Common.util import local_fqdn from hodlib.Common.xmlrpc import hodXRClient class hodBaseService: """hodBaseService class - This class provides service registration, logging, and configuration access methods. It also provides an XML-RPC server. This class should be extended to create hod services. Methods beginning with _xr_method will automatically be added to instances of this class. """ def __init__(self, name, config, xrtype='threaded'): """ Initialization requires a name string and a config object of type hodlib.Common.setup.options or hodlib.Common.setup.config.""" self.name = name self.hostname = local_fqdn() self._cfg = config self._xrc = None self.logs = {} self._baseLogger = None self._serviceID = os.getenv('PBS_JOBID') self.__logDir = None self.__svcrgy = None self.__stop = False self.__xrtype = xrtype self._init_logging() if name != 'serviceRegistry': self._init_signals() self._init_xrc_server() def __set_logging_level(self, level): self.logs['main'].info("Setting log level to %s." % level) for loggerName in self.loggers.keys(): self.logs['main'].set_logger_level(loggerName, level) def __get_logging_level(self): if self._cfg.has_key('stream'): return self.loggers['main'].get_level('stream', 'main') elif self._cfg.has_key('log-dir'): return self.loggers['main'].get_level('file', 'main') else: return 0 def _xr_method_stop(self, *args): """XML-RPC method, calls stop() on ourselves.""" return self.stop() def _xr_method_status(self, *args): """XML-RPC method, calls status() on ourselves.""" return self.status() def _init_logging(self): if self._cfg.has_key('debug'): if self._cfg['debug'] > 0: self._baseLogger = hodLog(self.name) self.logs['main'] = self._baseLogger.add_logger('main') if self._cfg.has_key('stream'): if self._cfg['stream']: self._baseLogger.add_stream(level=self._cfg['debug'], addToLoggerNames=('main',)) if self._cfg.has_key('log-dir'): if self._serviceID: self.__logDir = os.path.join(self._cfg['log-dir'], "%s.%s" % ( self._cfg['userid'], self._serviceID)) else: self.__logDir = os.path.join(self._cfg['log-dir'], self._cfg['userid']) if not os.path.exists(self.__logDir): os.mkdir(self.__logDir) self._baseLogger.add_file(logDirectory=self.__logDir, level=self._cfg['debug'], addToLoggerNames=('main',)) if self._cfg.has_key('syslog-address'): self._baseLogger.add_syslog(self._cfg['syslog-address'], level=self._cfg['debug'], addToLoggerNames=('main',)) if not self.logs.has_key('main'): self.logs['main'] = hodDummyLogger() else: self.logs['main'] = hodDummyLogger() else: self.logs['main'] = hodDummyLogger() def _init_signals(self): def sigStop(sigNum, handler): self.sig_wrapper(sigNum, self.stop) def toggleLevel(): currentLevel = self.__get_logging_level() if currentLevel == 4: self.__set_logging_level(1) else: self.__set_logging_level(currentLevel + 1) def sigStop(sigNum, handler): self._sig_wrapper(sigNum, self.stop) def sigDebug(sigNum, handler): self.sig_wrapper(sigNum, toggleLevel) signal(SIGTERM, sigStop) signal(SIGQUIT, sigStop) signal(SIGINT, sigStop) signal(SIGUSR2, sigDebug) def _sig_wrapper(self, sigNum, handler, *args): self.logs['main'].info("Caught signal %s." % sigNum) if args: handler(args) else: handler() def _init_xrc_server(self): host = None ports = None if self._cfg.has_key('xrs-address'): (host, port) = (self._cfg['xrs-address'][0], self._cfg['xrs-address'][1]) ports = (port,) elif self._cfg.has_key('xrs-port-range'): host = '' ports = self._cfg['xrs-port-range'] if host != None: if self.__xrtype == 'threaded': self._xrc = hodXMLRPCServer(host, ports) elif self.__xrtype == 'twisted': try: from socketServers import twistedXMLRPCServer self._xrc = twistedXMLRPCServer(host, ports, self.logs['main']) except ImportError: self.logs['main'].error("Twisted XML-RPC server not available, " + "falling back on threaded server.") self._xrc = hodXMLRPCServer(host, ports) for attr in dir(self): if attr.startswith('_xr_method_'): self._xrc.register_function(getattr(self, attr), attr[11:]) self._xrc.register_introspection_functions() def _register_service(self, port=None, installSignalHandlers=1): if self.__svcrgy: self.logs['main'].info( "Registering service with service registery %s... " % self.__svcrgy) svcrgy = hodXRClient(self.__svcrgy, None, None, 0, 0, installSignalHandlers) if self._xrc and self._http: svcrgy.registerService(self._cfg['userid'], self._serviceID, self.hostname, self.name, 'hod', { 'xrs' : "http://%s:%s" % ( self._xrc.server_address[0], self._xrc.server_address[1]),'http' : "http://%s:%s" % (self._http.server_address[0], self._http.server_address[1])}) elif self._xrc: svcrgy.registerService(self._cfg['userid'], self._serviceID, self.hostname, self.name, 'hod', { 'xrs' : "http://%s:%s" % ( self._xrc.server_address[0], self._xrc.server_address[1]),}) elif self._http: svcrgy.registerService(self._cfg['userid'], self._serviceID, self.hostname, self.name, 'hod', {'http' : "http://%s:%s" % (self._http.server_address[0], self._http.server_address[1]),}) else: svcrgy.registerService(self._cfg['userid'], self._serviceID, self.hostname, name, 'hod', {} ) def start(self): """ Start XML-RPC server and register service.""" self.logs['main'].info("Starting HOD service: %s ..." % self.name) if self._xrc: self._xrc.serve_forever() if self._cfg.has_key('register') and self._cfg['register']: self._register_service() def stop(self): """ Stop XML-RPC server, unregister service and set stop flag. """ self.logs['main'].info("Stopping service...") if self._xrc: self._xrc.stop() self.__stop = True return True def status(self): """Returns true, should be overriden.""" return True def wait(self): """Wait until stop method is called.""" while not self.__stop: time.sleep(.1)