1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.filecache; |
---|
20 | |
---|
21 | import org.apache.commons.logging.*; |
---|
22 | import java.io.*; |
---|
23 | import java.util.*; |
---|
24 | import org.apache.hadoop.conf.*; |
---|
25 | import org.apache.hadoop.util.*; |
---|
26 | import org.apache.hadoop.fs.*; |
---|
27 | |
---|
28 | import java.net.URI; |
---|
29 | |
---|
30 | /** |
---|
31 | * Distribute application-specific large, read-only files efficiently. |
---|
32 | * |
---|
33 | * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce |
---|
34 | * framework to cache files (text, archives, jars etc.) needed by applications. |
---|
35 | * </p> |
---|
36 | * |
---|
37 | * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached |
---|
38 | * via the {@link org.apache.hadoop.mapred.JobConf}. |
---|
39 | * The <code>DistributedCache</code> assumes that the |
---|
40 | * files specified via hdfs:// urls are already present on the |
---|
41 | * {@link FileSystem} at the path specified by the url.</p> |
---|
42 | * |
---|
43 | * <p>The framework will copy the necessary files on to the slave node before |
---|
44 | * any tasks for the job are executed on that node. Its efficiency stems from |
---|
45 | * the fact that the files are only copied once per job and the ability to |
---|
46 | * cache archives which are un-archived on the slaves.</p> |
---|
47 | * |
---|
48 | * <p><code>DistributedCache</code> can be used to distribute simple, read-only |
---|
49 | * data/text files and/or more complex types such as archives, jars etc. |
---|
50 | * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. |
---|
51 | * Jars may be optionally added to the classpath of the tasks, a rudimentary |
---|
52 | * software distribution mechanism. Files have execution permissions. |
---|
53 | * Optionally users can also direct it to symlink the distributed cache file(s) |
---|
54 | * into the working directory of the task.</p> |
---|
55 | * |
---|
56 | * <p><code>DistributedCache</code> tracks modification timestamps of the cache |
---|
57 | * files. Clearly the cache files should not be modified by the application |
---|
58 | * or externally while the job is executing.</p> |
---|
59 | * |
---|
60 | * <p>Here is an illustrative example on how to use the |
---|
61 | * <code>DistributedCache</code>:</p> |
---|
62 | * <p><blockquote><pre> |
---|
63 | * // Setting up the cache for the application |
---|
64 | * |
---|
65 | * 1. Copy the requisite files to the <code>FileSystem</code>: |
---|
66 | * |
---|
67 | * $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat |
---|
68 | * $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip |
---|
69 | * $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar |
---|
70 | * $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar |
---|
71 | * $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz |
---|
72 | * $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz |
---|
73 | * |
---|
74 | * 2. Setup the application's <code>JobConf</code>: |
---|
75 | * |
---|
76 | * JobConf job = new JobConf(); |
---|
77 | * DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), |
---|
78 | * job); |
---|
79 | * DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); |
---|
80 | * DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); |
---|
81 | * DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); |
---|
82 | * DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); |
---|
83 | * DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); |
---|
84 | * |
---|
85 | * 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper} |
---|
86 | * or {@link org.apache.hadoop.mapred.Reducer}: |
---|
87 | * |
---|
88 | * public static class MapClass extends MapReduceBase |
---|
89 | * implements Mapper<K, V, K, V> { |
---|
90 | * |
---|
91 | * private Path[] localArchives; |
---|
92 | * private Path[] localFiles; |
---|
93 | * |
---|
94 | * public void configure(JobConf job) { |
---|
95 | * // Get the cached archives/files |
---|
96 | * localArchives = DistributedCache.getLocalCacheArchives(job); |
---|
97 | * localFiles = DistributedCache.getLocalCacheFiles(job); |
---|
98 | * } |
---|
99 | * |
---|
100 | * public void map(K key, V value, |
---|
101 | * OutputCollector<K, V> output, Reporter reporter) |
---|
102 | * throws IOException { |
---|
103 | * // Use data from the cached archives/files here |
---|
104 | * // ... |
---|
105 | * // ... |
---|
106 | * output.collect(k, v); |
---|
107 | * } |
---|
108 | * } |
---|
109 | * |
---|
110 | * </pre></blockquote></p> |
---|
111 | * |
---|
112 | * @see org.apache.hadoop.mapred.JobConf |
---|
113 | * @see org.apache.hadoop.mapred.JobClient |
---|
114 | */ |
---|
115 | public class DistributedCache { |
---|
116 | // cacheID to cacheStatus mapping |
---|
117 | private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>(); |
---|
118 | |
---|
119 | private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>(); |
---|
120 | |
---|
121 | // default total cache size |
---|
122 | private static final long DEFAULT_CACHE_SIZE = 10737418240L; |
---|
123 | |
---|
124 | private static final Log LOG = |
---|
125 | LogFactory.getLog(DistributedCache.class); |
---|
126 | |
---|
127 | /** |
---|
128 | * Get the locally cached file or archive; it could either be |
---|
129 | * previously cached (and valid) or copy it from the {@link FileSystem} now. |
---|
130 | * |
---|
131 | * @param cache the cache to be localized, this should be specified as |
---|
132 | * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema |
---|
133 | * or hostname:port is provided the file is assumed to be in the filesystem |
---|
134 | * being used in the Configuration |
---|
135 | * @param conf The Confguration file which contains the filesystem |
---|
136 | * @param baseDir The base cache Dir where you wnat to localize the files/archives |
---|
137 | * @param fileStatus The file status on the dfs. |
---|
138 | * @param isArchive if the cache is an archive or a file. In case it is an |
---|
139 | * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will |
---|
140 | * be unzipped/unjarred/untarred automatically |
---|
141 | * and the directory where the archive is unzipped/unjarred/untarred is |
---|
142 | * returned as the Path. |
---|
143 | * In case of a file, the path to the file is returned |
---|
144 | * @param confFileStamp this is the hdfs file modification timestamp to verify that the |
---|
145 | * file to be cached hasn't changed since the job started |
---|
146 | * @param currentWorkDir this is the directory where you would want to create symlinks |
---|
147 | * for the locally cached files/archives |
---|
148 | * @return the path to directory where the archives are unjarred in case of archives, |
---|
149 | * the path to the file where the file is copied locally |
---|
150 | * @throws IOException |
---|
151 | */ |
---|
152 | public static Path getLocalCache(URI cache, Configuration conf, |
---|
153 | Path baseDir, FileStatus fileStatus, |
---|
154 | boolean isArchive, long confFileStamp, |
---|
155 | Path currentWorkDir) |
---|
156 | throws IOException { |
---|
157 | return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, |
---|
158 | confFileStamp, currentWorkDir, true); |
---|
159 | } |
---|
160 | /** |
---|
161 | * Get the locally cached file or archive; it could either be |
---|
162 | * previously cached (and valid) or copy it from the {@link FileSystem} now. |
---|
163 | * |
---|
164 | * @param cache the cache to be localized, this should be specified as |
---|
165 | * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema |
---|
166 | * or hostname:port is provided the file is assumed to be in the filesystem |
---|
167 | * being used in the Configuration |
---|
168 | * @param conf The Confguration file which contains the filesystem |
---|
169 | * @param baseDir The base cache Dir where you wnat to localize the files/archives |
---|
170 | * @param fileStatus The file status on the dfs. |
---|
171 | * @param isArchive if the cache is an archive or a file. In case it is an |
---|
172 | * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will |
---|
173 | * be unzipped/unjarred/untarred automatically |
---|
174 | * and the directory where the archive is unzipped/unjarred/untarred is |
---|
175 | * returned as the Path. |
---|
176 | * In case of a file, the path to the file is returned |
---|
177 | * @param confFileStamp this is the hdfs file modification timestamp to verify that the |
---|
178 | * file to be cached hasn't changed since the job started |
---|
179 | * @param currentWorkDir this is the directory where you would want to create symlinks |
---|
180 | * for the locally cached files/archives |
---|
181 | * @param honorSymLinkConf if this is false, then the symlinks are not |
---|
182 | * created even if conf says so (this is required for an optimization in task |
---|
183 | * launches |
---|
184 | * @return the path to directory where the archives are unjarred in case of archives, |
---|
185 | * the path to the file where the file is copied locally |
---|
186 | * @throws IOException |
---|
187 | */ |
---|
188 | public static Path getLocalCache(URI cache, Configuration conf, |
---|
189 | Path baseDir, FileStatus fileStatus, |
---|
190 | boolean isArchive, long confFileStamp, |
---|
191 | Path currentWorkDir, boolean honorSymLinkConf) |
---|
192 | throws IOException { |
---|
193 | String cacheId = makeRelative(cache, conf); |
---|
194 | CacheStatus lcacheStatus; |
---|
195 | Path localizedPath; |
---|
196 | synchronized (cachedArchives) { |
---|
197 | lcacheStatus = cachedArchives.get(cacheId); |
---|
198 | if (lcacheStatus == null) { |
---|
199 | // was never localized |
---|
200 | lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId))); |
---|
201 | cachedArchives.put(cacheId, lcacheStatus); |
---|
202 | } |
---|
203 | |
---|
204 | synchronized (lcacheStatus) { |
---|
205 | localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, |
---|
206 | fileStatus, isArchive, currentWorkDir, honorSymLinkConf); |
---|
207 | lcacheStatus.refcount++; |
---|
208 | } |
---|
209 | } |
---|
210 | |
---|
211 | // try deleting stuff if you can |
---|
212 | long size = 0; |
---|
213 | synchronized (baseDirSize) { |
---|
214 | Long get = baseDirSize.get(baseDir); |
---|
215 | if ( get != null ) { |
---|
216 | size = get.longValue(); |
---|
217 | } |
---|
218 | } |
---|
219 | // setting the cache size to a default of 10GB |
---|
220 | long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE); |
---|
221 | if (allowedSize < size) { |
---|
222 | // try some cache deletions |
---|
223 | deleteCache(conf); |
---|
224 | } |
---|
225 | return localizedPath; |
---|
226 | } |
---|
227 | |
---|
228 | |
---|
229 | /** |
---|
230 | * Get the locally cached file or archive; it could either be |
---|
231 | * previously cached (and valid) or copy it from the {@link FileSystem} now. |
---|
232 | * |
---|
233 | * @param cache the cache to be localized, this should be specified as |
---|
234 | * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema |
---|
235 | * or hostname:port is provided the file is assumed to be in the filesystem |
---|
236 | * being used in the Configuration |
---|
237 | * @param conf The Confguration file which contains the filesystem |
---|
238 | * @param baseDir The base cache Dir where you wnat to localize the files/archives |
---|
239 | * @param isArchive if the cache is an archive or a file. In case it is an |
---|
240 | * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will |
---|
241 | * be unzipped/unjarred/untarred automatically |
---|
242 | * and the directory where the archive is unzipped/unjarred/untarred |
---|
243 | * is returned as the Path. |
---|
244 | * In case of a file, the path to the file is returned |
---|
245 | * @param confFileStamp this is the hdfs file modification timestamp to verify that the |
---|
246 | * file to be cached hasn't changed since the job started |
---|
247 | * @param currentWorkDir this is the directory where you would want to create symlinks |
---|
248 | * for the locally cached files/archives |
---|
249 | * @return the path to directory where the archives are unjarred in case of archives, |
---|
250 | * the path to the file where the file is copied locally |
---|
251 | * @throws IOException |
---|
252 | |
---|
253 | */ |
---|
254 | public static Path getLocalCache(URI cache, Configuration conf, |
---|
255 | Path baseDir, boolean isArchive, |
---|
256 | long confFileStamp, Path currentWorkDir) |
---|
257 | throws IOException { |
---|
258 | return getLocalCache(cache, conf, |
---|
259 | baseDir, null, isArchive, |
---|
260 | confFileStamp, currentWorkDir); |
---|
261 | } |
---|
262 | |
---|
263 | /** |
---|
264 | * This is the opposite of getlocalcache. When you are done with |
---|
265 | * using the cache, you need to release the cache |
---|
266 | * @param cache The cache URI to be released |
---|
267 | * @param conf configuration which contains the filesystem the cache |
---|
268 | * is contained in. |
---|
269 | * @throws IOException |
---|
270 | */ |
---|
271 | public static void releaseCache(URI cache, Configuration conf) |
---|
272 | throws IOException { |
---|
273 | String cacheId = makeRelative(cache, conf); |
---|
274 | synchronized (cachedArchives) { |
---|
275 | CacheStatus lcacheStatus = cachedArchives.get(cacheId); |
---|
276 | if (lcacheStatus == null) |
---|
277 | return; |
---|
278 | synchronized (lcacheStatus) { |
---|
279 | lcacheStatus.refcount--; |
---|
280 | } |
---|
281 | } |
---|
282 | } |
---|
283 | |
---|
284 | // To delete the caches which have a refcount of zero |
---|
285 | |
---|
286 | private static void deleteCache(Configuration conf) throws IOException { |
---|
287 | // try deleting cache Status with refcount of zero |
---|
288 | synchronized (cachedArchives) { |
---|
289 | for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) { |
---|
290 | String cacheId = (String) it.next(); |
---|
291 | CacheStatus lcacheStatus = cachedArchives.get(cacheId); |
---|
292 | synchronized (lcacheStatus) { |
---|
293 | if (lcacheStatus.refcount == 0) { |
---|
294 | // delete this cache entry |
---|
295 | FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true); |
---|
296 | synchronized (baseDirSize) { |
---|
297 | Long dirSize = baseDirSize.get(lcacheStatus.baseDir); |
---|
298 | if ( dirSize != null ) { |
---|
299 | dirSize -= lcacheStatus.size; |
---|
300 | baseDirSize.put(lcacheStatus.baseDir, dirSize); |
---|
301 | } |
---|
302 | } |
---|
303 | it.remove(); |
---|
304 | } |
---|
305 | } |
---|
306 | } |
---|
307 | } |
---|
308 | } |
---|
309 | |
---|
310 | /* |
---|
311 | * Returns the relative path of the dir this cache will be localized in |
---|
312 | * relative path that this cache will be localized in. For |
---|
313 | * hdfs://hostname:port/absolute_path -- the relative path is |
---|
314 | * hostname/absolute path -- if it is just /absolute_path -- then the |
---|
315 | * relative path is hostname of DFS this mapred cluster is running |
---|
316 | * on/absolute_path |
---|
317 | */ |
---|
318 | public static String makeRelative(URI cache, Configuration conf) |
---|
319 | throws IOException { |
---|
320 | String host = cache.getHost(); |
---|
321 | if (host == null) { |
---|
322 | host = cache.getScheme(); |
---|
323 | } |
---|
324 | if (host == null) { |
---|
325 | URI defaultUri = FileSystem.get(conf).getUri(); |
---|
326 | host = defaultUri.getHost(); |
---|
327 | if (host == null) { |
---|
328 | host = defaultUri.getScheme(); |
---|
329 | } |
---|
330 | } |
---|
331 | String path = host + cache.getPath(); |
---|
332 | path = path.replace(":/","/"); // remove windows device colon |
---|
333 | return path; |
---|
334 | } |
---|
335 | |
---|
336 | private static Path cacheFilePath(Path p) { |
---|
337 | return new Path(p, p.getName()); |
---|
338 | } |
---|
339 | |
---|
340 | // the method which actually copies the caches locally and unjars/unzips them |
---|
341 | // and does chmod for the files |
---|
342 | private static Path localizeCache(Configuration conf, |
---|
343 | URI cache, long confFileStamp, |
---|
344 | CacheStatus cacheStatus, |
---|
345 | FileStatus fileStatus, |
---|
346 | boolean isArchive, |
---|
347 | Path currentWorkDir,boolean honorSymLinkConf) |
---|
348 | throws IOException { |
---|
349 | boolean doSymlink = honorSymLinkConf && getSymlink(conf); |
---|
350 | if(cache.getFragment() == null) { |
---|
351 | doSymlink = false; |
---|
352 | } |
---|
353 | FileSystem fs = getFileSystem(cache, conf); |
---|
354 | String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment(); |
---|
355 | File flink = new File(link); |
---|
356 | if (ifExistsAndFresh(conf, fs, cache, confFileStamp, |
---|
357 | cacheStatus, fileStatus)) { |
---|
358 | if (isArchive) { |
---|
359 | if (doSymlink){ |
---|
360 | if (!flink.exists()) |
---|
361 | FileUtil.symLink(cacheStatus.localLoadPath.toString(), |
---|
362 | link); |
---|
363 | } |
---|
364 | return cacheStatus.localLoadPath; |
---|
365 | } |
---|
366 | else { |
---|
367 | if (doSymlink){ |
---|
368 | if (!flink.exists()) |
---|
369 | FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), |
---|
370 | link); |
---|
371 | } |
---|
372 | return cacheFilePath(cacheStatus.localLoadPath); |
---|
373 | } |
---|
374 | } else { |
---|
375 | // remove the old archive |
---|
376 | // if the old archive cannot be removed since it is being used by another |
---|
377 | // job |
---|
378 | // return null |
---|
379 | if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true)) |
---|
380 | throw new IOException("Cache " + cacheStatus.localLoadPath.toString() |
---|
381 | + " is in use and cannot be refreshed"); |
---|
382 | |
---|
383 | FileSystem localFs = FileSystem.getLocal(conf); |
---|
384 | localFs.delete(cacheStatus.localLoadPath, true); |
---|
385 | synchronized (baseDirSize) { |
---|
386 | Long dirSize = baseDirSize.get(cacheStatus.baseDir); |
---|
387 | if ( dirSize != null ) { |
---|
388 | dirSize -= cacheStatus.size; |
---|
389 | baseDirSize.put(cacheStatus.baseDir, dirSize); |
---|
390 | } |
---|
391 | } |
---|
392 | Path parchive = new Path(cacheStatus.localLoadPath, |
---|
393 | new Path(cacheStatus.localLoadPath.getName())); |
---|
394 | |
---|
395 | if (!localFs.mkdirs(cacheStatus.localLoadPath)) { |
---|
396 | throw new IOException("Mkdirs failed to create directory " + |
---|
397 | cacheStatus.localLoadPath.toString()); |
---|
398 | } |
---|
399 | |
---|
400 | String cacheId = cache.getPath(); |
---|
401 | fs.copyToLocalFile(new Path(cacheId), parchive); |
---|
402 | if (isArchive) { |
---|
403 | String tmpArchive = parchive.toString().toLowerCase(); |
---|
404 | File srcFile = new File(parchive.toString()); |
---|
405 | File destDir = new File(parchive.getParent().toString()); |
---|
406 | if (tmpArchive.endsWith(".jar")) { |
---|
407 | RunJar.unJar(srcFile, destDir); |
---|
408 | } else if (tmpArchive.endsWith(".zip")) { |
---|
409 | FileUtil.unZip(srcFile, destDir); |
---|
410 | } else if (isTarFile(tmpArchive)) { |
---|
411 | FileUtil.unTar(srcFile, destDir); |
---|
412 | } |
---|
413 | // else will not do anyhting |
---|
414 | // and copy the file into the dir as it is |
---|
415 | } |
---|
416 | |
---|
417 | long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString())); |
---|
418 | cacheStatus.size = cacheSize; |
---|
419 | synchronized (baseDirSize) { |
---|
420 | Long dirSize = baseDirSize.get(cacheStatus.baseDir); |
---|
421 | if( dirSize == null ) { |
---|
422 | dirSize = Long.valueOf(cacheSize); |
---|
423 | } else { |
---|
424 | dirSize += cacheSize; |
---|
425 | } |
---|
426 | baseDirSize.put(cacheStatus.baseDir, dirSize); |
---|
427 | } |
---|
428 | |
---|
429 | // do chmod here |
---|
430 | try { |
---|
431 | FileUtil.chmod(parchive.toString(), "+x"); |
---|
432 | } catch(InterruptedException e) { |
---|
433 | LOG.warn("Exception in chmod" + e.toString()); |
---|
434 | } |
---|
435 | |
---|
436 | // update cacheStatus to reflect the newly cached file |
---|
437 | cacheStatus.currentStatus = true; |
---|
438 | cacheStatus.mtime = getTimestamp(conf, cache); |
---|
439 | } |
---|
440 | |
---|
441 | if (isArchive){ |
---|
442 | if (doSymlink){ |
---|
443 | if (!flink.exists()) |
---|
444 | FileUtil.symLink(cacheStatus.localLoadPath.toString(), |
---|
445 | link); |
---|
446 | } |
---|
447 | return cacheStatus.localLoadPath; |
---|
448 | } |
---|
449 | else { |
---|
450 | if (doSymlink){ |
---|
451 | if (!flink.exists()) |
---|
452 | FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), |
---|
453 | link); |
---|
454 | } |
---|
455 | return cacheFilePath(cacheStatus.localLoadPath); |
---|
456 | } |
---|
457 | } |
---|
458 | |
---|
459 | private static boolean isTarFile(String filename) { |
---|
460 | return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") || |
---|
461 | filename.endsWith(".tar")); |
---|
462 | } |
---|
463 | |
---|
464 | // Checks if the cache has already been localized and is fresh |
---|
465 | private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, |
---|
466 | URI cache, long confFileStamp, |
---|
467 | CacheStatus lcacheStatus, |
---|
468 | FileStatus fileStatus) |
---|
469 | throws IOException { |
---|
470 | // check for existence of the cache |
---|
471 | if (lcacheStatus.currentStatus == false) { |
---|
472 | return false; |
---|
473 | } else { |
---|
474 | long dfsFileStamp; |
---|
475 | if (fileStatus != null) { |
---|
476 | dfsFileStamp = fileStatus.getModificationTime(); |
---|
477 | } else { |
---|
478 | dfsFileStamp = getTimestamp(conf, cache); |
---|
479 | } |
---|
480 | |
---|
481 | // ensure that the file on hdfs hasn't been modified since the job started |
---|
482 | if (dfsFileStamp != confFileStamp) { |
---|
483 | LOG.fatal("File: " + cache + " has changed on HDFS since job started"); |
---|
484 | throw new IOException("File: " + cache + |
---|
485 | " has changed on HDFS since job started"); |
---|
486 | } |
---|
487 | |
---|
488 | if (dfsFileStamp != lcacheStatus.mtime) { |
---|
489 | // needs refreshing |
---|
490 | return false; |
---|
491 | } |
---|
492 | } |
---|
493 | |
---|
494 | return true; |
---|
495 | } |
---|
496 | |
---|
497 | /** |
---|
498 | * Returns mtime of a given cache file on hdfs. |
---|
499 | * @param conf configuration |
---|
500 | * @param cache cache file |
---|
501 | * @return mtime of a given cache file on hdfs |
---|
502 | * @throws IOException |
---|
503 | */ |
---|
504 | public static long getTimestamp(Configuration conf, URI cache) |
---|
505 | throws IOException { |
---|
506 | FileSystem fileSystem = FileSystem.get(cache, conf); |
---|
507 | Path filePath = new Path(cache.getPath()); |
---|
508 | |
---|
509 | return fileSystem.getFileStatus(filePath).getModificationTime(); |
---|
510 | } |
---|
511 | |
---|
512 | /** |
---|
513 | * This method create symlinks for all files in a given dir in another directory |
---|
514 | * @param conf the configuration |
---|
515 | * @param jobCacheDir the target directory for creating symlinks |
---|
516 | * @param workDir the directory in which the symlinks are created |
---|
517 | * @throws IOException |
---|
518 | */ |
---|
519 | public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir) |
---|
520 | throws IOException{ |
---|
521 | if ((jobCacheDir == null || !jobCacheDir.isDirectory()) || |
---|
522 | workDir == null || (!workDir.isDirectory())) { |
---|
523 | return; |
---|
524 | } |
---|
525 | boolean createSymlink = getSymlink(conf); |
---|
526 | if (createSymlink){ |
---|
527 | File[] list = jobCacheDir.listFiles(); |
---|
528 | for (int i=0; i < list.length; i++){ |
---|
529 | FileUtil.symLink(list[i].getAbsolutePath(), |
---|
530 | new File(workDir, list[i].getName()).toString()); |
---|
531 | } |
---|
532 | } |
---|
533 | } |
---|
534 | |
---|
535 | private static String getFileSysName(URI url) { |
---|
536 | String fsname = url.getScheme(); |
---|
537 | if ("hdfs".equals(fsname)) { |
---|
538 | String host = url.getHost(); |
---|
539 | int port = url.getPort(); |
---|
540 | return (port == (-1)) ? host : (host + ":" + port); |
---|
541 | } else { |
---|
542 | return null; |
---|
543 | } |
---|
544 | } |
---|
545 | |
---|
546 | private static FileSystem getFileSystem(URI cache, Configuration conf) |
---|
547 | throws IOException { |
---|
548 | String fileSysName = getFileSysName(cache); |
---|
549 | if (fileSysName != null) |
---|
550 | return FileSystem.getNamed(fileSysName, conf); |
---|
551 | else |
---|
552 | return FileSystem.get(conf); |
---|
553 | } |
---|
554 | |
---|
555 | /** |
---|
556 | * Set the configuration with the given set of archives |
---|
557 | * @param archives The list of archives that need to be localized |
---|
558 | * @param conf Configuration which will be changed |
---|
559 | */ |
---|
560 | public static void setCacheArchives(URI[] archives, Configuration conf) { |
---|
561 | String sarchives = StringUtils.uriToString(archives); |
---|
562 | conf.set("mapred.cache.archives", sarchives); |
---|
563 | } |
---|
564 | |
---|
565 | /** |
---|
566 | * Set the configuration with the given set of files |
---|
567 | * @param files The list of files that need to be localized |
---|
568 | * @param conf Configuration which will be changed |
---|
569 | */ |
---|
570 | public static void setCacheFiles(URI[] files, Configuration conf) { |
---|
571 | String sfiles = StringUtils.uriToString(files); |
---|
572 | conf.set("mapred.cache.files", sfiles); |
---|
573 | } |
---|
574 | |
---|
575 | /** |
---|
576 | * Get cache archives set in the Configuration |
---|
577 | * @param conf The configuration which contains the archives |
---|
578 | * @return A URI array of the caches set in the Configuration |
---|
579 | * @throws IOException |
---|
580 | */ |
---|
581 | public static URI[] getCacheArchives(Configuration conf) throws IOException { |
---|
582 | return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives")); |
---|
583 | } |
---|
584 | |
---|
585 | /** |
---|
586 | * Get cache files set in the Configuration |
---|
587 | * @param conf The configuration which contains the files |
---|
588 | * @return A URI array of the files set in the Configuration |
---|
589 | * @throws IOException |
---|
590 | */ |
---|
591 | |
---|
592 | public static URI[] getCacheFiles(Configuration conf) throws IOException { |
---|
593 | return StringUtils.stringToURI(conf.getStrings("mapred.cache.files")); |
---|
594 | } |
---|
595 | |
---|
596 | /** |
---|
597 | * Return the path array of the localized caches |
---|
598 | * @param conf Configuration that contains the localized archives |
---|
599 | * @return A path array of localized caches |
---|
600 | * @throws IOException |
---|
601 | */ |
---|
602 | public static Path[] getLocalCacheArchives(Configuration conf) |
---|
603 | throws IOException { |
---|
604 | return StringUtils.stringToPath(conf |
---|
605 | .getStrings("mapred.cache.localArchives")); |
---|
606 | } |
---|
607 | |
---|
608 | /** |
---|
609 | * Return the path array of the localized files |
---|
610 | * @param conf Configuration that contains the localized files |
---|
611 | * @return A path array of localized files |
---|
612 | * @throws IOException |
---|
613 | */ |
---|
614 | public static Path[] getLocalCacheFiles(Configuration conf) |
---|
615 | throws IOException { |
---|
616 | return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles")); |
---|
617 | } |
---|
618 | |
---|
619 | /** |
---|
620 | * Get the timestamps of the archives |
---|
621 | * @param conf The configuration which stored the timestamps |
---|
622 | * @return a string array of timestamps |
---|
623 | * @throws IOException |
---|
624 | */ |
---|
625 | public static String[] getArchiveTimestamps(Configuration conf) { |
---|
626 | return conf.getStrings("mapred.cache.archives.timestamps"); |
---|
627 | } |
---|
628 | |
---|
629 | |
---|
630 | /** |
---|
631 | * Get the timestamps of the files |
---|
632 | * @param conf The configuration which stored the timestamps |
---|
633 | * @return a string array of timestamps |
---|
634 | * @throws IOException |
---|
635 | */ |
---|
636 | public static String[] getFileTimestamps(Configuration conf) { |
---|
637 | return conf.getStrings("mapred.cache.files.timestamps"); |
---|
638 | } |
---|
639 | |
---|
640 | /** |
---|
641 | * This is to check the timestamp of the archives to be localized |
---|
642 | * @param conf Configuration which stores the timestamp's |
---|
643 | * @param timestamps comma separated list of timestamps of archives. |
---|
644 | * The order should be the same as the order in which the archives are added. |
---|
645 | */ |
---|
646 | public static void setArchiveTimestamps(Configuration conf, String timestamps) { |
---|
647 | conf.set("mapred.cache.archives.timestamps", timestamps); |
---|
648 | } |
---|
649 | |
---|
650 | /** |
---|
651 | * This is to check the timestamp of the files to be localized |
---|
652 | * @param conf Configuration which stores the timestamp's |
---|
653 | * @param timestamps comma separated list of timestamps of files. |
---|
654 | * The order should be the same as the order in which the files are added. |
---|
655 | */ |
---|
656 | public static void setFileTimestamps(Configuration conf, String timestamps) { |
---|
657 | conf.set("mapred.cache.files.timestamps", timestamps); |
---|
658 | } |
---|
659 | |
---|
660 | /** |
---|
661 | * Set the conf to contain the location for localized archives |
---|
662 | * @param conf The conf to modify to contain the localized caches |
---|
663 | * @param str a comma separated list of local archives |
---|
664 | */ |
---|
665 | public static void setLocalArchives(Configuration conf, String str) { |
---|
666 | conf.set("mapred.cache.localArchives", str); |
---|
667 | } |
---|
668 | |
---|
669 | /** |
---|
670 | * Set the conf to contain the location for localized files |
---|
671 | * @param conf The conf to modify to contain the localized caches |
---|
672 | * @param str a comma separated list of local files |
---|
673 | */ |
---|
674 | public static void setLocalFiles(Configuration conf, String str) { |
---|
675 | conf.set("mapred.cache.localFiles", str); |
---|
676 | } |
---|
677 | |
---|
678 | /** |
---|
679 | * Add a archives to be localized to the conf |
---|
680 | * @param uri The uri of the cache to be localized |
---|
681 | * @param conf Configuration to add the cache to |
---|
682 | */ |
---|
683 | public static void addCacheArchive(URI uri, Configuration conf) { |
---|
684 | String archives = conf.get("mapred.cache.archives"); |
---|
685 | conf.set("mapred.cache.archives", archives == null ? uri.toString() |
---|
686 | : archives + "," + uri.toString()); |
---|
687 | } |
---|
688 | |
---|
689 | /** |
---|
690 | * Add a file to be localized to the conf |
---|
691 | * @param uri The uri of the cache to be localized |
---|
692 | * @param conf Configuration to add the cache to |
---|
693 | */ |
---|
694 | public static void addCacheFile(URI uri, Configuration conf) { |
---|
695 | String files = conf.get("mapred.cache.files"); |
---|
696 | conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," |
---|
697 | + uri.toString()); |
---|
698 | } |
---|
699 | |
---|
700 | /** |
---|
701 | * Add an file path to the current set of classpath entries It adds the file |
---|
702 | * to cache as well. |
---|
703 | * |
---|
704 | * @param file Path of the file to be added |
---|
705 | * @param conf Configuration that contains the classpath setting |
---|
706 | */ |
---|
707 | public static void addFileToClassPath(Path file, Configuration conf) |
---|
708 | throws IOException { |
---|
709 | String classpath = conf.get("mapred.job.classpath.files"); |
---|
710 | conf.set("mapred.job.classpath.files", classpath == null ? file.toString() |
---|
711 | : classpath + System.getProperty("path.separator") + file.toString()); |
---|
712 | FileSystem fs = FileSystem.get(conf); |
---|
713 | URI uri = fs.makeQualified(file).toUri(); |
---|
714 | |
---|
715 | addCacheFile(uri, conf); |
---|
716 | } |
---|
717 | |
---|
718 | /** |
---|
719 | * Get the file entries in classpath as an array of Path |
---|
720 | * |
---|
721 | * @param conf Configuration that contains the classpath setting |
---|
722 | */ |
---|
723 | public static Path[] getFileClassPaths(Configuration conf) { |
---|
724 | String classpath = conf.get("mapred.job.classpath.files"); |
---|
725 | if (classpath == null) |
---|
726 | return null; |
---|
727 | ArrayList list = Collections.list(new StringTokenizer(classpath, System |
---|
728 | .getProperty("path.separator"))); |
---|
729 | Path[] paths = new Path[list.size()]; |
---|
730 | for (int i = 0; i < list.size(); i++) { |
---|
731 | paths[i] = new Path((String) list.get(i)); |
---|
732 | } |
---|
733 | return paths; |
---|
734 | } |
---|
735 | |
---|
736 | /** |
---|
737 | * Add an archive path to the current set of classpath entries. It adds the |
---|
738 | * archive to cache as well. |
---|
739 | * |
---|
740 | * @param archive Path of the archive to be added |
---|
741 | * @param conf Configuration that contains the classpath setting |
---|
742 | */ |
---|
743 | public static void addArchiveToClassPath(Path archive, Configuration conf) |
---|
744 | throws IOException { |
---|
745 | String classpath = conf.get("mapred.job.classpath.archives"); |
---|
746 | conf.set("mapred.job.classpath.archives", classpath == null ? archive |
---|
747 | .toString() : classpath + System.getProperty("path.separator") |
---|
748 | + archive.toString()); |
---|
749 | FileSystem fs = FileSystem.get(conf); |
---|
750 | URI uri = fs.makeQualified(archive).toUri(); |
---|
751 | |
---|
752 | addCacheArchive(uri, conf); |
---|
753 | } |
---|
754 | |
---|
755 | /** |
---|
756 | * Get the archive entries in classpath as an array of Path |
---|
757 | * |
---|
758 | * @param conf Configuration that contains the classpath setting |
---|
759 | */ |
---|
760 | public static Path[] getArchiveClassPaths(Configuration conf) { |
---|
761 | String classpath = conf.get("mapred.job.classpath.archives"); |
---|
762 | if (classpath == null) |
---|
763 | return null; |
---|
764 | ArrayList list = Collections.list(new StringTokenizer(classpath, System |
---|
765 | .getProperty("path.separator"))); |
---|
766 | Path[] paths = new Path[list.size()]; |
---|
767 | for (int i = 0; i < list.size(); i++) { |
---|
768 | paths[i] = new Path((String) list.get(i)); |
---|
769 | } |
---|
770 | return paths; |
---|
771 | } |
---|
772 | |
---|
773 | /** |
---|
774 | * This method allows you to create symlinks in the current working directory |
---|
775 | * of the task to all the cache files/archives |
---|
776 | * @param conf the jobconf |
---|
777 | */ |
---|
778 | public static void createSymlink(Configuration conf){ |
---|
779 | conf.set("mapred.create.symlink", "yes"); |
---|
780 | } |
---|
781 | |
---|
782 | /** |
---|
783 | * This method checks to see if symlinks are to be create for the |
---|
784 | * localized cache files in the current working directory |
---|
785 | * @param conf the jobconf |
---|
786 | * @return true if symlinks are to be created- else return false |
---|
787 | */ |
---|
788 | public static boolean getSymlink(Configuration conf){ |
---|
789 | String result = conf.get("mapred.create.symlink"); |
---|
790 | if ("yes".equals(result)){ |
---|
791 | return true; |
---|
792 | } |
---|
793 | return false; |
---|
794 | } |
---|
795 | |
---|
796 | /** |
---|
797 | * This method checks if there is a conflict in the fragment names |
---|
798 | * of the uris. Also makes sure that each uri has a fragment. It |
---|
799 | * is only to be called if you want to create symlinks for |
---|
800 | * the various archives and files. |
---|
801 | * @param uriFiles The uri array of urifiles |
---|
802 | * @param uriArchives the uri array of uri archives |
---|
803 | */ |
---|
804 | public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){ |
---|
805 | if ((uriFiles == null) && (uriArchives == null)){ |
---|
806 | return true; |
---|
807 | } |
---|
808 | if (uriFiles != null){ |
---|
809 | for (int i = 0; i < uriFiles.length; i++){ |
---|
810 | String frag1 = uriFiles[i].getFragment(); |
---|
811 | if (frag1 == null) |
---|
812 | return false; |
---|
813 | for (int j=i+1; j < uriFiles.length; j++){ |
---|
814 | String frag2 = uriFiles[j].getFragment(); |
---|
815 | if (frag2 == null) |
---|
816 | return false; |
---|
817 | if (frag1.equalsIgnoreCase(frag2)) |
---|
818 | return false; |
---|
819 | } |
---|
820 | if (uriArchives != null){ |
---|
821 | for (int j = 0; j < uriArchives.length; j++){ |
---|
822 | String frag2 = uriArchives[j].getFragment(); |
---|
823 | if (frag2 == null){ |
---|
824 | return false; |
---|
825 | } |
---|
826 | if (frag1.equalsIgnoreCase(frag2)) |
---|
827 | return false; |
---|
828 | for (int k=j+1; k < uriArchives.length; k++){ |
---|
829 | String frag3 = uriArchives[k].getFragment(); |
---|
830 | if (frag3 == null) |
---|
831 | return false; |
---|
832 | if (frag2.equalsIgnoreCase(frag3)) |
---|
833 | return false; |
---|
834 | } |
---|
835 | } |
---|
836 | } |
---|
837 | } |
---|
838 | } |
---|
839 | return true; |
---|
840 | } |
---|
841 | |
---|
842 | private static class CacheStatus { |
---|
843 | // false, not loaded yet, true is loaded |
---|
844 | boolean currentStatus; |
---|
845 | |
---|
846 | // the local load path of this cache |
---|
847 | Path localLoadPath; |
---|
848 | |
---|
849 | //the base dir where the cache lies |
---|
850 | Path baseDir; |
---|
851 | |
---|
852 | //the size of this cache |
---|
853 | long size; |
---|
854 | |
---|
855 | // number of instances using this cache |
---|
856 | int refcount; |
---|
857 | |
---|
858 | // the cache-file modification time |
---|
859 | long mtime; |
---|
860 | |
---|
861 | public CacheStatus(Path baseDir, Path localLoadPath) { |
---|
862 | super(); |
---|
863 | this.currentStatus = false; |
---|
864 | this.localLoadPath = localLoadPath; |
---|
865 | this.refcount = 0; |
---|
866 | this.mtime = -1; |
---|
867 | this.baseDir = baseDir; |
---|
868 | this.size = 0; |
---|
869 | } |
---|
870 | } |
---|
871 | |
---|
872 | /** |
---|
873 | * Clear the entire contents of the cache and delete the backing files. This |
---|
874 | * should only be used when the server is reinitializing, because the users |
---|
875 | * are going to lose their files. |
---|
876 | */ |
---|
877 | public static void purgeCache(Configuration conf) throws IOException { |
---|
878 | synchronized (cachedArchives) { |
---|
879 | FileSystem localFs = FileSystem.getLocal(conf); |
---|
880 | for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) { |
---|
881 | try { |
---|
882 | localFs.delete(f.getValue().localLoadPath, true); |
---|
883 | } catch (IOException ie) { |
---|
884 | LOG.debug("Error cleaning up cache", ie); |
---|
885 | } |
---|
886 | } |
---|
887 | cachedArchives.clear(); |
---|
888 | } |
---|
889 | } |
---|
890 | } |
---|