source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/pipes/impl/sort.cc @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 3.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "hadoop/Pipes.hh"
20#include "hadoop/TemplateFactory.hh"
21
22class SortMap: public HadoopPipes::Mapper {
23private:
24  /* the fraction 0.0 to 1.0 of records to keep */
25  float keepFraction;
26  /* the number of records kept so far */
27  long long keptRecords;
28  /* the total number of records */
29  long long totalRecords;
30  static const std::string MAP_KEEP_PERCENT;
31public:
32  /*
33   * Look in the config to find the fraction of records to keep.
34   */
35  SortMap(HadoopPipes::TaskContext& context){
36    const HadoopPipes::JobConf* conf = context.getJobConf();
37    if (conf->hasKey(MAP_KEEP_PERCENT)) {
38      keepFraction = conf->getFloat(MAP_KEEP_PERCENT) / 100.0;
39    } else {
40      keepFraction = 1.0;
41    }
42    keptRecords = 0;
43    totalRecords = 0;
44  }
45
46  void map(HadoopPipes::MapContext& context) {
47    totalRecords += 1;
48    while ((float) keptRecords / totalRecords < keepFraction) {
49      keptRecords += 1;
50      context.emit(context.getInputKey(), context.getInputValue());
51    }
52  }
53};
54
55const std::string SortMap::MAP_KEEP_PERCENT("hadoop.sort.map.keep.percent");
56
57class SortReduce: public HadoopPipes::Reducer {
58private:
59  /* the fraction 0.0 to 1.0 of records to keep */
60  float keepFraction;
61  /* the number of records kept so far */
62  long long keptRecords;
63  /* the total number of records */
64  long long totalRecords;
65  static const std::string REDUCE_KEEP_PERCENT;
66public:
67  SortReduce(HadoopPipes::TaskContext& context){
68    const HadoopPipes::JobConf* conf = context.getJobConf();
69    if (conf->hasKey(REDUCE_KEEP_PERCENT)) {
70      keepFraction = conf->getFloat(REDUCE_KEEP_PERCENT) / 100.0;
71    } else {
72      keepFraction = 1.0;
73    }
74    keptRecords = 0;
75    totalRecords = 0;
76  }
77
78  void reduce(HadoopPipes::ReduceContext& context) {
79    while (context.nextValue()) {
80      totalRecords += 1;
81      while ((float) keptRecords / totalRecords < keepFraction) {
82        keptRecords += 1;
83        context.emit(context.getInputKey(), context.getInputValue());
84      }
85    }
86  }
87};
88
89const std::string
90  SortReduce::REDUCE_KEEP_PERCENT("hadoop.sort.reduce.keep.percent");
91
92int main(int argc, char *argv[]) {
93  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<SortMap,
94                                                           SortReduce>());
95}
96
Note: See TracBrowser for help on using the repository browser.