1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
2 | #or more contributor license agreements. See the NOTICE file |
---|
3 | #distributed with this work for additional information |
---|
4 | #regarding copyright ownership. The ASF licenses this file |
---|
5 | #to you under the Apache License, Version 2.0 (the |
---|
6 | #"License"); you may not use this file except in compliance |
---|
7 | #with the License. You may obtain a copy of the License at |
---|
8 | |
---|
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | |
---|
11 | #Unless required by applicable law or agreed to in writing, software |
---|
12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
14 | #See the License for the specific language governing permissions and |
---|
15 | #limitations under the License. |
---|
16 | """define Hdfs as subclass of Service""" |
---|
17 | |
---|
18 | # -*- python -*- |
---|
19 | |
---|
20 | import os |
---|
21 | |
---|
22 | from service import * |
---|
23 | from hodlib.Hod.nodePool import * |
---|
24 | from hodlib.Common.desc import CommandDesc |
---|
25 | from hodlib.Common.util import get_exception_string, parseEquals |
---|
26 | |
---|
27 | class HdfsExternal(MasterSlave): |
---|
28 | """dummy proxy to external HDFS instance""" |
---|
29 | |
---|
30 | def __init__(self, serviceDesc, workDirs, version): |
---|
31 | MasterSlave.__init__(self, serviceDesc, workDirs,None) |
---|
32 | self.launchedMaster = True |
---|
33 | self.masterInitialized = True |
---|
34 | self.version = version |
---|
35 | |
---|
36 | def getMasterRequest(self): |
---|
37 | return None |
---|
38 | |
---|
39 | def getMasterCommands(self, serviceDict): |
---|
40 | return [] |
---|
41 | |
---|
42 | def getAdminCommands(self, serviceDict): |
---|
43 | return [] |
---|
44 | |
---|
45 | def getWorkerCommands(self, serviceDict): |
---|
46 | return [] |
---|
47 | |
---|
48 | def getMasterAddrs(self): |
---|
49 | attrs = self.serviceDesc.getfinalAttrs() |
---|
50 | addr = attrs['fs.default.name'] |
---|
51 | return [addr] |
---|
52 | |
---|
53 | def setMasterParams(self, dict): |
---|
54 | self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \ |
---|
55 | (dict['host'], dict['fs_port']) |
---|
56 | |
---|
57 | if self.version < 16: |
---|
58 | self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \ |
---|
59 | str(self.serviceDesc.dict['info_port']) |
---|
60 | else: |
---|
61 | # After Hadoop-2185 |
---|
62 | self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \ |
---|
63 | (dict['host'], dict['info_port']) |
---|
64 | |
---|
65 | def getInfoAddrs(self): |
---|
66 | attrs = self.serviceDesc.getfinalAttrs() |
---|
67 | if self.version < 16: |
---|
68 | addr = attrs['fs.default.name'] |
---|
69 | k,v = addr.split( ":") |
---|
70 | infoaddr = k + ':' + attrs['dfs.info.port'] |
---|
71 | else: |
---|
72 | # After Hadoop-2185 |
---|
73 | infoaddr = attrs['dfs.http.address'] |
---|
74 | return [infoaddr] |
---|
75 | |
---|
76 | class Hdfs(MasterSlave): |
---|
77 | |
---|
78 | def __init__(self, serviceDesc, nodePool, required_node, version, \ |
---|
79 | format=True, upgrade=False, |
---|
80 | workers_per_ring = 1): |
---|
81 | MasterSlave.__init__(self, serviceDesc, nodePool, required_node) |
---|
82 | self.masterNode = None |
---|
83 | self.masterAddr = None |
---|
84 | self.runAdminCommands = True |
---|
85 | self.infoAddr = None |
---|
86 | self._isLost = False |
---|
87 | self.format = format |
---|
88 | self.upgrade = upgrade |
---|
89 | self.workers = [] |
---|
90 | self.version = version |
---|
91 | self.workers_per_ring = workers_per_ring |
---|
92 | |
---|
93 | def getMasterRequest(self): |
---|
94 | req = NodeRequest(1, [], False) |
---|
95 | return req |
---|
96 | |
---|
97 | def getMasterCommands(self, serviceDict): |
---|
98 | |
---|
99 | masterCommands = [] |
---|
100 | if self.format: |
---|
101 | masterCommands.append(self._getNameNodeCommand(True)) |
---|
102 | |
---|
103 | if self.upgrade: |
---|
104 | masterCommands.append(self._getNameNodeCommand(False, True)) |
---|
105 | else: |
---|
106 | masterCommands.append(self._getNameNodeCommand(False)) |
---|
107 | |
---|
108 | return masterCommands |
---|
109 | |
---|
110 | def getAdminCommands(self, serviceDict): |
---|
111 | |
---|
112 | adminCommands = [] |
---|
113 | if self.upgrade and self.runAdminCommands: |
---|
114 | adminCommands.append(self._getNameNodeAdminCommand('-safemode wait')) |
---|
115 | adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade', |
---|
116 | True, True)) |
---|
117 | |
---|
118 | self.runAdminCommands = False |
---|
119 | return adminCommands |
---|
120 | |
---|
121 | def getWorkerCommands(self, serviceDict): |
---|
122 | workerCmds = [] |
---|
123 | for id in range(1, self.workers_per_ring + 1): |
---|
124 | workerCmds.append(self._getDataNodeCommand(str(id))) |
---|
125 | |
---|
126 | return workerCmds |
---|
127 | |
---|
128 | def setMasterNodes(self, list): |
---|
129 | node = list[0] |
---|
130 | self.masterNode = node |
---|
131 | |
---|
132 | def getMasterAddrs(self): |
---|
133 | return [self.masterAddr] |
---|
134 | |
---|
135 | def getInfoAddrs(self): |
---|
136 | return [self.infoAddr] |
---|
137 | |
---|
138 | def getWorkers(self): |
---|
139 | return self.workers |
---|
140 | |
---|
141 | def setMasterParams(self, list): |
---|
142 | dict = self._parseEquals(list) |
---|
143 | self.masterAddr = dict['fs.default.name'] |
---|
144 | k,v = self.masterAddr.split( ":") |
---|
145 | self.masterNode = k |
---|
146 | if self.version < 16: |
---|
147 | self.infoAddr = self.masterNode + ':' + dict['dfs.info.port'] |
---|
148 | else: |
---|
149 | # After Hadoop-2185 |
---|
150 | self.infoAddr = dict['dfs.http.address'] |
---|
151 | |
---|
152 | def _parseEquals(self, list): |
---|
153 | return parseEquals(list) |
---|
154 | |
---|
155 | def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir): |
---|
156 | namedir = None |
---|
157 | hadooptmpdir = None |
---|
158 | datadir = [] |
---|
159 | |
---|
160 | for p in parentDirs: |
---|
161 | workDirs.append(p) |
---|
162 | workDirs.append(os.path.join(p, subDir)) |
---|
163 | dir = os.path.join(p, subDir, 'dfs-data') |
---|
164 | datadir.append(dir) |
---|
165 | if not hadooptmpdir: |
---|
166 | # Not used currently, generating hadooptmpdir just in case |
---|
167 | hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp') |
---|
168 | |
---|
169 | if not namedir: |
---|
170 | namedir = os.path.join(p, subDir, 'dfs-name') |
---|
171 | |
---|
172 | workDirs.append(namedir) |
---|
173 | workDirs.extend(datadir) |
---|
174 | |
---|
175 | # FIXME!! use csv |
---|
176 | attrs['dfs.name.dir'] = namedir |
---|
177 | attrs['hadoop.tmp.dir'] = hadooptmpdir |
---|
178 | attrs['dfs.data.dir'] = ','.join(datadir) |
---|
179 | envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA" |
---|
180 | |
---|
181 | |
---|
182 | def _getNameNodeCommand(self, format=False, upgrade=False): |
---|
183 | sd = self.serviceDesc |
---|
184 | |
---|
185 | parentDirs = self.workDirs |
---|
186 | workDirs = [] |
---|
187 | attrs = sd.getfinalAttrs().copy() |
---|
188 | envs = sd.getEnvs().copy() |
---|
189 | |
---|
190 | if 'fs.default.name' not in attrs: |
---|
191 | attrs['fs.default.name'] = 'fillinhostport' |
---|
192 | |
---|
193 | if self.version < 16: |
---|
194 | if 'dfs.info.port' not in attrs: |
---|
195 | attrs['dfs.info.port'] = 'fillinport' |
---|
196 | else: |
---|
197 | # Addressing Hadoop-2185, added the following. Earlier versions don't |
---|
198 | # care about this |
---|
199 | if 'dfs.http.address' not in attrs: |
---|
200 | attrs['dfs.http.address'] = 'fillinhostport' |
---|
201 | |
---|
202 | self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn') |
---|
203 | |
---|
204 | dict = { 'name' : 'namenode' } |
---|
205 | dict['program'] = os.path.join('bin', 'hadoop') |
---|
206 | argv = ['namenode'] |
---|
207 | if format: |
---|
208 | argv.append('-format') |
---|
209 | elif upgrade: |
---|
210 | argv.append('-upgrade') |
---|
211 | dict['argv'] = argv |
---|
212 | dict['envs'] = envs |
---|
213 | dict['pkgdirs'] = sd.getPkgDirs() |
---|
214 | dict['workdirs'] = workDirs |
---|
215 | dict['final-attrs'] = attrs |
---|
216 | dict['attrs'] = sd.getAttrs() |
---|
217 | if format: |
---|
218 | dict['fg'] = 'true' |
---|
219 | dict['stdin'] = 'Y' |
---|
220 | cmd = CommandDesc(dict) |
---|
221 | return cmd |
---|
222 | |
---|
223 | def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False): |
---|
224 | sd = self.serviceDesc |
---|
225 | |
---|
226 | parentDirs = self.workDirs |
---|
227 | workDirs = [] |
---|
228 | attrs = sd.getfinalAttrs().copy() |
---|
229 | envs = sd.getEnvs().copy() |
---|
230 | nn = self.masterAddr |
---|
231 | |
---|
232 | if nn == None: |
---|
233 | raise ValueError, "Can't get namenode address" |
---|
234 | |
---|
235 | attrs['fs.default.name'] = nn |
---|
236 | |
---|
237 | self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn') |
---|
238 | |
---|
239 | dict = { 'name' : 'dfsadmin' } |
---|
240 | dict['program'] = os.path.join('bin', 'hadoop') |
---|
241 | argv = ['dfsadmin'] |
---|
242 | argv.append(adminCommand) |
---|
243 | dict['argv'] = argv |
---|
244 | dict['envs'] = envs |
---|
245 | dict['pkgdirs'] = sd.getPkgDirs() |
---|
246 | dict['workdirs'] = workDirs |
---|
247 | dict['final-attrs'] = attrs |
---|
248 | dict['attrs'] = sd.getAttrs() |
---|
249 | if wait: |
---|
250 | dict['fg'] = 'true' |
---|
251 | dict['stdin'] = 'Y' |
---|
252 | if ignoreFailures: |
---|
253 | dict['ignorefailures'] = 'Y' |
---|
254 | cmd = CommandDesc(dict) |
---|
255 | return cmd |
---|
256 | |
---|
257 | def _getDataNodeCommand(self, id): |
---|
258 | |
---|
259 | sd = self.serviceDesc |
---|
260 | |
---|
261 | parentDirs = self.workDirs |
---|
262 | workDirs = [] |
---|
263 | attrs = sd.getfinalAttrs().copy() |
---|
264 | envs = sd.getEnvs().copy() |
---|
265 | nn = self.masterAddr |
---|
266 | |
---|
267 | if nn == None: |
---|
268 | raise ValueError, "Can't get namenode address" |
---|
269 | |
---|
270 | attrs['fs.default.name'] = nn |
---|
271 | |
---|
272 | if self.version < 16: |
---|
273 | if 'dfs.datanode.port' not in attrs: |
---|
274 | attrs['dfs.datanode.port'] = 'fillinport' |
---|
275 | if 'dfs.datanode.info.port' not in attrs: |
---|
276 | attrs['dfs.datanode.info.port'] = 'fillinport' |
---|
277 | else: |
---|
278 | # Adding the following. Hadoop-2185 |
---|
279 | if 'dfs.datanode.address' not in attrs: |
---|
280 | attrs['dfs.datanode.address'] = 'fillinhostport' |
---|
281 | if 'dfs.datanode.http.address' not in attrs: |
---|
282 | attrs['dfs.datanode.http.address'] = 'fillinhostport' |
---|
283 | |
---|
284 | if self.version >= 18: |
---|
285 | # After HADOOP-3283 |
---|
286 | # TODO: check for major as well as minor versions |
---|
287 | attrs['dfs.datanode.ipc.address'] = 'fillinhostport' |
---|
288 | |
---|
289 | # unique workdirs in case of multiple datanodes per hodring |
---|
290 | pd = [] |
---|
291 | for dir in parentDirs: |
---|
292 | dir = dir + "-" + id |
---|
293 | pd.append(dir) |
---|
294 | parentDirs = pd |
---|
295 | # end of unique workdirs |
---|
296 | |
---|
297 | self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn') |
---|
298 | |
---|
299 | dict = { 'name' : 'datanode' } |
---|
300 | dict['program'] = os.path.join('bin', 'hadoop') |
---|
301 | dict['argv'] = ['datanode'] |
---|
302 | dict['envs'] = envs |
---|
303 | dict['pkgdirs'] = sd.getPkgDirs() |
---|
304 | dict['workdirs'] = workDirs |
---|
305 | dict['final-attrs'] = attrs |
---|
306 | dict['attrs'] = sd.getAttrs() |
---|
307 | |
---|
308 | cmd = CommandDesc(dict) |
---|
309 | return cmd |
---|
310 | |
---|