source: proiecte/HadoopJUnit/hadoop-0.20.1/src/contrib/failmon/bin/scheduler.py @ 176

Last change on this file since 176 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.5 KB
Line 
1#!/usr/bin/python
2
3# Licensed to the Apache Software Foundation (ASF) under one
4# or more contributor license agreements.  See the NOTICE file
5# distributed with this work for additional information
6# regarding copyright ownership.  The ASF licenses this file
7# to you under the Apache License, Version 2.0 (the
8# "License"); you may not use this file except in compliance
9# with the License.  You may obtain a copy of the License at
10#
11#     http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18
19
20# Schedule FailMon execution for nodes of file hosts.list, according to
21# the properties file conf/global.config.
22
23import time
24import ConfigParser
25import subprocess
26import threading
27import random
28
29jobs = []
30username = "user"
31connections = 10
32failmonDir = ""
33maxFiles = 100
34
35# This class represents a thread that connects to a set of cluster
36# nodes to locally execute monitoring jobs. These jobs are specified
37# as a shell command in the constructor.
38class sshThread (threading.Thread):
39
40    def __init__(self, threadname, username, command, failmonDir):
41        threading.Thread.__init__(self)
42        self.name = threadname
43        self.username = username
44        self.command = command
45        self.failmonDir = failmonDir
46        self.hosts = []
47
48    def addHost(self, host):
49        self.hosts.append(host)
50       
51    def run (self):
52        for host in self.hosts:
53            toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
54            print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...",
55            subprocess.check_call(toRun)
56            print "Done!"
57
58# This class represents a monitoring job. The param member is a string
59# that can be passed in the '--only' list of jobs given to the Java
60# class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
61# node.
62class Job:
63    def __init__(self, param, interval):
64        self.param = param
65        self.interval = interval
66        self.counter = interval
67        return
68
69    def reset(self):
70        self.counter = self.interval
71
72# This function reads the configuration file to get the values of the
73# configuration parameters.
74def getJobs(file):
75    global username
76    global connections
77    global jobs
78    global failmonDir
79    global maxFiles
80   
81    conf = ConfigParser.SafeConfigParser()
82    conf.read(file)
83
84    username = conf.get("Default", "ssh.username")
85    connections = int(conf.get("Default", "max.connections"))
86    failmonDir = conf.get("Default", "failmon.dir")
87    maxFiles = conf.get("Default", "hdfs.files.max")
88   
89    # Hadoop Log
90    interval = int(conf.get("Default", "log.hadoop.interval"))
91
92    if interval != 0:
93        jobs.append(Job("hadoopLog", interval))
94
95    # System Log
96    interval = int(conf.get("Default", "log.system.interval"))
97
98    if interval != 0:
99        jobs.append(Job("systemLog", interval))
100
101    # NICs
102    interval = int(conf.get("Default", "nics.interval"))
103
104    if interval != 0:
105        jobs.append(Job("nics", interval))
106
107    # CPU
108    interval = int(conf.get("Default", "cpu.interval"))
109
110    if interval != 0:
111        jobs.append(Job("cpu", interval))
112
113    # CPU
114    interval = int(conf.get("Default", "disks.interval"))
115
116    if interval != 0:
117        jobs.append(Job("disks", interval))
118
119    # sensors
120    interval = int(conf.get("Default", "sensors.interval"))
121
122    if interval != 0:
123        jobs.append(Job("sensors", interval))
124
125    # upload
126    interval = int(conf.get("Default", "upload.interval"))
127
128    if interval != 0:
129        jobs.append(Job("upload", interval))
130
131    return
132
133
134# Compute the gcd (Greatest Common Divisor) of two integerss
135def GCD(a, b):
136    assert isinstance(a, int)
137    assert isinstance(b, int)
138
139    while a:
140        a, b = b%a, a
141
142    return b
143
144# Compute the gcd (Greatest Common Divisor) of a list of integers
145def listGCD(joblist):
146    assert isinstance(joblist, list)
147
148    if (len(joblist) == 1):
149        return joblist[0].interval
150
151    g = GCD(joblist[0].interval, joblist[1].interval)
152
153    for i in range (2, len(joblist)):
154        g = GCD(g, joblist[i].interval)
155       
156    return g
157
158# Merge all failmon files created on the HDFS into a single file
159def mergeFiles():
160    global username
161    global failmonDir
162    hostList = []
163    hosts = open('./conf/hosts.list', 'r')
164    for host in hosts:
165        hostList.append(host.strip().rstrip())
166    randomHost = random.sample(hostList, 1)
167    mergeCommand = "bin/failmon.sh --mergeFiles"
168    toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
169    print "Invoking command on", randomHost, ":\t", mergeCommand, "...",
170    subprocess.check_call(toRun)
171    print "Done!"
172    return
173
174# The actual scheduling is done here
175def main():
176    getJobs("./conf/global.config")
177
178    for job in jobs:
179        print "Configuration: ", job.param, "every", job.interval, "seconds"
180       
181    globalInterval = listGCD(jobs)
182       
183    while True :
184        time.sleep(globalInterval)
185        params = []
186       
187        for job in jobs:
188            job.counter -= globalInterval
189           
190            if (job.counter <= 0):
191                params.append(job.param)
192                job.reset()
193               
194        if (len(params) == 0):
195            continue;
196                   
197        onlyStr = "--only " + params[0]
198        for i in range(1, len(params)):
199            onlyStr += ',' + params[i] 
200               
201        command = "bin/failmon.sh " + onlyStr
202
203        # execute on all nodes
204        hosts = open('./conf/hosts.list', 'r')
205        threadList = []
206        # create a thread for every connection
207        for i in range(0, connections):
208            threadList.append(sshThread(i, username, command, failmonDir))
209
210        # assign some hosts/connections hosts to every thread
211        cur = 0;
212        for host in hosts:
213            threadList[cur].addHost(host.strip().rstrip())
214            cur += 1
215            if (cur == len(threadList)):
216                cur = 0   
217
218        for ready in threadList:
219            ready.start()
220
221        for ssht in threading.enumerate():
222            if ssht != threading.currentThread():
223                ssht.join()
224
225        # if an upload has been done, then maybe we need to merge the
226        # HDFS files
227        if "upload" in params:
228            mergeFiles()
229
230    return
231
232
233if __name__ == '__main__':
234    main()
235
Note: See TracBrowser for help on using the repository browser.