#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. # Various socket server and helper classes. # # import os, sys, socket, threading, pprint, re, xmlrpclib, time from select import select from SocketServer import ThreadingMixIn, ForkingMixIn from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer from SimpleHTTPServer import SimpleHTTPRequestHandler from random import Random from urlparse import urlparse Fault = xmlrpclib.Fault from hodlib.Common.util import local_fqdn from hodlib.Common.logger import hodDummyLogger class hodHTTPHandler(BaseHTTPRequestHandler): port = -1 def __init__(self, request, client_address, server, registerService): self.registerService = registerService BaseHTTPRequestHandler.__init__(self, request, client_address, server) def log_message(self, *args): """Forget logging for now.""" pass def do_GET(self): self.fullUrl = "http://%s:%s%s" % (self.server.server_address[0], self.server.server_address[1], self.path) parsedUrl = urlparse(self.fullUrl) self.writeHeaders() self.writeData(parsedUrl) def w(self, string): self.wfile.write("%s\n" % string) def writeHeaders(self): self.send_response(200, 'OK') self.send_header('Content-type', 'text/html') self.end_headers() def sendWrongPage(self, userJob): self.w('') if userJob == False: self.w('invalid URL specified') elif re.match("^\d+$", userJob): self.w('invalid URL specified, job %s does not exist' % userJob) elif re.match("^\w+$", userJob): self.w('invalid URL specified, user %s does not exist' % userJob) self.w('') def getServiceHosts(self, serviceInfo): hostInfo = { 'long' : {}, 'short' : {} } for user in serviceInfo: for job in serviceInfo[user]: for host in serviceInfo[user][job]: for serviceItem in serviceInfo[user][job][host]: serviceName = serviceItem.keys() serviceName = serviceName[0] if isinstance(serviceItem[serviceName], str): hostInfo['short'][self.getJobKey(user, job, host)] = True hostInfo['long'][self.getJobKey(user, job, host)] = True return hostInfo def getJobInfo(self, job, serviceInfo): jobInfo = {} for user in serviceInfo.keys(): for someJob in serviceInfo[user].keys(): if job == someJob: jobInfo[user] = { job : serviceInfo[user][job] } return jobInfo def getJobKey(self, user, job, host): return "%s-%s-%s" % (user, job, host) def writeData(self, parsedUrl): options = parsedUrl[4] serviceInfo = self.server.service.getServiceInfo() users = serviceInfo.keys() users.sort() self.w("") self.w("") self.w("") self.writeCSS() self.w("") self.w('HOD Service Registry Information') if serviceInfo == {}: self.w('

  No HOD clusters configured.') else: if parsedUrl[2] == '/': self.w('   ') count = 0 for user in users: self.writeUserData(user, options, serviceInfo, count) count = count + 1 elif parsedUrl[2][1:] in serviceInfo: self.w('   
