source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce;
20
21import java.io.IOException;
22import java.util.Iterator;
23import java.util.NoSuchElementException;
24
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.io.BytesWritable;
27import org.apache.hadoop.io.DataInputBuffer;
28import org.apache.hadoop.io.RawComparator;
29import org.apache.hadoop.io.serializer.Deserializer;
30import org.apache.hadoop.io.serializer.SerializationFactory;
31import org.apache.hadoop.mapred.RawKeyValueIterator;
32import org.apache.hadoop.util.Progressable;
33
34/**
35 * The context passed to the {@link Reducer}.
36 * @param <KEYIN> the class of the input keys
37 * @param <VALUEIN> the class of the input values
38 * @param <KEYOUT> the class of the output keys
39 * @param <VALUEOUT> the class of the output values
40 */
41public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
42    extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
43  private RawKeyValueIterator input;
44  private Counter inputCounter;
45  private RawComparator<KEYIN> comparator;
46  private KEYIN key;                                  // current key
47  private VALUEIN value;                              // current value
48  private boolean firstValue = false;                 // first value in key
49  private boolean nextKeyIsSame = false;              // more w/ this key
50  private boolean hasMore;                            // more in file
51  protected Progressable reporter;
52  private Deserializer<KEYIN> keyDeserializer;
53  private Deserializer<VALUEIN> valueDeserializer;
54  private DataInputBuffer buffer = new DataInputBuffer();
55  private BytesWritable currentRawKey = new BytesWritable();
56  private ValueIterable iterable = new ValueIterable();
57
58  public ReduceContext(Configuration conf, TaskAttemptID taskid,
59                       RawKeyValueIterator input, 
60                       Counter inputCounter,
61                       RecordWriter<KEYOUT,VALUEOUT> output,
62                       OutputCommitter committer,
63                       StatusReporter reporter,
64                       RawComparator<KEYIN> comparator,
65                       Class<KEYIN> keyClass,
66                       Class<VALUEIN> valueClass
67                       ) throws InterruptedException, IOException{
68    super(conf, taskid, output, committer, reporter);
69    this.input = input;
70    this.inputCounter = inputCounter;
71    this.comparator = comparator;
72    SerializationFactory serializationFactory = new SerializationFactory(conf);
73    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
74    this.keyDeserializer.open(buffer);
75    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
76    this.valueDeserializer.open(buffer);
77    hasMore = input.next();
78  }
79
80  /** Start processing next unique key. */
81  public boolean nextKey() throws IOException,InterruptedException {
82    while (hasMore && nextKeyIsSame) {
83      nextKeyValue();
84    }
85    if (hasMore) {
86      return nextKeyValue();
87    } else {
88      return false;
89    }
90  }
91
92  /**
93   * Advance to the next key/value pair.
94   */
95  @Override
96  public boolean nextKeyValue() throws IOException, InterruptedException {
97    if (!hasMore) {
98      key = null;
99      value = null;
100      return false;
101    }
102    firstValue = !nextKeyIsSame;
103    DataInputBuffer next = input.getKey();
104    currentRawKey.set(next.getData(), next.getPosition(), 
105                      next.getLength() - next.getPosition());
106    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
107    key = keyDeserializer.deserialize(key);
108    next = input.getValue();
109    buffer.reset(next.getData(), next.getPosition(), next.getLength());
110    value = valueDeserializer.deserialize(value);
111    hasMore = input.next();
112    inputCounter.increment(1);
113    if (hasMore) {
114      next = input.getKey();
115      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
116                                         currentRawKey.getLength(),
117                                         next.getData(),
118                                         next.getPosition(),
119                                         next.getLength() - next.getPosition()
120                                         ) == 0;
121    } else {
122      nextKeyIsSame = false;
123    }
124    return true;
125  }
126
127  public KEYIN getCurrentKey() {
128    return key;
129  }
130
131  @Override
132  public VALUEIN getCurrentValue() {
133    return value;
134  }
135
136  protected class ValueIterator implements Iterator<VALUEIN> {
137
138    @Override
139    public boolean hasNext() {
140      return firstValue || nextKeyIsSame;
141    }
142
143    @Override
144    public VALUEIN next() {
145      // if this is the first record, we don't need to advance
146      if (firstValue) {
147        firstValue = false;
148        return value;
149      }
150      // if this isn't the first record and the next key is different, they
151      // can't advance it here.
152      if (!nextKeyIsSame) {
153        throw new NoSuchElementException("iterate past last value");
154      }
155      // otherwise, go to the next key/value pair
156      try {
157        nextKeyValue();
158        return value;
159      } catch (IOException ie) {
160        throw new RuntimeException("next value iterator failed", ie);
161      } catch (InterruptedException ie) {
162        // this is bad, but we can't modify the exception list of java.util
163        throw new RuntimeException("next value iterator interrupted", ie);       
164      }
165    }
166
167    @Override
168    public void remove() {
169      throw new UnsupportedOperationException("remove not implemented");
170    }
171   
172  }
173
174  protected class ValueIterable implements Iterable<VALUEIN> {
175    private ValueIterator iterator = new ValueIterator();
176    @Override
177    public Iterator<VALUEIN> iterator() {
178      return iterator;
179    } 
180  }
181 
182  /**
183   * Iterate through the values for the current key, reusing the same value
184   * object, which is stored in the context.
185   * @return the series of values associated with the current key. All of the
186   * objects returned directly and indirectly from this method are reused.
187   */
188  public 
189  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
190    return iterable;
191  }
192}
Note: See TracBrowser for help on using the repository browser.