source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/MRCaching.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.*;
22import java.util.*;
23
24import org.apache.hadoop.fs.*;
25import org.apache.hadoop.io.IntWritable;
26import org.apache.hadoop.io.LongWritable;
27import org.apache.hadoop.io.Text;
28import org.apache.hadoop.mapred.JobClient;
29import org.apache.hadoop.mapred.JobConf;
30import org.apache.hadoop.mapred.Mapper;
31import org.apache.hadoop.mapred.OutputCollector;
32import org.apache.hadoop.mapred.Reducer;
33import org.apache.hadoop.mapred.Reporter;
34import org.apache.hadoop.util.*;
35import org.apache.hadoop.mapred.MapReduceBase;
36import org.apache.hadoop.filecache.*;
37import java.net.URI;
38
39public class MRCaching {
40  static String testStr = "This is a test file " + "used for testing caching "
41    + "jars, zip and normal files.";
42
43  /**
44   * Using the wordcount example and adding caching to it. The cache
45   * archives/files are set and then are checked in the map if they have been
46   * localized or not.
47   */
48  public static class MapClass extends MapReduceBase
49    implements Mapper<LongWritable, Text, Text, IntWritable> {
50   
51    JobConf conf;
52
53    private final static IntWritable one = new IntWritable(1);
54
55    private Text word = new Text();
56
57    public void configure(JobConf jconf) {
58      conf = jconf;
59      try {
60        Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
61        Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
62        // read the cached files (unzipped, unjarred and text)
63        // and put it into a single file TEST_ROOT_DIR/test.txt
64        String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
65        Path file = new Path("file:///", TEST_ROOT_DIR);
66        FileSystem fs = FileSystem.getLocal(conf);
67        if (!fs.mkdirs(file)) {
68          throw new IOException("Mkdirs failed to create " + file.toString());
69        }
70        Path fileOut = new Path(file, "test.txt");
71        fs.delete(fileOut, true);
72        DataOutputStream out = fs.create(fileOut);
73        for (int i = 0; i < localArchives.length; i++) {
74          // read out the files from these archives
75          File f = new File(localArchives[i].toString());
76          File txt = new File(f, "test.txt");
77          FileInputStream fin = new FileInputStream(txt);
78          DataInputStream din = new DataInputStream(fin);
79          String str = din.readLine();
80          din.close();
81          out.writeBytes(str);
82          out.writeBytes("\n");
83        }
84        for (int i = 0; i < localFiles.length; i++) {
85          // read out the files from these archives
86          File txt = new File(localFiles[i].toString());
87          FileInputStream fin = new FileInputStream(txt);
88          DataInputStream din = new DataInputStream(fin);
89          String str = din.readLine();
90          out.writeBytes(str);
91          out.writeBytes("\n");
92        }
93        out.close();
94      } catch (IOException ie) {
95        System.out.println(StringUtils.stringifyException(ie));
96      }
97    }
98
99    public void map(LongWritable key, Text value,
100                    OutputCollector<Text, IntWritable> output,
101                    Reporter reporter) throws IOException {
102      String line = value.toString();
103      StringTokenizer itr = new StringTokenizer(line);
104      while (itr.hasMoreTokens()) {
105        word.set(itr.nextToken());
106        output.collect(word, one);
107      }
108
109    }
110  }
111
112  /**
113   * Using the wordcount example and adding caching to it. The cache
114   * archives/files are set and then are checked in the map if they have been
115   * symlinked or not.
116   */
117  public static class MapClass2 extends MapClass {
118   
119    JobConf conf;
120
121    public void configure(JobConf jconf) {
122      conf = jconf;
123      try {
124        // read the cached files (unzipped, unjarred and text)
125        // and put it into a single file TEST_ROOT_DIR/test.txt
126        String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
127        Path file = new Path("file:///", TEST_ROOT_DIR);
128        FileSystem fs = FileSystem.getLocal(conf);
129        if (!fs.mkdirs(file)) {
130          throw new IOException("Mkdirs failed to create " + file.toString());
131        }
132        Path fileOut = new Path(file, "test.txt");
133        fs.delete(fileOut, true);
134        DataOutputStream out = fs.create(fileOut); 
135        String[] symlinks = new String[6];
136        symlinks[0] = ".";
137        symlinks[1] = "testjar";
138        symlinks[2] = "testzip";
139        symlinks[3] = "testtgz";
140        symlinks[4] = "testtargz";
141        symlinks[5] = "testtar";
142
143        for (int i = 0; i < symlinks.length; i++) {
144          // read out the files from these archives
145          File f = new File(symlinks[i]);
146          File txt = new File(f, "test.txt");
147          FileInputStream fin = new FileInputStream(txt);
148          BufferedReader reader = new BufferedReader(new InputStreamReader(fin));
149          String str = reader.readLine();
150          reader.close();
151          out.writeBytes(str);
152          out.writeBytes("\n");
153        }
154        out.close();
155      } catch (IOException ie) {
156        System.out.println(StringUtils.stringifyException(ie));
157      }
158    }
159  }
160
161  /**
162   * A reducer class that just emits the sum of the input values.
163   */
164  public static class ReduceClass extends MapReduceBase
165    implements Reducer<Text, IntWritable, Text, IntWritable> {
166
167    public void reduce(Text key, Iterator<IntWritable> values,
168                       OutputCollector<Text, IntWritable> output,
169                       Reporter reporter) throws IOException {
170      int sum = 0;
171      while (values.hasNext()) {
172        sum += values.next().get();
173      }
174      output.collect(key, new IntWritable(sum));
175    }
176  }
177
178  public static class TestResult {
179    public RunningJob job;
180    public boolean isOutputOk;
181    TestResult(RunningJob job, boolean isOutputOk) {
182      this.job = job;
183      this.isOutputOk = isOutputOk;
184    }
185  }
186
187  static void setupCache(String cacheDir, FileSystem fs) 
188  throws IOException {
189    Path localPath = new Path("build/test/cache");
190    Path txtPath = new Path(localPath, new Path("test.txt"));
191    Path jarPath = new Path(localPath, new Path("test.jar"));
192    Path zipPath = new Path(localPath, new Path("test.zip"));
193    Path tarPath = new Path(localPath, new Path("test.tgz"));
194    Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
195    Path tarPath2 = new Path(localPath, new Path("test.tar"));
196    Path cachePath = new Path(cacheDir);
197    fs.delete(cachePath, true);
198    if (!fs.mkdirs(cachePath)) {
199      throw new IOException("Mkdirs failed to create " + cachePath.toString());
200    }
201    fs.copyFromLocalFile(txtPath, cachePath);
202    fs.copyFromLocalFile(jarPath, cachePath);
203    fs.copyFromLocalFile(zipPath, cachePath);
204    fs.copyFromLocalFile(tarPath, cachePath);
205    fs.copyFromLocalFile(tarPath1, cachePath);
206    fs.copyFromLocalFile(tarPath2, cachePath);
207  }
208 
209  public static TestResult launchMRCache(String indir,
210                                         String outdir, String cacheDir, 
211                                         JobConf conf, String input) 
212  throws IOException {
213    setupCache(cacheDir, FileSystem.get(conf));
214    return launchMRCache(indir,outdir, cacheDir, conf, input, false); 
215  }
216 
217  public static TestResult launchMRCache(String indir,
218                                         String outdir, String cacheDir, 
219                                         JobConf conf, String input,
220                                         boolean withSymlink)
221    throws IOException {
222    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
223      .toString().replace(' ', '+');
224    //if (TEST_ROOT_DIR.startsWith("C:")) TEST_ROOT_DIR = "/tmp";
225    conf.set("test.build.data", TEST_ROOT_DIR);
226    final Path inDir = new Path(indir);
227    final Path outDir = new Path(outdir);
228    FileSystem fs = FileSystem.get(conf);
229    fs.delete(outDir, true);
230    if (!fs.mkdirs(inDir)) {
231      throw new IOException("Mkdirs failed to create " + inDir.toString());
232    }
233    {
234      System.out.println("HERE:"+inDir);
235      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
236      file.writeBytes(input);
237      file.close();
238    }
239    conf.setJobName("cachetest");
240
241    // the keys are words (strings)
242    conf.setOutputKeyClass(Text.class);
243    // the values are counts (ints)
244    conf.setOutputValueClass(IntWritable.class);
245
246    conf.setCombinerClass(MRCaching.ReduceClass.class);
247    conf.setReducerClass(MRCaching.ReduceClass.class);
248    FileInputFormat.setInputPaths(conf, inDir);
249    FileOutputFormat.setOutputPath(conf, outDir);
250    conf.setNumMapTasks(1);
251    conf.setNumReduceTasks(1);
252    conf.setSpeculativeExecution(false);
253    URI[] uris = new URI[6];
254    if (!withSymlink) {
255      conf.setMapperClass(MRCaching.MapClass.class);
256      uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
257      uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
258      uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
259      uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
260      uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
261      uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
262    } else {
263      DistributedCache.createSymlink(conf);
264      conf.setMapperClass(MRCaching.MapClass2.class);
265      uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
266      uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
267      uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
268      uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
269      uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
270      uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
271    }
272    DistributedCache.addCacheFile(uris[0], conf);
273    for (int i = 1; i < 6; i++) {
274      DistributedCache.addCacheArchive(uris[i], conf);
275    }
276    RunningJob job = JobClient.runJob(conf);
277    int count = 0;
278    // after the job ran check to see if the input from the localized cache
279    // match the real string. check if there are 3 instances or not.
280    Path result = new Path(TEST_ROOT_DIR + "/test.txt");
281    {
282      BufferedReader file = new BufferedReader
283         (new InputStreamReader(FileSystem.getLocal(conf).open(result)));
284      String line = file.readLine();
285      while (line != null) {
286        if (!testStr.equals(line))
287          return new TestResult(job, false);
288        count++;
289        line = file.readLine();
290
291      }
292      file.close();
293    }
294    if (count != 6)
295      return new TestResult(job, false);
296
297    return new TestResult(job, true);
298
299  }
300}
Note: See TracBrowser for help on using the repository browser.