[120] | 1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
| 2 | #or more contributor license agreements. See the NOTICE file |
---|
| 3 | #distributed with this work for additional information |
---|
| 4 | #regarding copyright ownership. The ASF licenses this file |
---|
| 5 | #to you under the Apache License, Version 2.0 (the |
---|
| 6 | #"License"); you may not use this file except in compliance |
---|
| 7 | #with the License. You may obtain a copy of the License at |
---|
| 8 | |
---|
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 10 | |
---|
| 11 | #Unless required by applicable law or agreed to in writing, software |
---|
| 12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 14 | #See the License for the specific language governing permissions and |
---|
| 15 | #limitations under the License. |
---|
| 16 | """define Hdfs as subclass of Service""" |
---|
| 17 | |
---|
| 18 | # -*- python -*- |
---|
| 19 | |
---|
| 20 | import os |
---|
| 21 | |
---|
| 22 | from service import * |
---|
| 23 | from hodlib.Hod.nodePool import * |
---|
| 24 | from hodlib.Common.desc import CommandDesc |
---|
| 25 | from hodlib.Common.util import get_exception_string, parseEquals |
---|
| 26 | |
---|
| 27 | class HdfsExternal(MasterSlave): |
---|
| 28 | """dummy proxy to external HDFS instance""" |
---|
| 29 | |
---|
| 30 | def __init__(self, serviceDesc, workDirs, version): |
---|
| 31 | MasterSlave.__init__(self, serviceDesc, workDirs,None) |
---|
| 32 | self.launchedMaster = True |
---|
| 33 | self.masterInitialized = True |
---|
| 34 | self.version = version |
---|
| 35 | |
---|
| 36 | def getMasterRequest(self): |
---|
| 37 | return None |
---|
| 38 | |
---|
| 39 | def getMasterCommands(self, serviceDict): |
---|
| 40 | return [] |
---|
| 41 | |
---|
| 42 | def getAdminCommands(self, serviceDict): |
---|
| 43 | return [] |
---|
| 44 | |
---|
| 45 | def getWorkerCommands(self, serviceDict): |
---|
| 46 | return [] |
---|
| 47 | |
---|
| 48 | def getMasterAddrs(self): |
---|
| 49 | attrs = self.serviceDesc.getfinalAttrs() |
---|
| 50 | addr = attrs['fs.default.name'] |
---|
| 51 | return [addr] |
---|
| 52 | |
---|
| 53 | def setMasterParams(self, dict): |
---|
| 54 | self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \ |
---|
| 55 | (dict['host'], dict['fs_port']) |
---|
| 56 | |
---|
| 57 | if self.version < 16: |
---|
| 58 | self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \ |
---|
| 59 | str(self.serviceDesc.dict['info_port']) |
---|
| 60 | else: |
---|
| 61 | # After Hadoop-2185 |
---|
| 62 | self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \ |
---|
| 63 | (dict['host'], dict['info_port']) |
---|
| 64 | |
---|
| 65 | def getInfoAddrs(self): |
---|
| 66 | attrs = self.serviceDesc.getfinalAttrs() |
---|
| 67 | if self.version < 16: |
---|
| 68 | addr = attrs['fs.default.name'] |
---|
| 69 | k,v = addr.split( ":") |
---|
| 70 | infoaddr = k + ':' + attrs['dfs.info.port'] |
---|
| 71 | else: |
---|
| 72 | # After Hadoop-2185 |
---|
| 73 | infoaddr = attrs['dfs.http.address'] |
---|
| 74 | return [infoaddr] |
---|
| 75 | |
---|
| 76 | class Hdfs(MasterSlave): |
---|
| 77 | |
---|
| 78 | def __init__(self, serviceDesc, nodePool, required_node, version, \ |
---|
| 79 | format=True, upgrade=False, |
---|
| 80 | workers_per_ring = 1): |
---|
| 81 | MasterSlave.__init__(self, serviceDesc, nodePool, required_node) |
---|
| 82 | self.masterNode = None |
---|
| 83 | self.masterAddr = None |
---|
| 84 | self.runAdminCommands = True |
---|
| 85 | self.infoAddr = None |
---|
| 86 | self._isLost = False |
---|
| 87 | self.format = format |
---|
| 88 | self.upgrade = upgrade |
---|
| 89 | self.workers = [] |
---|
| 90 | self.version = version |
---|
| 91 | self.workers_per_ring = workers_per_ring |
---|
| 92 | |
---|
| 93 | def getMasterRequest(self): |
---|
| 94 | req = NodeRequest(1, [], False) |
---|
| 95 | return req |
---|
| 96 | |
---|
| 97 | def getMasterCommands(self, serviceDict): |
---|
| 98 | |
---|
| 99 | masterCommands = [] |
---|
| 100 | if self.format: |
---|
| 101 | masterCommands.append(self._getNameNodeCommand(True)) |
---|
| 102 | |
---|
| 103 | if self.upgrade: |
---|
| 104 | masterCommands.append(self._getNameNodeCommand(False, True)) |
---|
| 105 | else: |
---|
| 106 | masterCommands.append(self._getNameNodeCommand(False)) |
---|
| 107 | |
---|
| 108 | return masterCommands |
---|
| 109 | |
---|
| 110 | def getAdminCommands(self, serviceDict): |
---|
| 111 | |
---|
| 112 | adminCommands = [] |
---|
| 113 | if self.upgrade and self.runAdminCommands: |
---|
| 114 | adminCommands.append(self._getNameNodeAdminCommand('-safemode wait')) |
---|
| 115 | adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade', |
---|
| 116 | True, True)) |
---|
| 117 | |
---|
| 118 | self.runAdminCommands = False |
---|
| 119 | return adminCommands |
---|
| 120 | |
---|
| 121 | def getWorkerCommands(self, serviceDict): |
---|
| 122 | workerCmds = [] |
---|
| 123 | for id in range(1, self.workers_per_ring + 1): |
---|
| 124 | workerCmds.append(self._getDataNodeCommand(str(id))) |
---|
| 125 | |
---|
| 126 | return workerCmds |
---|
| 127 | |
---|
| 128 | def setMasterNodes(self, list): |
---|
| 129 | node = list[0] |
---|
| 130 | self.masterNode = node |
---|
| 131 | |
---|
| 132 | def getMasterAddrs(self): |
---|
| 133 | return [self.masterAddr] |
---|
| 134 | |
---|
| 135 | def getInfoAddrs(self): |
---|
| 136 | return [self.infoAddr] |
---|
| 137 | |
---|
| 138 | def getWorkers(self): |
---|
| 139 | return self.workers |
---|
| 140 | |
---|
| 141 | def setMasterParams(self, list): |
---|
| 142 | dict = self._parseEquals(list) |
---|
| 143 | self.masterAddr = dict['fs.default.name'] |
---|
| 144 | k,v = self.masterAddr.split( ":") |
---|
| 145 | self.masterNode = k |
---|
| 146 | if self.version < 16: |
---|
| 147 | self.infoAddr = self.masterNode + ':' + dict['dfs.info.port'] |
---|
| 148 | else: |
---|
| 149 | # After Hadoop-2185 |
---|
| 150 | self.infoAddr = dict['dfs.http.address'] |
---|
| 151 | |
---|
| 152 | def _parseEquals(self, list): |
---|
| 153 | return parseEquals(list) |
---|
| 154 | |
---|
| 155 | def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir): |
---|
| 156 | namedir = None |
---|
| 157 | hadooptmpdir = None |
---|
| 158 | datadir = [] |
---|
| 159 | |
---|
| 160 | for p in parentDirs: |
---|
| 161 | workDirs.append(p) |
---|
| 162 | workDirs.append(os.path.join(p, subDir)) |
---|
| 163 | dir = os.path.join(p, subDir, 'dfs-data') |
---|
| 164 | datadir.append(dir) |
---|
| 165 | if not hadooptmpdir: |
---|
| 166 | # Not used currently, generating hadooptmpdir just in case |
---|
| 167 | hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp') |
---|
| 168 | |
---|
| 169 | if not namedir: |
---|
| 170 | namedir = os.path.join(p, subDir, 'dfs-name') |
---|
| 171 | |
---|
| 172 | workDirs.append(namedir) |
---|
| 173 | workDirs.extend(datadir) |
---|
| 174 | |
---|
| 175 | # FIXME!! use csv |
---|
| 176 | attrs['dfs.name.dir'] = namedir |
---|
| 177 | attrs['hadoop.tmp.dir'] = hadooptmpdir |
---|
| 178 | attrs['dfs.data.dir'] = ','.join(datadir) |
---|
| 179 | envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA" |
---|
| 180 | |
---|
| 181 | |
---|
| 182 | def _getNameNodeCommand(self, format=False, upgrade=False): |
---|
| 183 | sd = self.serviceDesc |
---|
| 184 | |
---|
| 185 | parentDirs = self.workDirs |
---|
| 186 | workDirs = [] |
---|
| 187 | attrs = sd.getfinalAttrs().copy() |
---|
| 188 | envs = sd.getEnvs().copy() |
---|
| 189 | |
---|
| 190 | if 'fs.default.name' not in attrs: |
---|
| 191 | attrs['fs.default.name'] = 'fillinhostport' |
---|
| 192 | |
---|
| 193 | if self.version < 16: |
---|
| 194 | if 'dfs.info.port' not in attrs: |
---|
| 195 | attrs['dfs.info.port'] = 'fillinport' |
---|
| 196 | else: |
---|
| 197 | # Addressing Hadoop-2185, added the following. Earlier versions don't |
---|
| 198 | # care about this |
---|
| 199 | if 'dfs.http.address' not in attrs: |
---|
| 200 | attrs['dfs.http.address'] = 'fillinhostport' |
---|
| 201 | |
---|
| 202 | self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn') |
---|
| 203 | |
---|
| 204 | dict = { 'name' : 'namenode' } |
---|
| 205 | dict['program'] = os.path.join('bin', 'hadoop') |
---|
| 206 | argv = ['namenode'] |
---|
| 207 | if format: |
---|
| 208 | argv.append('-format') |
---|
| 209 | elif upgrade: |
---|
| 210 | argv.append('-upgrade') |
---|
| 211 | dict['argv'] = argv |
---|
| 212 | dict['envs'] = envs |
---|
| 213 | dict['pkgdirs'] = sd.getPkgDirs() |
---|
| 214 | dict['workdirs'] = workDirs |
---|
| 215 | dict['final-attrs'] = attrs |
---|
| 216 | dict['attrs'] = sd.getAttrs() |
---|
| 217 | if format: |
---|
| 218 | dict['fg'] = 'true' |
---|
| 219 | dict['stdin'] = 'Y' |
---|
| 220 | cmd = CommandDesc(dict) |
---|
| 221 | return cmd |
---|
| 222 | |
---|
| 223 | def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False): |
---|
| 224 | sd = self.serviceDesc |
---|
| 225 | |
---|
| 226 | parentDirs = self.workDirs |
---|
| 227 | workDirs = [] |
---|
| 228 | attrs = sd.getfinalAttrs().copy() |
---|
| 229 | envs = sd.getEnvs().copy() |
---|
| 230 | nn = self.masterAddr |
---|
| 231 | |
---|
| 232 | if nn == None: |
---|
| 233 | raise ValueError, "Can't get namenode address" |
---|
| 234 | |
---|
| 235 | attrs['fs.default.name'] = nn |
---|
| 236 | |
---|
| 237 | self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn') |
---|
| 238 | |
---|
| 239 | dict = { 'name' : 'dfsadmin' } |
---|
| 240 | dict['program'] = os.path.join('bin', 'hadoop') |
---|
| 241 | argv = ['dfsadmin'] |
---|
| 242 | argv.append(adminCommand) |
---|
| 243 | dict['argv'] = argv |
---|
| 244 | dict['envs'] = envs |
---|
| 245 | dict['pkgdirs'] = sd.getPkgDirs() |
---|
| 246 | dict['workdirs'] = workDirs |
---|
| 247 | dict['final-attrs'] = attrs |
---|
| 248 | dict['attrs'] = sd.getAttrs() |
---|
| 249 | if wait: |
---|
| 250 | dict['fg'] = 'true' |
---|
| 251 | dict['stdin'] = 'Y' |
---|
| 252 | if ignoreFailures: |
---|
| 253 | dict['ignorefailures'] = 'Y' |
---|
| 254 | cmd = CommandDesc(dict) |
---|
| 255 | return cmd |
---|
| 256 | |
---|
| 257 | def _getDataNodeCommand(self, id): |
---|
| 258 | |
---|
| 259 | sd = self.serviceDesc |
---|
| 260 | |
---|
| 261 | parentDirs = self.workDirs |
---|
| 262 | workDirs = [] |
---|
| 263 | attrs = sd.getfinalAttrs().copy() |
---|
| 264 | envs = sd.getEnvs().copy() |
---|
| 265 | nn = self.masterAddr |
---|
| 266 | |
---|
| 267 | if nn == None: |
---|
| 268 | raise ValueError, "Can't get namenode address" |
---|
| 269 | |
---|
| 270 | attrs['fs.default.name'] = nn |
---|
| 271 | |
---|
| 272 | if self.version < 16: |
---|
| 273 | if 'dfs.datanode.port' not in attrs: |
---|
| 274 | attrs['dfs.datanode.port'] = 'fillinport' |
---|
| 275 | if 'dfs.datanode.info.port' not in attrs: |
---|
| 276 | attrs['dfs.datanode.info.port'] = 'fillinport' |
---|
| 277 | else: |
---|
| 278 | # Adding the following. Hadoop-2185 |
---|
| 279 | if 'dfs.datanode.address' not in attrs: |
---|
| 280 | attrs['dfs.datanode.address'] = 'fillinhostport' |
---|
| 281 | if 'dfs.datanode.http.address' not in attrs: |
---|
| 282 | attrs['dfs.datanode.http.address'] = 'fillinhostport' |
---|
| 283 | |
---|
| 284 | if self.version >= 18: |
---|
| 285 | # After HADOOP-3283 |
---|
| 286 | # TODO: check for major as well as minor versions |
---|
| 287 | attrs['dfs.datanode.ipc.address'] = 'fillinhostport' |
---|
| 288 | |
---|
| 289 | # unique workdirs in case of multiple datanodes per hodring |
---|
| 290 | pd = [] |
---|
| 291 | for dir in parentDirs: |
---|
| 292 | dir = dir + "-" + id |
---|
| 293 | pd.append(dir) |
---|
| 294 | parentDirs = pd |
---|
| 295 | # end of unique workdirs |
---|
| 296 | |
---|
| 297 | self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn') |
---|
| 298 | |
---|
| 299 | dict = { 'name' : 'datanode' } |
---|
| 300 | dict['program'] = os.path.join('bin', 'hadoop') |
---|
| 301 | dict['argv'] = ['datanode'] |
---|
| 302 | dict['envs'] = envs |
---|
| 303 | dict['pkgdirs'] = sd.getPkgDirs() |
---|
| 304 | dict['workdirs'] = workDirs |
---|
| 305 | dict['final-attrs'] = attrs |
---|
| 306 | dict['attrs'] = sd.getAttrs() |
---|
| 307 | |
---|
| 308 | cmd = CommandDesc(dict) |
---|
| 309 | return cmd |
---|
| 310 | |
---|