source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/testing/testHod.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.1 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16import unittest, getpass, os, sys, re, threading, time
17
18myDirectory = os.path.realpath(sys.argv[0])
19rootDirectory   = re.sub("/testing/.*", "", myDirectory)
20
21sys.path.append(rootDirectory)
22
23import tempfile
24from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
25from hodlib.Hod.hod import hodRunner, hodState
26from hodlib.Common.desc import NodePoolDesc
27
28excludes = []
29
30# Information about all clusters is written to a file called clusters.state.
31from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, \
32                           INVALID_STATE_FILE_MSGS
33
34# Temp directory prefix
35TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
36
37# build a config object with all required keys for initializing hod.
38def setupConf():
39  cfg = {
40          'hod' : {
41                    'original-dir' : os.getcwd(),
42                    'stream' : True,
43                    # store all the info about clusters in this directory
44                    'user_state' : '/tmp/hodtest',
45                    'debug' : 3,
46                    'java-home' : os.getenv('JAVA_HOME'),
47                    'cluster' : 'dummy',
48                    'cluster-factor' : 1.8,
49                    'xrs-port-range' : (32768,65536),
50                    'allocate-wait-time' : 3600,
51                    'temp-dir' : '/tmp/hod'
52                  },
53          # just set everything to dummy. Need something to initialize the
54          # node pool description object.
55          'resource_manager' : {
56                                 'id' : 'dummy',
57                                 'batch-home' : 'dummy',
58                                 'queue' : 'dummy',
59                               }
60        }
61  cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
62  return cfg
63
64# Test class that defines methods to test invalid arguments to hod operations.
65class test_InvalidArgsOperations(unittest.TestCase):
66  def setUp(self):
67
68    self.cfg = setupConf()
69    # initialize the mock objects
70    self.log = MockLogger()
71    self.cluster = MockHadoopCluster()
72
73    # Use the test logger. This will be used for test verification.
74    self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
75    # Create the hodState object to set the test state you want.
76    self.state = hodState(self.cfg['hod']['user_state'])
77    if not os.path.exists(self.cfg['hod']['user_state']):
78      os.path.mkdir(self.cfg['hod']['user_state'])
79    p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
80    # ensure cluster data file exists, so write works in the tests.
81    f = open(p, 'w')
82    f.close()
83 
84  def tearDown(self):
85    # clean up cluster data file and directory
86    p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
87    os.remove(p)
88    os.rmdir(self.cfg['hod']['user_state'])
89
90  # Test that list works with deleted cluster directories - more than one entries which are invalid.
91  def testListInvalidDirectory(self):
92    userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1', 
93                  os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' }
94    self.__setupClusterState(userState)
95    self.client._op_list(['list'])
96    # assert that required errors are logged.
97    for clusterDir in userState.keys():
98      self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
99                            % (userState[clusterDir], clusterDir), 'info'))
100
101    # simulate a test where a directory is deleted, and created again, without deallocation
102    clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory')
103    os.makedirs(clusterDir)
104    self.assertTrue(os.path.isdir(clusterDir))
105    userState = { clusterDir : '123.dummy.id3' }
106    self.__setupClusterState(userState, False)
107    self.client._op_list(['list'])
108    self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
109                          % (userState[clusterDir], clusterDir), 'info'))
110    os.rmdir(clusterDir)
111   
112  # Test that info works with a deleted cluster directory
113  def testInfoInvalidDirectory(self):
114    clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory')
115    userState = { clusterDir : '456.dummy.id' }
116    self.__setupClusterState(userState)
117    self.client._op_info(['info', clusterDir])
118    self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
119
120    # simulate a test where a directory is deleted, and created again, without deallocation
121    clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory')
122    os.makedirs(clusterDir)
123    self.assertTrue(os.path.isdir(clusterDir))
124    userState = { clusterDir : '456.dummy.id1' }
125    self.__setupClusterState(userState, False)
126    self.client._op_info(['info', clusterDir])
127    self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
128    os.rmdir(clusterDir)
129
130  # Test info works with an invalid cluster directory
131  def testInfoNonExistentDirectory(self):
132    clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
133    self.client._op_info(['info', clusterDir])
134    self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
135
136  # Test that deallocation works on a deleted cluster directory
137  # by clearing the job, and removing the state
138  def testDeallocateInvalidDirectory(self):
139    clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory')
140    jobid = '789.dummy.id'
141    userState = { clusterDir : jobid }
142    self.__setupClusterState(userState)
143    self.client._op_deallocate(['deallocate', clusterDir])
144    # verify job was deleted
145    self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
146    # verify appropriate message was logged.
147    self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
148    self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
149    # verify that the state information was cleared.
150    userState = self.state.read(TEST_CLUSTER_DATA_FILE)
151    self.assertFalse(clusterDir in userState.keys())
152 
153    # simulate a test where a directory is deleted, and created again, without deallocation
154    clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory')
155    os.makedirs(clusterDir)
156    self.assertTrue(os.path.isdir(clusterDir))
157    jobid = '789.dummy.id1'
158    userState = { clusterDir : jobid }
159    self.__setupClusterState(userState, False)
160    self.client._op_deallocate(['deallocate', clusterDir])
161    # verify job was deleted
162    self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
163    # verify appropriate message was logged.
164    self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
165    self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
166    # verify that the state information was cleared.
167    userState = self.state.read(TEST_CLUSTER_DATA_FILE)
168    self.assertFalse(clusterDir in userState.keys())
169    os.rmdir(clusterDir)
170     
171  # Test that deallocation works on a nonexistent directory.
172  def testDeallocateNonExistentDirectory(self):
173    clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory')
174    self.client._op_deallocate(['deallocate', clusterDir])
175    # there should be no call..
176    self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
177    self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
178
179  # Test that allocation on an previously deleted directory fails.   
180  def testAllocateOnDeletedDirectory(self):
181    clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory')
182    os.makedirs(clusterDir)
183    self.assertTrue(os.path.isdir(clusterDir))
184    jobid = '1234.abc.com'
185    userState = { clusterDir : jobid }
186    self.__setupClusterState(userState, False)
187    self.client._op_allocate(['allocate', clusterDir, '3'])
188    self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "\
189                      "cluster directory '%s'. HOD cannot determine if this cluster "\
190                      "can be automatically deallocated. Deallocate the cluster if it "\
191                      "is unused." % (clusterDir), 'critical'))
192    os.rmdir(clusterDir)
193
194  def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
195    for clusterDir in clusterStateMap.keys():
196      # ensure directory doesn't exist, just in case.
197      if verifyDirIsAbsent:
198        self.assertFalse(os.path.exists(clusterDir))
199    # set up required state.
200    self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap)
201    # verify everything is stored correctly.
202    state = self.state.read(TEST_CLUSTER_DATA_FILE)
203    for clusterDir in clusterStateMap.keys():
204      self.assertTrue(clusterDir in state.keys())
205      self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
206
207class test_InvalidHodStateFiles(unittest.TestCase):
208  def setUp(self):
209    self.rootDir = '/tmp/hod-%s' % getpass.getuser()
210    self.cfg = setupConf() # creat a conf
211    # Modify hod.user_state
212    self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
213                              prefix='HodTestSuite.test_InvalidHodStateFiles_')
214    self.log = MockLogger() # mock logger
215    self.cluster = MockHadoopCluster() # mock hadoop cluster
216    self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
217    self.state = hodState(self.cfg['hod']['user_state'])
218    self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % \
219                                  TEST_CLUSTER_DATA_FILE)
220    self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
221                              prefix='HodTestSuite.test_InvalidHodStateFiles_')
222 
223  def testOperationWithInvalidStateFile(self):
224    jobid = '1234.hadoop.apache.org'
225    # create user state file with invalid permissions
226    stateFile = open(self.statePath, "w")
227    os.chmod(self.statePath, 000) # has no read/write permissions
228    self.client._hodRunner__cfg['hod']['operation'] = \
229                                             "info %s" % self.clusterDir
230    ret = self.client.operation()
231    os.chmod(self.statePath, 700) # restore permissions
232    stateFile.close()
233    os.remove(self.statePath)
234
235    # print self.log._MockLogger__logLines
236    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
237                          os.path.realpath(self.statePath), 'critical'))
238    self.assertEquals(ret, 1)
239   
240  def testAllocateWithInvalidStateFile(self):
241    jobid = '1234.hadoop.apache.org'
242    # create user state file with invalid permissions
243    stateFile = open(self.statePath, "w")
244    os.chmod(self.statePath, 0400) # has no write permissions
245    self.client._hodRunner__cfg['hod']['operation'] = \
246                                        "allocate %s %s" % (self.clusterDir, '3')
247    ret = self.client.operation()
248    os.chmod(self.statePath, 700) # restore permissions
249    stateFile.close()
250    os.remove(self.statePath)
251
252    # print self.log._MockLogger__logLines
253    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % \
254                        os.path.realpath(self.statePath), 'critical'))
255    self.assertEquals(ret, 1)
256 
257  def testAllocateWithInvalidStateStore(self):
258    jobid = '1234.hadoop.apache.org'
259    self.client._hodRunner__cfg['hod']['operation'] = \
260                                      "allocate %s %s" % (self.clusterDir, 3)
261
262    ###### check with no executable permissions ######
263    stateFile = open(self.statePath, "w") # create user state file
264    os.chmod(self.cfg['hod']['user_state'], 0600) 
265    ret = self.client.operation()
266    os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
267    stateFile.close()
268    os.remove(self.statePath)
269    # print self.log._MockLogger__logLines
270    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
271                          os.path.realpath(self.statePath), 'critical'))
272    self.assertEquals(ret, 1)
273   
274    ###### check with no write permissions ######
275    stateFile = open(self.statePath, "w") # create user state file
276    os.chmod(self.cfg['hod']['user_state'], 0500) 
277    ret = self.client.operation()
278    os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
279    stateFile.close()
280    os.remove(self.statePath)
281    # print self.log._MockLogger__logLines
282    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
283                          os.path.realpath(self.statePath), 'critical'))
284    self.assertEquals(ret, 1)
285
286  def tearDown(self):
287    if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
288    if os.path.exists(self.cfg['hod']['user_state']):
289      os.rmdir(self.cfg['hod']['user_state'])
290
291
292class HodTestSuite(BaseTestSuite):
293  def __init__(self):
294    # suite setup
295    BaseTestSuite.__init__(self, __name__, excludes)
296    pass
297 
298  def cleanUp(self):
299    # suite tearDown
300    pass
301
302def RunHodTests():
303  # modulename_suite
304  suite = HodTestSuite()
305  testResult = suite.runTests()
306  suite.cleanUp()
307  return testResult
308
309if __name__ == "__main__":
310  RunHodTests()
Note: See TracBrowser for help on using the repository browser.