#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. import unittest, getpass, os, sys, re, threading, time myDirectory = os.path.realpath(sys.argv[0]) rootDirectory = re.sub("/testing/.*", "", myDirectory) sys.path.append(rootDirectory) import tempfile from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster from hodlib.Hod.hod import hodRunner, hodState from hodlib.Common.desc import NodePoolDesc excludes = [] # Information about all clusters is written to a file called clusters.state. from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, \ INVALID_STATE_FILE_MSGS # Temp directory prefix TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser())) # build a config object with all required keys for initializing hod. def setupConf(): cfg = { 'hod' : { 'original-dir' : os.getcwd(), 'stream' : True, # store all the info about clusters in this directory 'user_state' : '/tmp/hodtest', 'debug' : 3, 'java-home' : os.getenv('JAVA_HOME'), 'cluster' : 'dummy', 'cluster-factor' : 1.8, 'xrs-port-range' : (32768,65536), 'allocate-wait-time' : 3600, 'temp-dir' : '/tmp/hod' }, # just set everything to dummy. Need something to initialize the # node pool description object. 'resource_manager' : { 'id' : 'dummy', 'batch-home' : 'dummy', 'queue' : 'dummy', } } cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager']) return cfg # Test class that defines methods to test invalid arguments to hod operations. class test_InvalidArgsOperations(unittest.TestCase): def setUp(self): self.cfg = setupConf() # initialize the mock objects self.log = MockLogger() self.cluster = MockHadoopCluster() # Use the test logger. This will be used for test verification. self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster) # Create the hodState object to set the test state you want. self.state = hodState(self.cfg['hod']['user_state']) if not os.path.exists(self.cfg['hod']['user_state']): os.path.mkdir(self.cfg['hod']['user_state']) p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE) # ensure cluster data file exists, so write works in the tests. f = open(p, 'w') f.close() def tearDown(self): # clean up cluster data file and directory p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE) os.remove(p) os.rmdir(self.cfg['hod']['user_state']) # Test that list works with deleted cluster directories - more than one entries which are invalid. def testListInvalidDirectory(self): userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1', os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' } self.__setupClusterState(userState) self.client._op_list(['list']) # assert that required errors are logged. for clusterDir in userState.keys(): self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \ % (userState[clusterDir], clusterDir), 'info')) # simulate a test where a directory is deleted, and created again, without deallocation clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory') os.makedirs(clusterDir) self.assertTrue(os.path.isdir(clusterDir)) userState = { clusterDir : '123.dummy.id3' } self.__setupClusterState(userState, False) self.client._op_list(['list']) self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \ % (userState[clusterDir], clusterDir), 'info')) os.rmdir(clusterDir) # Test that info works with a deleted cluster directory def testInfoInvalidDirectory(self): clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory') userState = { clusterDir : '456.dummy.id' } self.__setupClusterState(userState) self.client._op_info(['info', clusterDir]) self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical')) # simulate a test where a directory is deleted, and created again, without deallocation clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory') os.makedirs(clusterDir) self.assertTrue(os.path.isdir(clusterDir)) userState = { clusterDir : '456.dummy.id1' } self.__setupClusterState(userState, False) self.client._op_info(['info', clusterDir]) self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical')) os.rmdir(clusterDir) # Test info works with an invalid cluster directory def testInfoNonExistentDirectory(self): clusterDir = '/tmp/hod/testInfoNonExistentDirectory' self.client._op_info(['info', clusterDir]) self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical')) # Test that deallocation works on a deleted cluster directory # by clearing the job, and removing the state def testDeallocateInvalidDirectory(self): clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory') jobid = '789.dummy.id' userState = { clusterDir : jobid } self.__setupClusterState(userState) self.client._op_deallocate(['deallocate', clusterDir]) # verify job was deleted self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid)) # verify appropriate message was logged. self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical')) self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical')) # verify that the state information was cleared. userState = self.state.read(TEST_CLUSTER_DATA_FILE) self.assertFalse(clusterDir in userState.keys()) # simulate a test where a directory is deleted, and created again, without deallocation clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory') os.makedirs(clusterDir) self.assertTrue(os.path.isdir(clusterDir)) jobid = '789.dummy.id1' userState = { clusterDir : jobid } self.__setupClusterState(userState, False) self.client._op_deallocate(['deallocate', clusterDir]) # verify job was deleted self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid)) # verify appropriate message was logged. self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical')) self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical')) # verify that the state information was cleared. userState = self.state.read(TEST_CLUSTER_DATA_FILE) self.assertFalse(clusterDir in userState.keys()) os.rmdir(clusterDir) # Test that deallocation works on a nonexistent directory. def testDeallocateNonExistentDirectory(self): clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory') self.client._op_deallocate(['deallocate', clusterDir]) # there should be no call.. self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None)) self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical')) # Test that allocation on an previously deleted directory fails. def testAllocateOnDeletedDirectory(self): clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory') os.makedirs(clusterDir) self.assertTrue(os.path.isdir(clusterDir)) jobid = '1234.abc.com' userState = { clusterDir : jobid } self.__setupClusterState(userState, False) self.client._op_allocate(['allocate', clusterDir, '3']) self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "\ "cluster directory '%s'. HOD cannot determine if this cluster "\ "can be automatically deallocated. Deallocate the cluster if it "\ "is unused." % (clusterDir), 'critical')) os.rmdir(clusterDir) def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True): for clusterDir in clusterStateMap.keys(): # ensure directory doesn't exist, just in case. if verifyDirIsAbsent: self.assertFalse(os.path.exists(clusterDir)) # set up required state. self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap) # verify everything is stored correctly. state = self.state.read(TEST_CLUSTER_DATA_FILE) for clusterDir in clusterStateMap.keys(): self.assertTrue(clusterDir in state.keys()) self.assertEquals(clusterStateMap[clusterDir], state[clusterDir]) class test_InvalidHodStateFiles(unittest.TestCase): def setUp(self): self.rootDir = '/tmp/hod-%s' % getpass.getuser() self.cfg = setupConf() # creat a conf # Modify hod.user_state self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir, prefix='HodTestSuite.test_InvalidHodStateFiles_') self.log = MockLogger() # mock logger self.cluster = MockHadoopCluster() # mock hadoop cluster self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster) self.state = hodState(self.cfg['hod']['user_state']) self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % \ TEST_CLUSTER_DATA_FILE) self.clusterDir = tempfile.mkdtemp(dir=self.rootDir, prefix='HodTestSuite.test_InvalidHodStateFiles_') def testOperationWithInvalidStateFile(self): jobid = '1234.hadoop.apache.org' # create user state file with invalid permissions stateFile = open(self.statePath, "w") os.chmod(self.statePath, 000) # has no read/write permissions self.client._hodRunner__cfg['hod']['operation'] = \ "info %s" % self.clusterDir ret = self.client.operation() os.chmod(self.statePath, 700) # restore permissions stateFile.close() os.remove(self.statePath) # print self.log._MockLogger__logLines self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \ os.path.realpath(self.statePath), 'critical')) self.assertEquals(ret, 1) def testAllocateWithInvalidStateFile(self): jobid = '1234.hadoop.apache.org' # create user state file with invalid permissions stateFile = open(self.statePath, "w") os.chmod(self.statePath, 0400) # has no write permissions self.client._hodRunner__cfg['hod']['operation'] = \ "allocate %s %s" % (self.clusterDir, '3') ret = self.client.operation() os.chmod(self.statePath, 700) # restore permissions stateFile.close() os.remove(self.statePath) # print self.log._MockLogger__logLines self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % \ os.path.realpath(self.statePath), 'critical')) self.assertEquals(ret, 1) def testAllocateWithInvalidStateStore(self): jobid = '1234.hadoop.apache.org' self.client._hodRunner__cfg['hod']['operation'] = \ "allocate %s %s" % (self.clusterDir, 3) ###### check with no executable permissions ###### stateFile = open(self.statePath, "w") # create user state file os.chmod(self.cfg['hod']['user_state'], 0600) ret = self.client.operation() os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions stateFile.close() os.remove(self.statePath) # print self.log._MockLogger__logLines self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \ os.path.realpath(self.statePath), 'critical')) self.assertEquals(ret, 1) ###### check with no write permissions ###### stateFile = open(self.statePath, "w") # create user state file os.chmod(self.cfg['hod']['user_state'], 0500) ret = self.client.operation() os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions stateFile.close() os.remove(self.statePath) # print self.log._MockLogger__logLines self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \ os.path.realpath(self.statePath), 'critical')) self.assertEquals(ret, 1) def tearDown(self): if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir) if os.path.exists(self.cfg['hod']['user_state']): os.rmdir(self.cfg['hod']['user_state']) class HodTestSuite(BaseTestSuite): def __init__(self): # suite setup BaseTestSuite.__init__(self, __name__, excludes) pass def cleanUp(self): # suite tearDown pass def RunHodTests(): # modulename_suite suite = HodTestSuite() testResult = suite.runTests() suite.cleanUp() return testResult if __name__ == "__main__": RunHodTests()