source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 3.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce.lib.input;
20
21import java.io.IOException;
22
23
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.Path;
27import org.apache.hadoop.io.*;
28import org.apache.hadoop.mapreduce.InputSplit;
29import org.apache.hadoop.mapreduce.RecordReader;
30import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32/** An {@link RecordReader} for {@link SequenceFile}s. */
33public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
34  private SequenceFile.Reader in;
35  private long start;
36  private long end;
37  private boolean more = true;
38  private K key = null;
39  private V value = null;
40  protected Configuration conf;
41 
42  @Override
43  public void initialize(InputSplit split, 
44                         TaskAttemptContext context
45                         ) throws IOException, InterruptedException {
46    FileSplit fileSplit = (FileSplit) split;
47    conf = context.getConfiguration();   
48    Path path = fileSplit.getPath();
49    FileSystem fs = path.getFileSystem(conf);
50    this.in = new SequenceFile.Reader(fs, path, conf);
51    this.end = fileSplit.getStart() + fileSplit.getLength();
52
53    if (fileSplit.getStart() > in.getPosition()) {
54      in.sync(fileSplit.getStart());                  // sync to start
55    }
56
57    this.start = in.getPosition();
58    more = start < end;
59  }
60
61  @Override
62  @SuppressWarnings("unchecked")
63  public boolean nextKeyValue() throws IOException, InterruptedException {
64    if (!more) {
65      return false;
66    }
67    long pos = in.getPosition();
68    key = (K) in.next(key);
69    if (key == null || (pos >= end && in.syncSeen())) {
70      more = false;
71      key = null;
72      value = null;
73    } else {
74      value = (V) in.getCurrentValue(value);
75    }
76    return more;
77  }
78
79  @Override
80  public K getCurrentKey() {
81    return key;
82  }
83 
84  @Override
85  public V getCurrentValue() {
86    return value;
87  }
88 
89  /**
90   * Return the progress within the input split
91   * @return 0.0 to 1.0 of the input byte range
92   */
93  public float getProgress() throws IOException {
94    if (end == start) {
95      return 0.0f;
96    } else {
97      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
98    }
99  }
100 
101  public synchronized void close() throws IOException { in.close(); }
102 
103}
104
Note: See TracBrowser for help on using the repository browser.