source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/Common/socketServers.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 23.6 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16# Various socket server and helper classes.
17#
18#
19import os, sys, socket, threading, pprint, re, xmlrpclib, time
20 
21from select import select
22from SocketServer import ThreadingMixIn, ForkingMixIn
23from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
24from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
25from SimpleHTTPServer import SimpleHTTPRequestHandler
26from random import Random
27from urlparse import urlparse
28
29Fault = xmlrpclib.Fault
30
31from hodlib.Common.util import local_fqdn
32from hodlib.Common.logger import hodDummyLogger
33
34class hodHTTPHandler(BaseHTTPRequestHandler):
35  port = -1
36
37  def __init__(self, request, client_address, server, registerService):
38    self.registerService = registerService
39    BaseHTTPRequestHandler.__init__(self, request, client_address, server)
40 
41  def log_message(self, *args):
42    """Forget logging for now."""
43   
44    pass
45     
46  def do_GET(self):
47    self.fullUrl = "http://%s:%s%s" % (self.server.server_address[0],
48                                       self.server.server_address[1], 
49                                       self.path)
50   
51    parsedUrl = urlparse(self.fullUrl)
52    self.writeHeaders()
53    self.writeData(parsedUrl)
54 
55  def w(self, string):
56    self.wfile.write("%s\n" % string)
57 
58  def writeHeaders(self):
59   self.send_response(200, 'OK')
60   self.send_header('Content-type', 'text/html')
61   self.end_headers()   
62     
63  def sendWrongPage(self, userJob):
64    self.w('<font class="alert">')
65    if userJob == False:
66      self.w('invalid URL specified')   
67    elif re.match("^\d+$", userJob):
68      self.w('invalid URL specified, job <b>%s</b> does not exist' % userJob)
69    elif re.match("^\w+$", userJob):
70      self.w('invalid URL specified, user <b>%s</b> does not exist' % userJob) 
71    self.w('</font>')
72   
73  def getServiceHosts(self, serviceInfo):
74    hostInfo = { 'long' : {}, 'short' : {} }
75    for user in serviceInfo:
76      for job in serviceInfo[user]:
77        for host in serviceInfo[user][job]:
78          for serviceItem in serviceInfo[user][job][host]:
79            serviceName = serviceItem.keys()
80            serviceName = serviceName[0]
81            if isinstance(serviceItem[serviceName], str):
82              hostInfo['short'][self.getJobKey(user, job, host)] = True
83            hostInfo['long'][self.getJobKey(user, job, host)] = True
84   
85    return hostInfo
86
87  def getJobInfo(self, job, serviceInfo):
88    jobInfo = {}
89   
90    for user in serviceInfo.keys():
91      for someJob in serviceInfo[user].keys():
92        if job == someJob:
93          jobInfo[user] = { job : serviceInfo[user][job] }
94   
95    return jobInfo
96 
97  def getJobKey(self, user, job, host):
98    return "%s-%s-%s" % (user, job, host)
99 
100  def writeData(self, parsedUrl):
101    options = parsedUrl[4]
102    serviceInfo = self.server.service.getServiceInfo()
103    users = serviceInfo.keys()
104    users.sort()
105
106    self.w("<html>")
107    self.w("<body>")
108    self.w("<head>")
109    self.writeCSS()
110    self.w("</head>")
111    self.w('<font class="header2">HOD Service Registry Information</font>')
112    if serviceInfo == {}:
113      self.w('<br><br><font class="header">&nbsp;&nbsp;No HOD clusters configured.</font>')
114    else:
115      if parsedUrl[2] == '/':
116        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
117        count = 0
118        for user in users:
119          self.writeUserData(user, options, serviceInfo, count)
120          count = count + 1
121      elif parsedUrl[2][1:] in serviceInfo:
122        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
123        self.writeUserData(parsedUrl[2][1:], options, serviceInfo, 0)
124      elif re.match("^\d+$", parsedUrl[2][1:]):
125        jobInfo = self.getJobInfo(parsedUrl[2][1:], serviceInfo)
126        if jobInfo.keys():
127          self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
128          for user in jobInfo.keys():
129            self.writeUserData(user, options, jobInfo, 0)   
130        else:
131          self.sendWrongPage(parsedUrl[2][1:]) 
132          self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
133          count = 0
134          for user in users:
135            self.writeUserData(user, options, serviceInfo, count)
136            count = count + 1
137      elif re.match("^\w+$", parsedUrl[2][1:]):
138        self.sendWrongPage(parsedUrl[2][1:]) 
139        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
140        count = 0
141        for user in users:
142          self.writeUserData(user, options, serviceInfo, count)
143          count = count + 1       
144      else:
145        self.sendWrongPage(False) 
146        self.w('&nbsp;&nbsp;&nbsp;<table class="main">')
147        count = 0
148        for user in users:
149          self.writeUserData(user, options, serviceInfo, count)
150          count = count + 1
151
152    self.w('</table>')
153    self.w("</pre>")
154    self.w("</body>")
155    self.w("</html>")
156
157  def writeCSS(self):
158    self.w('<style type="text/css">')
159   
160    self.w('table.main { border: 0px; padding: 1; background-color: #E1ECE0; width: 70%; margin: 10; }')
161    self.w('table.sub1 { background-color: #F1F1F1; padding: 0; }')
162    self.w('table.sub2 { background-color: #FFFFFF; padding: 0; }')
163    self.w('table.sub3 { border: 1px solid #EEEEEE; background-color: #FFFFFF; padding: 0; }')
164    self.w('td.header { border-bottom: 1px solid #CCCCCC; padding: 2;}')
165    self.w('td.service1 { border: 0px; background-color: #FFFFFF; padding: 2; width: 10%}')
166    self.w('td.service2 { border: 0px; background-color: #FFFFFF; padding: 2; width: 90%}')
167    self.w('td { vertical-align: top; padding: 0; }')
168    self.w('td.noborder { border-style: none; border-collapse: collapse; }')
169    self.w('tr.colored { background-color: #F1F1F1; }')
170    self.w('font { font-family: Helvetica, Arial, sans-serif; font-size: 10pt; color: #666666; }')
171    self.w('font.header { font-family: Helvetica, Arial, sans-serif;  font-size: 10pt; color: #333333; font-style: bold }')
172    self.w('font.header2 { font-family: Helvetica, Arial, sans-serif; font-size: 16pt; color: #333333; }')
173    self.w('font.sml { font-family: Helvetica, Arial, sans-serif; font-size: 8pt; color: #666666; }')
174    self.w('font.alert { font-family: Helvetica, Arial, sans-serif; font-size: 9pt; color: #FF7A22; }')
175    self.w('a { font-family: Helvetica, Arial, sans-serif; text-decoration:none; font-size: 10pt; color: #111111; }')
176    self.w('a:visited { font-family: Helvetica, Arial, sans-serif; color:#2D4628; text-decoration:none; font-size: 10pt; }')
177    self.w('a:hover { font-family: Helvetica, Arial, sans-serif; color:#00A033; text-decoration:none; font-size: 10pt; }')
178    self.w('a.small { font-family:  Helvetica, Arial, sans-serif; text-decoration:none; font-size: 8pt }')
179    self.w('a.small:hover { color:#822499; text-decoration:none; font-size: 8pt }')
180
181    self.w("</style>")
182
183  def writeUserData(self, user, options, serviceInfo, count):
184    hostInfo = self.getServiceHosts(serviceInfo)
185    hostKey = 'short'
186    if options == 'display=long':
187      hostKey = 'long'
188
189    if count == 0:
190      self.w('<tr>')
191      self.w('<td class="header" colspan="2">')
192      self.w('<font class="header">Active Users</font>')
193      self.w('</td>')
194      self.w('</tr>')
195    self.w('<tr>')
196    self.w('<td><font>%s</font></td>' % user)
197    self.w('<td>')
198    jobIDs = serviceInfo[user].keys()
199    jobIDs.sort()
200    for jobID in jobIDs: 
201      self.w('<table class="sub1" width="100%">')
202      if count == 0:
203        self.w('<tr>')
204        self.w('<td class="header" colspan="2">')
205        self.w('<font class="header">PBS Job Identifiers</font>')
206        self.w('</td>')
207        self.w('</tr>')       
208      self.w('<tr>')
209      self.w('<td><font>%s</font></td>' % jobID)
210      self.w('<td>')
211      hosts = serviceInfo[user][jobID].keys()
212      hosts.sort()
213      for host in hosts:
214        if hostInfo[hostKey].has_key(self.getJobKey(user, jobID, host)):
215          self.w('<table class="sub2" width="100%">')
216          if count == 0:
217            self.w('<tr>')
218            self.w('<td class="header" colspan="2">')
219            self.w('<font class="header">Hosts Running Services</font>')
220            self.w('</td>')
221            self.w('</tr>') 
222          self.w('<tr>')
223          self.w('<td><font>%s</font></td>' % host)
224          self.w('<td>')
225          self.w('<table class="sub3" width="100%">')
226          self.w('<tr>')
227          self.w('<td colspan="2">')
228          self.w('<font class="header">Service Information</font>')
229          self.w('</td>')
230          self.w('</tr>') 
231          for serviceItem in serviceInfo[user][jobID][host]:
232            serviceName = serviceItem.keys()
233            serviceName = serviceName[0]
234            if isinstance(serviceItem[serviceName], dict) and \
235              options == 'display=long':
236              self.w('<tr class="colored">')
237              self.w('<td><font>%s</font></td>' % serviceName)
238              self.w('<td>')
239              self.w('<table width="100%">')
240              for key in serviceItem[serviceName]:
241                self.w('<tr>')
242                self.w('<td class="service1"><font>%s</font></td>' % key)
243                self.w('<td class="service2"><font>%s</font></td>' % serviceItem[serviceName][key])
244                self.w('</tr>')
245              self.w('</table>')
246              self.w('</td>')
247              self.w('</tr>')
248            elif isinstance(serviceItem[serviceName], str):
249              self.w('<tr class="colored">')
250              self.w('<td><font class="service1">%s</font></td>' % serviceName)
251              self.w('<td>')
252              (host, port) = serviceItem[serviceName].split(':')
253              hostnameInfo = socket.gethostbyname_ex(host)
254              if serviceName.startswith('mapred'):
255                self.w('<a href="http://%s:%s">Hadoop Job Tracker</a>' % (hostnameInfo[0], port))
256              elif serviceName.startswith('hdfs'):
257                self.w('<a href="http://%s:%s">HDFS Name Node</a>&nbsp' % (hostnameInfo[0], port))
258              else:
259                self.w('<font class="service2">%s</font>' % serviceItem[serviceName])
260              self.w('</td>')
261              self.w('</tr>')
262          self.w('</table>')   
263          self.w('</td>')
264          self.w('</tr>')
265          self.w('</table>')
266          count = count + 1
267      self.w('</td>') 
268      self.w('</tr>')
269      self.w('</table>')
270      count = count + 1
271    self.w('</td>')
272    self.w('</tr>')
273#    self.w("<pre>")
274#    self.w(pprint.pformat(serviceInfo))
275#    self.w("</pre>")
276   
277class baseSocketServer:
278    def __init__(self, host, ports):
279        self.host = host
280        self.ports = ports
281        self.__stopForever = threading.Event()
282        self.__stopForever.clear()
283        self.__run = threading.Event()
284        self.__run.set()   
285        self.server_address = ()
286        self.mThread = None
287       
288    def server_bind(self):
289        """server_bind() method binds to a random range of ports."""
290
291        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
292
293        if len(self.ports) > 1:
294            randomPort = Random(os.getpid())
295            portSequence = range(self.ports[0], self.ports[1])
296
297            maxTryCount = abs(self.ports[0] - self.ports[1])
298            tryCount = 0
299            while True:
300                somePort = randomPort.choice(portSequence)
301                self.server_address = (self.host, somePort)
302                try:
303                    self.socket.bind(self.server_address)
304                except socket.gaierror, errData:
305                    raise socket.gaierror, errData
306                except:
307                    tryCount = tryCount + 1
308                    if tryCount > maxTryCount:
309                        bindError = "bind failure for port range %s:%d" % (
310                            self.ports)
311
312                        raise socket.error, bindError
313                else:
314                    break
315        else:
316            self.server_address = (self.host, int(self.ports[0]))
317            self.socket.bind(self.server_address)
318       
319        if self.host == '':
320            self.server_address = (local_fqdn(), self.server_address[1])
321
322    def _serve_forever(self):
323        """Replacement for serve_forever loop.
324       
325           All baseSocketServers run within a master thread; that thread
326           imitates serve_forever, but checks an event (self.__stopForever)
327           before processing new connections.
328        """
329       
330        while not self.__stopForever.isSet():
331            (rlist, wlist, xlist) = select([self.socket], [], [], 
332                                           1)
333           
334            if (len(rlist) > 0 and self.socket == rlist[0]):
335                self.handle_request()
336       
337            while not self.__run.isSet():
338                if self.__stopForever.isSet():
339                    break
340                time.sleep(1)
341       
342        self.server_close()
343       
344        return True
345
346    def serve_forever(self):
347        """Handle requests until stopForever event flag indicates stop."""
348
349        self.mThread = threading.Thread(name="baseSocketServer", 
350                                        target=self._serve_forever)
351        self.mThread.start()
352
353        return self.mThread
354
355    def pause(self):
356        """Temporarily stop servicing requests."""
357
358        self.__run.clear()
359
360    def cont(self):
361        """Resume servicing requests."""
362
363        self.__run.set()
364
365    def stop(self):
366        """Set the stopForever flag to tell serve_forever() to exit."""
367   
368        self.__stopForever.set()
369        if self.mThread: self.mThread.join()
370        return True
371
372    def is_alive(self):
373        if self.mThread != None:
374            return self.mThread.isAlive()
375        else:
376            return False
377
378class threadedHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer):
379    def __init__(self, host, ports):
380        baseSocketServer.__init__(self, host, ports)
381        HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)
382
383class forkingHTTPServer(baseSocketServer, ForkingMixIn, HTTPServer):
384    def __init__(self, host, ports):
385        baseSocketServer.__init__(self, host, ports)
386        HTTPServer.__init__(self, self.server_address, SimpleHTTPRequestHandler)
387
388class hodHTTPServer(baseSocketServer, ThreadingMixIn, HTTPServer):
389    service = None 
390    def __init__(self, host, ports, serviceobj = None):
391        self.service = serviceobj
392        baseSocketServer.__init__(self, host, ports)
393        HTTPServer.__init__(self, self.server_address, hodHTTPHandler)
394
395    def finish_request(self, request, client_address):
396        self.RequestHandlerClass(request, client_address, self, self.service)
397       
398class hodXMLRPCServer(baseSocketServer, ThreadingMixIn, SimpleXMLRPCServer):
399    def __init__(self, host, ports, 
400                 requestHandler=SimpleXMLRPCRequestHandler, 
401                 logRequests=False, allow_none=False, encoding=None):
402        baseSocketServer.__init__(self, host, ports)
403        SimpleXMLRPCServer.__init__(self, self.server_address, requestHandler, 
404                                    logRequests)
405       
406        self.register_function(self.stop, 'stop')
407
408try:
409    from twisted.web import server, xmlrpc
410    from twisted.internet import reactor, defer
411    from twisted.internet.threads import deferToThread
412    from twisted.python import log
413               
414    class twistedXMLRPC(xmlrpc.XMLRPC):
415        def __init__(self, logger):
416            xmlrpc.XMLRPC.__init__(self)
417           
418            self.__XRMethods = {}
419            self.__numRequests = 0
420            self.__logger = logger
421            self.__pause = False
422   
423        def render(self, request):
424            request.content.seek(0, 0)
425            args, functionPath = xmlrpclib.loads(request.content.read())
426            try:
427                function = self._getFunction(functionPath)
428            except Fault, f:
429                self._cbRender(f, request)
430            else:
431                request.setHeader("content-type", "text/xml")
432                defer.maybeDeferred(function, *args).addErrback(
433                    self._ebRender).addCallback(self._cbRender, request)
434           
435            return server.NOT_DONE_YET
436   
437        def _cbRender(self, result, request):
438            if isinstance(result, xmlrpc.Handler):
439                result = result.result
440            if not isinstance(result, Fault):
441                result = (result,)
442            try:
443                s = xmlrpclib.dumps(result, methodresponse=1)
444            except:
445                f = Fault(self.FAILURE, "can't serialize output")
446                s = xmlrpclib.dumps(f, methodresponse=1)
447            request.setHeader("content-length", str(len(s)))
448            request.write(s)
449            request.finish()
450     
451        def _ebRender(self, failure):
452            if isinstance(failure.value, Fault):
453                return failure.value
454            log.err(failure)
455            return Fault(self.FAILURE, "error")
456       
457        def _getFunction(self, methodName):
458            while self.__pause:
459                time.sleep(1)
460           
461            self.__numRequests = self.__numRequests + 1
462            function = None
463            try:
464                def defer_function(*args):
465                    return deferToThread(self.__XRMethods[methodName], 
466                                         *args)
467                function = defer_function
468                self.__logger.info(
469                    "[%s] processing defered XML-RPC call to: %s ..." % 
470                    (self.__numRequests, methodName))           
471            except KeyError:
472                self.__logger.warn(
473                    "[%s] fault %s on XML-RPC call to %s, method not found." % (
474                    self.__numRequests, self.NOT_FOUND, methodName))
475                raise xmlrpc.NoSuchFunction(self.NOT_FOUND, 
476                                            "method %s not found" % methodName)
477           
478            return function
479       
480        def register_function(self, functionRef, methodName):
481            self.__XRMethods[methodName] = functionRef
482           
483        def list_methods(self):
484            return self.__XRMethods.keys()
485       
486        def num_requests(self):
487            return self.__numRequests
488       
489        def pause(self):
490            self.__pause = True
491       
492        def cont(self):
493            self.__pause = False
494           
495    class twistedXMLRPCServer:
496        def __init__(self, host, ports, logger=None, threadPoolSize=100):
497            self.__host = host
498            self.__ports = ports
499           
500            if logger == None:
501                logger = hodDummyLogger()
502           
503            self.__logger = logger
504               
505            self.server_address = ['', '']
506            reactor.suggestThreadPoolSize(threadPoolSize)   
507   
508            self.__stopForever = threading.Event()
509            self.__stopForever.clear()
510            self.__mThread = None
511               
512            self.__xmlrpc = twistedXMLRPC(self.__logger)
513               
514        def _serve_forever(self):
515            if len(self.__ports) > 1:
516                randomPort = Random(os.getpid())
517                portSequence = range(self.__ports[0], self.__ports[1])
518   
519                maxTryCount = abs(self.__ports[0] - self.__ports[1])
520                tryCount = 0
521                while True:
522                    somePort = randomPort.choice(portSequence)
523                    self.server_address = (self.__host, int(somePort))
524                    if self.__host == '':
525                        self.server_address = (local_fqdn(), self.server_address[1])
526                    try:
527                        reactor.listenTCP(int(somePort), server.Site(
528                            self.__xmlrpc), interface=self.__host)
529                        reactor.run(installSignalHandlers=0)
530                    except:
531                        self.__logger.debug("Failed to bind to: %s:%s." % (
532                            self.__host, somePort))
533                        tryCount = tryCount + 1
534                        if tryCount > maxTryCount:
535                            self.__logger.warn("Failed to bind to: %s:%s" % (
536                                self.__host, self.__ports))
537                            sys.exit(1)
538                    else:
539                        break
540            else:
541                try:
542                    self.server_address = (self.__host, int(self.__ports[0]))
543                    if self.__host == '':
544                        self.server_address = (local_fqdn(), self.server_address[1])
545                    reactor.listenTCP(int(self.__ports[0]), server.Site(self.__xmlrpc), 
546                                      interface=self.__host)
547                    reactor.run(installSignalHandlers=0)
548                except:
549                    self.__logger.warn("Failed to bind to: %s:%s."% (
550                            self.__host, self.__ports[0]))
551                    sys.exit(1)
552           
553        def serve_forever(self):
554            """Handle requests until stopForever event flag indicates stop."""
555   
556            self.__mThread = threading.Thread(name="XRServer",
557                                              target=self._serve_forever)
558            self.__mThread.start()
559           
560            if not self.__mThread.isAlive():
561                raise Exception("Twisted XMLRPC server thread dead.")
562                   
563        def register_function(self, functionRef, methodName):
564            self.__xmlrpc.register_function(functionRef, methodName)
565       
566        def register_introspection_functions(self):
567            pass
568       
569        def register_instance(self, instance):
570            for method in dir(instance):
571                if not method.startswith('_'):
572                    self.register_function(getattr(instance, method), method)
573       
574        def pause(self):
575            self.__xmlrpc.pause()
576       
577        def cont(self):
578            self.__xmlrpc.cont()
579       
580        def stop(self):
581            def stop_thread():
582                time.sleep(2)
583                reactor.stop()
584               
585            self.__stopForever.set()
586           
587            stopThread = threading.Thread(name='XRStop', target=stop_thread)
588            stopThread.start()
589               
590            return True
591           
592        def is_alive(self):
593            status = False
594            if reactor.running == 1:
595                status = True
596           
597            return status
598       
599        def status(self):
600            """Return status information on running XMLRPC Server."""
601            stat = { 'XR server address'     : self.server_address,
602                     'XR methods'            : self.system_listMethods(),
603                     'XR server alive'       : self.is_alive(),
604                     'XR requests processed' : self.__xmlrpc.num_requests(),
605                     'XR server stop flag'   : self.__stopForever.isSet()}
606            return(stat)
607       
608        def system_listMethods(self):
609            return self.__xmlrpc.list_methods()
610       
611        def get_server_address(self):
612            waitCount = 0
613            while self.server_address == '':
614                if waitCount == 9:
615                    break 
616                time.sleep(1)
617                waitCount = waitCount + 1
618               
619            return self.server_address
620except ImportError:
621    pass
Note: See TracBrowser for help on using the repository browser.