1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.io.*; |
---|
22 | import java.util.*; |
---|
23 | |
---|
24 | import org.apache.hadoop.fs.*; |
---|
25 | import org.apache.hadoop.io.IntWritable; |
---|
26 | import org.apache.hadoop.io.LongWritable; |
---|
27 | import org.apache.hadoop.io.Text; |
---|
28 | import org.apache.hadoop.mapred.JobClient; |
---|
29 | import org.apache.hadoop.mapred.JobConf; |
---|
30 | import org.apache.hadoop.mapred.Mapper; |
---|
31 | import org.apache.hadoop.mapred.OutputCollector; |
---|
32 | import org.apache.hadoop.mapred.Reducer; |
---|
33 | import org.apache.hadoop.mapred.Reporter; |
---|
34 | import org.apache.hadoop.util.*; |
---|
35 | import org.apache.hadoop.mapred.MapReduceBase; |
---|
36 | import org.apache.hadoop.filecache.*; |
---|
37 | import java.net.URI; |
---|
38 | |
---|
39 | public class MRCaching { |
---|
40 | static String testStr = "This is a test file " + "used for testing caching " |
---|
41 | + "jars, zip and normal files."; |
---|
42 | |
---|
43 | /** |
---|
44 | * Using the wordcount example and adding caching to it. The cache |
---|
45 | * archives/files are set and then are checked in the map if they have been |
---|
46 | * localized or not. |
---|
47 | */ |
---|
48 | public static class MapClass extends MapReduceBase |
---|
49 | implements Mapper<LongWritable, Text, Text, IntWritable> { |
---|
50 | |
---|
51 | JobConf conf; |
---|
52 | |
---|
53 | private final static IntWritable one = new IntWritable(1); |
---|
54 | |
---|
55 | private Text word = new Text(); |
---|
56 | |
---|
57 | public void configure(JobConf jconf) { |
---|
58 | conf = jconf; |
---|
59 | try { |
---|
60 | Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); |
---|
61 | Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); |
---|
62 | // read the cached files (unzipped, unjarred and text) |
---|
63 | // and put it into a single file TEST_ROOT_DIR/test.txt |
---|
64 | String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp"); |
---|
65 | Path file = new Path("file:///", TEST_ROOT_DIR); |
---|
66 | FileSystem fs = FileSystem.getLocal(conf); |
---|
67 | if (!fs.mkdirs(file)) { |
---|
68 | throw new IOException("Mkdirs failed to create " + file.toString()); |
---|
69 | } |
---|
70 | Path fileOut = new Path(file, "test.txt"); |
---|
71 | fs.delete(fileOut, true); |
---|
72 | DataOutputStream out = fs.create(fileOut); |
---|
73 | for (int i = 0; i < localArchives.length; i++) { |
---|
74 | // read out the files from these archives |
---|
75 | File f = new File(localArchives[i].toString()); |
---|
76 | File txt = new File(f, "test.txt"); |
---|
77 | FileInputStream fin = new FileInputStream(txt); |
---|
78 | DataInputStream din = new DataInputStream(fin); |
---|
79 | String str = din.readLine(); |
---|
80 | din.close(); |
---|
81 | out.writeBytes(str); |
---|
82 | out.writeBytes("\n"); |
---|
83 | } |
---|
84 | for (int i = 0; i < localFiles.length; i++) { |
---|
85 | // read out the files from these archives |
---|
86 | File txt = new File(localFiles[i].toString()); |
---|
87 | FileInputStream fin = new FileInputStream(txt); |
---|
88 | DataInputStream din = new DataInputStream(fin); |
---|
89 | String str = din.readLine(); |
---|
90 | out.writeBytes(str); |
---|
91 | out.writeBytes("\n"); |
---|
92 | } |
---|
93 | out.close(); |
---|
94 | } catch (IOException ie) { |
---|
95 | System.out.println(StringUtils.stringifyException(ie)); |
---|
96 | } |
---|
97 | } |
---|
98 | |
---|
99 | public void map(LongWritable key, Text value, |
---|
100 | OutputCollector<Text, IntWritable> output, |
---|
101 | Reporter reporter) throws IOException { |
---|
102 | String line = value.toString(); |
---|
103 | StringTokenizer itr = new StringTokenizer(line); |
---|
104 | while (itr.hasMoreTokens()) { |
---|
105 | word.set(itr.nextToken()); |
---|
106 | output.collect(word, one); |
---|
107 | } |
---|
108 | |
---|
109 | } |
---|
110 | } |
---|
111 | |
---|
112 | /** |
---|
113 | * Using the wordcount example and adding caching to it. The cache |
---|
114 | * archives/files are set and then are checked in the map if they have been |
---|
115 | * symlinked or not. |
---|
116 | */ |
---|
117 | public static class MapClass2 extends MapClass { |
---|
118 | |
---|
119 | JobConf conf; |
---|
120 | |
---|
121 | public void configure(JobConf jconf) { |
---|
122 | conf = jconf; |
---|
123 | try { |
---|
124 | // read the cached files (unzipped, unjarred and text) |
---|
125 | // and put it into a single file TEST_ROOT_DIR/test.txt |
---|
126 | String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp"); |
---|
127 | Path file = new Path("file:///", TEST_ROOT_DIR); |
---|
128 | FileSystem fs = FileSystem.getLocal(conf); |
---|
129 | if (!fs.mkdirs(file)) { |
---|
130 | throw new IOException("Mkdirs failed to create " + file.toString()); |
---|
131 | } |
---|
132 | Path fileOut = new Path(file, "test.txt"); |
---|
133 | fs.delete(fileOut, true); |
---|
134 | DataOutputStream out = fs.create(fileOut); |
---|
135 | String[] symlinks = new String[6]; |
---|
136 | symlinks[0] = "."; |
---|
137 | symlinks[1] = "testjar"; |
---|
138 | symlinks[2] = "testzip"; |
---|
139 | symlinks[3] = "testtgz"; |
---|
140 | symlinks[4] = "testtargz"; |
---|
141 | symlinks[5] = "testtar"; |
---|
142 | |
---|
143 | for (int i = 0; i < symlinks.length; i++) { |
---|
144 | // read out the files from these archives |
---|
145 | File f = new File(symlinks[i]); |
---|
146 | File txt = new File(f, "test.txt"); |
---|
147 | FileInputStream fin = new FileInputStream(txt); |
---|
148 | BufferedReader reader = new BufferedReader(new InputStreamReader(fin)); |
---|
149 | String str = reader.readLine(); |
---|
150 | reader.close(); |
---|
151 | out.writeBytes(str); |
---|
152 | out.writeBytes("\n"); |
---|
153 | } |
---|
154 | out.close(); |
---|
155 | } catch (IOException ie) { |
---|
156 | System.out.println(StringUtils.stringifyException(ie)); |
---|
157 | } |
---|
158 | } |
---|
159 | } |
---|
160 | |
---|
161 | /** |
---|
162 | * A reducer class that just emits the sum of the input values. |
---|
163 | */ |
---|
164 | public static class ReduceClass extends MapReduceBase |
---|
165 | implements Reducer<Text, IntWritable, Text, IntWritable> { |
---|
166 | |
---|
167 | public void reduce(Text key, Iterator<IntWritable> values, |
---|
168 | OutputCollector<Text, IntWritable> output, |
---|
169 | Reporter reporter) throws IOException { |
---|
170 | int sum = 0; |
---|
171 | while (values.hasNext()) { |
---|
172 | sum += values.next().get(); |
---|
173 | } |
---|
174 | output.collect(key, new IntWritable(sum)); |
---|
175 | } |
---|
176 | } |
---|
177 | |
---|
178 | public static class TestResult { |
---|
179 | public RunningJob job; |
---|
180 | public boolean isOutputOk; |
---|
181 | TestResult(RunningJob job, boolean isOutputOk) { |
---|
182 | this.job = job; |
---|
183 | this.isOutputOk = isOutputOk; |
---|
184 | } |
---|
185 | } |
---|
186 | |
---|
187 | static void setupCache(String cacheDir, FileSystem fs) |
---|
188 | throws IOException { |
---|
189 | Path localPath = new Path("build/test/cache"); |
---|
190 | Path txtPath = new Path(localPath, new Path("test.txt")); |
---|
191 | Path jarPath = new Path(localPath, new Path("test.jar")); |
---|
192 | Path zipPath = new Path(localPath, new Path("test.zip")); |
---|
193 | Path tarPath = new Path(localPath, new Path("test.tgz")); |
---|
194 | Path tarPath1 = new Path(localPath, new Path("test.tar.gz")); |
---|
195 | Path tarPath2 = new Path(localPath, new Path("test.tar")); |
---|
196 | Path cachePath = new Path(cacheDir); |
---|
197 | fs.delete(cachePath, true); |
---|
198 | if (!fs.mkdirs(cachePath)) { |
---|
199 | throw new IOException("Mkdirs failed to create " + cachePath.toString()); |
---|
200 | } |
---|
201 | fs.copyFromLocalFile(txtPath, cachePath); |
---|
202 | fs.copyFromLocalFile(jarPath, cachePath); |
---|
203 | fs.copyFromLocalFile(zipPath, cachePath); |
---|
204 | fs.copyFromLocalFile(tarPath, cachePath); |
---|
205 | fs.copyFromLocalFile(tarPath1, cachePath); |
---|
206 | fs.copyFromLocalFile(tarPath2, cachePath); |
---|
207 | } |
---|
208 | |
---|
209 | public static TestResult launchMRCache(String indir, |
---|
210 | String outdir, String cacheDir, |
---|
211 | JobConf conf, String input) |
---|
212 | throws IOException { |
---|
213 | setupCache(cacheDir, FileSystem.get(conf)); |
---|
214 | return launchMRCache(indir,outdir, cacheDir, conf, input, false); |
---|
215 | } |
---|
216 | |
---|
217 | public static TestResult launchMRCache(String indir, |
---|
218 | String outdir, String cacheDir, |
---|
219 | JobConf conf, String input, |
---|
220 | boolean withSymlink) |
---|
221 | throws IOException { |
---|
222 | String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")) |
---|
223 | .toString().replace(' ', '+'); |
---|
224 | //if (TEST_ROOT_DIR.startsWith("C:")) TEST_ROOT_DIR = "/tmp"; |
---|
225 | conf.set("test.build.data", TEST_ROOT_DIR); |
---|
226 | final Path inDir = new Path(indir); |
---|
227 | final Path outDir = new Path(outdir); |
---|
228 | FileSystem fs = FileSystem.get(conf); |
---|
229 | fs.delete(outDir, true); |
---|
230 | if (!fs.mkdirs(inDir)) { |
---|
231 | throw new IOException("Mkdirs failed to create " + inDir.toString()); |
---|
232 | } |
---|
233 | { |
---|
234 | System.out.println("HERE:"+inDir); |
---|
235 | DataOutputStream file = fs.create(new Path(inDir, "part-0")); |
---|
236 | file.writeBytes(input); |
---|
237 | file.close(); |
---|
238 | } |
---|
239 | conf.setJobName("cachetest"); |
---|
240 | |
---|
241 | // the keys are words (strings) |
---|
242 | conf.setOutputKeyClass(Text.class); |
---|
243 | // the values are counts (ints) |
---|
244 | conf.setOutputValueClass(IntWritable.class); |
---|
245 | |
---|
246 | conf.setCombinerClass(MRCaching.ReduceClass.class); |
---|
247 | conf.setReducerClass(MRCaching.ReduceClass.class); |
---|
248 | FileInputFormat.setInputPaths(conf, inDir); |
---|
249 | FileOutputFormat.setOutputPath(conf, outDir); |
---|
250 | conf.setNumMapTasks(1); |
---|
251 | conf.setNumReduceTasks(1); |
---|
252 | conf.setSpeculativeExecution(false); |
---|
253 | URI[] uris = new URI[6]; |
---|
254 | if (!withSymlink) { |
---|
255 | conf.setMapperClass(MRCaching.MapClass.class); |
---|
256 | uris[0] = fs.getUri().resolve(cacheDir + "/test.txt"); |
---|
257 | uris[1] = fs.getUri().resolve(cacheDir + "/test.jar"); |
---|
258 | uris[2] = fs.getUri().resolve(cacheDir + "/test.zip"); |
---|
259 | uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz"); |
---|
260 | uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz"); |
---|
261 | uris[5] = fs.getUri().resolve(cacheDir + "/test.tar"); |
---|
262 | } else { |
---|
263 | DistributedCache.createSymlink(conf); |
---|
264 | conf.setMapperClass(MRCaching.MapClass2.class); |
---|
265 | uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt"); |
---|
266 | uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar"); |
---|
267 | uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip"); |
---|
268 | uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz"); |
---|
269 | uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz"); |
---|
270 | uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar"); |
---|
271 | } |
---|
272 | DistributedCache.addCacheFile(uris[0], conf); |
---|
273 | for (int i = 1; i < 6; i++) { |
---|
274 | DistributedCache.addCacheArchive(uris[i], conf); |
---|
275 | } |
---|
276 | RunningJob job = JobClient.runJob(conf); |
---|
277 | int count = 0; |
---|
278 | // after the job ran check to see if the input from the localized cache |
---|
279 | // match the real string. check if there are 3 instances or not. |
---|
280 | Path result = new Path(TEST_ROOT_DIR + "/test.txt"); |
---|
281 | { |
---|
282 | BufferedReader file = new BufferedReader |
---|
283 | (new InputStreamReader(FileSystem.getLocal(conf).open(result))); |
---|
284 | String line = file.readLine(); |
---|
285 | while (line != null) { |
---|
286 | if (!testStr.equals(line)) |
---|
287 | return new TestResult(job, false); |
---|
288 | count++; |
---|
289 | line = file.readLine(); |
---|
290 | |
---|
291 | } |
---|
292 | file.close(); |
---|
293 | } |
---|
294 | if (count != 6) |
---|
295 | return new TestResult(job, false); |
---|
296 | |
---|
297 | return new TestResult(job, true); |
---|
298 | |
---|
299 | } |
---|
300 | } |
---|