1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
2 | #or more contributor license agreements. See the NOTICE file |
---|
3 | #distributed with this work for additional information |
---|
4 | #regarding copyright ownership. The ASF licenses this file |
---|
5 | #to you under the Apache License, Version 2.0 (the |
---|
6 | #"License"); you may not use this file except in compliance |
---|
7 | #with the License. You may obtain a copy of the License at |
---|
8 | |
---|
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | |
---|
11 | #Unless required by applicable law or agreed to in writing, software |
---|
12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
14 | #See the License for the specific language governing permissions and |
---|
15 | #limitations under the License. |
---|
16 | import sys, time, socket, threading, copy, pprint |
---|
17 | |
---|
18 | from hodlib.Common.hodsvc import hodBaseService |
---|
19 | from hodlib.Common.threads import loop |
---|
20 | from hodlib.Common.tcp import tcpSocket |
---|
21 | from hodlib.Common.util import get_exception_string |
---|
22 | import logging |
---|
23 | |
---|
24 | class svcrgy(hodBaseService): |
---|
25 | def __init__(self, config, log=None): |
---|
26 | hodBaseService.__init__(self, 'serviceRegistry', config) |
---|
27 | |
---|
28 | self.__serviceDict = {} |
---|
29 | self.__failCount = {} |
---|
30 | self.__released = {} |
---|
31 | self.__locked = {} |
---|
32 | |
---|
33 | self.__serviceDictLock = threading.Lock() |
---|
34 | self.RMErrorMsgs = None # Ringmaster error messages |
---|
35 | self.log = log |
---|
36 | if self.log is None: |
---|
37 | self.log = logging.getLogger() |
---|
38 | |
---|
39 | def __get_job_key(self, userid, job): |
---|
40 | return "%s-%s" % (userid, job) |
---|
41 | |
---|
42 | def _xr_method_registerService(self, userid, job, host, name, type, dict): |
---|
43 | return self.registerService(userid, job, host, name, type, dict) |
---|
44 | |
---|
45 | def _xr_method_getServiceInfo(self, userid=None, job=None, name=None, |
---|
46 | type=None): |
---|
47 | return self.getServiceInfo(userid, job, name, type) |
---|
48 | |
---|
49 | def _xr_method_setRMError(self, args): |
---|
50 | self.log.debug("setRMError called with %s" % args) |
---|
51 | self.RMErrorMsgs = args |
---|
52 | return True |
---|
53 | |
---|
54 | def _xr_method_getRMError(self): |
---|
55 | self.log.debug("getRMError called") |
---|
56 | if self.RMErrorMsgs is not None: |
---|
57 | return self.RMErrorMsgs |
---|
58 | else: |
---|
59 | self.log.debug("no Ringmaster error messages") |
---|
60 | return False |
---|
61 | |
---|
62 | def registerService(self, userid, job, host, name, type, dict): |
---|
63 | """Method thats called upon by |
---|
64 | the ringmaster to register to the |
---|
65 | the service registry""" |
---|
66 | lock = self.__serviceDictLock |
---|
67 | lock.acquire() |
---|
68 | try: |
---|
69 | self.logs['main'].debug("Registering %s.%s.%s.%s.%s..." % ( |
---|
70 | userid, job, host, name, type)) |
---|
71 | id = "%s.%s" % (name, type) |
---|
72 | |
---|
73 | if userid in self.__serviceDict: |
---|
74 | if job in self.__serviceDict[userid]: |
---|
75 | if host in self.__serviceDict[userid][job]: |
---|
76 | self.__serviceDict[userid][job][host].append( |
---|
77 | {id : dict,}) |
---|
78 | else: |
---|
79 | self.__serviceDict[userid][job][host] = [ |
---|
80 | {id : dict,},] |
---|
81 | else: |
---|
82 | self.__serviceDict[userid][job] = {host : [ |
---|
83 | { id : dict,},]} |
---|
84 | else: |
---|
85 | self.__serviceDict[userid] = {job : {host : [ |
---|
86 | { id : dict,},]}} |
---|
87 | |
---|
88 | finally: |
---|
89 | lock.release() |
---|
90 | |
---|
91 | return True |
---|
92 | |
---|
93 | def getXMLRPCAddr(self): |
---|
94 | """return the xml rpc server address""" |
---|
95 | return self._xrc.server_address |
---|
96 | |
---|
97 | def getServiceInfo(self, userid=None, job=None, name=None, type=None): |
---|
98 | """This method is called upon by others |
---|
99 | to query for a particular service returns |
---|
100 | a dictionary of elements""" |
---|
101 | |
---|
102 | self.logs['main'].debug("inside getServiceInfo: %s.%s.%s" % (userid, job, name)) |
---|
103 | retdict = {} |
---|
104 | lock = self.__serviceDictLock |
---|
105 | lock.acquire() |
---|
106 | try: |
---|
107 | if userid in self.__serviceDict: |
---|
108 | if job in self.__serviceDict[userid]: |
---|
109 | if name and type: |
---|
110 | retdict = [] |
---|
111 | id = "%s.%s" % (name, type) |
---|
112 | for host in self.__serviceDict[userid][job]: |
---|
113 | for dict in self.__serviceDict[userid][job][host]: |
---|
114 | [loopID, ] = dict.keys() |
---|
115 | if loopID == id: |
---|
116 | retdict.append(dict[id]) |
---|
117 | else: |
---|
118 | retdict = copy.deepcopy( |
---|
119 | self.__serviceDict[userid][job]) |
---|
120 | elif not job: |
---|
121 | retdict = copy.deepcopy(self.__serviceDict[userid]) |
---|
122 | elif not userid: |
---|
123 | retdict = copy.deepcopy(self.__serviceDict) |
---|
124 | finally: |
---|
125 | lock.release() |
---|
126 | |
---|
127 | return retdict |
---|