source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.util.Random;
23
24import org.apache.hadoop.fs.*;
25import org.apache.hadoop.io.*;
26import org.apache.hadoop.io.SequenceFile.CompressionType;
27
28import junit.framework.TestCase;
29import org.apache.commons.logging.*;
30
31public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
32  private static final Log LOG =
33      LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
34
35  private static final int RECORDS = 10000;
36  // A random task attempt id for testing.
37  private static final String attempt = "attempt_200707121733_0001_m_000000_0";
38
39  public void testBinary() throws IOException {
40    JobConf job = new JobConf();
41    FileSystem fs = FileSystem.getLocal(job);
42   
43    Path dir = 
44      new Path(new Path(new Path(System.getProperty("test.build.data",".")), 
45                        FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
46    Path file = new Path(dir, "testbinary.seq");
47    Random r = new Random();
48    long seed = r.nextLong();
49    r.setSeed(seed);
50
51    fs.delete(dir, true);
52    if (!fs.mkdirs(dir)) { 
53      fail("Failed to create output directory");
54    }
55
56    job.set("mapred.task.id", attempt);
57    FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
58    FileOutputFormat.setWorkOutputPath(job, dir);
59
60    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
61                                          IntWritable.class );
62    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
63                                          DoubleWritable.class ); 
64
65    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
66    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
67                                                       CompressionType.BLOCK);
68
69    BytesWritable bkey = new BytesWritable();
70    BytesWritable bval = new BytesWritable();
71
72
73    RecordWriter <BytesWritable, BytesWritable> writer = 
74      new SequenceFileAsBinaryOutputFormat().getRecordWriter(fs, 
75                                                       job, file.toString(),
76                                                       Reporter.NULL);
77
78    IntWritable iwritable = new IntWritable();
79    DoubleWritable dwritable = new DoubleWritable();
80    DataOutputBuffer outbuf = new DataOutputBuffer();
81    LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
82    try {
83      for (int i = 0; i < RECORDS; ++i) {
84        iwritable = new IntWritable(r.nextInt());
85        iwritable.write(outbuf);
86        bkey.set(outbuf.getData(), 0, outbuf.getLength());
87        outbuf.reset();
88        dwritable = new DoubleWritable(r.nextDouble());
89        dwritable.write(outbuf);
90        bval.set(outbuf.getData(), 0, outbuf.getLength());
91        outbuf.reset();
92        writer.write(bkey, bval);
93      }
94    } finally {
95      writer.close(Reporter.NULL);
96    }
97
98    InputFormat<IntWritable,DoubleWritable> iformat =
99                    new SequenceFileInputFormat<IntWritable,DoubleWritable>();
100    int count = 0;
101    r.setSeed(seed);
102    DataInputBuffer buf = new DataInputBuffer();
103    final int NUM_SPLITS = 3;
104    SequenceFileInputFormat.addInputPath(job, file);
105    LOG.info("Reading data by SequenceFileInputFormat");
106    for (InputSplit split : iformat.getSplits(job, NUM_SPLITS)) {
107      RecordReader<IntWritable,DoubleWritable> reader =
108        iformat.getRecordReader(split, job, Reporter.NULL);
109      try {
110        int sourceInt;
111        double sourceDouble;
112        while (reader.next(iwritable, dwritable)) {
113          sourceInt = r.nextInt();
114          sourceDouble = r.nextDouble();
115          assertEquals(
116              "Keys don't match: " + "*" + iwritable.get() + ":" + 
117                                           sourceInt + "*",
118              sourceInt, iwritable.get());
119          assertTrue(
120              "Vals don't match: " + "*" + dwritable.get() + ":" +
121                                           sourceDouble + "*",
122              Double.compare(dwritable.get(), sourceDouble) == 0 );
123          ++count;
124        }
125      } finally {
126        reader.close();
127      }
128    }
129    assertEquals("Some records not found", RECORDS, count);
130  }
131
132  public void testSequenceOutputClassDefaultsToMapRedOutputClass() 
133         throws IOException {
134    JobConf job = new JobConf();
135    FileSystem fs = FileSystem.getLocal(job);
136
137    // Setting Random class to test getSequenceFileOutput{Key,Value}Class
138    job.setOutputKeyClass(FloatWritable.class);
139    job.setOutputValueClass(BooleanWritable.class);
140
141    assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass", 
142             FloatWritable.class,
143             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
144                                                                         job));
145    assertEquals("SequenceFileOutputValueClass should default to " 
146             + "ouputValueClass", 
147             BooleanWritable.class,
148             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
149                                                                         job));
150
151    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
152                                          IntWritable.class );
153    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
154                                          DoubleWritable.class ); 
155
156    assertEquals("SequenceFileOutputKeyClass not updated", 
157             IntWritable.class,
158             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
159                                                                         job));
160    assertEquals("SequenceFileOutputValueClass not updated", 
161             DoubleWritable.class,
162             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
163                                                                         job));
164  }
165
166  public void testcheckOutputSpecsForbidRecordCompression() throws IOException {
167    JobConf job = new JobConf();
168    FileSystem fs = FileSystem.getLocal(job);
169    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
170    Path outputdir = new Path(System.getProperty("test.build.data",".") 
171                              + "/output");
172
173    fs.delete(dir, true);
174    fs.delete(outputdir, true);
175    if (!fs.mkdirs(dir)) { 
176      fail("Failed to create output directory");
177    }
178
179    FileOutputFormat.setWorkOutputPath(job, dir);
180
181    // Without outputpath, FileOutputFormat.checkoutputspecs will throw
182    // InvalidJobConfException
183    FileOutputFormat.setOutputPath(job, outputdir);
184
185    // SequenceFileAsBinaryOutputFormat doesn't support record compression
186    // It should throw an exception when checked by checkOutputSpecs
187    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
188
189    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
190                                                       CompressionType.BLOCK);
191    try {
192      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
193    } catch (Exception e) {
194      fail("Block compression should be allowed for " 
195                       + "SequenceFileAsBinaryOutputFormat:" 
196                       + "Caught " + e.getClass().getName());
197    }
198
199    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
200                                                       CompressionType.RECORD);
201    try {
202      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
203      fail("Record compression should not be allowed for " 
204                           +"SequenceFileAsBinaryOutputFormat");
205    } catch (InvalidJobConfException ie) {
206      // expected
207    } catch (Exception e) {
208      fail("Expected " + InvalidJobConfException.class.getName() 
209                       + "but caught " + e.getClass().getName() );
210    }
211  }
212}
Note: See TracBrowser for help on using the repository browser.