source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/Reducer.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce;
20
21import java.io.IOException;
22
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.io.RawComparator;
25import org.apache.hadoop.mapred.RawKeyValueIterator;
26
27/**
28 * Reduces a set of intermediate values which share a key to a smaller set of
29 * values. 
30 *
31 * <p><code>Reducer</code> implementations
32 * can access the {@link Configuration} for the job via the
33 * {@link JobContext#getConfiguration()} method.</p>
34
35 * <p><code>Reducer</code> has 3 primary phases:</p>
36 * <ol>
37 *   <li>
38 *   
39 *   <h4 id="Shuffle">Shuffle</h4>
40 *   
41 *   <p>The <code>Reducer</code> copies the sorted output from each
42 *   {@link Mapper} using HTTP across the network.</p>
43 *   </li>
44 *   
45 *   <li>
46 *   <h4 id="Sort">Sort</h4>
47 *   
48 *   <p>The framework merge sorts <code>Reducer</code> inputs by
49 *   <code>key</code>s
50 *   (since different <code>Mapper</code>s may have output the same key).</p>
51 *   
52 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
53 *   being fetched they are merged.</p>
54 *     
55 *   <h5 id="SecondarySort">SecondarySort</h5>
56 *   
57 *   <p>To achieve a secondary sort on the values returned by the value
58 *   iterator, the application should extend the key with the secondary
59 *   key and define a grouping comparator. The keys will be sorted using the
60 *   entire key, but will be grouped using the grouping comparator to decide
61 *   which keys and values are sent in the same call to reduce.The grouping
62 *   comparator is specified via
63 *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
64 *   controlled by
65 *   {@link Job#setSortComparatorClass(Class)}.</p>
66 *   
67 *   
68 *   For example, say that you want to find duplicate web pages and tag them
69 *   all with the url of the "best" known example. You would set up the job
70 *   like:
71 *   <ul>
72 *     <li>Map Input Key: url</li>
73 *     <li>Map Input Value: document</li>
74 *     <li>Map Output Key: document checksum, url pagerank</li>
75 *     <li>Map Output Value: url</li>
76 *     <li>Partitioner: by checksum</li>
77 *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
78 *     <li>OutputValueGroupingComparator: by checksum</li>
79 *   </ul>
80 *   </li>
81 *   
82 *   <li>   
83 *   <h4 id="Reduce">Reduce</h4>
84 *   
85 *   <p>In this phase the
86 *   {@link #reduce(Object, Iterable, Context)}
87 *   method is called for each <code>&lt;key, (collection of values)></code> in
88 *   the sorted inputs.</p>
89 *   <p>The output of the reduce task is typically written to a
90 *   {@link RecordWriter} via
91 *   {@link Context#write(Object, Object)}.</p>
92 *   </li>
93 * </ol>
94 *
95 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
96 *
97 * <p>Example:</p>
98 * <p><blockquote><pre>
99 * public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
100 *                                                 Key,IntWritable> {
101 *   private IntWritable result = new IntWritable();
102 *
103 *   public void reduce(Key key, Iterable<IntWritable> values,
104 *                      Context context) throws IOException {
105 *     int sum = 0;
106 *     for (IntWritable val : values) {
107 *       sum += val.get();
108 *     }
109 *     result.set(sum);
110 *     context.collect(key, result);
111 *   }
112 * }
113 * </pre></blockquote></p>
114 *
115 * @see Mapper
116 * @see Partitioner
117 */
118public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
119
120  public class Context 
121    extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
122    public Context(Configuration conf, TaskAttemptID taskid,
123                   RawKeyValueIterator input, 
124                   Counter inputCounter,
125                   RecordWriter<KEYOUT,VALUEOUT> output,
126                   OutputCommitter committer,
127                   StatusReporter reporter,
128                   RawComparator<KEYIN> comparator,
129                   Class<KEYIN> keyClass,
130                   Class<VALUEIN> valueClass
131                   ) throws IOException, InterruptedException {
132      super(conf, taskid, input, inputCounter, output, committer, reporter, 
133            comparator, keyClass, valueClass);
134    }
135  }
136
137  /**
138   * Called once at the start of the task.
139   */
140  protected void setup(Context context
141                       ) throws IOException, InterruptedException {
142    // NOTHING
143  }
144
145  /**
146   * This method is called once for each key. Most applications will define
147   * their reduce class by overriding this method. The default implementation
148   * is an identity function.
149   */
150  @SuppressWarnings("unchecked")
151  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
152                        ) throws IOException, InterruptedException {
153    for(VALUEIN value: values) {
154      context.write((KEYOUT) key, (VALUEOUT) value);
155    }
156  }
157
158  /**
159   * Called once at the end of the task.
160   */
161  protected void cleanup(Context context
162                         ) throws IOException, InterruptedException {
163    // NOTHING
164  }
165
166  /**
167   * Advanced application writers can use the
168   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
169   * control how the reduce task works.
170   */
171  public void run(Context context) throws IOException, InterruptedException {
172    setup(context);
173    while (context.nextKey()) {
174      reduce(context.getCurrentKey(), context.getValues(), context);
175    }
176    cleanup(context);
177  }
178}
Note: See TracBrowser for help on using the repository browser.