1 | #Licensed to the Apache Software Foundation (ASF) under one |
---|
2 | #or more contributor license agreements. See the NOTICE file |
---|
3 | #distributed with this work for additional information |
---|
4 | #regarding copyright ownership. The ASF licenses this file |
---|
5 | #to you under the Apache License, Version 2.0 (the |
---|
6 | #"License"); you may not use this file except in compliance |
---|
7 | #with the License. You may obtain a copy of the License at |
---|
8 | |
---|
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | |
---|
11 | #Unless required by applicable law or agreed to in writing, software |
---|
12 | #distributed under the License is distributed on an "AS IS" BASIS, |
---|
13 | #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
14 | #See the License for the specific language governing permissions and |
---|
15 | #limitations under the License. |
---|
16 | """manage component descriptors""" |
---|
17 | # -*- python -*- |
---|
18 | |
---|
19 | import random |
---|
20 | |
---|
21 | from sets import Set |
---|
22 | from pprint import pformat |
---|
23 | from hodlib.Common.util import local_fqdn |
---|
24 | from hodlib.Common.tcp import tcpSocket, tcpError |
---|
25 | |
---|
26 | class Schema: |
---|
27 | """the primary class for describing |
---|
28 | schema's """ |
---|
29 | STRING, LIST, MAP = range(3) |
---|
30 | |
---|
31 | def __init__(self, name, type = STRING, delim=','): |
---|
32 | self.name = name |
---|
33 | self.type = type |
---|
34 | self.delim = delim |
---|
35 | |
---|
36 | def getName(self): |
---|
37 | return self.name |
---|
38 | |
---|
39 | def getType(self): |
---|
40 | return self.type |
---|
41 | |
---|
42 | def getDelim(self): |
---|
43 | return self.delim |
---|
44 | |
---|
45 | class _Merger: |
---|
46 | """A class to merge lists and add key/value |
---|
47 | pairs to a dictionary""" |
---|
48 | def mergeList(x, y, uniq=True): |
---|
49 | l = [] |
---|
50 | l.extend(x) |
---|
51 | l.extend(y) |
---|
52 | if not uniq: |
---|
53 | return l |
---|
54 | |
---|
55 | s = Set(l) |
---|
56 | l = list(s) |
---|
57 | return l |
---|
58 | |
---|
59 | mergeList = staticmethod(mergeList) |
---|
60 | |
---|
61 | def mergeMap(to, add): |
---|
62 | |
---|
63 | for k in add: |
---|
64 | to.setdefault(k, add[k]) |
---|
65 | |
---|
66 | return to |
---|
67 | |
---|
68 | mergeMap = staticmethod(mergeMap) |
---|
69 | |
---|
70 | class NodePoolDesc: |
---|
71 | """a schema for describing |
---|
72 | Nodepools""" |
---|
73 | def __init__(self, dict): |
---|
74 | self.dict = dict.copy() |
---|
75 | |
---|
76 | self.dict.setdefault('attrs', {}) |
---|
77 | |
---|
78 | self._checkRequired() |
---|
79 | |
---|
80 | if 'options' in dict: self.dict['attrs'] = dict['options'] |
---|
81 | |
---|
82 | def _checkRequired(self): |
---|
83 | |
---|
84 | if not 'id' in self.dict: |
---|
85 | raise ValueError, "nodepool needs 'id'" |
---|
86 | |
---|
87 | if self.getPkgDir() == None: |
---|
88 | raise ValueError, "nodepool %s needs 'pkgs'" % (self.getName()) |
---|
89 | |
---|
90 | def getName(self): |
---|
91 | return self.dict['id'] |
---|
92 | |
---|
93 | def getPkgDir(self): |
---|
94 | return self.dict['batch-home'] |
---|
95 | |
---|
96 | def getAttrs(self): |
---|
97 | return self.dict['attrs'] |
---|
98 | |
---|
99 | def getSchema(): |
---|
100 | schema = {} |
---|
101 | |
---|
102 | s = Schema('id') |
---|
103 | schema[s.getName()] = s |
---|
104 | |
---|
105 | s = Schema('batch-home', Schema.LIST, ':') |
---|
106 | schema[s.getName()] = s |
---|
107 | |
---|
108 | s = Schema('attrs', Schema.MAP) |
---|
109 | schema[s.getName()] = s |
---|
110 | |
---|
111 | return schema |
---|
112 | |
---|
113 | getSchema = staticmethod(getSchema) |
---|
114 | |
---|
115 | class ServiceDesc: |
---|
116 | """A schema for describing services""" |
---|
117 | def __init__(self, dict): |
---|
118 | self.dict = dict.copy() |
---|
119 | |
---|
120 | self.dict.setdefault('external', False) |
---|
121 | self.dict.setdefault('attrs', {}) |
---|
122 | self.dict.setdefault('envs', {}) |
---|
123 | self.dict.setdefault('host',None) |
---|
124 | self.dict.setdefault('port',None) |
---|
125 | self.dict.setdefault('tar', None) |
---|
126 | self.dict.setdefault('pkgs', '') |
---|
127 | self.dict.setdefault('final-attrs', {}) |
---|
128 | self._checkRequired() |
---|
129 | if self.dict.has_key('hadoop-tar-ball'): |
---|
130 | self.dict['tar'] = self.dict['hadoop-tar-ball'] |
---|
131 | |
---|
132 | def _checkRequired(self): |
---|
133 | |
---|
134 | if not 'id' in self.dict: |
---|
135 | raise ValueError, "service description needs 'id'" |
---|
136 | |
---|
137 | # if len(self.getPkgDirs()) <= 0: |
---|
138 | # raise ValueError, "service description %s needs 'pkgs'" % (self.getName()) |
---|
139 | |
---|
140 | def getName(self): |
---|
141 | return self.dict['id'] |
---|
142 | |
---|
143 | def isExternal(self): |
---|
144 | """True if the service is outside hod. |
---|
145 | e.g. connect to existing HDFS""" |
---|
146 | |
---|
147 | return self.dict['external'] |
---|
148 | |
---|
149 | def getPkgDirs(self): |
---|
150 | return self.dict['pkgs'] |
---|
151 | |
---|
152 | def getTar(self): |
---|
153 | return self.dict['tar'] |
---|
154 | |
---|
155 | def getAttrs(self): |
---|
156 | return self.dict['attrs'] |
---|
157 | |
---|
158 | def getfinalAttrs(self): |
---|
159 | return self.dict['final-attrs'] |
---|
160 | |
---|
161 | def getEnvs(self): |
---|
162 | return self.dict['envs'] |
---|
163 | |
---|
164 | def getSchema(): |
---|
165 | schema = {} |
---|
166 | |
---|
167 | s = Schema('id') |
---|
168 | schema[s.getName()] = s |
---|
169 | |
---|
170 | s = Schema('external') |
---|
171 | schema[s.getName()] = s |
---|
172 | |
---|
173 | s = Schema('pkgs', Schema.LIST, ':') |
---|
174 | schema[s.getName()] = s |
---|
175 | |
---|
176 | s = Schema('tar', Schema.LIST, ":") |
---|
177 | schema[s.getName()] = s |
---|
178 | |
---|
179 | s = Schema('attrs', Schema.MAP) |
---|
180 | schema[s.getName()] = s |
---|
181 | |
---|
182 | s = Schema('final-attrs', Schema.MAP) |
---|
183 | schema[s.getName()] = s |
---|
184 | |
---|
185 | s = Schema('envs', Schema.MAP) |
---|
186 | schema[s.getName()] = s |
---|
187 | |
---|
188 | return schema |
---|
189 | |
---|
190 | getSchema = staticmethod(getSchema) |
---|
191 | |
---|
192 | class CommandDesc: |
---|
193 | |
---|
194 | def __init__(self, dict): |
---|
195 | """a class for how a command is described""" |
---|
196 | self.dict = dict |
---|
197 | |
---|
198 | def __repr__(self): |
---|
199 | return pformat(self.dict) |
---|
200 | |
---|
201 | def _getName(self): |
---|
202 | """return the name of the command to be run""" |
---|
203 | return self.dict['name'] |
---|
204 | |
---|
205 | def _getProgram(self): |
---|
206 | """return where the program is """ |
---|
207 | return self.dict['program'] |
---|
208 | |
---|
209 | def _getArgv(self): |
---|
210 | """return the arguments for the command to be run""" |
---|
211 | return self.dict['argv'] |
---|
212 | |
---|
213 | def _getEnvs(self): |
---|
214 | """return the environment in which the command is to be run""" |
---|
215 | return self.dict['envs'] |
---|
216 | |
---|
217 | def _getPkgDirs(self): |
---|
218 | """return the packages for this command""" |
---|
219 | return self.dict['pkgdirs'] |
---|
220 | |
---|
221 | def _getWorkDirs(self): |
---|
222 | """return the working directories for this command""" |
---|
223 | return self.dict['workdirs'] |
---|
224 | |
---|
225 | def _getAttrs(self): |
---|
226 | """return the list of attributes for this command""" |
---|
227 | return self.dict['attrs'] |
---|
228 | |
---|
229 | def _getfinalAttrs(self): |
---|
230 | """return the final xml params list for this command""" |
---|
231 | return self.dict['final-attrs'] |
---|
232 | |
---|
233 | def _getForeground(self): |
---|
234 | """return if the command is to be run in foreground or not""" |
---|
235 | return self.dict['fg'] |
---|
236 | |
---|
237 | def _getStdin(self): |
---|
238 | return self.dict['stdin'] |
---|
239 | |
---|
240 | def toString(cmdDesc): |
---|
241 | """return a string representation of this command""" |
---|
242 | row = [] |
---|
243 | row.append('name=%s' % (cmdDesc._getName())) |
---|
244 | row.append('program=%s' % (cmdDesc._getProgram())) |
---|
245 | row.append('pkgdirs=%s' % CommandDesc._csv(cmdDesc._getPkgDirs(), ':')) |
---|
246 | |
---|
247 | if 'argv' in cmdDesc.dict: |
---|
248 | row.append('argv=%s' % CommandDesc._csv(cmdDesc._getArgv())) |
---|
249 | |
---|
250 | if 'envs' in cmdDesc.dict: |
---|
251 | envs = cmdDesc._getEnvs() |
---|
252 | list = [] |
---|
253 | for k in envs: |
---|
254 | v = envs[k] |
---|
255 | list.append('%s=%s' % (k, v)) |
---|
256 | row.append('envs=%s' % CommandDesc._csv(list)) |
---|
257 | |
---|
258 | if 'workdirs' in cmdDesc.dict: |
---|
259 | row.append('workdirs=%s' % CommandDesc._csv(cmdDesc._getWorkDirs(), ':')) |
---|
260 | |
---|
261 | if 'attrs' in cmdDesc.dict: |
---|
262 | attrs = cmdDesc._getAttrs() |
---|
263 | list = [] |
---|
264 | for k in attrs: |
---|
265 | v = attrs[k] |
---|
266 | list.append('%s=%s' % (k, v)) |
---|
267 | row.append('attrs=%s' % CommandDesc._csv(list)) |
---|
268 | |
---|
269 | if 'final-attrs' in cmdDesc.dict: |
---|
270 | fattrs = cmdDesc._getAttrs() |
---|
271 | list = [] |
---|
272 | for k in fattrs: |
---|
273 | v = fattrs[k] |
---|
274 | list.append('%s=%s' % (k, v)) |
---|
275 | row.append('final-attrs=%s' % CommandDesc._cvs(list)) |
---|
276 | |
---|
277 | if 'fg' in cmdDesc.dict: |
---|
278 | row.append('fg=%s' % (cmdDesc._getForeground())) |
---|
279 | |
---|
280 | if 'stdin' in cmdDesc.dict: |
---|
281 | row.append('stdin=%s' % (cmdDesc._getStdin())) |
---|
282 | |
---|
283 | return CommandDesc._csv(row) |
---|
284 | |
---|
285 | toString = staticmethod(toString) |
---|
286 | |
---|
287 | def _csv(row, delim=','): |
---|
288 | """return a string in csv format""" |
---|
289 | import cStringIO |
---|
290 | import csv |
---|
291 | |
---|
292 | queue = cStringIO.StringIO() |
---|
293 | writer = csv.writer(queue, delimiter=delim, escapechar='\\', quoting=csv.QUOTE_NONE, |
---|
294 | doublequote=False, lineterminator='\n') |
---|
295 | writer.writerow(row) |
---|
296 | return queue.getvalue().rstrip('\n') |
---|
297 | |
---|
298 | _csv = staticmethod(_csv) |
---|