') self.writeUserData(parsedUrl[2][1:], options, serviceInfo, 0) elif re.match("^\d+$", parsedUrl[2][1:]): jobInfo = self.getJobInfo(parsedUrl[2][1:], serviceInfo) if jobInfo.keys(): self.w('   
') for user in jobInfo.keys(): self.writeUserData(user, options, jobInfo, 0) else: self.sendWrongPage(parsedUrl[2][1:]) self.w('   
') count = 0 for user in users: self.writeUserData(user, options, serviceInfo, count) count = count + 1 elif re.match("^\w+$", parsedUrl[2][1:]): self.sendWrongPage(parsedUrl[2][1:]) self.w('   
') count = 0 for user in users: self.writeUserData(user, options, serviceInfo, count) count = count + 1 else: self.sendWrongPage(False) self.w('   
') count = 0 for user in users: self.writeUserData(user, options, serviceInfo, count) count = count + 1 self.w('
') self.w("") self.w("") self.w("") def writeCSS(self): self.w('") def writeUserData(self, user, options, serviceInfo, count): hostInfo = self.getServiceHosts(serviceInfo) hostKey = 'short' if options == 'display=long': hostKey = 'long' if count == 0: self.w('') self.w('') self.w('Active Users') self.w('') self.w('') self.w('') self.w('%s' % user) self.w('') jobIDs = serviceInfo[user].keys() jobIDs.sort() for jobID in jobIDs: self.w('') if count == 0: self.w('') self.w('') self.w('') self.w('') self.w('' % jobID) self.w('') self.w('') self.w('
') self.w('PBS Job Identifiers') self.w('
%s') hosts = serviceInfo[user][jobID].keys() hosts.sort() for host in hosts: if hostInfo[hostKey].has_key(self.getJobKey(user, jobID, host)): self.w('') if count == 0: self.w('') self.w('') self.w('') self.w('') self.w('' % host) self.w('') self.w('') self.w('
') self.w('Hosts Running Services') self.w('
%s') self.w('') self.w('') self.w('') self.w('') for serviceItem in serviceInfo[user][jobID][host]: serviceName = serviceItem.keys() serviceName = serviceName[0] if isinstance(serviceItem[serviceName], dict) and \ options == 'display=long': self.w('') self.w('' % serviceName) self.w('') self.w('') elif isinstance(serviceItem[serviceName], str): self.w('') self.w('' % serviceName) self.w('') self.w('') self.w('
') self.w('Service Information') self.w('
%s') self.w('') for key in serviceItem[serviceName]: self.w('') self.w('' % key) self.w('' % serviceItem[serviceName][key]) self.w('') self.w('
%s%s
') self.w('
%s') (host, port) = serviceItem[serviceName].split(':') hostnameInfo = socket.gethostbyname_ex(host) if serviceName.startswith('mapred'): self.w('Hadoop Job Tracker' % (hostnameInfo[0], port)) elif serviceName.startswith('hdfs'): self.w('HDFS Name Node ' % (hostnameInfo[0], port)) else: self.w('%s' % serviceItem[serviceName]) self.w('
') self.w('
') count = count + 1 self.w('
') count = count + 1 self.w('') self.w('') # self.w("
")
#    self.w(pprint.pformat(serviceInfo))
#    self.w("
") class baseSocketServer: def __init__(self, host, ports): self.host = host self.ports = ports self.__stopForever = threading.Event() self.__stopForever.clear() self.__run = threading.Event() self.__run.set() self.server_address = () self.mThread = None def server_bind(self): """server_bind() method binds to a random range of ports.""" self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if len(self.ports) > 1: randomPort = Random(os.getpid()) portSequence = range(self.ports[0], self.ports[1]) maxTryCount = abs(self.ports[0] - self.ports[1]) tryCount = 0 while True: somePort = randomPort.choice(portSequence) self.server_address = (self.host, somePort) try: self.socket.bind(self.server_address) except socket.gaierror, errData: raise socket.gaierror, errData except: tryCount = tryCount + 1 if tryCount > maxTryCount: bindError = "bind failure for port range %s:%d" % ( self.ports) raise socket.error, bindError else: break else: self.server_address = (self.host, int(self.ports[0])) self.socket.bind(self.server_address) if self.host == '': self.server_address = (local_fqdn(), self.server_address[1]) def _serve_forever(self): """Replacement for serve_forever loop. All baseSocketServers run within a master thread; that thread imitates serve_forever, but checks an event (self.__stopForever) before processing new connections. """ while not self.__stopForever.isSet(): (rlist, wlist, xlist) = select([self.socket], [], [], 1) if (len(rlist) > 0 and self.socket == rlist[0]): self.handle_request() while not self.__run.isSet(): if self.__stopForever.isSet(): break time.sleep(1) self.server_close() return True def serve_forever(self): """Handle requests until stopForever event flag indicates stop.""" self.mThread = threading.Thread(name="baseSocketServer", target=self._serve_forever) self.mThread.start() return self.mThread def pause(self): """Temporarily stop servicing requests.""" self.__run.clear() def cont(self): """Resume servicing requests.""" self.__run.set() def stop(self): """Set the stopForever flag to tell serve_forever() to exit.""" self.__stopForever.set() if self.mThread: self.mThread.join() return True def is_alive(self): if self.mThread != None: return self.mThread.isAlive() else: return False class threadedHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer): def __init__(self, host, ports): baseSocketServer.__init__(self, host, ports) HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler) class forkingHTTPServer(baseSocketServer, ForkingMixIn, HTTPServer): def __init__(self, host, ports): baseSocketServer.__init__(self, host, ports) HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler) class hodHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer): service = None def __init__(self, host, ports, serviceobj = None): self.service = serviceobj baseSocketServer.__init__(self, host, ports) HTTPServer.__init__(self, self.server_address, hodHTTPHandler) def finish_request(self, request, client_address): self.RequestHandlerClass(request, client_address, self, self.service) class hodXMLRPCServer(baseSocketServer, ThreadingMixIn, SimpleXMLRPCServer): def __init__(self, host, ports, requestHandler=SimpleXMLRPCRequestHandler, logRequests=False, allow_none=False, encoding=None): baseSocketServer.__init__(self, host, ports) SimpleXMLRPCServer.__init__(self, self.server_address, requestHandler, logRequests) self.register_function(self.stop, 'stop') try: from twisted.web import server, xmlrpc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread from twisted.python import log class twistedXMLRPC(xmlrpc.XMLRPC): def __init__(self, logger): xmlrpc.XMLRPC.__init__(self) self.__XRMethods = {} self.__numRequests = 0 self.__logger = logger self.__pause = False def render(self, request): request.content.seek(0, 0) args, functionPath = xmlrpclib.loads(request.content.read()) try: function = self._getFunction(functionPath) except Fault, f: self._cbRender(f, request) else: request.setHeader("content-type", "text/xml") defer.maybeDeferred(function, *args).addErrback( self._ebRender).addCallback(self._cbRender, request) return server.NOT_DONE_YET def _cbRender(self, result, request): if isinstance(result, xmlrpc.Handler): result = result.result if not isinstance(result, Fault): result = (result,) try: s = xmlrpclib.dumps(result, methodresponse=1) except: f = Fault(self.FAILURE, "can't serialize output") s = xmlrpclib.dumps(f, methodresponse=1) request.setHeader("content-length", str(len(s))) request.write(s) request.finish() def _ebRender(self, failure): if isinstance(failure.value, Fault): return failure.value log.err(failure) return Fault(self.FAILURE, "error") def _getFunction(self, methodName): while self.__pause: time.sleep(1) self.__numRequests = self.__numRequests + 1 function = None try: def defer_function(*args): return deferToThread(self.__XRMethods[methodName], *args) function = defer_function self.__logger.info( "[%s] processing defered XML-RPC call to: %s ..." % (self.__numRequests, methodName)) except KeyError: self.__logger.warn( "[%s] fault %s on XML-RPC call to %s, method not found." % ( self.__numRequests, self.NOT_FOUND, methodName)) raise xmlrpc.NoSuchFunction(self.NOT_FOUND, "method %s not found" % methodName) return function def register_function(self, functionRef, methodName): self.__XRMethods[methodName] = functionRef def list_methods(self): return self.__XRMethods.keys() def num_requests(self): return self.__numRequests def pause(self): self.__pause = True def cont(self): self.__pause = False class twistedXMLRPCServer: def __init__(self, host, ports, logger=None, threadPoolSize=100): self.__host = host self.__ports = ports if logger == None: logger = hodDummyLogger() self.__logger = logger self.server_address = ['', ''] reactor.suggestThreadPoolSize(threadPoolSize) self.__stopForever = threading.Event() self.__stopForever.clear() self.__mThread = None self.__xmlrpc = twistedXMLRPC(self.__logger) def _serve_forever(self): if len(self.__ports) > 1: randomPort = Random(os.getpid()) portSequence = range(self.__ports[0], self.__ports[1]) maxTryCount = abs(self.__ports[0] - self.__ports[1]) tryCount = 0 while True: somePort = randomPort.choice(portSequence) self.server_address = (self.__host, int(somePort)) if self.__host == '': self.server_address = (local_fqdn(), self.server_address[1]) try: reactor.listenTCP(int(somePort), server.Site( self.__xmlrpc), interface=self.__host) reactor.run(installSignalHandlers=0) except: self.__logger.debug("Failed to bind to: %s:%s." % ( self.__host, somePort)) tryCount = tryCount + 1 if tryCount > maxTryCount: self.__logger.warn("Failed to bind to: %s:%s" % ( self.__host, self.__ports)) sys.exit(1) else: break else: try: self.server_address = (self.__host, int(self.__ports[0])) if self.__host == '': self.server_address = (local_fqdn(), self.server_address[1]) reactor.listenTCP(int(self.__ports[0]), server.Site(self.__xmlrpc), interface=self.__host) reactor.run(installSignalHandlers=0) except: self.__logger.warn("Failed to bind to: %s:%s."% ( self.__host, self.__ports[0])) sys.exit(1) def serve_forever(self): """Handle requests until stopForever event flag indicates stop.""" self.__mThread = threading.Thread(name="XRServer", target=self._serve_forever) self.__mThread.start() if not self.__mThread.isAlive(): raise Exception("Twisted XMLRPC server thread dead.") def register_function(self, functionRef, methodName): self.__xmlrpc.register_function(functionRef, methodName) def register_introspection_functions(self): pass def register_instance(self, instance): for method in dir(instance): if not method.startswith('_'): self.register_function(getattr(instance, method), method) def pause(self): self.__xmlrpc.pause() def cont(self): self.__xmlrpc.cont() def stop(self): def stop_thread(): time.sleep(2) reactor.stop() self.__stopForever.set() stopThread = threading.Thread(name='XRStop', target=stop_thread) stopThread.start() return True def is_alive(self): status = False if reactor.running == 1: status = True return status def status(self): """Return status information on running XMLRPC Server.""" stat = { 'XR server address' : self.server_address, 'XR methods' : self.system_listMethods(), 'XR server alive' : self.is_alive(), 'XR requests processed' : self.__xmlrpc.num_requests(), 'XR server stop flag' : self.__stopForever.isSet()} return(stat) def system_listMethods(self): return self.__xmlrpc.list_methods() def get_server_address(self): waitCount = 0 while self.server_address == '': if waitCount == 9: break time.sleep(1) waitCount = waitCount + 1 return self.server_address except ImportError: pass