/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.util.Progressable; /** * The context passed to the {@link Reducer}. * @param the class of the input keys * @param the class of the input values * @param the class of the output keys * @param the class of the output values */ public class ReduceContext extends TaskInputOutputContext { private RawKeyValueIterator input; private Counter inputCounter; private RawComparator comparator; private KEYIN key; // current key private VALUEIN value; // current value private boolean firstValue = false; // first value in key private boolean nextKeyIsSame = false; // more w/ this key private boolean hasMore; // more in file protected Progressable reporter; private Deserializer keyDeserializer; private Deserializer valueDeserializer; private DataInputBuffer buffer = new DataInputBuffer(); private BytesWritable currentRawKey = new BytesWritable(); private ValueIterable iterable = new ValueIterable(); public ReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass ) throws InterruptedException, IOException{ super(conf, taskid, output, committer, reporter); this.input = input; this.inputCounter = inputCounter; this.comparator = comparator; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(buffer); this.valueDeserializer = serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(buffer); hasMore = input.next(); } /** Start processing next unique key. */ public boolean nextKey() throws IOException,InterruptedException { while (hasMore && nextKeyIsSame) { nextKeyValue(); } if (hasMore) { return nextKeyValue(); } else { return false; } } /** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!hasMore) { key = null; value = null; return false; } firstValue = !nextKeyIsSame; DataInputBuffer next = input.getKey(); currentRawKey.set(next.getData(), next.getPosition(), next.getLength() - next.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); key = keyDeserializer.deserialize(key); next = input.getValue(); buffer.reset(next.getData(), next.getPosition(), next.getLength()); value = valueDeserializer.deserialize(value); hasMore = input.next(); inputCounter.increment(1); if (hasMore) { next = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), next.getData(), next.getPosition(), next.getLength() - next.getPosition() ) == 0; } else { nextKeyIsSame = false; } return true; } public KEYIN getCurrentKey() { return key; } @Override public VALUEIN getCurrentValue() { return value; } protected class ValueIterator implements Iterator { @Override public boolean hasNext() { return firstValue || nextKeyIsSame; } @Override public VALUEIN next() { // if this is the first record, we don't need to advance if (firstValue) { firstValue = false; return value; } // if this isn't the first record and the next key is different, they // can't advance it here. if (!nextKeyIsSame) { throw new NoSuchElementException("iterate past last value"); } // otherwise, go to the next key/value pair try { nextKeyValue(); return value; } catch (IOException ie) { throw new RuntimeException("next value iterator failed", ie); } catch (InterruptedException ie) { // this is bad, but we can't modify the exception list of java.util throw new RuntimeException("next value iterator interrupted", ie); } } @Override public void remove() { throw new UnsupportedOperationException("remove not implemented"); } } protected class ValueIterable implements Iterable { private ValueIterator iterator = new ValueIterator(); @Override public Iterator iterator() { return iterator; } } /** * Iterate through the values for the current key, reusing the same value * object, which is stored in the context. * @return the series of values associated with the current key. All of the * objects returned directly and indirectly from this method are reused. */ public Iterable getValues() throws IOException, InterruptedException { return iterable; } }