source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/pipes/impl/wordcount-nopipe.cc @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18#include "hadoop/Pipes.hh"
19#include "hadoop/TemplateFactory.hh"
20#include "hadoop/StringUtils.hh"
21#include "hadoop/SerialUtils.hh"
22
23#include <stdio.h>
24#include <sys/types.h>
25#include <sys/stat.h>
26
27const std::string WORDCOUNT = "WORDCOUNT";
28const std::string INPUT_WORDS = "INPUT_WORDS";
29const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
30
31class WordCountMap: public HadoopPipes::Mapper {
32public:
33  HadoopPipes::TaskContext::Counter* inputWords;
34 
35  WordCountMap(HadoopPipes::TaskContext& context) {
36    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
37  }
38 
39  void map(HadoopPipes::MapContext& context) {
40    std::vector<std::string> words = 
41      HadoopUtils::splitString(context.getInputValue(), " ");
42    for(unsigned int i=0; i < words.size(); ++i) {
43      context.emit(words[i], "1");
44    }
45    context.incrementCounter(inputWords, words.size());
46  }
47};
48
49class WordCountReduce: public HadoopPipes::Reducer {
50public:
51  HadoopPipes::TaskContext::Counter* outputWords;
52
53  WordCountReduce(HadoopPipes::TaskContext& context) {
54    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
55  }
56
57  void reduce(HadoopPipes::ReduceContext& context) {
58    int sum = 0;
59    while (context.nextValue()) {
60      sum += HadoopUtils::toInt(context.getInputValue());
61    }
62    context.emit(context.getInputKey(), HadoopUtils::toString(sum));
63    context.incrementCounter(outputWords, 1); 
64  }
65};
66
67class WordCountReader: public HadoopPipes::RecordReader {
68private:
69  int64_t bytesTotal;
70  int64_t bytesRead;
71  FILE* file;
72public:
73  WordCountReader(HadoopPipes::MapContext& context) {
74    std::string filename;
75    HadoopUtils::StringInStream stream(context.getInputSplit());
76    HadoopUtils::deserializeString(filename, stream);
77    struct stat statResult;
78    stat(filename.c_str(), &statResult);
79    bytesTotal = statResult.st_size;
80    bytesRead = 0;
81    file = fopen(filename.c_str(), "rt");
82    HADOOP_ASSERT(file != NULL, "failed to open " + filename);
83  }
84
85  ~WordCountReader() {
86    fclose(file);
87  }
88
89  virtual bool next(std::string& key, std::string& value) {
90    key = HadoopUtils::toString(ftell(file));
91    int ch = getc(file);
92    bytesRead += 1;
93    value.clear();
94    while (ch != -1 && ch != '\n') {
95      value += ch;
96      ch = getc(file);
97      bytesRead += 1;
98    }
99    return ch != -1;
100  }
101
102  /**
103   * The progress of the record reader through the split as a value between
104   * 0.0 and 1.0.
105   */
106  virtual float getProgress() {
107    if (bytesTotal > 0) {
108      return (float)bytesRead / bytesTotal;
109    } else {
110      return 1.0f;
111    }
112  }
113};
114
115class WordCountWriter: public HadoopPipes::RecordWriter {
116private:
117  FILE* file;
118public:
119  WordCountWriter(HadoopPipes::ReduceContext& context) {
120    const HadoopPipes::JobConf* job = context.getJobConf();
121    int part = job->getInt("mapred.task.partition");
122    std::string outDir = job->get("mapred.work.output.dir");
123    // remove the file: schema substring
124    std::string::size_type posn = outDir.find(":");
125    HADOOP_ASSERT(posn != std::string::npos, 
126                  "no schema found in output dir: " + outDir);
127    outDir.erase(0, posn+1);
128    mkdir(outDir.c_str(), 0777);
129    std::string outFile = outDir + "/part-" + HadoopUtils::toString(part);
130    file = fopen(outFile.c_str(), "wt");
131    HADOOP_ASSERT(file != NULL, "can't open file for writing: " + outFile);
132  }
133
134  ~WordCountWriter() {
135    fclose(file);
136  }
137
138  void emit(const std::string& key, const std::string& value) {
139    fprintf(file, "%s -> %s\n", key.c_str(), value.c_str());
140  }
141};
142
143int main(int argc, char *argv[]) {
144  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, 
145                              WordCountReduce, void, void, WordCountReader,
146                              WordCountWriter>());
147}
148
Note: See TracBrowser for help on using the repository browser.