source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 12.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.*;
22import java.util.*;
23import junit.framework.TestCase;
24
25import org.apache.commons.logging.*;
26import org.apache.hadoop.fs.*;
27import org.apache.hadoop.io.*;
28import org.apache.hadoop.io.compress.*;
29import org.apache.hadoop.util.LineReader;
30import org.apache.hadoop.util.ReflectionUtils;
31
32public class TestTextInputFormat extends TestCase {
33  private static final Log LOG =
34    LogFactory.getLog(TestTextInputFormat.class.getName());
35
36  private static int MAX_LENGTH = 10000;
37 
38  private static JobConf defaultConf = new JobConf();
39  private static FileSystem localFs = null; 
40  static {
41    try {
42      localFs = FileSystem.getLocal(defaultConf);
43    } catch (IOException e) {
44      throw new RuntimeException("init failure", e);
45    }
46  }
47  private static Path workDir = 
48    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
49             "TestTextInputFormat");
50 
51  public void testFormat() throws Exception {
52    JobConf job = new JobConf();
53    Path file = new Path(workDir, "test.txt");
54
55    // A reporter that does nothing
56    Reporter reporter = Reporter.NULL;
57   
58    int seed = new Random().nextInt();
59    LOG.info("seed = "+seed);
60    Random random = new Random(seed);
61
62    localFs.delete(workDir, true);
63    FileInputFormat.setInputPaths(job, workDir);
64
65    // for a variety of lengths
66    for (int length = 0; length < MAX_LENGTH;
67         length+= random.nextInt(MAX_LENGTH/10)+1) {
68
69      LOG.debug("creating; entries = " + length);
70
71      // create a file with length entries
72      Writer writer = new OutputStreamWriter(localFs.create(file));
73      try {
74        for (int i = 0; i < length; i++) {
75          writer.write(Integer.toString(i));
76          writer.write("\n");
77        }
78      } finally {
79        writer.close();
80      }
81
82      // try splitting the file in a variety of sizes
83      TextInputFormat format = new TextInputFormat();
84      format.configure(job);
85      LongWritable key = new LongWritable();
86      Text value = new Text();
87      for (int i = 0; i < 3; i++) {
88        int numSplits = random.nextInt(MAX_LENGTH/20)+1;
89        LOG.debug("splitting: requesting = " + numSplits);
90        InputSplit[] splits = format.getSplits(job, numSplits);
91        LOG.debug("splitting: got =        " + splits.length);
92
93        if (length == 0) {
94           assertEquals("Files of length 0 are not returned from FileInputFormat.getSplits().", 
95                        1, splits.length);
96           assertEquals("Empty file length == 0", 0, splits[0].getLength());
97        }
98
99        // check each split
100        BitSet bits = new BitSet(length);
101        for (int j = 0; j < splits.length; j++) {
102          LOG.debug("split["+j+"]= " + splits[j]);
103          RecordReader<LongWritable, Text> reader =
104            format.getRecordReader(splits[j], job, reporter);
105          try {
106            int count = 0;
107            while (reader.next(key, value)) {
108              int v = Integer.parseInt(value.toString());
109              LOG.debug("read " + v);
110              if (bits.get(v)) {
111                LOG.warn("conflict with " + v + 
112                         " in split " + j +
113                         " at position "+reader.getPos());
114              }
115              assertFalse("Key in multiple partitions.", bits.get(v));
116              bits.set(v);
117              count++;
118            }
119            LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
120          } finally {
121            reader.close();
122          }
123        }
124        assertEquals("Some keys in no partition.", length, bits.cardinality());
125      }
126
127    }
128  }
129
130  private static LineReader makeStream(String str) throws IOException {
131    return new LineReader(new ByteArrayInputStream
132                                             (str.getBytes("UTF-8")), 
133                                           defaultConf);
134  }
135  private static LineReader makeStream(String str, int bufsz) throws IOException {
136    return new LineReader(new ByteArrayInputStream
137                                             (str.getBytes("UTF-8")), 
138                                           bufsz);
139  }
140 
141  public void testUTF8() throws Exception {
142    LineReader in = makeStream("abcd\u20acbdcd\u20ac");
143    Text line = new Text();
144    in.readLine(line);
145    assertEquals("readLine changed utf8 characters", 
146                 "abcd\u20acbdcd\u20ac", line.toString());
147    in = makeStream("abc\u200axyz");
148    in.readLine(line);
149    assertEquals("split on fake newline", "abc\u200axyz", line.toString());
150  }
151
152  /**
153   * Test readLine for various kinds of line termination sequneces.
154   * Varies buffer size to stress test.  Also check that returned
155   * value matches the string length.
156   *
157   * @throws Exception
158   */
159  public void testNewLines() throws Exception {
160    final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
161    final int STRLENBYTES = STR.getBytes().length;
162    Text out = new Text();
163    for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
164      LineReader in = makeStream(STR, bufsz);
165      int c = 0;
166      c += in.readLine(out); //"a"\n
167      assertEquals("line1 length, bufsz:"+bufsz, 1, out.getLength());
168      c += in.readLine(out); //"bb"\n
169      assertEquals("line2 length, bufsz:"+bufsz, 2, out.getLength());
170      c += in.readLine(out); //""\n
171      assertEquals("line3 length, bufsz:"+bufsz, 0, out.getLength());
172      c += in.readLine(out); //"ccc"\r
173      assertEquals("line4 length, bufsz:"+bufsz, 3, out.getLength());
174      c += in.readLine(out); //dddd\r
175      assertEquals("line5 length, bufsz:"+bufsz, 4, out.getLength());
176      c += in.readLine(out); //""\r
177      assertEquals("line6 length, bufsz:"+bufsz, 0, out.getLength());
178      c += in.readLine(out); //""\r\n
179      assertEquals("line7 length, bufsz:"+bufsz, 0, out.getLength());
180      c += in.readLine(out); //""\r\n
181      assertEquals("line8 length, bufsz:"+bufsz, 0, out.getLength());
182      c += in.readLine(out); //"eeeee"EOF
183      assertEquals("line9 length, bufsz:"+bufsz, 5, out.getLength());
184      assertEquals("end of file, bufsz: "+bufsz, 0, in.readLine(out));
185      assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
186    }
187  }
188
189  /**
190   * Test readLine for correct interpretation of maxLineLength
191   * (returned string should be clipped at maxLineLength, and the
192   * remaining bytes on the same line should be thrown out).
193   * Also check that returned value matches the string length.
194   * Varies buffer size to stress test.
195   *
196   * @throws Exception
197   */
198  public void testMaxLineLength() throws Exception {
199    final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
200    final int STRLENBYTES = STR.getBytes().length;
201    Text out = new Text();
202    for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
203      LineReader in = makeStream(STR, bufsz);
204      int c = 0;
205      c += in.readLine(out, 1);
206      assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
207      c += in.readLine(out, 1);
208      assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
209      c += in.readLine(out, 1);
210      assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
211      c += in.readLine(out, 3);
212      assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
213      c += in.readLine(out, 10);
214      assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
215      c += in.readLine(out, 8);
216      assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
217      assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
218      assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
219    }
220  }
221
222  private static void writeFile(FileSystem fs, Path name, 
223                                CompressionCodec codec,
224                                String contents) throws IOException {
225    OutputStream stm;
226    if (codec == null) {
227      stm = fs.create(name);
228    } else {
229      stm = codec.createOutputStream(fs.create(name));
230    }
231    stm.write(contents.getBytes());
232    stm.close();
233  }
234 
235  private static final Reporter voidReporter = Reporter.NULL;
236 
237  private static List<Text> readSplit(TextInputFormat format, 
238                                      InputSplit split, 
239                                      JobConf job) throws IOException {
240    List<Text> result = new ArrayList<Text>();
241    RecordReader<LongWritable, Text> reader =
242      format.getRecordReader(split, job, voidReporter);
243    LongWritable key = reader.createKey();
244    Text value = reader.createValue();
245    while (reader.next(key, value)) {
246      result.add(value);
247      value = (Text) reader.createValue();
248    }
249    reader.close();
250    return result;
251  }
252 
253  /**
254   * Test using the gzip codec for reading
255   */
256  public static void testGzip() throws IOException {
257    JobConf job = new JobConf();
258    CompressionCodec gzip = new GzipCodec();
259    ReflectionUtils.setConf(gzip, job);
260    localFs.delete(workDir, true);
261    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
262              "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
263    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
264              "this is a test\nof gzip\n");
265    FileInputFormat.setInputPaths(job, workDir);
266    TextInputFormat format = new TextInputFormat();
267    format.configure(job);
268    InputSplit[] splits = format.getSplits(job, 100);
269    assertEquals("compressed splits == 2", 2, splits.length);
270    FileSplit tmp = (FileSplit) splits[0];
271    if (tmp.getPath().getName().equals("part2.txt.gz")) {
272      splits[0] = splits[1];
273      splits[1] = tmp;
274    }
275    List<Text> results = readSplit(format, splits[0], job);
276    assertEquals("splits[0] length", 6, results.size());
277    assertEquals("splits[0][5]", " dog", results.get(5).toString());
278    results = readSplit(format, splits[1], job);
279    assertEquals("splits[1] length", 2, results.size());
280    assertEquals("splits[1][0]", "this is a test", 
281                 results.get(0).toString());   
282    assertEquals("splits[1][1]", "of gzip", 
283                 results.get(1).toString());   
284  }
285
286  /**
287   * Test using the gzip codec and an empty input file
288   */
289  public static void testGzipEmpty() throws IOException {
290    JobConf job = new JobConf();
291    CompressionCodec gzip = new GzipCodec();
292    ReflectionUtils.setConf(gzip, job);
293    localFs.delete(workDir, true);
294    writeFile(localFs, new Path(workDir, "empty.gz"), gzip, "");
295    FileInputFormat.setInputPaths(job, workDir);
296    TextInputFormat format = new TextInputFormat();
297    format.configure(job);
298    InputSplit[] splits = format.getSplits(job, 100);
299    assertEquals("Compressed files of length 0 are not returned from FileInputFormat.getSplits().",
300                 1, splits.length);
301    List<Text> results = readSplit(format, splits[0], job);
302    assertEquals("Compressed empty file length == 0", 0, results.size());
303  }
304 
305  private static String unquote(String in) {
306    StringBuffer result = new StringBuffer();
307    for(int i=0; i < in.length(); ++i) {
308      char ch = in.charAt(i);
309      if (ch == '\\') {
310        ch = in.charAt(++i);
311        switch (ch) {
312        case 'n':
313          result.append('\n');
314          break;
315        case 'r':
316          result.append('\r');
317          break;
318        default:
319          result.append(ch);
320          break;
321        }
322      } else {
323        result.append(ch);
324      }
325    }
326    return result.toString();
327  }
328
329  /**
330   * Parse the command line arguments into lines and display the result.
331   * @param args
332   * @throws Exception
333   */
334  public static void main(String[] args) throws Exception {
335    for(String arg: args) {
336      System.out.println("Working on " + arg);
337      LineReader reader = makeStream(unquote(arg));
338      Text line = new Text();
339      int size = reader.readLine(line);
340      while (size > 0) {
341        System.out.println("Got: " + line.toString());
342        size = reader.readLine(line);
343      }
344      reader.close();
345    }
346  }
347}
Note: See TracBrowser for help on using the repository browser.