source: proiecte/HadoopJUnit/hadoop-0.20.1/contrib/hod/hodlib/GridServices/hdfs.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 9.0 KB
Line 
1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16"""define Hdfs as subclass of Service"""
17
18# -*- python -*-
19
20import os
21
22from service import *
23from hodlib.Hod.nodePool import *
24from hodlib.Common.desc import CommandDesc
25from hodlib.Common.util import get_exception_string, parseEquals
26
27class HdfsExternal(MasterSlave):
28  """dummy proxy to external HDFS instance"""
29
30  def __init__(self, serviceDesc, workDirs, version):
31    MasterSlave.__init__(self, serviceDesc, workDirs,None)
32    self.launchedMaster = True
33    self.masterInitialized = True
34    self.version = version
35   
36  def getMasterRequest(self):
37    return None
38
39  def getMasterCommands(self, serviceDict):
40    return []
41
42  def getAdminCommands(self, serviceDict):
43    return []
44
45  def getWorkerCommands(self, serviceDict):
46    return []
47
48  def getMasterAddrs(self):
49    attrs = self.serviceDesc.getfinalAttrs()
50    addr = attrs['fs.default.name']
51    return [addr]
52 
53  def setMasterParams(self, dict):
54   self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
55     (dict['host'], dict['fs_port'])
56
57   if self.version < 16:
58    self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
59                                    str(self.serviceDesc.dict['info_port'])
60   else:
61     # After Hadoop-2185
62     self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \
63       (dict['host'], dict['info_port'])
64
65  def getInfoAddrs(self):
66    attrs = self.serviceDesc.getfinalAttrs()
67    if self.version < 16:
68      addr = attrs['fs.default.name']
69      k,v = addr.split( ":")
70      infoaddr = k + ':' + attrs['dfs.info.port']
71    else:
72      # After Hadoop-2185
73      infoaddr = attrs['dfs.http.address']
74    return [infoaddr]
75
76class Hdfs(MasterSlave):
77
78  def __init__(self, serviceDesc, nodePool, required_node, version, \
79                                        format=True, upgrade=False,
80                                        workers_per_ring = 1):
81    MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
82    self.masterNode = None
83    self.masterAddr = None
84    self.runAdminCommands = True
85    self.infoAddr = None
86    self._isLost = False
87    self.format = format
88    self.upgrade = upgrade
89    self.workers = []
90    self.version = version
91    self.workers_per_ring = workers_per_ring
92
93  def getMasterRequest(self):
94    req = NodeRequest(1, [], False)
95    return req
96
97  def getMasterCommands(self, serviceDict):
98
99    masterCommands = []
100    if self.format:
101      masterCommands.append(self._getNameNodeCommand(True))
102
103    if self.upgrade:
104      masterCommands.append(self._getNameNodeCommand(False, True))
105    else:
106      masterCommands.append(self._getNameNodeCommand(False))
107
108    return masterCommands
109
110  def getAdminCommands(self, serviceDict):
111
112    adminCommands = []
113    if self.upgrade and self.runAdminCommands:
114      adminCommands.append(self._getNameNodeAdminCommand('-safemode wait'))
115      adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade',
116                                                          True, True))
117
118    self.runAdminCommands = False
119    return adminCommands
120
121  def getWorkerCommands(self, serviceDict):
122    workerCmds = []
123    for id in range(1, self.workers_per_ring + 1):
124      workerCmds.append(self._getDataNodeCommand(str(id)))
125
126    return workerCmds
127
128  def setMasterNodes(self, list):
129    node = list[0]
130    self.masterNode = node
131   
132  def getMasterAddrs(self):
133    return [self.masterAddr]
134
135  def getInfoAddrs(self):
136    return [self.infoAddr]
137
138  def getWorkers(self):
139    return self.workers
140
141  def setMasterParams(self, list):
142    dict = self._parseEquals(list)
143    self.masterAddr = dict['fs.default.name']
144    k,v = self.masterAddr.split( ":")
145    self.masterNode = k
146    if self.version < 16:
147      self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
148    else:
149      # After Hadoop-2185
150      self.infoAddr = dict['dfs.http.address']
151   
152  def _parseEquals(self, list):
153    return parseEquals(list)
154 
155  def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
156    namedir = None
157    hadooptmpdir = None
158    datadir = []
159
160    for p in parentDirs:
161      workDirs.append(p)
162      workDirs.append(os.path.join(p, subDir))
163      dir = os.path.join(p, subDir, 'dfs-data')
164      datadir.append(dir)
165      if not hadooptmpdir:
166        # Not used currently, generating hadooptmpdir just in case
167        hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
168
169      if not namedir:
170        namedir = os.path.join(p, subDir, 'dfs-name')
171
172    workDirs.append(namedir)
173    workDirs.extend(datadir)
174
175    # FIXME!! use csv
176    attrs['dfs.name.dir'] = namedir
177    attrs['hadoop.tmp.dir'] = hadooptmpdir
178    attrs['dfs.data.dir'] = ','.join(datadir)
179    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
180
181
182  def _getNameNodeCommand(self, format=False, upgrade=False):
183    sd = self.serviceDesc
184
185    parentDirs = self.workDirs
186    workDirs = []
187    attrs = sd.getfinalAttrs().copy()
188    envs = sd.getEnvs().copy()
189   
190    if 'fs.default.name' not in attrs:
191      attrs['fs.default.name'] = 'fillinhostport'
192 
193    if self.version < 16:
194     if 'dfs.info.port' not in attrs:
195      attrs['dfs.info.port'] = 'fillinport'
196    else:
197      # Addressing Hadoop-2185, added the following. Earlier versions don't
198      # care about this
199      if 'dfs.http.address' not in attrs:
200        attrs['dfs.http.address'] = 'fillinhostport'
201
202    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
203
204    dict = { 'name' : 'namenode' }
205    dict['program'] = os.path.join('bin', 'hadoop')
206    argv = ['namenode']
207    if format:
208      argv.append('-format')
209    elif upgrade:
210      argv.append('-upgrade')
211    dict['argv'] = argv
212    dict['envs'] = envs
213    dict['pkgdirs'] = sd.getPkgDirs()
214    dict['workdirs'] = workDirs
215    dict['final-attrs'] = attrs
216    dict['attrs'] = sd.getAttrs()
217    if format:
218      dict['fg'] = 'true'
219      dict['stdin'] = 'Y'
220    cmd = CommandDesc(dict)
221    return cmd
222
223  def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False):
224    sd = self.serviceDesc
225
226    parentDirs = self.workDirs
227    workDirs = []
228    attrs = sd.getfinalAttrs().copy()
229    envs = sd.getEnvs().copy()
230    nn = self.masterAddr
231
232    if nn == None:
233      raise ValueError, "Can't get namenode address"
234
235    attrs['fs.default.name'] = nn
236
237    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
238
239    dict = { 'name' : 'dfsadmin' }
240    dict['program'] = os.path.join('bin', 'hadoop')
241    argv = ['dfsadmin']
242    argv.append(adminCommand)
243    dict['argv'] = argv
244    dict['envs'] = envs
245    dict['pkgdirs'] = sd.getPkgDirs()
246    dict['workdirs'] = workDirs
247    dict['final-attrs'] = attrs
248    dict['attrs'] = sd.getAttrs()
249    if wait:
250      dict['fg'] = 'true'
251      dict['stdin'] = 'Y'
252    if ignoreFailures:
253      dict['ignorefailures'] = 'Y'
254    cmd = CommandDesc(dict)
255    return cmd
256 
257  def _getDataNodeCommand(self, id):
258
259    sd = self.serviceDesc
260
261    parentDirs = self.workDirs
262    workDirs = []
263    attrs = sd.getfinalAttrs().copy()
264    envs = sd.getEnvs().copy()
265    nn = self.masterAddr
266
267    if nn == None:
268      raise ValueError, "Can't get namenode address"
269
270    attrs['fs.default.name'] = nn
271
272    if self.version < 16:
273      if 'dfs.datanode.port' not in attrs:
274        attrs['dfs.datanode.port'] = 'fillinport'
275      if 'dfs.datanode.info.port' not in attrs:
276        attrs['dfs.datanode.info.port'] = 'fillinport'
277    else:
278      # Adding the following. Hadoop-2185
279      if 'dfs.datanode.address' not in attrs:
280        attrs['dfs.datanode.address'] = 'fillinhostport'
281      if 'dfs.datanode.http.address' not in attrs:
282        attrs['dfs.datanode.http.address'] = 'fillinhostport'
283   
284    if self.version >= 18:
285      # After HADOOP-3283
286      # TODO: check for major as well as minor versions
287      attrs['dfs.datanode.ipc.address'] = 'fillinhostport'
288                   
289    # unique workdirs in case of multiple datanodes per hodring
290    pd = []
291    for dir in parentDirs:
292      dir = dir + "-" + id
293      pd.append(dir)
294    parentDirs = pd
295    # end of unique workdirs
296
297    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
298
299    dict = { 'name' : 'datanode' }
300    dict['program'] = os.path.join('bin', 'hadoop')
301    dict['argv'] = ['datanode']
302    dict['envs'] = envs
303    dict['pkgdirs'] = sd.getPkgDirs()
304    dict['workdirs'] = workDirs
305    dict['final-attrs'] = attrs
306    dict['attrs'] = sd.getAttrs()
307 
308    cmd = CommandDesc(dict)
309    return cmd
310
Note: See TracBrowser for help on using the repository browser.