source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/io/TestSequenceFile.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 20.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.io;
20
21import java.io.*;
22import java.util.*;
23import junit.framework.TestCase;
24
25import org.apache.commons.logging.*;
26
27import org.apache.hadoop.fs.*;
28import org.apache.hadoop.io.SequenceFile.CompressionType;
29import org.apache.hadoop.io.compress.CompressionCodec;
30import org.apache.hadoop.io.compress.DefaultCodec;
31import org.apache.hadoop.util.ReflectionUtils;
32import org.apache.hadoop.conf.*;
33
34
35/** Support for flat files of binary key/value pairs. */
36public class TestSequenceFile extends TestCase {
37  private static final Log LOG = LogFactory.getLog(TestSequenceFile.class);
38
39  private static Configuration conf = new Configuration();
40 
41  public TestSequenceFile(String name) { super(name); }
42
43  /** Unit tests for SequenceFile. */
44  public void testZlibSequenceFile() throws Exception {
45    LOG.info("Testing SequenceFile with DefaultCodec");
46    compressedSeqFileTest(new DefaultCodec());
47    LOG.info("Successfully tested SequenceFile with DefaultCodec");
48  }
49 
50  public void compressedSeqFileTest(CompressionCodec codec) throws Exception {
51    int count = 1024 * 10;
52    int megabytes = 1;
53    int factor = 5;
54    Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq");
55    Path recordCompressedFile = 
56      new Path(System.getProperty("test.build.data",".")+"/test.rc.seq");
57    Path blockCompressedFile = 
58      new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
59 
60    int seed = new Random().nextInt();
61    LOG.info("Seed = " + seed);
62
63    FileSystem fs = FileSystem.getLocal(conf);
64    try {
65      // SequenceFile.Writer
66      writeTest(fs, count, seed, file, CompressionType.NONE, null);
67      readTest(fs, count, seed, file);
68
69      sortTest(fs, count, megabytes, factor, false, file);
70      checkSort(fs, count, seed, file);
71
72      sortTest(fs, count, megabytes, factor, true, file);
73      checkSort(fs, count, seed, file);
74
75      mergeTest(fs, count, seed, file, CompressionType.NONE, false, 
76                factor, megabytes);
77      checkSort(fs, count, seed, file);
78
79      mergeTest(fs, count, seed, file, CompressionType.NONE, true, 
80                factor, megabytes);
81      checkSort(fs, count, seed, file);
82       
83      // SequenceFile.RecordCompressWriter
84      writeTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, 
85                codec);
86      readTest(fs, count, seed, recordCompressedFile);
87
88      sortTest(fs, count, megabytes, factor, false, recordCompressedFile);
89      checkSort(fs, count, seed, recordCompressedFile);
90
91      sortTest(fs, count, megabytes, factor, true, recordCompressedFile);
92      checkSort(fs, count, seed, recordCompressedFile);
93
94      mergeTest(fs, count, seed, recordCompressedFile, 
95                CompressionType.RECORD, false, factor, megabytes);
96      checkSort(fs, count, seed, recordCompressedFile);
97
98      mergeTest(fs, count, seed, recordCompressedFile, 
99                CompressionType.RECORD, true, factor, megabytes);
100      checkSort(fs, count, seed, recordCompressedFile);
101       
102      // SequenceFile.BlockCompressWriter
103      writeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
104                codec);
105      readTest(fs, count, seed, blockCompressedFile);
106
107      sortTest(fs, count, megabytes, factor, false, blockCompressedFile);
108      checkSort(fs, count, seed, blockCompressedFile);
109
110      sortTest(fs, count, megabytes, factor, true, blockCompressedFile);
111      checkSort(fs, count, seed, blockCompressedFile);
112
113      mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
114                false, factor, megabytes);
115      checkSort(fs, count, seed, blockCompressedFile);
116
117      mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
118                true, factor, megabytes);
119      checkSort(fs, count, seed, blockCompressedFile);
120
121    } finally {
122      fs.close();
123    }
124  }
125
126  private static void writeTest(FileSystem fs, int count, int seed, Path file, 
127                                CompressionType compressionType, CompressionCodec codec)
128    throws IOException {
129    fs.delete(file, true);
130    LOG.info("creating " + count + " records with " + compressionType +
131             " compression");
132    SequenceFile.Writer writer = 
133      SequenceFile.createWriter(fs, conf, file, 
134                                RandomDatum.class, RandomDatum.class, compressionType, codec);
135    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
136    for (int i = 0; i < count; i++) {
137      generator.next();
138      RandomDatum key = generator.getKey();
139      RandomDatum value = generator.getValue();
140
141      writer.append(key, value);
142    }
143    writer.close();
144  }
145
146  private static void readTest(FileSystem fs, int count, int seed, Path file)
147    throws IOException {
148    LOG.debug("reading " + count + " records");
149    SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
150    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
151
152    RandomDatum k = new RandomDatum();
153    RandomDatum v = new RandomDatum();
154    DataOutputBuffer rawKey = new DataOutputBuffer();
155    SequenceFile.ValueBytes rawValue = reader.createValueBytes();
156   
157    for (int i = 0; i < count; i++) {
158      generator.next();
159      RandomDatum key = generator.getKey();
160      RandomDatum value = generator.getValue();
161
162      try {
163        if ((i%5) == 0) {
164          // Testing 'raw' apis
165          rawKey.reset();
166          reader.nextRaw(rawKey, rawValue);
167        } else {
168          // Testing 'non-raw' apis
169          if ((i%2) == 0) {
170            reader.next(k);
171            reader.getCurrentValue(v);
172          } else {
173            reader.next(k, v);
174          }
175         
176          // Check
177          if (!k.equals(key))
178            throw new RuntimeException("wrong key at " + i);
179          if (!v.equals(value))
180            throw new RuntimeException("wrong value at " + i);
181        }
182      } catch (IOException ioe) {
183        LOG.info("Problem on row " + i);
184        LOG.info("Expected key = " + key);
185        LOG.info("Expected len = " + key.getLength());
186        LOG.info("Actual key = " + k);
187        LOG.info("Actual len = " + k.getLength());
188        LOG.info("Expected value = " + value);
189        LOG.info("Expected len = " + value.getLength());
190        LOG.info("Actual value = " + v);
191        LOG.info("Actual len = " + v.getLength());
192        LOG.info("Key equals: " + k.equals(key));
193        LOG.info("value equals: " + v.equals(value));
194        throw ioe;
195      }
196
197    }
198    reader.close();
199  }
200
201
202  private static void sortTest(FileSystem fs, int count, int megabytes, 
203                               int factor, boolean fast, Path file)
204    throws IOException {
205    fs.delete(new Path(file+".sorted"), true);
206    SequenceFile.Sorter sorter = newSorter(fs, fast, megabytes, factor);
207    LOG.debug("sorting " + count + " records");
208    sorter.sort(file, file.suffix(".sorted"));
209    LOG.info("done sorting " + count + " debug");
210  }
211
212  private static void checkSort(FileSystem fs, int count, int seed, Path file)
213    throws IOException {
214    LOG.info("sorting " + count + " records in memory for debug");
215    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
216    SortedMap<RandomDatum, RandomDatum> map =
217      new TreeMap<RandomDatum, RandomDatum>();
218    for (int i = 0; i < count; i++) {
219      generator.next();
220      RandomDatum key = generator.getKey();
221      RandomDatum value = generator.getValue();
222      map.put(key, value);
223    }
224
225    LOG.debug("checking order of " + count + " records");
226    RandomDatum k = new RandomDatum();
227    RandomDatum v = new RandomDatum();
228    Iterator<Map.Entry<RandomDatum, RandomDatum>> iterator =
229      map.entrySet().iterator();
230    SequenceFile.Reader reader =
231      new SequenceFile.Reader(fs, file.suffix(".sorted"), conf);
232    for (int i = 0; i < count; i++) {
233      Map.Entry<RandomDatum, RandomDatum> entry = iterator.next();
234      RandomDatum key = entry.getKey();
235      RandomDatum value = entry.getValue();
236
237      reader.next(k, v);
238
239      if (!k.equals(key))
240        throw new RuntimeException("wrong key at " + i);
241      if (!v.equals(value))
242        throw new RuntimeException("wrong value at " + i);
243    }
244
245    reader.close();
246    LOG.debug("sucessfully checked " + count + " records");
247  }
248
249  private static void mergeTest(FileSystem fs, int count, int seed, Path file, 
250                                CompressionType compressionType,
251                                boolean fast, int factor, int megabytes)
252    throws IOException {
253
254    LOG.debug("creating "+factor+" files with "+count/factor+" records");
255
256    SequenceFile.Writer[] writers = new SequenceFile.Writer[factor];
257    Path[] names = new Path[factor];
258    Path[] sortedNames = new Path[factor];
259   
260    for (int i = 0; i < factor; i++) {
261      names[i] = file.suffix("."+i);
262      sortedNames[i] = names[i].suffix(".sorted");
263      fs.delete(names[i], true);
264      fs.delete(sortedNames[i], true);
265      writers[i] = SequenceFile.createWriter(fs, conf, names[i], 
266                                             RandomDatum.class, RandomDatum.class, compressionType);
267    }
268
269    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
270
271    for (int i = 0; i < count; i++) {
272      generator.next();
273      RandomDatum key = generator.getKey();
274      RandomDatum value = generator.getValue();
275
276      writers[i%factor].append(key, value);
277    }
278
279    for (int i = 0; i < factor; i++)
280      writers[i].close();
281
282    for (int i = 0; i < factor; i++) {
283      LOG.debug("sorting file " + i + " with " + count/factor + " records");
284      newSorter(fs, fast, megabytes, factor).sort(names[i], sortedNames[i]);
285    }
286
287    LOG.info("merging " + factor + " files with " + count/factor + " debug");
288    fs.delete(new Path(file+".sorted"), true);
289    newSorter(fs, fast, megabytes, factor)
290      .merge(sortedNames, file.suffix(".sorted"));
291  }
292
293  private static SequenceFile.Sorter newSorter(FileSystem fs, 
294                                               boolean fast,
295                                               int megabytes, int factor) {
296    SequenceFile.Sorter sorter = 
297      fast
298      ? new SequenceFile.Sorter(fs, new RandomDatum.Comparator(),
299                                RandomDatum.class, RandomDatum.class, conf)
300      : new SequenceFile.Sorter(fs, RandomDatum.class, RandomDatum.class, conf);
301    sorter.setMemory(megabytes * 1024*1024);
302    sorter.setFactor(factor);
303    return sorter;
304  }
305
306  /** Unit tests for SequenceFile metadata. */
307  public void testSequenceFileMetadata() throws Exception {
308    LOG.info("Testing SequenceFile with metadata");
309    int count = 1024 * 10;
310    int megabytes = 1;
311    int factor = 5;
312    CompressionCodec codec = new DefaultCodec();
313    Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq.metadata");
314    Path recordCompressedFile = 
315      new Path(System.getProperty("test.build.data",".")+"/test.rc.seq.metadata");
316    Path blockCompressedFile = 
317      new Path(System.getProperty("test.build.data",".")+"/test.bc.seq.metadata");
318 
319    FileSystem fs = FileSystem.getLocal(conf);
320    SequenceFile.Metadata theMetadata = new SequenceFile.Metadata();
321    theMetadata.set(new Text("name_1"), new Text("value_1"));
322    theMetadata.set(new Text("name_2"), new Text("value_2"));
323    theMetadata.set(new Text("name_3"), new Text("value_3"));
324    theMetadata.set(new Text("name_4"), new Text("value_4"));
325   
326    int seed = new Random().nextInt();
327   
328    try {
329      // SequenceFile.Writer
330      writeMetadataTest(fs, count, seed, file, CompressionType.NONE, null, theMetadata);
331      SequenceFile.Metadata aMetadata = readMetadata(fs, file);
332      if (!theMetadata.equals(aMetadata)) {
333        LOG.info("The original metadata:\n" + theMetadata.toString());
334        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
335        throw new RuntimeException("metadata not match:  " + 1);
336      }
337      // SequenceFile.RecordCompressWriter
338      writeMetadataTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, 
339                        codec, theMetadata);
340      aMetadata = readMetadata(fs, recordCompressedFile);
341      if (!theMetadata.equals(aMetadata)) {
342        LOG.info("The original metadata:\n" + theMetadata.toString());
343        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
344        throw new RuntimeException("metadata not match:  " + 2);
345      }
346      // SequenceFile.BlockCompressWriter
347      writeMetadataTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
348                        codec, theMetadata);
349      aMetadata =readMetadata(fs, blockCompressedFile);
350      if (!theMetadata.equals(aMetadata)) {
351        LOG.info("The original metadata:\n" + theMetadata.toString());
352        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
353        throw new RuntimeException("metadata not match:  " + 3);
354      }
355    } finally {
356      fs.close();
357    }
358    LOG.info("Successfully tested SequenceFile with metadata");
359  }
360 
361 
362  private static SequenceFile.Metadata readMetadata(FileSystem fs, Path file)
363    throws IOException {
364    LOG.info("reading file: " + file.toString() + "\n");
365    SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
366    SequenceFile.Metadata meta = reader.getMetadata(); 
367    reader.close();
368    return meta;
369  }
370
371  private static void writeMetadataTest(FileSystem fs, int count, int seed, Path file, 
372                                        CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
373    throws IOException {
374    fs.delete(file, true);
375    LOG.info("creating " + count + " records with metadata and with" + compressionType +
376             " compression");
377    SequenceFile.Writer writer = 
378      SequenceFile.createWriter(fs, conf, file, 
379                                RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
380    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
381    for (int i = 0; i < count; i++) {
382      generator.next();
383      RandomDatum key = generator.getKey();
384      RandomDatum value = generator.getValue();
385
386      writer.append(key, value);
387    }
388    writer.close();
389  }
390
391  public void testClose() throws IOException {
392    Configuration conf = new Configuration();
393    LocalFileSystem fs = FileSystem.getLocal(conf);
394 
395    // create a sequence file 1
396    Path path1 = new Path(System.getProperty("test.build.data",".")+"/test1.seq");
397    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path1,
398        Text.class, NullWritable.class, CompressionType.BLOCK);
399    writer.append(new Text("file1-1"), NullWritable.get());
400    writer.append(new Text("file1-2"), NullWritable.get());
401    writer.close();
402 
403    Path path2 = new Path(System.getProperty("test.build.data",".")+"/test2.seq");
404    writer = SequenceFile.createWriter(fs, conf, path2, Text.class,
405        NullWritable.class, CompressionType.BLOCK);
406    writer.append(new Text("file2-1"), NullWritable.get());
407    writer.append(new Text("file2-2"), NullWritable.get());
408    writer.close();
409 
410    // Create a reader which uses 4 BuiltInZLibInflater instances
411    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path1, conf);
412    // Returns the 4 BuiltInZLibInflater instances to the CodecPool
413    reader.close();
414    // The second close _could_ erroneously returns the same
415    // 4 BuiltInZLibInflater instances to the CodecPool again
416    reader.close();
417 
418    // The first reader gets 4 BuiltInZLibInflater instances from the CodecPool
419    SequenceFile.Reader reader1 = new SequenceFile.Reader(fs, path1, conf);
420    // read first value from reader1
421    Text text = new Text();
422    reader1.next(text);
423    assertEquals("file1-1", text.toString());
424   
425    // The second reader _could_ get the same 4 BuiltInZLibInflater
426    // instances from the CodePool as reader1
427    SequenceFile.Reader reader2 = new SequenceFile.Reader(fs, path2, conf);
428   
429    // read first value from reader2
430    reader2.next(text);
431    assertEquals("file2-1", text.toString());
432    // read second value from reader1
433    reader1.next(text);
434    assertEquals("file1-2", text.toString());
435    // read second value from reader2 (this throws an exception)
436    reader2.next(text);
437    assertEquals("file2-2", text.toString());
438 
439    assertFalse(reader1.next(text));
440    assertFalse(reader2.next(text));
441  }
442
443  /** For debugging and testing. */
444  public static void main(String[] args) throws Exception {
445    int count = 1024 * 1024;
446    int megabytes = 1;
447    int factor = 10;
448    boolean create = true;
449    boolean rwonly = false;
450    boolean check = false;
451    boolean fast = false;
452    boolean merge = false;
453    String compressType = "NONE";
454    String compressionCodec = "org.apache.hadoop.io.compress.DefaultCodec";
455    Path file = null;
456    int seed = new Random().nextInt();
457
458    String usage = "Usage: SequenceFile " +
459      "[-count N] " + 
460      "[-seed #] [-check] [-compressType <NONE|RECORD|BLOCK>] " + 
461      "-codec <compressionCodec> " + 
462      "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
463      " file";
464    if (args.length == 0) {
465      System.err.println(usage);
466      System.exit(-1);
467    }
468   
469    FileSystem fs = null;
470    try {
471      for (int i=0; i < args.length; ++i) {       // parse command line
472        if (args[i] == null) {
473          continue;
474        } else if (args[i].equals("-count")) {
475          count = Integer.parseInt(args[++i]);
476        } else if (args[i].equals("-megabytes")) {
477          megabytes = Integer.parseInt(args[++i]);
478        } else if (args[i].equals("-factor")) {
479          factor = Integer.parseInt(args[++i]);
480        } else if (args[i].equals("-seed")) {
481          seed = Integer.parseInt(args[++i]);
482        } else if (args[i].equals("-rwonly")) {
483          rwonly = true;
484        } else if (args[i].equals("-nocreate")) {
485          create = false;
486        } else if (args[i].equals("-check")) {
487          check = true;
488        } else if (args[i].equals("-fast")) {
489          fast = true;
490        } else if (args[i].equals("-merge")) {
491          merge = true;
492        } else if (args[i].equals("-compressType")) {
493          compressType = args[++i];
494        } else if (args[i].equals("-codec")) {
495          compressionCodec = args[++i];
496        } else {
497          // file is required parameter
498          file = new Path(args[i]);
499        }
500      }
501       
502      fs = file.getFileSystem(conf);
503
504      LOG.info("count = " + count);
505      LOG.info("megabytes = " + megabytes);
506      LOG.info("factor = " + factor);
507      LOG.info("create = " + create);
508      LOG.info("seed = " + seed);
509      LOG.info("rwonly = " + rwonly);
510      LOG.info("check = " + check);
511      LOG.info("fast = " + fast);
512      LOG.info("merge = " + merge);
513      LOG.info("compressType = " + compressType);
514      LOG.info("compressionCodec = " + compressionCodec);
515      LOG.info("file = " + file);
516
517      if (rwonly && (!create || merge || fast)) {
518        System.err.println(usage);
519        System.exit(-1);
520      }
521
522      CompressionType compressionType = 
523        CompressionType.valueOf(compressType);
524      CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(
525                                                                             conf.getClassByName(compressionCodec), 
526                                                                             conf);
527
528      if (rwonly || (create && !merge)) {
529        writeTest(fs, count, seed, file, compressionType, codec);
530        readTest(fs, count, seed, file);
531      }
532
533      if (!rwonly) {
534        if (merge) {
535          mergeTest(fs, count, seed, file, compressionType, 
536                    fast, factor, megabytes);
537        } else {
538          sortTest(fs, count, megabytes, factor, fast, file);
539        }
540      }
541   
542      if (check) {
543        checkSort(fs, count, seed, file);
544      }
545    } finally {
546      fs.close();
547    }
548  }
549}
Note: See TracBrowser for help on using the repository browser.