source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/python/pyAbacus/JythonAbacus.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 2.7 KB
Line 
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements.  See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership.  The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License.  You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from org.apache.hadoop.fs import Path
20from org.apache.hadoop.io import *
21from org.apache.hadoop.mapred import *
22
23from org.apache.hadoop.abacus import *
24
25from java.util import *;
26
27import sys
28
29class AbacusMapper(ValueAggregatorMapper):
30    def map(self, key, value, output, reporter):
31        ValueAggregatorMapper.map(self, key, value, output, reporter);
32
33class AbacusReducer(ValueAggregatorReducer):
34    def reduce(self, key, values, output, reporter):
35        ValueAggregatorReducer.reduce(self, key, values, output, reporter);
36
37class AbacusCombiner(ValueAggregatorCombiner):
38    def reduce(self, key, values, output, reporter):
39        ValueAggregatorCombiner.reduce(self, key, values, output, reporter);
40
41def printUsage(code):
42    print "Abacus <input> <output> <numOfReducers> <inputformat> <specfile>"
43    sys.exit(code)
44
45def main(args):
46    if len(args) < 6:
47        printUsage(1);
48
49    inDir = args[1];
50    outDir = args[2];
51    numOfReducers = int(args[3]);
52    theInputFormat = args[4];
53    specFile = args[5];
54                                       
55    print "numOfReducers: ", numOfReducers, "theInputFormat: ", theInputFormat, "specFile: ", specFile
56
57    conf = JobConf(AbacusMapper);
58    conf.setJobName("recordcount");
59    conf.addDefaultResource(Path(specFile));
60 
61    if theInputFormat=="textinputformat":
62        conf.setInputFormat(TextInputFormat);
63    else:
64        conf.setInputFormat(SequenceFileInputFormat);
65    conf.setOutputFormat(TextOutputFormat);
66    conf.setMapOutputKeyClass(Text);
67    conf.setMapOutputValueClass(Text);
68    conf.setOutputKeyClass(Text);
69    conf.setOutputValueClass(Text);
70    conf.setNumMapTasks(1);
71    conf.setNumReduceTasks(numOfReducers);
72
73    conf.setMapperClass(AbacusMapper);       
74    conf.setCombinerClass(AbacusCombiner);
75    conf.setReducerClass(AbacusReducer);
76    conf.setInputPath(Path(args[1]))
77    conf.setOutputPath(Path(args[2]))
78
79    JobClient.runJob(conf);
80
81if __name__ == "__main__":
82    main(sys.argv)
Note: See TracBrowser for help on using the repository browser.