source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestCollect.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.hadoop.fs.*;
21import org.apache.hadoop.io.*;
22import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
23
24import junit.framework.TestCase;
25import java.io.*;
26import java.util.*;
27
28/**
29 * TestCollect checks if the collect can handle simultaneous invocations.
30 */
31public class TestCollect extends TestCase
32{
33  final static Path OUTPUT_DIR = new Path("build/test/test.collect.output");
34  static final int NUM_FEEDERS = 10;
35  static final int NUM_COLLECTS_PER_THREAD = 1000;
36 
37  /**
38   * Map is a Mapper that spawns threads which simultaneously call collect.
39   * Each thread has a specific range to write to the buffer and is unique to
40   * the thread. This is a synchronization test for the map's collect.
41   */
42   
43  static class Map
44    implements Mapper<Text, Text, IntWritable, IntWritable> {
45   
46    public void configure(JobConf job) {
47    }
48   
49    public void map(Text key, Text val,
50                    final OutputCollector<IntWritable, IntWritable> out,
51                    Reporter reporter) throws IOException {
52      // Class for calling collect in separate threads
53      class CollectFeeder extends Thread {
54        int id; // id for the thread
55       
56        public CollectFeeder(int id) {
57          this.id = id;
58        }
59       
60        public void run() {
61          for (int j = 1; j <= NUM_COLLECTS_PER_THREAD; j++) {
62            try {
63              out.collect(new IntWritable((id * NUM_COLLECTS_PER_THREAD) + j), 
64                                          new IntWritable(0));
65            } catch (IOException ioe) { }
66          }
67        }
68      }
69     
70      CollectFeeder [] feeders = new CollectFeeder[NUM_FEEDERS];
71     
72      // start the feeders
73      for (int i = 0; i < NUM_FEEDERS; i++) {
74        feeders[i] = new CollectFeeder(i);
75        feeders[i].start();
76      }
77      // wait for them to finish
78      for (int i = 0; i < NUM_FEEDERS; i++) {
79        try {
80          feeders[i].join();
81        } catch (InterruptedException ie) {
82          throw new IOException(ie.toString());
83        }
84      }
85    }
86   
87    public void close() {
88    }
89  }
90 
91  static class Reduce
92  implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
93 
94    static int numSeen;
95    static int actualSum;
96    public void configure(JobConf job) { }
97
98    public void reduce(IntWritable key, Iterator<IntWritable> val,
99                       OutputCollector<IntWritable, IntWritable> out,
100                       Reporter reporter) throws IOException {
101      actualSum += key.get(); // keep the running count of the seen values
102      numSeen++; // number of values seen so far
103     
104      // using '1+2+3+...n =  n*(n+1)/2' to validate
105      int expectedSum = numSeen * (numSeen + 1) / 2;
106      if (expectedSum != actualSum) {
107        throw new IOException("Collect test failed!! Ordering mismatch.");
108      }
109    }
110
111    public void close() { }
112  }
113 
114  public void configure(JobConf conf) throws IOException {
115    conf.setJobName("TestCollect");
116    conf.setJarByClass(TestCollect.class);
117   
118    conf.setInputFormat(RandomInputFormat.class); // for self data generation
119    conf.setOutputKeyClass(IntWritable.class);
120    conf.setOutputValueClass(IntWritable.class);
121    FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
122   
123    conf.setMapperClass(Map.class);
124    conf.setReducerClass(Reduce.class);
125    conf.setNumMapTasks(1);
126    conf.setNumReduceTasks(1);
127  }
128 
129  public void testCollect() throws IOException {
130    JobConf conf = new JobConf();
131    configure(conf);
132    try {
133      JobClient.runJob(conf);
134      // check if all the values were seen by the reducer
135      if (Reduce.numSeen != (NUM_COLLECTS_PER_THREAD * NUM_FEEDERS)) {
136        throw new IOException("Collect test failed!! Total does not match.");
137      }
138    } catch (IOException ioe) {
139      throw ioe;
140    } finally {
141      FileSystem fs = FileSystem.get(conf);
142      fs.delete(OUTPUT_DIR, true);
143    }
144  }
145 
146  public static void main(String[] args) throws IOException {
147    new TestCollect().testCollect();
148  }
149}
150
Note: See TracBrowser for help on using the repository browser.