source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/ServiceRegistry/serviceRegistry.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.9 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16import sys, time, socket, threading, copy, pprint
17
18from hodlib.Common.hodsvc import hodBaseService
19from hodlib.Common.threads import loop
20from hodlib.Common.tcp import tcpSocket
21from hodlib.Common.util import get_exception_string
22import logging
23
24class svcrgy(hodBaseService):
25    def __init__(self, config, log=None):
26        hodBaseService.__init__(self, 'serviceRegistry', config)
27       
28        self.__serviceDict = {}
29        self.__failCount = {}
30        self.__released = {}
31        self.__locked = {}
32       
33        self.__serviceDictLock = threading.Lock()
34        self.RMErrorMsgs = None # Ringmaster error messages
35        self.log = log
36        if self.log is None:
37          self.log = logging.getLogger()
38   
39    def __get_job_key(self, userid, job):
40        return "%s-%s" % (userid, job)
41   
42    def _xr_method_registerService(self, userid, job, host, name, type, dict):
43       return self.registerService(userid, job, host, name, type, dict)
44   
45    def _xr_method_getServiceInfo(self, userid=None, job=None, name=None, 
46                                  type=None):
47        return self.getServiceInfo(userid, job, name, type)
48
49    def _xr_method_setRMError(self, args):
50        self.log.debug("setRMError called with %s" % args)
51        self.RMErrorMsgs = args
52        return True
53
54    def _xr_method_getRMError(self):
55        self.log.debug("getRMError called")
56        if self.RMErrorMsgs is not None:
57          return self.RMErrorMsgs
58        else:
59          self.log.debug("no Ringmaster error messages")
60          return False
61
62    def registerService(self, userid, job, host, name, type, dict):
63        """Method thats called upon by
64        the ringmaster to register to the
65        the service registry"""
66        lock = self.__serviceDictLock
67        lock.acquire()
68        try:
69            self.logs['main'].debug("Registering %s.%s.%s.%s.%s..." % (
70                                    userid, job, host, name, type))   
71            id = "%s.%s" % (name, type) 
72   
73            if userid in self.__serviceDict:
74                if job in self.__serviceDict[userid]:
75                     if host in self.__serviceDict[userid][job]:
76                          self.__serviceDict[userid][job][host].append(
77                              {id : dict,})
78                     else:
79                        self.__serviceDict[userid][job][host] = [
80                            {id : dict,},] 
81                else:
82                    self.__serviceDict[userid][job] = {host : [
83                                                       { id : dict,},]}
84            else:   
85                self.__serviceDict[userid] = {job : {host : [
86                                                     { id : dict,},]}}
87
88        finally:
89            lock.release()
90           
91        return True
92   
93    def getXMLRPCAddr(self):
94      """return the xml rpc server address"""
95      return self._xrc.server_address
96   
97    def getServiceInfo(self, userid=None, job=None, name=None, type=None):
98        """This method is called upon by others
99        to query for a particular service returns
100        a dictionary of elements"""
101       
102        self.logs['main'].debug("inside getServiceInfo: %s.%s.%s" % (userid, job, name))
103        retdict = {}
104        lock = self.__serviceDictLock
105        lock.acquire()
106        try:
107            if userid in self.__serviceDict:
108                if job in self.__serviceDict[userid]:
109                    if name and type:
110                        retdict = []
111                        id = "%s.%s" % (name, type)
112                        for host in self.__serviceDict[userid][job]:
113                            for dict in self.__serviceDict[userid][job][host]:
114                                [loopID, ] = dict.keys()
115                                if loopID == id:
116                                    retdict.append(dict[id])
117                    else:
118                        retdict = copy.deepcopy(
119                            self.__serviceDict[userid][job])
120                elif not job:
121                    retdict = copy.deepcopy(self.__serviceDict[userid])
122            elif not userid:
123                retdict = copy.deepcopy(self.__serviceDict)
124        finally:
125            lock.release()
126       
127        return retdict
Note: See TracBrowser for help on using the repository browser.