source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapreduce.lib.output;
20
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.UnsupportedEncodingException;
24
25import org.apache.hadoop.conf.Configuration;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.fs.FSDataOutputStream;
29
30import org.apache.hadoop.io.NullWritable;
31import org.apache.hadoop.io.Text;
32import org.apache.hadoop.io.compress.CompressionCodec;
33import org.apache.hadoop.io.compress.GzipCodec;
34import org.apache.hadoop.mapreduce.OutputFormat;
35import org.apache.hadoop.mapreduce.RecordWriter;
36import org.apache.hadoop.mapreduce.TaskAttemptContext;
37import org.apache.hadoop.util.*;
38
39/** An {@link OutputFormat} that writes plain text files. */
40public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
41  protected static class LineRecordWriter<K, V>
42    extends RecordWriter<K, V> {
43    private static final String utf8 = "UTF-8";
44    private static final byte[] newline;
45    static {
46      try {
47        newline = "\n".getBytes(utf8);
48      } catch (UnsupportedEncodingException uee) {
49        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
50      }
51    }
52
53    protected DataOutputStream out;
54    private final byte[] keyValueSeparator;
55
56    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
57      this.out = out;
58      try {
59        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
60      } catch (UnsupportedEncodingException uee) {
61        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
62      }
63    }
64
65    public LineRecordWriter(DataOutputStream out) {
66      this(out, "\t");
67    }
68
69    /**
70     * Write the object to the byte stream, handling Text as a special
71     * case.
72     * @param o the object to print
73     * @throws IOException if the write throws, we pass it on
74     */
75    private void writeObject(Object o) throws IOException {
76      if (o instanceof Text) {
77        Text to = (Text) o;
78        out.write(to.getBytes(), 0, to.getLength());
79      } else {
80        out.write(o.toString().getBytes(utf8));
81      }
82    }
83
84    public synchronized void write(K key, V value)
85      throws IOException {
86
87      boolean nullKey = key == null || key instanceof NullWritable;
88      boolean nullValue = value == null || value instanceof NullWritable;
89      if (nullKey && nullValue) {
90        return;
91      }
92      if (!nullKey) {
93        writeObject(key);
94      }
95      if (!(nullKey || nullValue)) {
96        out.write(keyValueSeparator);
97      }
98      if (!nullValue) {
99        writeObject(value);
100      }
101      out.write(newline);
102    }
103
104    public synchronized 
105    void close(TaskAttemptContext context) throws IOException {
106      out.close();
107    }
108  }
109
110  public RecordWriter<K, V> 
111         getRecordWriter(TaskAttemptContext job
112                         ) throws IOException, InterruptedException {
113    Configuration conf = job.getConfiguration();
114    boolean isCompressed = getCompressOutput(job);
115    String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
116                                       "\t");
117    CompressionCodec codec = null;
118    String extension = "";
119    if (isCompressed) {
120      Class<? extends CompressionCodec> codecClass = 
121        getOutputCompressorClass(job, GzipCodec.class);
122      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
123      extension = codec.getDefaultExtension();
124    }
125    Path file = getDefaultWorkFile(job, extension);
126    FileSystem fs = file.getFileSystem(conf);
127    if (!isCompressed) {
128      FSDataOutputStream fileOut = fs.create(file, false);
129      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
130    } else {
131      FSDataOutputStream fileOut = fs.create(file, false);
132      return new LineRecordWriter<K, V>(new DataOutputStream
133                                        (codec.createOutputStream(fileOut)),
134                                        keyValueSeparator);
135    }
136  }
137}
138
Note: See TracBrowser for help on using the repository browser.