#Licensed to the Apache Software Foundation (ASF) under one #or more contributor license agreements. See the NOTICE file #distributed with this work for additional information #regarding copyright ownership. The ASF licenses this file #to you under the Apache License, Version 2.0 (the #"License"); you may not use this file except in compliance #with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 #Unless required by applicable law or agreed to in writing, software #distributed under the License is distributed on an "AS IS" BASIS, #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #See the License for the specific language governing permissions and #limitations under the License. """manage component descriptors""" # -*- python -*- import random from sets import Set from pprint import pformat from hodlib.Common.util import local_fqdn from hodlib.Common.tcp import tcpSocket, tcpError class Schema: """the primary class for describing schema's """ STRING, LIST, MAP = range(3) def __init__(self, name, type = STRING, delim=','): self.name = name self.type = type self.delim = delim def getName(self): return self.name def getType(self): return self.type def getDelim(self): return self.delim class _Merger: """A class to merge lists and add key/value pairs to a dictionary""" def mergeList(x, y, uniq=True): l = [] l.extend(x) l.extend(y) if not uniq: return l s = Set(l) l = list(s) return l mergeList = staticmethod(mergeList) def mergeMap(to, add): for k in add: to.setdefault(k, add[k]) return to mergeMap = staticmethod(mergeMap) class NodePoolDesc: """a schema for describing Nodepools""" def __init__(self, dict): self.dict = dict.copy() self.dict.setdefault('attrs', {}) self._checkRequired() if 'options' in dict: self.dict['attrs'] = dict['options'] def _checkRequired(self): if not 'id' in self.dict: raise ValueError, "nodepool needs 'id'" if self.getPkgDir() == None: raise ValueError, "nodepool %s needs 'pkgs'" % (self.getName()) def getName(self): return self.dict['id'] def getPkgDir(self): return self.dict['batch-home'] def getAttrs(self): return self.dict['attrs'] def getSchema(): schema = {} s = Schema('id') schema[s.getName()] = s s = Schema('batch-home', Schema.LIST, ':') schema[s.getName()] = s s = Schema('attrs', Schema.MAP) schema[s.getName()] = s return schema getSchema = staticmethod(getSchema) class ServiceDesc: """A schema for describing services""" def __init__(self, dict): self.dict = dict.copy() self.dict.setdefault('external', False) self.dict.setdefault('attrs', {}) self.dict.setdefault('envs', {}) self.dict.setdefault('host',None) self.dict.setdefault('port',None) self.dict.setdefault('tar', None) self.dict.setdefault('pkgs', '') self.dict.setdefault('final-attrs', {}) self._checkRequired() if self.dict.has_key('hadoop-tar-ball'): self.dict['tar'] = self.dict['hadoop-tar-ball'] def _checkRequired(self): if not 'id' in self.dict: raise ValueError, "service description needs 'id'" # if len(self.getPkgDirs()) <= 0: # raise ValueError, "service description %s needs 'pkgs'" % (self.getName()) def getName(self): return self.dict['id'] def isExternal(self): """True if the service is outside hod. e.g. connect to existing HDFS""" return self.dict['external'] def getPkgDirs(self): return self.dict['pkgs'] def getTar(self): return self.dict['tar'] def getAttrs(self): return self.dict['attrs'] def getfinalAttrs(self): return self.dict['final-attrs'] def getEnvs(self): return self.dict['envs'] def getSchema(): schema = {} s = Schema('id') schema[s.getName()] = s s = Schema('external') schema[s.getName()] = s s = Schema('pkgs', Schema.LIST, ':') schema[s.getName()] = s s = Schema('tar', Schema.LIST, ":") schema[s.getName()] = s s = Schema('attrs', Schema.MAP) schema[s.getName()] = s s = Schema('final-attrs', Schema.MAP) schema[s.getName()] = s s = Schema('envs', Schema.MAP) schema[s.getName()] = s return schema getSchema = staticmethod(getSchema) class CommandDesc: def __init__(self, dict): """a class for how a command is described""" self.dict = dict def __repr__(self): return pformat(self.dict) def _getName(self): """return the name of the command to be run""" return self.dict['name'] def _getProgram(self): """return where the program is """ return self.dict['program'] def _getArgv(self): """return the arguments for the command to be run""" return self.dict['argv'] def _getEnvs(self): """return the environment in which the command is to be run""" return self.dict['envs'] def _getPkgDirs(self): """return the packages for this command""" return self.dict['pkgdirs'] def _getWorkDirs(self): """return the working directories for this command""" return self.dict['workdirs'] def _getAttrs(self): """return the list of attributes for this command""" return self.dict['attrs'] def _getfinalAttrs(self): """return the final xml params list for this command""" return self.dict['final-attrs'] def _getForeground(self): """return if the command is to be run in foreground or not""" return self.dict['fg'] def _getStdin(self): return self.dict['stdin'] def toString(cmdDesc): """return a string representation of this command""" row = [] row.append('name=%s' % (cmdDesc._getName())) row.append('program=%s' % (cmdDesc._getProgram())) row.append('pkgdirs=%s' % CommandDesc._csv(cmdDesc._getPkgDirs(), ':')) if 'argv' in cmdDesc.dict: row.append('argv=%s' % CommandDesc._csv(cmdDesc._getArgv())) if 'envs' in cmdDesc.dict: envs = cmdDesc._getEnvs() list = [] for k in envs: v = envs[k] list.append('%s=%s' % (k, v)) row.append('envs=%s' % CommandDesc._csv(list)) if 'workdirs' in cmdDesc.dict: row.append('workdirs=%s' % CommandDesc._csv(cmdDesc._getWorkDirs(), ':')) if 'attrs' in cmdDesc.dict: attrs = cmdDesc._getAttrs() list = [] for k in attrs: v = attrs[k] list.append('%s=%s' % (k, v)) row.append('attrs=%s' % CommandDesc._csv(list)) if 'final-attrs' in cmdDesc.dict: fattrs = cmdDesc._getAttrs() list = [] for k in fattrs: v = fattrs[k] list.append('%s=%s' % (k, v)) row.append('final-attrs=%s' % CommandDesc._cvs(list)) if 'fg' in cmdDesc.dict: row.append('fg=%s' % (cmdDesc._getForeground())) if 'stdin' in cmdDesc.dict: row.append('stdin=%s' % (cmdDesc._getStdin())) return CommandDesc._csv(row) toString = staticmethod(toString) def _csv(row, delim=','): """return a string in csv format""" import cStringIO import csv queue = cStringIO.StringIO() writer = csv.writer(queue, delimiter=delim, escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False, lineterminator='\n') writer.writerow(row) return queue.getvalue().rstrip('\n') _csv = staticmethod(_csv)