source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/pipes/impl/wordcount-part.cc @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 2.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "hadoop/Pipes.hh"
20#include "hadoop/TemplateFactory.hh"
21#include "hadoop/StringUtils.hh"
22
23const std::string WORDCOUNT = "WORDCOUNT";
24const std::string INPUT_WORDS = "INPUT_WORDS";
25const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
26
27class WordCountMap: public HadoopPipes::Mapper {
28public:
29  HadoopPipes::TaskContext::Counter* inputWords;
30 
31  WordCountMap(HadoopPipes::TaskContext& context) {
32    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
33  }
34 
35  void map(HadoopPipes::MapContext& context) {
36    std::vector<std::string> words = 
37      HadoopUtils::splitString(context.getInputValue(), " ");
38    for(unsigned int i=0; i < words.size(); ++i) {
39      context.emit(words[i], "1");
40    }
41    context.incrementCounter(inputWords, words.size());
42  }
43};
44
45class WordCountReduce: public HadoopPipes::Reducer {
46public:
47  HadoopPipes::TaskContext::Counter* outputWords;
48
49  WordCountReduce(HadoopPipes::TaskContext& context) {
50    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
51  }
52
53  void reduce(HadoopPipes::ReduceContext& context) {
54    int sum = 0;
55    while (context.nextValue()) {
56      sum += HadoopUtils::toInt(context.getInputValue());
57    }
58    context.emit(context.getInputKey(), HadoopUtils::toString(sum));
59    context.incrementCounter(outputWords, 1); 
60  }
61};
62
63class WordCountPartitioner: public HadoopPipes::Partitioner {
64public:
65  WordCountPartitioner(HadoopPipes::TaskContext& context){}
66  virtual int partition(const std::string& key, int numOfReduces) {
67    return 0;
68  }
69};
70
71int main(int argc, char *argv[]) {
72  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, 
73                              WordCountReduce,WordCountPartitioner,
74                              WordCountReduce>());
75}
76
Note: See TracBrowser for help on using the repository browser.