[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.mapreduce; |
---|
| 20 | |
---|
| 21 | import java.io.IOException; |
---|
| 22 | import java.util.Iterator; |
---|
| 23 | import java.util.NoSuchElementException; |
---|
| 24 | |
---|
| 25 | import org.apache.hadoop.conf.Configuration; |
---|
| 26 | import org.apache.hadoop.io.BytesWritable; |
---|
| 27 | import org.apache.hadoop.io.DataInputBuffer; |
---|
| 28 | import org.apache.hadoop.io.RawComparator; |
---|
| 29 | import org.apache.hadoop.io.serializer.Deserializer; |
---|
| 30 | import org.apache.hadoop.io.serializer.SerializationFactory; |
---|
| 31 | import org.apache.hadoop.mapred.RawKeyValueIterator; |
---|
| 32 | import org.apache.hadoop.util.Progressable; |
---|
| 33 | |
---|
| 34 | /** |
---|
| 35 | * The context passed to the {@link Reducer}. |
---|
| 36 | * @param <KEYIN> the class of the input keys |
---|
| 37 | * @param <VALUEIN> the class of the input values |
---|
| 38 | * @param <KEYOUT> the class of the output keys |
---|
| 39 | * @param <VALUEOUT> the class of the output values |
---|
| 40 | */ |
---|
| 41 | public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> |
---|
| 42 | extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { |
---|
| 43 | private RawKeyValueIterator input; |
---|
| 44 | private Counter inputCounter; |
---|
| 45 | private RawComparator<KEYIN> comparator; |
---|
| 46 | private KEYIN key; // current key |
---|
| 47 | private VALUEIN value; // current value |
---|
| 48 | private boolean firstValue = false; // first value in key |
---|
| 49 | private boolean nextKeyIsSame = false; // more w/ this key |
---|
| 50 | private boolean hasMore; // more in file |
---|
| 51 | protected Progressable reporter; |
---|
| 52 | private Deserializer<KEYIN> keyDeserializer; |
---|
| 53 | private Deserializer<VALUEIN> valueDeserializer; |
---|
| 54 | private DataInputBuffer buffer = new DataInputBuffer(); |
---|
| 55 | private BytesWritable currentRawKey = new BytesWritable(); |
---|
| 56 | private ValueIterable iterable = new ValueIterable(); |
---|
| 57 | |
---|
| 58 | public ReduceContext(Configuration conf, TaskAttemptID taskid, |
---|
| 59 | RawKeyValueIterator input, |
---|
| 60 | Counter inputCounter, |
---|
| 61 | RecordWriter<KEYOUT,VALUEOUT> output, |
---|
| 62 | OutputCommitter committer, |
---|
| 63 | StatusReporter reporter, |
---|
| 64 | RawComparator<KEYIN> comparator, |
---|
| 65 | Class<KEYIN> keyClass, |
---|
| 66 | Class<VALUEIN> valueClass |
---|
| 67 | ) throws InterruptedException, IOException{ |
---|
| 68 | super(conf, taskid, output, committer, reporter); |
---|
| 69 | this.input = input; |
---|
| 70 | this.inputCounter = inputCounter; |
---|
| 71 | this.comparator = comparator; |
---|
| 72 | SerializationFactory serializationFactory = new SerializationFactory(conf); |
---|
| 73 | this.keyDeserializer = serializationFactory.getDeserializer(keyClass); |
---|
| 74 | this.keyDeserializer.open(buffer); |
---|
| 75 | this.valueDeserializer = serializationFactory.getDeserializer(valueClass); |
---|
| 76 | this.valueDeserializer.open(buffer); |
---|
| 77 | hasMore = input.next(); |
---|
| 78 | } |
---|
| 79 | |
---|
| 80 | /** Start processing next unique key. */ |
---|
| 81 | public boolean nextKey() throws IOException,InterruptedException { |
---|
| 82 | while (hasMore && nextKeyIsSame) { |
---|
| 83 | nextKeyValue(); |
---|
| 84 | } |
---|
| 85 | if (hasMore) { |
---|
| 86 | return nextKeyValue(); |
---|
| 87 | } else { |
---|
| 88 | return false; |
---|
| 89 | } |
---|
| 90 | } |
---|
| 91 | |
---|
| 92 | /** |
---|
| 93 | * Advance to the next key/value pair. |
---|
| 94 | */ |
---|
| 95 | @Override |
---|
| 96 | public boolean nextKeyValue() throws IOException, InterruptedException { |
---|
| 97 | if (!hasMore) { |
---|
| 98 | key = null; |
---|
| 99 | value = null; |
---|
| 100 | return false; |
---|
| 101 | } |
---|
| 102 | firstValue = !nextKeyIsSame; |
---|
| 103 | DataInputBuffer next = input.getKey(); |
---|
| 104 | currentRawKey.set(next.getData(), next.getPosition(), |
---|
| 105 | next.getLength() - next.getPosition()); |
---|
| 106 | buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); |
---|
| 107 | key = keyDeserializer.deserialize(key); |
---|
| 108 | next = input.getValue(); |
---|
| 109 | buffer.reset(next.getData(), next.getPosition(), next.getLength()); |
---|
| 110 | value = valueDeserializer.deserialize(value); |
---|
| 111 | hasMore = input.next(); |
---|
| 112 | inputCounter.increment(1); |
---|
| 113 | if (hasMore) { |
---|
| 114 | next = input.getKey(); |
---|
| 115 | nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, |
---|
| 116 | currentRawKey.getLength(), |
---|
| 117 | next.getData(), |
---|
| 118 | next.getPosition(), |
---|
| 119 | next.getLength() - next.getPosition() |
---|
| 120 | ) == 0; |
---|
| 121 | } else { |
---|
| 122 | nextKeyIsSame = false; |
---|
| 123 | } |
---|
| 124 | return true; |
---|
| 125 | } |
---|
| 126 | |
---|
| 127 | public KEYIN getCurrentKey() { |
---|
| 128 | return key; |
---|
| 129 | } |
---|
| 130 | |
---|
| 131 | @Override |
---|
| 132 | public VALUEIN getCurrentValue() { |
---|
| 133 | return value; |
---|
| 134 | } |
---|
| 135 | |
---|
| 136 | protected class ValueIterator implements Iterator<VALUEIN> { |
---|
| 137 | |
---|
| 138 | @Override |
---|
| 139 | public boolean hasNext() { |
---|
| 140 | return firstValue || nextKeyIsSame; |
---|
| 141 | } |
---|
| 142 | |
---|
| 143 | @Override |
---|
| 144 | public VALUEIN next() { |
---|
| 145 | // if this is the first record, we don't need to advance |
---|
| 146 | if (firstValue) { |
---|
| 147 | firstValue = false; |
---|
| 148 | return value; |
---|
| 149 | } |
---|
| 150 | // if this isn't the first record and the next key is different, they |
---|
| 151 | // can't advance it here. |
---|
| 152 | if (!nextKeyIsSame) { |
---|
| 153 | throw new NoSuchElementException("iterate past last value"); |
---|
| 154 | } |
---|
| 155 | // otherwise, go to the next key/value pair |
---|
| 156 | try { |
---|
| 157 | nextKeyValue(); |
---|
| 158 | return value; |
---|
| 159 | } catch (IOException ie) { |
---|
| 160 | throw new RuntimeException("next value iterator failed", ie); |
---|
| 161 | } catch (InterruptedException ie) { |
---|
| 162 | // this is bad, but we can't modify the exception list of java.util |
---|
| 163 | throw new RuntimeException("next value iterator interrupted", ie); |
---|
| 164 | } |
---|
| 165 | } |
---|
| 166 | |
---|
| 167 | @Override |
---|
| 168 | public void remove() { |
---|
| 169 | throw new UnsupportedOperationException("remove not implemented"); |
---|
| 170 | } |
---|
| 171 | |
---|
| 172 | } |
---|
| 173 | |
---|
| 174 | protected class ValueIterable implements Iterable<VALUEIN> { |
---|
| 175 | private ValueIterator iterator = new ValueIterator(); |
---|
| 176 | @Override |
---|
| 177 | public Iterator<VALUEIN> iterator() { |
---|
| 178 | return iterator; |
---|
| 179 | } |
---|
| 180 | } |
---|
| 181 | |
---|
| 182 | /** |
---|
| 183 | * Iterate through the values for the current key, reusing the same value |
---|
| 184 | * object, which is stored in the context. |
---|
| 185 | * @return the series of values associated with the current key. All of the |
---|
| 186 | * objects returned directly and indirectly from this method are reused. |
---|
| 187 | */ |
---|
| 188 | public |
---|
| 189 | Iterable<VALUEIN> getValues() throws IOException, InterruptedException { |
---|
| 190 | return iterable; |
---|
| 191 | } |
---|
| 192 | } |
---|