source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/python/WordCount.py @ 141

Last change on this file since 141 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 2.2 KB
Line 
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements.  See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership.  The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License.  You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from org.apache.hadoop.fs import Path
20from org.apache.hadoop.io import *
21from org.apache.hadoop.mapred import *
22
23import sys
24import getopt
25
26class WordCountMap(Mapper, MapReduceBase):
27    one = IntWritable(1)
28    def map(self, key, value, output, reporter):
29        for w in value.toString().split():
30            output.collect(Text(w), self.one)
31
32class Summer(Reducer, MapReduceBase):
33    def reduce(self, key, values, output, reporter):
34        sum = 0
35        while values.hasNext():
36            sum += values.next().get()
37        output.collect(key, IntWritable(sum))
38
39def printUsage(code):
40    print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"
41    sys.exit(code)
42
43def main(args):
44    conf = JobConf(WordCountMap);
45    conf.setJobName("wordcount");
46 
47    conf.setOutputKeyClass(Text);
48    conf.setOutputValueClass(IntWritable);
49   
50    conf.setMapperClass(WordCountMap);       
51    conf.setCombinerClass(Summer);
52    conf.setReducerClass(Summer);
53    try:
54        flags, other_args = getopt.getopt(args[1:], "m:r:")
55    except getopt.GetoptError:
56        printUsage(1)
57    if len(other_args) != 2:
58        printUsage(1)
59   
60    for f,v in flags:
61        if f == "-m":
62            conf.setNumMapTasks(int(v))
63        elif f == "-r":
64            conf.setNumReduceTasks(int(v))
65    conf.setInputPath(Path(other_args[0]))
66    conf.setOutputPath(Path(other_args[1]))
67    JobClient.runJob(conf);
68
69if __name__ == "__main__":
70    main(sys.argv)
Note: See TracBrowser for help on using the repository browser.