source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 23.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17
18package org.apache.hadoop.io.file.tfile;
19
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.EOFException;
23import java.io.IOException;
24import java.util.Random;
25
26import junit.framework.Assert;
27import junit.framework.TestCase;
28
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.fs.FSDataOutputStream;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.io.WritableUtils;
34import org.apache.hadoop.io.file.tfile.TFile.Reader;
35import org.apache.hadoop.io.file.tfile.TFile.Writer;
36import org.apache.hadoop.io.file.tfile.TFile.Reader.Location;
37import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
38
39/**
40 *
41 * Byte arrays test case class using GZ compression codec, base class of none
42 * and LZO compression classes.
43 *
44 */
45public class TestTFileByteArrays extends TestCase {
46  private static String ROOT =
47      System.getProperty("test.build.data", "/tmp/tfile-test");
48  private final static int BLOCK_SIZE = 512;
49  private final static int BUF_SIZE = 64;
50  private final static int K = 1024;
51  protected boolean skip = false;
52
53  private static final String KEY = "key";
54  private static final String VALUE = "value";
55
56  private FileSystem fs;
57  private Configuration conf;
58  private Path path;
59  private FSDataOutputStream out;
60  private Writer writer;
61
62  private String compression = Compression.Algorithm.GZ.getName();
63  private String comparator = "memcmp";
64  private String outputFile = "TFileTestByteArrays";
65  /*
66   * pre-sampled numbers of records in one block, based on the given the
67   * generated key and value strings
68   */
69  // private int records1stBlock = 4314;
70  // private int records2ndBlock = 4108;
71  private int records1stBlock = 4480;
72  private int records2ndBlock = 4263;
73
74  public void init(String compression, String comparator, String outputFile,
75      int numRecords1stBlock, int numRecords2ndBlock) {
76    this.compression = compression;
77    this.comparator = comparator;
78    this.outputFile = outputFile;
79    this.records1stBlock = numRecords1stBlock;
80    this.records2ndBlock = numRecords2ndBlock;
81  }
82
83  @Override
84  public void setUp() throws IOException {
85    conf = new Configuration();
86    path = new Path(ROOT, outputFile);
87    fs = path.getFileSystem(conf);
88    out = fs.create(path);
89    writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
90  }
91
92  @Override
93  public void tearDown() throws IOException {
94    if (!skip)
95    fs.delete(path, true);
96  }
97
98  public void testNoDataEntry() throws IOException {
99    if (skip) 
100      return;
101    closeOutput();
102
103    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
104    Assert.assertTrue(reader.isSorted());
105    Scanner scanner = reader.createScanner();
106    Assert.assertTrue(scanner.atEnd());
107    scanner.close();
108    reader.close();
109  }
110
111  public void testOneDataEntry() throws IOException {
112    if (skip)
113      return;
114    writeRecords(1);
115    readRecords(1);
116
117    checkBlockIndex(1, 0, 0);
118    readValueBeforeKey(1, 0);
119    readKeyWithoutValue(1, 0);
120    readValueWithoutKey(1, 0);
121    readKeyManyTimes(1, 0);
122  }
123
124  public void testTwoDataEntries() throws IOException {
125    if (skip)
126      return;
127    writeRecords(2);
128    readRecords(2);
129  }
130
131  /**
132   * Fill up exactly one block.
133   *
134   * @throws IOException
135   */
136  public void testOneBlock() throws IOException {
137    if (skip)
138      return;
139    // just under one block
140    writeRecords(records1stBlock);
141    readRecords(records1stBlock);
142    // last key should be in the first block (block 0)
143    checkBlockIndex(records1stBlock, records1stBlock - 1, 0);
144  }
145
146  /**
147   * One block plus one record.
148   *
149   * @throws IOException
150   */
151  public void testOneBlockPlusOneEntry() throws IOException {
152    if (skip)
153      return;
154    writeRecords(records1stBlock + 1);
155    readRecords(records1stBlock + 1);
156    checkBlockIndex(records1stBlock + 1, records1stBlock - 1, 0);
157    checkBlockIndex(records1stBlock + 1, records1stBlock, 1);
158  }
159
160  public void testTwoBlocks() throws IOException {
161    if (skip)
162      return;
163    writeRecords(records1stBlock + 5);
164    readRecords(records1stBlock + 5);
165    checkBlockIndex(records1stBlock + 5, records1stBlock + 4, 1);
166  }
167
168  public void testThreeBlocks() throws IOException {
169    if (skip) 
170      return;
171    writeRecords(2 * records1stBlock + 5);
172    readRecords(2 * records1stBlock + 5);
173
174    checkBlockIndex(2 * records1stBlock + 5, 2 * records1stBlock + 4, 2);
175    // 1st key in file
176    readValueBeforeKey(2 * records1stBlock + 5, 0);
177    readKeyWithoutValue(2 * records1stBlock + 5, 0);
178    readValueWithoutKey(2 * records1stBlock + 5, 0);
179    readKeyManyTimes(2 * records1stBlock + 5, 0);
180    // last key in file
181    readValueBeforeKey(2 * records1stBlock + 5, 2 * records1stBlock + 4);
182    readKeyWithoutValue(2 * records1stBlock + 5, 2 * records1stBlock + 4);
183    readValueWithoutKey(2 * records1stBlock + 5, 2 * records1stBlock + 4);
184    readKeyManyTimes(2 * records1stBlock + 5, 2 * records1stBlock + 4);
185
186    // 1st key in mid block, verify block indexes then read
187    checkBlockIndex(2 * records1stBlock + 5, records1stBlock - 1, 0);
188    checkBlockIndex(2 * records1stBlock + 5, records1stBlock, 1);
189    readValueBeforeKey(2 * records1stBlock + 5, records1stBlock);
190    readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock);
191    readValueWithoutKey(2 * records1stBlock + 5, records1stBlock);
192    readKeyManyTimes(2 * records1stBlock + 5, records1stBlock);
193
194    // last key in mid block, verify block indexes then read
195    checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock
196        - 1, 1);
197    checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock,
198        2);
199    readValueBeforeKey(2 * records1stBlock + 5, records1stBlock
200        + records2ndBlock - 1);
201    readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock
202        + records2ndBlock - 1);
203    readValueWithoutKey(2 * records1stBlock + 5, records1stBlock
204        + records2ndBlock - 1);
205    readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + records2ndBlock
206        - 1);
207
208    // mid in mid block
209    readValueBeforeKey(2 * records1stBlock + 5, records1stBlock + 10);
210    readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock + 10);
211    readValueWithoutKey(2 * records1stBlock + 5, records1stBlock + 10);
212    readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + 10);
213  }
214
215  Location locate(Scanner scanner, byte[] key) throws IOException {
216    if (scanner.seekTo(key) == true) {
217      return scanner.currentLocation;
218    }
219    return scanner.endLocation;
220  }
221 
222  public void testLocate() throws IOException {
223    if (skip)
224      return;
225    writeRecords(3 * records1stBlock);
226    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
227    Scanner scanner = reader.createScanner();
228    Location loc2 =
229        locate(scanner, composeSortedKey(KEY, 3 * records1stBlock, 2)
230            .getBytes());
231    Location locLastIn1stBlock =
232        locate(scanner, composeSortedKey(KEY, 3 * records1stBlock,
233            records1stBlock - 1).getBytes());
234    Location locFirstIn2ndBlock =
235        locate(scanner, composeSortedKey(KEY, 3 * records1stBlock,
236            records1stBlock).getBytes());
237    Location locX = locate(scanner, "keyX".getBytes());
238    Assert.assertEquals(scanner.endLocation, locX);
239    scanner.close();
240    reader.close();
241  }
242
243  public void testFailureWriterNotClosed() throws IOException {
244    if (skip)
245      return;
246    Reader reader = null;
247    try {
248      reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
249      Assert.fail("Cannot read before closing the writer.");
250    }
251    catch (IOException e) {
252      // noop, expecting exceptions
253    }
254    finally {
255      if (reader != null) {
256        reader.close();
257      }
258    }
259  }
260
261  public void testFailureWriteMetaBlocksWithSameName() throws IOException {
262    if (skip)
263      return;
264    writer.append("keyX".getBytes(), "valueX".getBytes());
265
266    // create a new metablock
267    DataOutputStream outMeta =
268        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
269    outMeta.write(123);
270    outMeta.write("foo".getBytes());
271    outMeta.close();
272    // add the same metablock
273    try {
274      DataOutputStream outMeta2 =
275          writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
276      Assert.fail("Cannot create metablocks with the same name.");
277    }
278    catch (Exception e) {
279      // noop, expecting exceptions
280    }
281    closeOutput();
282  }
283
284  public void testFailureGetNonExistentMetaBlock() throws IOException {
285    if (skip)
286      return;
287    writer.append("keyX".getBytes(), "valueX".getBytes());
288
289    // create a new metablock
290    DataOutputStream outMeta =
291        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
292    outMeta.write(123);
293    outMeta.write("foo".getBytes());
294    outMeta.close();
295    closeOutput();
296
297    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
298    DataInputStream mb = reader.getMetaBlock("testX");
299    Assert.assertNotNull(mb);
300    mb.close();
301    try {
302      DataInputStream mbBad = reader.getMetaBlock("testY");
303      Assert.assertNull(mbBad);
304      Assert.fail("Error on handling non-existent metablocks.");
305    }
306    catch (Exception e) {
307      // noop, expecting exceptions
308    }
309    reader.close();
310  }
311
312  public void testFailureWriteRecordAfterMetaBlock() throws IOException {
313    if (skip)
314      return;
315    // write a key/value first
316    writer.append("keyX".getBytes(), "valueX".getBytes());
317    // create a new metablock
318    DataOutputStream outMeta =
319        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
320    outMeta.write(123);
321    outMeta.write("dummy".getBytes());
322    outMeta.close();
323    // add more key/value
324    try {
325      writer.append("keyY".getBytes(), "valueY".getBytes());
326      Assert.fail("Cannot add key/value after start adding meta blocks.");
327    }
328    catch (Exception e) {
329      // noop, expecting exceptions
330    }
331    closeOutput();
332  }
333
334  public void testFailureReadValueManyTimes() throws IOException {
335    if (skip)
336      return;
337    writeRecords(5);
338
339    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
340    Scanner scanner = reader.createScanner();
341
342    byte[] vbuf = new byte[BUF_SIZE];
343    int vlen = scanner.entry().getValueLength();
344    scanner.entry().getValue(vbuf);
345    Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0);
346    try {
347      scanner.entry().getValue(vbuf);
348      Assert.fail("Cannot get the value mlutiple times.");
349    }
350    catch (Exception e) {
351      // noop, expecting exceptions
352    }
353
354    scanner.close();
355    reader.close();
356  }
357
358  public void testFailureBadCompressionCodec() throws IOException {
359    if (skip)
360      return;
361    closeOutput();
362    out = fs.create(path);
363    try {
364      writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf);
365      Assert.fail("Error on handling invalid compression codecs.");
366    }
367    catch (Exception e) {
368      // noop, expecting exceptions
369      // e.printStackTrace();
370    }
371  }
372
373  public void testFailureOpenEmptyFile() throws IOException {
374    if (skip)
375      return;
376    closeOutput();
377    // create an absolutely empty file
378    path = new Path(fs.getWorkingDirectory(), outputFile);
379    out = fs.create(path);
380    out.close();
381    try {
382      Reader reader =
383          new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
384      Assert.fail("Error on handling empty files.");
385    }
386    catch (EOFException e) {
387      // noop, expecting exceptions
388    }
389  }
390
391  public void testFailureOpenRandomFile() throws IOException {
392    if (skip)
393      return;
394    closeOutput();
395    // create an random file
396    path = new Path(fs.getWorkingDirectory(), outputFile);
397    out = fs.create(path);
398    Random rand = new Random();
399    byte[] buf = new byte[K];
400    // fill with > 1MB data
401    for (int nx = 0; nx < K + 2; nx++) {
402      rand.nextBytes(buf);
403      out.write(buf);
404    }
405    out.close();
406    try {
407      Reader reader =
408          new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
409      Assert.fail("Error on handling random files.");
410    }
411    catch (IOException e) {
412      // noop, expecting exceptions
413    }
414  }
415
416  public void testFailureKeyLongerThan64K() throws IOException {
417    if (skip)
418      return;
419    byte[] buf = new byte[64 * K + 1];
420    Random rand = new Random();
421    rand.nextBytes(buf);
422    try {
423      writer.append(buf, "valueX".getBytes());
424    }
425    catch (IndexOutOfBoundsException e) {
426      // noop, expecting exceptions
427    }
428    closeOutput();
429  }
430
431  public void testFailureOutOfOrderKeys() throws IOException {
432    if (skip)
433      return;
434    try {
435      writer.append("keyM".getBytes(), "valueM".getBytes());
436      writer.append("keyA".getBytes(), "valueA".getBytes());
437      Assert.fail("Error on handling out of order keys.");
438    }
439    catch (Exception e) {
440      // noop, expecting exceptions
441      // e.printStackTrace();
442    }
443
444    closeOutput();
445  }
446
447  public void testFailureNegativeOffset() throws IOException {
448    if (skip)
449      return;
450    try {
451      writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6);
452      Assert.fail("Error on handling negative offset.");
453    }
454    catch (Exception e) {
455      // noop, expecting exceptions
456    }
457    closeOutput();
458  }
459
460  public void testFailureNegativeOffset_2() throws IOException {
461    if (skip)
462      return;
463    closeOutput();
464
465    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
466    Scanner scanner = reader.createScanner();
467    try {
468      scanner.lowerBound("keyX".getBytes(), -1, 4);
469      Assert.fail("Error on handling negative offset.");
470    }
471    catch (Exception e) {
472      // noop, expecting exceptions
473    }
474    finally {
475      reader.close();
476      scanner.close();
477    }
478    closeOutput();
479  }
480
481  public void testFailureNegativeLength() throws IOException {
482    if (skip)
483      return;
484    try {
485      writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6);
486      Assert.fail("Error on handling negative length.");
487    }
488    catch (Exception e) {
489      // noop, expecting exceptions
490    }
491    closeOutput();
492  }
493
494  public void testFailureNegativeLength_2() throws IOException {
495    if (skip)
496      return;
497    closeOutput();
498
499    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
500    Scanner scanner = reader.createScanner();
501    try {
502      scanner.lowerBound("keyX".getBytes(), 0, -1);
503      Assert.fail("Error on handling negative length.");
504    }
505    catch (Exception e) {
506      // noop, expecting exceptions
507    }
508    finally {
509      scanner.close();
510      reader.close();
511    }
512    closeOutput();
513  }
514
515  public void testFailureNegativeLength_3() throws IOException {
516    if (skip)
517      return;
518    writeRecords(3);
519
520    Reader reader =
521        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
522    Scanner scanner = reader.createScanner();
523    try {
524      // test negative array offset
525      try {
526        scanner.seekTo("keyY".getBytes(), -1, 4);
527        Assert.fail("Failed to handle negative offset.");
528      } catch (Exception e) {
529        // noop, expecting exceptions
530      }
531
532      // test negative array length
533      try {
534        scanner.seekTo("keyY".getBytes(), 0, -2);
535        Assert.fail("Failed to handle negative key length.");
536      } catch (Exception e) {
537        // noop, expecting exceptions
538      }
539    } finally {
540      reader.close();
541      scanner.close();
542    }
543  }
544
545  public void testFailureCompressionNotWorking() throws IOException {
546    if (skip)
547      return;
548    long rawDataSize = writeRecords(10 * records1stBlock, false);
549    if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
550      Assert.assertTrue(out.getPos() < rawDataSize);
551    }
552    closeOutput();
553  }
554
555  public void testFailureFileWriteNotAt0Position() throws IOException {
556    if (skip)
557      return;
558    closeOutput();
559    out = fs.create(path);
560    out.write(123);
561
562    try {
563      writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
564      Assert.fail("Failed to catch file write not at position 0.");
565    }
566    catch (Exception e) {
567      // noop, expecting exceptions
568    }
569    closeOutput();
570  }
571
572  private long writeRecords(int count) throws IOException {
573    return writeRecords(count, true);
574  }
575
576  private long writeRecords(int count, boolean close) throws IOException {
577    long rawDataSize = writeRecords(writer, count);
578    if (close) {
579      closeOutput();
580    }
581    return rawDataSize;
582  }
583
584  static long writeRecords(Writer writer, int count) throws IOException {
585    long rawDataSize = 0;
586    int nx;
587    for (nx = 0; nx < count; nx++) {
588      byte[] key = composeSortedKey(KEY, count, nx).getBytes();
589      byte[] value = (VALUE + nx).getBytes();
590      writer.append(key, value);
591      rawDataSize +=
592          WritableUtils.getVIntSize(key.length) + key.length
593              + WritableUtils.getVIntSize(value.length) + value.length;
594    }
595    return rawDataSize;
596  }
597
598  /**
599   * Insert some leading 0's in front of the value, to make the keys sorted.
600   *
601   * @param prefix
602   * @param total
603   * @param value
604   * @return
605   */
606  static String composeSortedKey(String prefix, int total, int value) {
607    return String.format("%s%010d", prefix, value);
608  }
609
610  /**
611   * Calculate how many digits are in the 10-based integer.
612   *
613   * @param value
614   * @return
615   */
616  private static int numberDigits(int value) {
617    int digits = 0;
618    while ((value = value / 10) > 0) {
619      digits++;
620    }
621    return digits;
622  }
623
624  private void readRecords(int count) throws IOException {
625    readRecords(fs, path, count, conf);
626  }
627
628  static void readRecords(FileSystem fs, Path path, int count,
629      Configuration conf) throws IOException {
630    Reader reader =
631        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
632    Scanner scanner = reader.createScanner();
633
634    try {
635      for (int nx = 0; nx < count; nx++, scanner.advance()) {
636        Assert.assertFalse(scanner.atEnd());
637        // Assert.assertTrue(scanner.next());
638
639        byte[] kbuf = new byte[BUF_SIZE];
640        int klen = scanner.entry().getKeyLength();
641        scanner.entry().getKey(kbuf);
642        Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
643            count, nx));
644
645        byte[] vbuf = new byte[BUF_SIZE];
646        int vlen = scanner.entry().getValueLength();
647        scanner.entry().getValue(vbuf);
648        Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx);
649      }
650
651      Assert.assertTrue(scanner.atEnd());
652      Assert.assertFalse(scanner.advance());
653    }
654    finally {
655      scanner.close();
656      reader.close();
657    }
658  }
659
660  private void checkBlockIndex(int count, int recordIndex,
661      int blockIndexExpected) throws IOException {
662    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
663    Scanner scanner = reader.createScanner();
664    scanner.seekTo(composeSortedKey(KEY, count, recordIndex).getBytes());
665    Assert.assertEquals(blockIndexExpected, scanner.currentLocation
666        .getBlockIndex());
667    scanner.close();
668    reader.close();
669  }
670
671  private void readValueBeforeKey(int count, int recordIndex)
672      throws IOException {
673    Reader reader =
674        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
675    Scanner scanner =
676        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
677            .getBytes(), null);
678
679    try {
680      byte[] vbuf = new byte[BUF_SIZE];
681      int vlen = scanner.entry().getValueLength();
682      scanner.entry().getValue(vbuf);
683      Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex);
684
685      byte[] kbuf = new byte[BUF_SIZE];
686      int klen = scanner.entry().getKeyLength();
687      scanner.entry().getKey(kbuf);
688      Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
689          count, recordIndex));
690    }
691    finally {
692      scanner.close();
693      reader.close();
694    }
695  }
696
697  private void readKeyWithoutValue(int count, int recordIndex)
698      throws IOException {
699    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
700    Scanner scanner =
701        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
702            .getBytes(), null);
703
704    try {
705      // read the indexed key
706      byte[] kbuf1 = new byte[BUF_SIZE];
707      int klen1 = scanner.entry().getKeyLength();
708      scanner.entry().getKey(kbuf1);
709      Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
710          count, recordIndex));
711
712      if (scanner.advance() && !scanner.atEnd()) {
713        // read the next key following the indexed
714        byte[] kbuf2 = new byte[BUF_SIZE];
715        int klen2 = scanner.entry().getKeyLength();
716        scanner.entry().getKey(kbuf2);
717        Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY,
718            count, recordIndex + 1));
719      }
720    }
721    finally {
722      scanner.close();
723      reader.close();
724    }
725  }
726
727  private void readValueWithoutKey(int count, int recordIndex)
728      throws IOException {
729    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
730
731    Scanner scanner =
732        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
733            .getBytes(), null);
734
735    byte[] vbuf1 = new byte[BUF_SIZE];
736    int vlen1 = scanner.entry().getValueLength();
737    scanner.entry().getValue(vbuf1);
738    Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex);
739
740    if (scanner.advance() && !scanner.atEnd()) {
741      byte[] vbuf2 = new byte[BUF_SIZE];
742      int vlen2 = scanner.entry().getValueLength();
743      scanner.entry().getValue(vbuf2);
744      Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE
745          + (recordIndex + 1));
746    }
747
748    scanner.close();
749    reader.close();
750  }
751
752  private void readKeyManyTimes(int count, int recordIndex) throws IOException {
753    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
754
755    Scanner scanner =
756        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
757            .getBytes(), null);
758
759    // read the indexed key
760    byte[] kbuf1 = new byte[BUF_SIZE];
761    int klen1 = scanner.entry().getKeyLength();
762    scanner.entry().getKey(kbuf1);
763    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
764        count, recordIndex));
765
766    klen1 = scanner.entry().getKeyLength();
767    scanner.entry().getKey(kbuf1);
768    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
769        count, recordIndex));
770
771    klen1 = scanner.entry().getKeyLength();
772    scanner.entry().getKey(kbuf1);
773    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
774        count, recordIndex));
775
776    scanner.close();
777    reader.close();
778  }
779
780  private void closeOutput() throws IOException {
781    if (writer != null) {
782      writer.close();
783      writer = null;
784    }
785    if (out != null) {
786      out.close();
787      out = null;
788    }
789  }
790}
Note: See TracBrowser for help on using the repository browser.