source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/GridServices/service.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.0 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16"""defines Service as abstract interface"""
17
18# -*- python -*-
19import random, socket
20
21class Service:
22  """ the service base class that all the
23  other services inherit from. """
24  def __init__(self, serviceDesc, workDirs):
25    self.serviceDesc = serviceDesc
26    self.workDirs = workDirs
27
28  def getName(self):
29    return self.serviceDesc.getName()
30
31  def getInfoAddrs(self):
32    """Return a list of addresses that provide
33    information about the servie"""
34    return []
35
36  def isLost(self):
37    """True if the service is down"""
38    raise NotImplementedError
39
40  def addNodes(self, nodeList):
41    """add nodeSet"""
42    raise NotImplementedError
43
44  def removeNodes(self, nodeList):
45    """remove a nodeset"""
46    raise NotImplementedError
47
48  def getWorkers(self):
49     raise NotImplementedError
50
51  def needsMore(self):
52    """return number of nodes the service wants to add"""
53    raise NotImplementedError
54
55  def needsLess(self):
56    """return number of nodes the service wants to remove"""
57    raise NotImplementedError
58
59class MasterSlave(Service):
60  """ the base class for a master slave
61  service architecture. """
62  def __init__(self, serviceDesc, workDirs,requiredNode):
63    Service.__init__(self, serviceDesc, workDirs)
64    self.launchedMaster = False
65    self.masterInitialized = False
66    self.masterAddress = 'none'
67    self.requiredNode = requiredNode
68    self.failedMsg = None
69    self.masterFailureCount = 0
70
71  def getRequiredNode(self):
72    return self.requiredNode
73 
74  def getMasterRequest(self):
75    """ the number of master you need
76    to run for this service. """
77    raise NotImplementedError
78 
79  def isLaunchable(self, serviceDict):
80    """ if your service does not depend on
81    other services. is set to true by default. """
82    return True
83 
84  def getMasterCommands(self, serviceDict):
85    """ a list of master commands you
86    want to run for this service. """
87    raise NotImplementedError
88
89  def getAdminCommands(self, serviceDict):
90    """ a list of admin commands you
91    want to run for this service. """
92    raise NotImplementedError
93
94  def getWorkerCommands(self, serviceDict):
95    """ a list of worker commands you want to
96    run for this service. """
97    raise NotImplementedError
98
99  def setMasterNodes(self, list):
100    """ set the status of master nodes
101    after they start running on a node cluster. """
102    raise NotImplementedError
103
104  def addNodes(self, list):
105    """ add nodes to a service. Not implemented
106    currently. """
107    raise NotImplementedError
108
109  def getMasterAddrs(self):
110    """ return the addresses of master. the
111    hostname:port to which worker nodes should
112    connect. """
113    raise NotImplementedError
114 
115  def setMasterParams(self, list):
116    """ set the various master params
117    depending on what each hodring set
118    the master params to. """
119    raise NotImplementedError
120
121  def setlaunchedMaster(self):
122    """ set the status of master launched
123    to true. """
124    self.launchedMaster = True
125
126  def isMasterLaunched(self):
127    """ return if a master has been launched
128    for the service or not. """
129    return self.launchedMaster
130
131  def isMasterInitialized(self):
132    """ return if a master if launched
133    has been initialized or not. """
134    return self.masterInitialized
135
136  def setMasterInitialized(self):
137    """ set the master initialized to
138    true. """
139    self.masterInitialized = True
140    # Reset failure related variables, as master is initialized successfully.
141    self.masterFailureCount = 0
142    self.failedMsg = None
143
144  def getMasterAddress(self):
145    """ it needs to change to reflect
146    more that one masters. Currently it
147    keeps a knowledge of where the master
148    was launched and to keep track if it was actually
149    up or not. """
150    return self.masterAddress
151
152  def setMasterAddress(self, addr):
153    self.masterAddress = addr
154
155  def isExternal(self):
156    return self.serviceDesc.isExternal()
157
158  def setMasterFailed(self, err):
159    """Sets variables related to Master failure"""
160    self.masterFailureCount += 1
161    self.failedMsg = err
162    # When command is sent to HodRings, this would have been set to True.
163    # Reset it to reflect the correct status.
164    self.launchedMaster = False
165
166  def getMasterFailed(self):
167    return self.failedMsg
168 
169  def getMasterFailureCount(self):
170    return self.masterFailureCount
171 
172class NodeRequest:
173  """ A class to define
174  a node request. """
175  def __init__(self, n, required = [], preferred = [], isPreemptee = True):
176    self.numNodes = n
177    self.preferred = preferred
178    self.isPreemptee = isPreemptee
179    self.required = required
180
181  def setNumNodes(self, n):
182    self.numNodes = n
183
184  def setPreferredList(self, list):
185    self.preferred = list
186
187  def setIsPreemptee(self, flag):
188    self.isPreemptee = flag
189
190
191class ServiceUtil:
192  """ this class should be moved out of
193  service.py to a util file"""
194  localPortUsed = {}
195   
196  def getUniqRandomPort(h=None, low=50000, high=60000, retry=900, log=None):
197    """This allocates a randome free port between low and high"""
198    # We use a default value of 900 retries, which takes an agreeable
199    # time limit of ~ 6.2 seconds to check 900 ports, in the worse case
200    # of no available port in those 900.
201
202    while retry > 0:
203      n = random.randint(low, high)
204      if n in ServiceUtil.localPortUsed:
205        continue
206      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
207      if not h:
208        h = socket.gethostname()
209      avail = False
210      if log: log.debug("Trying to see if port %s is available"% n)
211      try:
212        s.bind((h, n))
213        if log: log.debug("Yes, port %s is available" % n)
214        avail = True
215      except socket.error,e:
216        if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
217        retry -= 1
218        pass
219      # The earlier code that used to be here had syntax errors. The code path
220      # couldn't be followd anytime, so the error remained uncaught.
221      # This time I stumbled upon the error
222      s.close()
223
224      if avail:
225        ServiceUtil.localPortUsed[n] = True
226        return n
227    raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
228 
229  getUniqRandomPort = staticmethod(getUniqRandomPort)
230 
231  def getUniqPort(h=None, low=40000, high=60000, retry=900, log=None):
232    """get unique port on a host that can be used by service
233    This and its consumer code should disappear when master
234    nodes get allocatet by nodepool"""
235
236    # We use a default value of 900 retries, which takes an agreeable
237    # time limit of ~ 6.2 seconds to check 900 ports, in the worse case
238    # of no available port in those 900.
239
240    n  = low
241    while retry > 0:
242      n = n + 1
243      if n in ServiceUtil.localPortUsed:
244        continue
245      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
246      if not h:
247        h = socket.gethostname()
248      avail = False
249      if log: log.debug("Trying to see if port %s is available"% n)
250      try:
251        s.bind((h, n))
252        if log: log.debug("Yes, port %s is available" % n)
253        avail = True
254      except socket.error,e:
255        if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
256        retry -= 1
257        pass
258      s.close()
259
260      if avail:
261        ServiceUtil.localPortUsed[n] = True
262        return n
263
264    raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
265
266  getUniqPort = staticmethod(getUniqPort)
Note: See TracBrowser for help on using the repository browser.