[120] | 1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
| 2 | #or more contributor license agreements. See the NOTICE file |
---|
| 3 | #distributed with this work for additional information |
---|
| 4 | #regarding copyright ownership. The ASF licenses this file |
---|
| 5 | #to you under the Apache License, Version 2.0 (the |
---|
| 6 | #"License"); you may not use this file except in compliance |
---|
| 7 | #with the License. You may obtain a copy of the License at |
---|
| 8 | |
---|
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 10 | |
---|
| 11 | #Unless required by applicable law or agreed to in writing, software |
---|
| 12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 14 | #See the License for the specific language governing permissions and |
---|
| 15 | #limitations under the License. |
---|
| 16 | import sys, time, socket, threading, copy, pprint |
---|
| 17 | |
---|
| 18 | from hodlib.Common.hodsvc import hodBaseService |
---|
| 19 | from hodlib.Common.threads import loop |
---|
| 20 | from hodlib.Common.tcp import tcpSocket |
---|
| 21 | from hodlib.Common.util import get_exception_string |
---|
| 22 | import logging |
---|
| 23 | |
---|
| 24 | class svcrgy(hodBaseService): |
---|
| 25 | def __init__(self, config, log=None): |
---|
| 26 | hodBaseService.__init__(self, 'serviceRegistry', config) |
---|
| 27 | |
---|
| 28 | self.__serviceDict = {} |
---|
| 29 | self.__failCount = {} |
---|
| 30 | self.__released = {} |
---|
| 31 | self.__locked = {} |
---|
| 32 | |
---|
| 33 | self.__serviceDictLock = threading.Lock() |
---|
| 34 | self.RMErrorMsgs = None # Ringmaster error messages |
---|
| 35 | self.log = log |
---|
| 36 | if self.log is None: |
---|
| 37 | self.log = logging.getLogger() |
---|
| 38 | |
---|
| 39 | def __get_job_key(self, userid, job): |
---|
| 40 | return "%s-%s" % (userid, job) |
---|
| 41 | |
---|
| 42 | def _xr_method_registerService(self, userid, job, host, name, type, dict): |
---|
| 43 | return self.registerService(userid, job, host, name, type, dict) |
---|
| 44 | |
---|
| 45 | def _xr_method_getServiceInfo(self, userid=None, job=None, name=None, |
---|
| 46 | type=None): |
---|
| 47 | return self.getServiceInfo(userid, job, name, type) |
---|
| 48 | |
---|
| 49 | def _xr_method_setRMError(self, args): |
---|
| 50 | self.log.debug("setRMError called with %s" % args) |
---|
| 51 | self.RMErrorMsgs = args |
---|
| 52 | return True |
---|
| 53 | |
---|
| 54 | def _xr_method_getRMError(self): |
---|
| 55 | self.log.debug("getRMError called") |
---|
| 56 | if self.RMErrorMsgs is not None: |
---|
| 57 | return self.RMErrorMsgs |
---|
| 58 | else: |
---|
| 59 | self.log.debug("no Ringmaster error messages") |
---|
| 60 | return False |
---|
| 61 | |
---|
| 62 | def registerService(self, userid, job, host, name, type, dict): |
---|
| 63 | """Method thats called upon by |
---|
| 64 | the ringmaster to register to the |
---|
| 65 | the service registry""" |
---|
| 66 | lock = self.__serviceDictLock |
---|
| 67 | lock.acquire() |
---|
| 68 | try: |
---|
| 69 | self.logs['main'].debug("Registering %s.%s.%s.%s.%s..." % ( |
---|
| 70 | userid, job, host, name, type)) |
---|
| 71 | id = "%s.%s" % (name, type) |
---|
| 72 | |
---|
| 73 | if userid in self.__serviceDict: |
---|
| 74 | if job in self.__serviceDict[userid]: |
---|
| 75 | if host in self.__serviceDict[userid][job]: |
---|
| 76 | self.__serviceDict[userid][job][host].append( |
---|
| 77 | {id : dict,}) |
---|
| 78 | else: |
---|
| 79 | self.__serviceDict[userid][job][host] = [ |
---|
| 80 | {id : dict,},] |
---|
| 81 | else: |
---|
| 82 | self.__serviceDict[userid][job] = {host : [ |
---|
| 83 | { id : dict,},]} |
---|
| 84 | else: |
---|
| 85 | self.__serviceDict[userid] = {job : {host : [ |
---|
| 86 | { id : dict,},]}} |
---|
| 87 | |
---|
| 88 | finally: |
---|
| 89 | lock.release() |
---|
| 90 | |
---|
| 91 | return True |
---|
| 92 | |
---|
| 93 | def getXMLRPCAddr(self): |
---|
| 94 | """return the xml rpc server address""" |
---|
| 95 | return self._xrc.server_address |
---|
| 96 | |
---|
| 97 | def getServiceInfo(self, userid=None, job=None, name=None, type=None): |
---|
| 98 | """This method is called upon by others |
---|
| 99 | to query for a particular service returns |
---|
| 100 | a dictionary of elements""" |
---|
| 101 | |
---|
| 102 | self.logs['main'].debug("inside getServiceInfo: %s.%s.%s" % (userid, job, name)) |
---|
| 103 | retdict = {} |
---|
| 104 | lock = self.__serviceDictLock |
---|
| 105 | lock.acquire() |
---|
| 106 | try: |
---|
| 107 | if userid in self.__serviceDict: |
---|
| 108 | if job in self.__serviceDict[userid]: |
---|
| 109 | if name and type: |
---|
| 110 | retdict = [] |
---|
| 111 | id = "%s.%s" % (name, type) |
---|
| 112 | for host in self.__serviceDict[userid][job]: |
---|
| 113 | for dict in self.__serviceDict[userid][job][host]: |
---|
| 114 | [loopID, ] = dict.keys() |
---|
| 115 | if loopID == id: |
---|
| 116 | retdict.append(dict[id]) |
---|
| 117 | else: |
---|
| 118 | retdict = copy.deepcopy( |
---|
| 119 | self.__serviceDict[userid][job]) |
---|
| 120 | elif not job: |
---|
| 121 | retdict = copy.deepcopy(self.__serviceDict[userid]) |
---|
| 122 | elif not userid: |
---|
| 123 | retdict = copy.deepcopy(self.__serviceDict) |
---|
| 124 | finally: |
---|
| 125 | lock.release() |
---|
| 126 | |
---|
| 127 | return retdict |
---|