source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.*;
22import java.util.*;
23import junit.framework.TestCase;
24
25import org.apache.commons.logging.*;
26
27import org.apache.hadoop.fs.*;
28import org.apache.hadoop.io.*;
29import org.apache.hadoop.conf.*;
30
31public class TestSequenceFileInputFilter extends TestCase {
32  private static final Log LOG = FileInputFormat.LOG;
33
34  private static final int MAX_LENGTH = 15000;
35  private static final Configuration conf = new Configuration();
36  private static final JobConf job = new JobConf(conf);
37  private static final FileSystem fs;
38  private static final Path inDir = new Path(System.getProperty("test.build.data",".") + "/mapred");
39  private static final Path inFile = new Path(inDir, "test.seq");
40  private static final Random random = new Random(1);
41  private static final Reporter reporter = Reporter.NULL;
42 
43  static {
44    FileInputFormat.setInputPaths(job, inDir);
45    try {
46      fs = FileSystem.getLocal(conf);
47    } catch (IOException e) {
48      e.printStackTrace();
49      throw new RuntimeException(e);
50    }
51  }
52
53  private static void createSequenceFile(int numRecords) throws Exception {
54    // create a file with length entries
55    SequenceFile.Writer writer =
56      SequenceFile.createWriter(fs, conf, inFile,
57                                Text.class, BytesWritable.class);
58    try {
59      for (int i = 1; i <= numRecords; i++) {
60        Text key = new Text(Integer.toString(i));
61        byte[] data = new byte[random.nextInt(10)];
62        random.nextBytes(data);
63        BytesWritable value = new BytesWritable(data);
64        writer.append(key, value);
65      }
66    } finally {
67      writer.close();
68    }
69  }
70
71
72  private int countRecords(int numSplits) throws IOException {
73    InputFormat<Text, BytesWritable> format =
74      new SequenceFileInputFilter<Text, BytesWritable>();
75    Text key = new Text();
76    BytesWritable value = new BytesWritable();
77    if (numSplits==0) {
78      numSplits =
79        random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
80    }
81    InputSplit[] splits = format.getSplits(job, numSplits);
82     
83    // check each split
84    int count = 0;
85    LOG.info("Generated " + splits.length + " splits.");
86    for (int j = 0; j < splits.length; j++) {
87      RecordReader<Text, BytesWritable> reader =
88        format.getRecordReader(splits[j], job, reporter);
89      try {
90        while (reader.next(key, value)) {
91          LOG.info("Accept record "+key.toString());
92          count++;
93        }
94      } finally {
95        reader.close();
96      }
97    }
98    return count;
99  }
100 
101  public void testRegexFilter() throws Exception {
102    // set the filter class
103    LOG.info("Testing Regex Filter with patter: \\A10*");
104    SequenceFileInputFilter.setFilterClass(job, 
105                                           SequenceFileInputFilter.RegexFilter.class);
106    SequenceFileInputFilter.RegexFilter.setPattern(job, "\\A10*");
107   
108    // clean input dir
109    fs.delete(inDir, true);
110 
111    // for a variety of lengths
112    for (int length = 1; length < MAX_LENGTH;
113         length+= random.nextInt(MAX_LENGTH/10)+1) {
114      LOG.info("******Number of records: "+length);
115      createSequenceFile(length);
116      int count = countRecords(0);
117      assertEquals(count, length==0?0:(int)Math.log10(length)+1);
118    }
119   
120    // clean up
121    fs.delete(inDir, true);
122  }
123
124  public void testPercentFilter() throws Exception {
125    LOG.info("Testing Percent Filter with frequency: 1000");
126    // set the filter class
127    SequenceFileInputFilter.setFilterClass(job, 
128                                           SequenceFileInputFilter.PercentFilter.class);
129    SequenceFileInputFilter.PercentFilter.setFrequency(job, 1000);
130     
131    // clean input dir
132    fs.delete(inDir, true);
133   
134    // for a variety of lengths
135    for (int length = 0; length < MAX_LENGTH;
136         length+= random.nextInt(MAX_LENGTH/10)+1) {
137      LOG.info("******Number of records: "+length);
138      createSequenceFile(length);
139      int count = countRecords(1);
140      LOG.info("Accepted "+count+" records");
141      int expectedCount = length/1000;
142      if (expectedCount*1000!=length)
143        expectedCount++;
144      assertEquals(count, expectedCount);
145    }
146     
147    // clean up
148    fs.delete(inDir, true);
149  }
150 
151  public void testMD5Filter() throws Exception {
152    // set the filter class
153    LOG.info("Testing MD5 Filter with frequency: 1000");
154    SequenceFileInputFilter.setFilterClass(job, 
155                                           SequenceFileInputFilter.MD5Filter.class);
156    SequenceFileInputFilter.MD5Filter.setFrequency(job, 1000);
157     
158    // clean input dir
159    fs.delete(inDir, true);
160   
161    // for a variety of lengths
162    for (int length = 0; length < MAX_LENGTH;
163         length+= random.nextInt(MAX_LENGTH/10)+1) {
164      LOG.info("******Number of records: "+length);
165      createSequenceFile(length);
166      LOG.info("Accepted "+countRecords(0)+" records");
167    }
168    // clean up
169    fs.delete(inDir, true);
170  }
171
172  public static void main(String[] args) throws Exception {
173    TestSequenceFileInputFilter filter = new TestSequenceFileInputFilter();
174    filter.testRegexFilter();
175  }
176}
Note: See TracBrowser for help on using the repository browser.