source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce.lib.input;
20
21import java.io.IOException;
22
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.fs.FSDataInputStream;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.Path;
27import org.apache.hadoop.io.LongWritable;
28import org.apache.hadoop.io.Text;
29import org.apache.hadoop.io.compress.CompressionCodec;
30import org.apache.hadoop.io.compress.CompressionCodecFactory;
31import org.apache.hadoop.mapreduce.InputSplit;
32import org.apache.hadoop.mapreduce.RecordReader;
33import org.apache.hadoop.mapreduce.TaskAttemptContext;
34import org.apache.hadoop.util.LineReader;
35import org.apache.commons.logging.LogFactory;
36import org.apache.commons.logging.Log;
37
38/**
39 * Treats keys as offset in file and value as line.
40 */
41public class LineRecordReader extends RecordReader<LongWritable, Text> {
42  private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
43
44  private CompressionCodecFactory compressionCodecs = null;
45  private long start;
46  private long pos;
47  private long end;
48  private LineReader in;
49  private int maxLineLength;
50  private LongWritable key = null;
51  private Text value = null;
52
53  public void initialize(InputSplit genericSplit,
54                         TaskAttemptContext context) throws IOException {
55    FileSplit split = (FileSplit) genericSplit;
56    Configuration job = context.getConfiguration();
57    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
58                                    Integer.MAX_VALUE);
59    start = split.getStart();
60    end = start + split.getLength();
61    final Path file = split.getPath();
62    compressionCodecs = new CompressionCodecFactory(job);
63    final CompressionCodec codec = compressionCodecs.getCodec(file);
64
65    // open the file and seek to the start of the split
66    FileSystem fs = file.getFileSystem(job);
67    FSDataInputStream fileIn = fs.open(split.getPath());
68    boolean skipFirstLine = false;
69    if (codec != null) {
70      in = new LineReader(codec.createInputStream(fileIn), job);
71      end = Long.MAX_VALUE;
72    } else {
73      if (start != 0) {
74        skipFirstLine = true;
75        --start;
76        fileIn.seek(start);
77      }
78      in = new LineReader(fileIn, job);
79    }
80    if (skipFirstLine) {  // skip first line and re-establish "start".
81      start += in.readLine(new Text(), 0,
82                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
83    }
84    this.pos = start;
85  }
86 
87  public boolean nextKeyValue() throws IOException {
88    if (key == null) {
89      key = new LongWritable();
90    }
91    key.set(pos);
92    if (value == null) {
93      value = new Text();
94    }
95    int newSize = 0;
96    while (pos < end) {
97      newSize = in.readLine(value, maxLineLength,
98                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
99                                     maxLineLength));
100      if (newSize == 0) {
101        break;
102      }
103      pos += newSize;
104      if (newSize < maxLineLength) {
105        break;
106      }
107
108      // line too long. try again
109      LOG.info("Skipped line of size " + newSize + " at pos " + 
110               (pos - newSize));
111    }
112    if (newSize == 0) {
113      key = null;
114      value = null;
115      return false;
116    } else {
117      return true;
118    }
119  }
120
121  @Override
122  public LongWritable getCurrentKey() {
123    return key;
124  }
125
126  @Override
127  public Text getCurrentValue() {
128    return value;
129  }
130
131  /**
132   * Get the progress within the split
133   */
134  public float getProgress() {
135    if (start == end) {
136      return 0.0f;
137    } else {
138      return Math.min(1.0f, (pos - start) / (float)(end - start));
139    }
140  }
141 
142  public synchronized void close() throws IOException {
143    if (in != null) {
144      in.close(); 
145    }
146  }
147}
Note: See TracBrowser for help on using the repository browser.