[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.filecache; |
---|
| 20 | |
---|
| 21 | import org.apache.commons.logging.*; |
---|
| 22 | import java.io.*; |
---|
| 23 | import java.util.*; |
---|
| 24 | import org.apache.hadoop.conf.*; |
---|
| 25 | import org.apache.hadoop.util.*; |
---|
| 26 | import org.apache.hadoop.fs.*; |
---|
| 27 | |
---|
| 28 | import java.net.URI; |
---|
| 29 | |
---|
| 30 | /** |
---|
| 31 | * Distribute application-specific large, read-only files efficiently. |
---|
| 32 | * |
---|
| 33 | * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce |
---|
| 34 | * framework to cache files (text, archives, jars etc.) needed by applications. |
---|
| 35 | * </p> |
---|
| 36 | * |
---|
| 37 | * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached |
---|
| 38 | * via the {@link org.apache.hadoop.mapred.JobConf}. |
---|
| 39 | * The <code>DistributedCache</code> assumes that the |
---|
| 40 | * files specified via hdfs:// urls are already present on the |
---|
| 41 | * {@link FileSystem} at the path specified by the url.</p> |
---|
| 42 | * |
---|
| 43 | * <p>The framework will copy the necessary files on to the slave node before |
---|
| 44 | * any tasks for the job are executed on that node. Its efficiency stems from |
---|
| 45 | * the fact that the files are only copied once per job and the ability to |
---|
| 46 | * cache archives which are un-archived on the slaves.</p> |
---|
| 47 | * |
---|
| 48 | * <p><code>DistributedCache</code> can be used to distribute simple, read-only |
---|
| 49 | * data/text files and/or more complex types such as archives, jars etc. |
---|
| 50 | * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. |
---|
| 51 | * Jars may be optionally added to the classpath of the tasks, a rudimentary |
---|
| 52 | * software distribution mechanism. Files have execution permissions. |
---|
| 53 | * Optionally users can also direct it to symlink the distributed cache file(s) |
---|
| 54 | * into the working directory of the task.</p> |
---|
| 55 | * |
---|
| 56 | * <p><code>DistributedCache</code> tracks modification timestamps of the cache |
---|
| 57 | * files. Clearly the cache files should not be modified by the application |
---|
| 58 | * or externally while the job is executing.</p> |
---|
| 59 | * |
---|
| 60 | * <p>Here is an illustrative example on how to use the |
---|
| 61 | * <code>DistributedCache</code>:</p> |
---|
| 62 | * <p><blockquote><pre> |
---|
| 63 | * // Setting up the cache for the application |
---|
| 64 | * |
---|
| 65 | * 1. Copy the requisite files to the <code>FileSystem</code>: |
---|
| 66 | * |
---|
| 67 | * $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat |
---|
| 68 | * $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip |
---|
| 69 | * $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar |
---|
| 70 | * $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar |
---|
| 71 | * $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz |
---|
| 72 | * $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz |
---|
| 73 | * |
---|
| 74 | * 2. Setup the application's <code>JobConf</code>: |
---|
| 75 | * |
---|
| 76 | * JobConf job = new JobConf(); |
---|
| 77 | * DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), |
---|
| 78 | * job); |
---|
| 79 | * DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); |
---|
| 80 | * DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); |
---|
| 81 | * DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); |
---|
| 82 | * DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); |
---|
| 83 | * DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); |
---|
| 84 | * |
---|
| 85 | * 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper} |
---|
| 86 | * or {@link org.apache.hadoop.mapred.Reducer}: |
---|
| 87 | * |
---|
| 88 | * public static class MapClass extends MapReduceBase |
---|
| 89 | * implements Mapper<K, V, K, V> { |
---|
| 90 | * |
---|
| 91 | * private Path[] localArchives; |
---|
| 92 | * private Path[] localFiles; |
---|
| 93 | * |
---|
| 94 | * public void configure(JobConf job) { |
---|
| 95 | * // Get the cached archives/files |
---|
| 96 | * localArchives = DistributedCache.getLocalCacheArchives(job); |
---|
| 97 | * localFiles = DistributedCache.getLocalCacheFiles(job); |
---|
| 98 | * } |
---|
| 99 | * |
---|
| 100 | * public void map(K key, V value, |
---|
| 101 | * OutputCollector<K, V> output, Reporter reporter) |
---|
| 102 | * throws IOException { |
---|
| 103 | * // Use data from the cached archives/files here |
---|
| 104 | * // ... |
---|
| 105 | * // ... |
---|
| 106 | * output.collect(k, v); |
---|
| 107 | * } |
---|
| 108 | * } |
---|
| 109 | * |
---|
| 110 | * </pre></blockquote></p> |
---|
| 111 | * |
---|
| 112 | * @see org.apache.hadoop.mapred.JobConf |
---|
| 113 | * @see org.apache.hadoop.mapred.JobClient |
---|
| 114 | */ |
---|
| 115 | public class DistributedCache { |
---|
| 116 | // cacheID to cacheStatus mapping |
---|
| 117 | private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>(); |
---|
| 118 | |
---|
| 119 | private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>(); |
---|
| 120 | |
---|
| 121 | // default total cache size |
---|
| 122 | private static final long DEFAULT_CACHE_SIZE = 10737418240L; |
---|
| 123 | |
---|
| 124 | private static final Log LOG = |
---|
| 125 | LogFactory.getLog(DistributedCache.class); |
---|
| 126 | |
---|
| 127 | /** |
---|
| 128 | * Get the locally cached file or archive; it could either be |
---|
| 129 | * previously cached (and valid) or copy it from the {@link FileSystem} now. |
---|
| 130 | * |
---|
| 131 | * @param cache the cache to be localized, this should be specified as |
---|
| 132 | * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema |
---|
| 133 | * or hostname:port is provided the file is assumed to be in the filesystem |
---|
| 134 | * being used in the Configuration |
---|
| 135 | * @param conf The Confguration file which contains the filesystem |
---|
| 136 | * @param baseDir The base cache Dir where you wnat to localize the files/archives |
---|
| 137 | * @param fileStatus The file status on the dfs. |
---|
| 138 | * @param isArchive if the cache is an archive or a file. In case it is an |
---|
| 139 | * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will |
---|
| 140 | * be unzipped/unjarred/untarred automatically |
---|
| 141 | * and the directory where the archive is unzipped/unjarred/untarred is |
---|
| 142 | * returned as the Path. |
---|
| 143 | * In case of a file, the path to the file is returned |
---|
| 144 | * @param confFileStamp this is the hdfs file modification timestamp to verify that the |
---|
| 145 | * file to be cached hasn't changed since the job started |
---|
| 146 | * @param currentWorkDir this is the directory where you would want to create symlinks |
---|
| 147 | * for the locally cached files/archives |
---|
| 148 | * @return the path to directory where the archives are unjarred in case of archives, |
---|
| 149 | * the path to the file where the file is copied locally |
---|
| 150 | * @throws IOException |
---|
| 151 | */ |
---|
| 152 | public static Path getLocalCache(URI cache, Configuration conf, |
---|
| 153 | Path baseDir, FileStatus fileStatus, |
---|
| 154 | boolean isArchive, long confFileStamp, |
---|
| 155 | Path currentWorkDir) |
---|
| 156 | throws IOException { |
---|
| 157 | return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, |
---|
| 158 | confFileStamp, currentWorkDir, true); |
---|
| 159 | } |
---|
| 160 | /** |
---|
| 161 | * Get the locally cached file or archive; it could either be |
---|
| 162 | * previously cached (and valid) or copy it from the {@link FileSystem} now. |
---|
| 163 | * |
---|
| 164 | * @param cache the cache to be localized, this should be specified as |
---|
| 165 | * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema |
---|
| 166 | * or hostname:port is provided the file is assumed to be in the filesystem |
---|
| 167 | * being used in the Configuration |
---|
| 168 | * @param conf The Confguration file which contains the filesystem |
---|
| 169 | * @param baseDir The base cache Dir where you wnat to localize the files/archives |
---|
| 170 | * @param fileStatus The file status on the dfs. |
---|
| 171 | * @param isArchive if the cache is an archive or a file. In case it is an |
---|
| 172 | * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will |
---|
| 173 | * be unzipped/unjarred/untarred automatically |
---|
| 174 | * and the directory where the archive is unzipped/unjarred/untarred is |
---|
| 175 | * returned as the Path. |
---|
| 176 | * In case of a file, the path to the file is returned |
---|
| 177 | * @param confFileStamp this is the hdfs file modification timestamp to verify that the |
---|
| 178 | * file to be cached hasn't changed since the job started |
---|
| 179 | * @param currentWorkDir this is the directory where you would want to create symlinks |
---|
| 180 | * for the locally cached files/archives |
---|
| 181 | * @param honorSymLinkConf if this is false, then the symlinks are not |
---|
| 182 | * created even if conf says so (this is required for an optimization in task |
---|
| 183 | * launches |
---|
| 184 | * @return the path to directory where the archives are unjarred in case of archives, |
---|
| 185 | * the path to the file where the file is copied locally |
---|
| 186 | * @throws IOException |
---|
| 187 | */ |
---|
| 188 | public static Path getLocalCache(URI cache, Configuration conf, |
---|
| 189 | Path baseDir, FileStatus fileStatus, |
---|
| 190 | boolean isArchive, long confFileStamp, |
---|
| 191 | Path currentWorkDir, boolean honorSymLinkConf) |
---|
| 192 | throws IOException { |
---|
| 193 | String cacheId = makeRelative(cache, conf); |
---|
| 194 | CacheStatus lcacheStatus; |
---|
| 195 | Path localizedPath; |
---|
| 196 | synchronized (cachedArchives) { |
---|
| 197 | lcacheStatus = cachedArchives.get(cacheId); |
---|
| 198 | if (lcacheStatus == null) { |
---|
| 199 | // was never localized |
---|
| 200 | lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId))); |
---|
| 201 | cachedArchives.put(cacheId, lcacheStatus); |
---|
| 202 | } |
---|
| 203 | |
---|
| 204 | synchronized (lcacheStatus) { |
---|
| 205 | localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, |
---|
| 206 | fileStatus, isArchive, currentWorkDir, honorSymLinkConf); |
---|
| 207 | lcacheStatus.refcount++; |
---|
| 208 | } |
---|
| 209 | } |
---|
| 210 | |
---|
| 211 | // try deleting stuff if you can |
---|
| 212 | long size = 0; |
---|
| 213 | synchronized (baseDirSize) { |
---|
| 214 | Long get = baseDirSize.get(baseDir); |
---|
| 215 | if ( get != null ) { |
---|
| 216 | size = get.longValue(); |
---|
| 217 | } |
---|
| 218 | } |
---|
| 219 | // setting the cache size to a default of 10GB |
---|
| 220 | long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE); |
---|
| 221 | if (allowedSize < size) { |
---|
| 222 | // try some cache deletions |
---|
| 223 | deleteCache(conf); |
---|
| 224 | } |
---|
| 225 | return localizedPath; |
---|
| 226 | } |
---|
| 227 | |
---|
| 228 | |
---|
| 229 | /** |
---|
| 230 | * Get the locally cached file or archive; it could either be |
---|
| 231 | * previously cached (and valid) or copy it from the {@link FileSystem} now. |
---|
| 232 | * |
---|
| 233 | * @param cache the cache to be localized, this should be specified as |
---|
| 234 | * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema |
---|
| 235 | * or hostname:port is provided the file is assumed to be in the filesystem |
---|
| 236 | * being used in the Configuration |
---|
| 237 | * @param conf The Confguration file which contains the filesystem |
---|
| 238 | * @param baseDir The base cache Dir where you wnat to localize the files/archives |
---|
| 239 | * @param isArchive if the cache is an archive or a file. In case it is an |
---|
| 240 | * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will |
---|
| 241 | * be unzipped/unjarred/untarred automatically |
---|
| 242 | * and the directory where the archive is unzipped/unjarred/untarred |
---|
| 243 | * is returned as the Path. |
---|
| 244 | * In case of a file, the path to the file is returned |
---|
| 245 | * @param confFileStamp this is the hdfs file modification timestamp to verify that the |
---|
| 246 | * file to be cached hasn't changed since the job started |
---|
| 247 | * @param currentWorkDir this is the directory where you would want to create symlinks |
---|
| 248 | * for the locally cached files/archives |
---|
| 249 | * @return the path to directory where the archives are unjarred in case of archives, |
---|
| 250 | * the path to the file where the file is copied locally |
---|
| 251 | * @throws IOException |
---|
| 252 | |
---|
| 253 | */ |
---|
| 254 | public static Path getLocalCache(URI cache, Configuration conf, |
---|
| 255 | Path baseDir, boolean isArchive, |
---|
| 256 | long confFileStamp, Path currentWorkDir) |
---|
| 257 | throws IOException { |
---|
| 258 | return getLocalCache(cache, conf, |
---|
| 259 | baseDir, null, isArchive, |
---|
| 260 | confFileStamp, currentWorkDir); |
---|
| 261 | } |
---|
| 262 | |
---|
| 263 | /** |
---|
| 264 | * This is the opposite of getlocalcache. When you are done with |
---|
| 265 | * using the cache, you need to release the cache |
---|
| 266 | * @param cache The cache URI to be released |
---|
| 267 | * @param conf configuration which contains the filesystem the cache |
---|
| 268 | * is contained in. |
---|
| 269 | * @throws IOException |
---|
| 270 | */ |
---|
| 271 | public static void releaseCache(URI cache, Configuration conf) |
---|
| 272 | throws IOException { |
---|
| 273 | String cacheId = makeRelative(cache, conf); |
---|
| 274 | synchronized (cachedArchives) { |
---|
| 275 | CacheStatus lcacheStatus = cachedArchives.get(cacheId); |
---|
| 276 | if (lcacheStatus == null) |
---|
| 277 | return; |
---|
| 278 | synchronized (lcacheStatus) { |
---|
| 279 | lcacheStatus.refcount--; |
---|
| 280 | } |
---|
| 281 | } |
---|
| 282 | } |
---|
| 283 | |
---|
| 284 | // To delete the caches which have a refcount of zero |
---|
| 285 | |
---|
| 286 | private static void deleteCache(Configuration conf) throws IOException { |
---|
| 287 | // try deleting cache Status with refcount of zero |
---|
| 288 | synchronized (cachedArchives) { |
---|
| 289 | for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) { |
---|
| 290 | String cacheId = (String) it.next(); |
---|
| 291 | CacheStatus lcacheStatus = cachedArchives.get(cacheId); |
---|
| 292 | synchronized (lcacheStatus) { |
---|
| 293 | if (lcacheStatus.refcount == 0) { |
---|
| 294 | // delete this cache entry |
---|
| 295 | FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true); |
---|
| 296 | synchronized (baseDirSize) { |
---|
| 297 | Long dirSize = baseDirSize.get(lcacheStatus.baseDir); |
---|
| 298 | if ( dirSize != null ) { |
---|
| 299 | dirSize -= lcacheStatus.size; |
---|
| 300 | baseDirSize.put(lcacheStatus.baseDir, dirSize); |
---|
| 301 | } |
---|
| 302 | } |
---|
| 303 | it.remove(); |
---|
| 304 | } |
---|
| 305 | } |
---|
| 306 | } |
---|
| 307 | } |
---|
| 308 | } |
---|
| 309 | |
---|
| 310 | /* |
---|
| 311 | * Returns the relative path of the dir this cache will be localized in |
---|
| 312 | * relative path that this cache will be localized in. For |
---|
| 313 | * hdfs://hostname:port/absolute_path -- the relative path is |
---|
| 314 | * hostname/absolute path -- if it is just /absolute_path -- then the |
---|
| 315 | * relative path is hostname of DFS this mapred cluster is running |
---|
| 316 | * on/absolute_path |
---|
| 317 | */ |
---|
| 318 | public static String makeRelative(URI cache, Configuration conf) |
---|
| 319 | throws IOException { |
---|
| 320 | String host = cache.getHost(); |
---|
| 321 | if (host == null) { |
---|
| 322 | host = cache.getScheme(); |
---|
| 323 | } |
---|
| 324 | if (host == null) { |
---|
| 325 | URI defaultUri = FileSystem.get(conf).getUri(); |
---|
| 326 | host = defaultUri.getHost(); |
---|
| 327 | if (host == null) { |
---|
| 328 | host = defaultUri.getScheme(); |
---|
| 329 | } |
---|
| 330 | } |
---|
| 331 | String path = host + cache.getPath(); |
---|
| 332 | path = path.replace(":/","/"); // remove windows device colon |
---|
| 333 | return path; |
---|
| 334 | } |
---|
| 335 | |
---|
| 336 | private static Path cacheFilePath(Path p) { |
---|
| 337 | return new Path(p, p.getName()); |
---|
| 338 | } |
---|
| 339 | |
---|
| 340 | // the method which actually copies the caches locally and unjars/unzips them |
---|
| 341 | // and does chmod for the files |
---|
| 342 | private static Path localizeCache(Configuration conf, |
---|
| 343 | URI cache, long confFileStamp, |
---|
| 344 | CacheStatus cacheStatus, |
---|
| 345 | FileStatus fileStatus, |
---|
| 346 | boolean isArchive, |
---|
| 347 | Path currentWorkDir,boolean honorSymLinkConf) |
---|
| 348 | throws IOException { |
---|
| 349 | boolean doSymlink = honorSymLinkConf && getSymlink(conf); |
---|
| 350 | if(cache.getFragment() == null) { |
---|
| 351 | doSymlink = false; |
---|
| 352 | } |
---|
| 353 | FileSystem fs = getFileSystem(cache, conf); |
---|
| 354 | String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment(); |
---|
| 355 | File flink = new File(link); |
---|
| 356 | if (ifExistsAndFresh(conf, fs, cache, confFileStamp, |
---|
| 357 | cacheStatus, fileStatus)) { |
---|
| 358 | if (isArchive) { |
---|
| 359 | if (doSymlink){ |
---|
| 360 | if (!flink.exists()) |
---|
| 361 | FileUtil.symLink(cacheStatus.localLoadPath.toString(), |
---|
| 362 | link); |
---|
| 363 | } |
---|
| 364 | return cacheStatus.localLoadPath; |
---|
| 365 | } |
---|
| 366 | else { |
---|
| 367 | if (doSymlink){ |
---|
| 368 | if (!flink.exists()) |
---|
| 369 | FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), |
---|
| 370 | link); |
---|
| 371 | } |
---|
| 372 | return cacheFilePath(cacheStatus.localLoadPath); |
---|
| 373 | } |
---|
| 374 | } else { |
---|
| 375 | // remove the old archive |
---|
| 376 | // if the old archive cannot be removed since it is being used by another |
---|
| 377 | // job |
---|
| 378 | // return null |
---|
| 379 | if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true)) |
---|
| 380 | throw new IOException("Cache " + cacheStatus.localLoadPath.toString() |
---|
| 381 | + " is in use and cannot be refreshed"); |
---|
| 382 | |
---|
| 383 | FileSystem localFs = FileSystem.getLocal(conf); |
---|
| 384 | localFs.delete(cacheStatus.localLoadPath, true); |
---|
| 385 | synchronized (baseDirSize) { |
---|
| 386 | Long dirSize = baseDirSize.get(cacheStatus.baseDir); |
---|
| 387 | if ( dirSize != null ) { |
---|
| 388 | dirSize -= cacheStatus.size; |
---|
| 389 | baseDirSize.put(cacheStatus.baseDir, dirSize); |
---|
| 390 | } |
---|
| 391 | } |
---|
| 392 | Path parchive = new Path(cacheStatus.localLoadPath, |
---|
| 393 | new Path(cacheStatus.localLoadPath.getName())); |
---|
| 394 | |
---|
| 395 | if (!localFs.mkdirs(cacheStatus.localLoadPath)) { |
---|
| 396 | throw new IOException("Mkdirs failed to create directory " + |
---|
| 397 | cacheStatus.localLoadPath.toString()); |
---|
| 398 | } |
---|
| 399 | |
---|
| 400 | String cacheId = cache.getPath(); |
---|
| 401 | fs.copyToLocalFile(new Path(cacheId), parchive); |
---|
| 402 | if (isArchive) { |
---|
| 403 | String tmpArchive = parchive.toString().toLowerCase(); |
---|
| 404 | File srcFile = new File(parchive.toString()); |
---|
| 405 | File destDir = new File(parchive.getParent().toString()); |
---|
| 406 | if (tmpArchive.endsWith(".jar")) { |
---|
| 407 | RunJar.unJar(srcFile, destDir); |
---|
| 408 | } else if (tmpArchive.endsWith(".zip")) { |
---|
| 409 | FileUtil.unZip(srcFile, destDir); |
---|
| 410 | } else if (isTarFile(tmpArchive)) { |
---|
| 411 | FileUtil.unTar(srcFile, destDir); |
---|
| 412 | } |
---|
| 413 | // else will not do anyhting |
---|
| 414 | // and copy the file into the dir as it is |
---|
| 415 | } |
---|
| 416 | |
---|
| 417 | long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString())); |
---|
| 418 | cacheStatus.size = cacheSize; |
---|
| 419 | synchronized (baseDirSize) { |
---|
| 420 | Long dirSize = baseDirSize.get(cacheStatus.baseDir); |
---|
| 421 | if( dirSize == null ) { |
---|
| 422 | dirSize = Long.valueOf(cacheSize); |
---|
| 423 | } else { |
---|
| 424 | dirSize += cacheSize; |
---|
| 425 | } |
---|
| 426 | baseDirSize.put(cacheStatus.baseDir, dirSize); |
---|
| 427 | } |
---|
| 428 | |
---|
| 429 | // do chmod here |
---|
| 430 | try { |
---|
| 431 | FileUtil.chmod(parchive.toString(), "+x"); |
---|
| 432 | } catch(InterruptedException e) { |
---|
| 433 | LOG.warn("Exception in chmod" + e.toString()); |
---|
| 434 | } |
---|
| 435 | |
---|
| 436 | // update cacheStatus to reflect the newly cached file |
---|
| 437 | cacheStatus.currentStatus = true; |
---|
| 438 | cacheStatus.mtime = getTimestamp(conf, cache); |
---|
| 439 | } |
---|
| 440 | |
---|
| 441 | if (isArchive){ |
---|
| 442 | if (doSymlink){ |
---|
| 443 | if (!flink.exists()) |
---|
| 444 | FileUtil.symLink(cacheStatus.localLoadPath.toString(), |
---|
| 445 | link); |
---|
| 446 | } |
---|
| 447 | return cacheStatus.localLoadPath; |
---|
| 448 | } |
---|
| 449 | else { |
---|
| 450 | if (doSymlink){ |
---|
| 451 | if (!flink.exists()) |
---|
| 452 | FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), |
---|
| 453 | link); |
---|
| 454 | } |
---|
| 455 | return cacheFilePath(cacheStatus.localLoadPath); |
---|
| 456 | } |
---|
| 457 | } |
---|
| 458 | |
---|
| 459 | private static boolean isTarFile(String filename) { |
---|
| 460 | return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") || |
---|
| 461 | filename.endsWith(".tar")); |
---|
| 462 | } |
---|
| 463 | |
---|
| 464 | // Checks if the cache has already been localized and is fresh |
---|
| 465 | private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, |
---|
| 466 | URI cache, long confFileStamp, |
---|
| 467 | CacheStatus lcacheStatus, |
---|
| 468 | FileStatus fileStatus) |
---|
| 469 | throws IOException { |
---|
| 470 | // check for existence of the cache |
---|
| 471 | if (lcacheStatus.currentStatus == false) { |
---|
| 472 | return false; |
---|
| 473 | } else { |
---|
| 474 | long dfsFileStamp; |
---|
| 475 | if (fileStatus != null) { |
---|
| 476 | dfsFileStamp = fileStatus.getModificationTime(); |
---|
| 477 | } else { |
---|
| 478 | dfsFileStamp = getTimestamp(conf, cache); |
---|
| 479 | } |
---|
| 480 | |
---|
| 481 | // ensure that the file on hdfs hasn't been modified since the job started |
---|
| 482 | if (dfsFileStamp != confFileStamp) { |
---|
| 483 | LOG.fatal("File: " + cache + " has changed on HDFS since job started"); |
---|
| 484 | throw new IOException("File: " + cache + |
---|
| 485 | " has changed on HDFS since job started"); |
---|
| 486 | } |
---|
| 487 | |
---|
| 488 | if (dfsFileStamp != lcacheStatus.mtime) { |
---|
| 489 | // needs refreshing |
---|
| 490 | return false; |
---|
| 491 | } |
---|
| 492 | } |
---|
| 493 | |
---|
| 494 | return true; |
---|
| 495 | } |
---|
| 496 | |
---|
| 497 | /** |
---|
| 498 | * Returns mtime of a given cache file on hdfs. |
---|
| 499 | * @param conf configuration |
---|
| 500 | * @param cache cache file |
---|
| 501 | * @return mtime of a given cache file on hdfs |
---|
| 502 | * @throws IOException |
---|
| 503 | */ |
---|
| 504 | public static long getTimestamp(Configuration conf, URI cache) |
---|
| 505 | throws IOException { |
---|
| 506 | FileSystem fileSystem = FileSystem.get(cache, conf); |
---|
| 507 | Path filePath = new Path(cache.getPath()); |
---|
| 508 | |
---|
| 509 | return fileSystem.getFileStatus(filePath).getModificationTime(); |
---|
| 510 | } |
---|
| 511 | |
---|
| 512 | /** |
---|
| 513 | * This method create symlinks for all files in a given dir in another directory |
---|
| 514 | * @param conf the configuration |
---|
| 515 | * @param jobCacheDir the target directory for creating symlinks |
---|
| 516 | * @param workDir the directory in which the symlinks are created |
---|
| 517 | * @throws IOException |
---|
| 518 | */ |
---|
| 519 | public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir) |
---|
| 520 | throws IOException{ |
---|
| 521 | if ((jobCacheDir == null || !jobCacheDir.isDirectory()) || |
---|
| 522 | workDir == null || (!workDir.isDirectory())) { |
---|
| 523 | return; |
---|
| 524 | } |
---|
| 525 | boolean createSymlink = getSymlink(conf); |
---|
| 526 | if (createSymlink){ |
---|
| 527 | File[] list = jobCacheDir.listFiles(); |
---|
| 528 | for (int i=0; i < list.length; i++){ |
---|
| 529 | FileUtil.symLink(list[i].getAbsolutePath(), |
---|
| 530 | new File(workDir, list[i].getName()).toString()); |
---|
| 531 | } |
---|
| 532 | } |
---|
| 533 | } |
---|
| 534 | |
---|
| 535 | private static String getFileSysName(URI url) { |
---|
| 536 | String fsname = url.getScheme(); |
---|
| 537 | if ("hdfs".equals(fsname)) { |
---|
| 538 | String host = url.getHost(); |
---|
| 539 | int port = url.getPort(); |
---|
| 540 | return (port == (-1)) ? host : (host + ":" + port); |
---|
| 541 | } else { |
---|
| 542 | return null; |
---|
| 543 | } |
---|
| 544 | } |
---|
| 545 | |
---|
| 546 | private static FileSystem getFileSystem(URI cache, Configuration conf) |
---|
| 547 | throws IOException { |
---|
| 548 | String fileSysName = getFileSysName(cache); |
---|
| 549 | if (fileSysName != null) |
---|
| 550 | return FileSystem.getNamed(fileSysName, conf); |
---|
| 551 | else |
---|
| 552 | return FileSystem.get(conf); |
---|
| 553 | } |
---|
| 554 | |
---|
| 555 | /** |
---|
| 556 | * Set the configuration with the given set of archives |
---|
| 557 | * @param archives The list of archives that need to be localized |
---|
| 558 | * @param conf Configuration which will be changed |
---|
| 559 | */ |
---|
| 560 | public static void setCacheArchives(URI[] archives, Configuration conf) { |
---|
| 561 | String sarchives = StringUtils.uriToString(archives); |
---|
| 562 | conf.set("mapred.cache.archives", sarchives); |
---|
| 563 | } |
---|
| 564 | |
---|
| 565 | /** |
---|
| 566 | * Set the configuration with the given set of files |
---|
| 567 | * @param files The list of files that need to be localized |
---|
| 568 | * @param conf Configuration which will be changed |
---|
| 569 | */ |
---|
| 570 | public static void setCacheFiles(URI[] files, Configuration conf) { |
---|
| 571 | String sfiles = StringUtils.uriToString(files); |
---|
| 572 | conf.set("mapred.cache.files", sfiles); |
---|
| 573 | } |
---|
| 574 | |
---|
| 575 | /** |
---|
| 576 | * Get cache archives set in the Configuration |
---|
| 577 | * @param conf The configuration which contains the archives |
---|
| 578 | * @return A URI array of the caches set in the Configuration |
---|
| 579 | * @throws IOException |
---|
| 580 | */ |
---|
| 581 | public static URI[] getCacheArchives(Configuration conf) throws IOException { |
---|
| 582 | return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives")); |
---|
| 583 | } |
---|
| 584 | |
---|
| 585 | /** |
---|
| 586 | * Get cache files set in the Configuration |
---|
| 587 | * @param conf The configuration which contains the files |
---|
| 588 | * @return A URI array of the files set in the Configuration |
---|
| 589 | * @throws IOException |
---|
| 590 | */ |
---|
| 591 | |
---|
| 592 | public static URI[] getCacheFiles(Configuration conf) throws IOException { |
---|
| 593 | return StringUtils.stringToURI(conf.getStrings("mapred.cache.files")); |
---|
| 594 | } |
---|
| 595 | |
---|
| 596 | /** |
---|
| 597 | * Return the path array of the localized caches |
---|
| 598 | * @param conf Configuration that contains the localized archives |
---|
| 599 | * @return A path array of localized caches |
---|
| 600 | * @throws IOException |
---|
| 601 | */ |
---|
| 602 | public static Path[] getLocalCacheArchives(Configuration conf) |
---|
| 603 | throws IOException { |
---|
| 604 | return StringUtils.stringToPath(conf |
---|
| 605 | .getStrings("mapred.cache.localArchives")); |
---|
| 606 | } |
---|
| 607 | |
---|
| 608 | /** |
---|
| 609 | * Return the path array of the localized files |
---|
| 610 | * @param conf Configuration that contains the localized files |
---|
| 611 | * @return A path array of localized files |
---|
| 612 | * @throws IOException |
---|
| 613 | */ |
---|
| 614 | public static Path[] getLocalCacheFiles(Configuration conf) |
---|
| 615 | throws IOException { |
---|
| 616 | return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles")); |
---|
| 617 | } |
---|
| 618 | |
---|
| 619 | /** |
---|
| 620 | * Get the timestamps of the archives |
---|
| 621 | * @param conf The configuration which stored the timestamps |
---|
| 622 | * @return a string array of timestamps |
---|
| 623 | * @throws IOException |
---|
| 624 | */ |
---|
| 625 | public static String[] getArchiveTimestamps(Configuration conf) { |
---|
| 626 | return conf.getStrings("mapred.cache.archives.timestamps"); |
---|
| 627 | } |
---|
| 628 | |
---|
| 629 | |
---|
| 630 | /** |
---|
| 631 | * Get the timestamps of the files |
---|
| 632 | * @param conf The configuration which stored the timestamps |
---|
| 633 | * @return a string array of timestamps |
---|
| 634 | * @throws IOException |
---|
| 635 | */ |
---|
| 636 | public static String[] getFileTimestamps(Configuration conf) { |
---|
| 637 | return conf.getStrings("mapred.cache.files.timestamps"); |
---|
| 638 | } |
---|
| 639 | |
---|
| 640 | /** |
---|
| 641 | * This is to check the timestamp of the archives to be localized |
---|
| 642 | * @param conf Configuration which stores the timestamp's |
---|
| 643 | * @param timestamps comma separated list of timestamps of archives. |
---|
| 644 | * The order should be the same as the order in which the archives are added. |
---|
| 645 | */ |
---|
| 646 | public static void setArchiveTimestamps(Configuration conf, String timestamps) { |
---|
| 647 | conf.set("mapred.cache.archives.timestamps", timestamps); |
---|
| 648 | } |
---|
| 649 | |
---|
| 650 | /** |
---|
| 651 | * This is to check the timestamp of the files to be localized |
---|
| 652 | * @param conf Configuration which stores the timestamp's |
---|
| 653 | * @param timestamps comma separated list of timestamps of files. |
---|
| 654 | * The order should be the same as the order in which the files are added. |
---|
| 655 | */ |
---|
| 656 | public static void setFileTimestamps(Configuration conf, String timestamps) { |
---|
| 657 | conf.set("mapred.cache.files.timestamps", timestamps); |
---|
| 658 | } |
---|
| 659 | |
---|
| 660 | /** |
---|
| 661 | * Set the conf to contain the location for localized archives |
---|
| 662 | * @param conf The conf to modify to contain the localized caches |
---|
| 663 | * @param str a comma separated list of local archives |
---|
| 664 | */ |
---|
| 665 | public static void setLocalArchives(Configuration conf, String str) { |
---|
| 666 | conf.set("mapred.cache.localArchives", str); |
---|
| 667 | } |
---|
| 668 | |
---|
| 669 | /** |
---|
| 670 | * Set the conf to contain the location for localized files |
---|
| 671 | * @param conf The conf to modify to contain the localized caches |
---|
| 672 | * @param str a comma separated list of local files |
---|
| 673 | */ |
---|
| 674 | public static void setLocalFiles(Configuration conf, String str) { |
---|
| 675 | conf.set("mapred.cache.localFiles", str); |
---|
| 676 | } |
---|
| 677 | |
---|
| 678 | /** |
---|
| 679 | * Add a archives to be localized to the conf |
---|
| 680 | * @param uri The uri of the cache to be localized |
---|
| 681 | * @param conf Configuration to add the cache to |
---|
| 682 | */ |
---|
| 683 | public static void addCacheArchive(URI uri, Configuration conf) { |
---|
| 684 | String archives = conf.get("mapred.cache.archives"); |
---|
| 685 | conf.set("mapred.cache.archives", archives == null ? uri.toString() |
---|
| 686 | : archives + "," + uri.toString()); |
---|
| 687 | } |
---|
| 688 | |
---|
| 689 | /** |
---|
| 690 | * Add a file to be localized to the conf |
---|
| 691 | * @param uri The uri of the cache to be localized |
---|
| 692 | * @param conf Configuration to add the cache to |
---|
| 693 | */ |
---|
| 694 | public static void addCacheFile(URI uri, Configuration conf) { |
---|
| 695 | String files = conf.get("mapred.cache.files"); |
---|
| 696 | conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," |
---|
| 697 | + uri.toString()); |
---|
| 698 | } |
---|
| 699 | |
---|
| 700 | /** |
---|
| 701 | * Add an file path to the current set of classpath entries It adds the file |
---|
| 702 | * to cache as well. |
---|
| 703 | * |
---|
| 704 | * @param file Path of the file to be added |
---|
| 705 | * @param conf Configuration that contains the classpath setting |
---|
| 706 | */ |
---|
| 707 | public static void addFileToClassPath(Path file, Configuration conf) |
---|
| 708 | throws IOException { |
---|
| 709 | String classpath = conf.get("mapred.job.classpath.files"); |
---|
| 710 | conf.set("mapred.job.classpath.files", classpath == null ? file.toString() |
---|
| 711 | : classpath + System.getProperty("path.separator") + file.toString()); |
---|
| 712 | FileSystem fs = FileSystem.get(conf); |
---|
| 713 | URI uri = fs.makeQualified(file).toUri(); |
---|
| 714 | |
---|
| 715 | addCacheFile(uri, conf); |
---|
| 716 | } |
---|
| 717 | |
---|
| 718 | /** |
---|
| 719 | * Get the file entries in classpath as an array of Path |
---|
| 720 | * |
---|
| 721 | * @param conf Configuration that contains the classpath setting |
---|
| 722 | */ |
---|
| 723 | public static Path[] getFileClassPaths(Configuration conf) { |
---|
| 724 | String classpath = conf.get("mapred.job.classpath.files"); |
---|
| 725 | if (classpath == null) |
---|
| 726 | return null; |
---|
| 727 | ArrayList list = Collections.list(new StringTokenizer(classpath, System |
---|
| 728 | .getProperty("path.separator"))); |
---|
| 729 | Path[] paths = new Path[list.size()]; |
---|
| 730 | for (int i = 0; i < list.size(); i++) { |
---|
| 731 | paths[i] = new Path((String) list.get(i)); |
---|
| 732 | } |
---|
| 733 | return paths; |
---|
| 734 | } |
---|
| 735 | |
---|
| 736 | /** |
---|
| 737 | * Add an archive path to the current set of classpath entries. It adds the |
---|
| 738 | * archive to cache as well. |
---|
| 739 | * |
---|
| 740 | * @param archive Path of the archive to be added |
---|
| 741 | * @param conf Configuration that contains the classpath setting |
---|
| 742 | */ |
---|
| 743 | public static void addArchiveToClassPath(Path archive, Configuration conf) |
---|
| 744 | throws IOException { |
---|
| 745 | String classpath = conf.get("mapred.job.classpath.archives"); |
---|
| 746 | conf.set("mapred.job.classpath.archives", classpath == null ? archive |
---|
| 747 | .toString() : classpath + System.getProperty("path.separator") |
---|
| 748 | + archive.toString()); |
---|
| 749 | FileSystem fs = FileSystem.get(conf); |
---|
| 750 | URI uri = fs.makeQualified(archive).toUri(); |
---|
| 751 | |
---|
| 752 | addCacheArchive(uri, conf); |
---|
| 753 | } |
---|
| 754 | |
---|
| 755 | /** |
---|
| 756 | * Get the archive entries in classpath as an array of Path |
---|
| 757 | * |
---|
| 758 | * @param conf Configuration that contains the classpath setting |
---|
| 759 | */ |
---|
| 760 | public static Path[] getArchiveClassPaths(Configuration conf) { |
---|
| 761 | String classpath = conf.get("mapred.job.classpath.archives"); |
---|
| 762 | if (classpath == null) |
---|
| 763 | return null; |
---|
| 764 | ArrayList list = Collections.list(new StringTokenizer(classpath, System |
---|
| 765 | .getProperty("path.separator"))); |
---|
| 766 | Path[] paths = new Path[list.size()]; |
---|
| 767 | for (int i = 0; i < list.size(); i++) { |
---|
| 768 | paths[i] = new Path((String) list.get(i)); |
---|
| 769 | } |
---|
| 770 | return paths; |
---|
| 771 | } |
---|
| 772 | |
---|
| 773 | /** |
---|
| 774 | * This method allows you to create symlinks in the current working directory |
---|
| 775 | * of the task to all the cache files/archives |
---|
| 776 | * @param conf the jobconf |
---|
| 777 | */ |
---|
| 778 | public static void createSymlink(Configuration conf){ |
---|
| 779 | conf.set("mapred.create.symlink", "yes"); |
---|
| 780 | } |
---|
| 781 | |
---|
| 782 | /** |
---|
| 783 | * This method checks to see if symlinks are to be create for the |
---|
| 784 | * localized cache files in the current working directory |
---|
| 785 | * @param conf the jobconf |
---|
| 786 | * @return true if symlinks are to be created- else return false |
---|
| 787 | */ |
---|
| 788 | public static boolean getSymlink(Configuration conf){ |
---|
| 789 | String result = conf.get("mapred.create.symlink"); |
---|
| 790 | if ("yes".equals(result)){ |
---|
| 791 | return true; |
---|
| 792 | } |
---|
| 793 | return false; |
---|
| 794 | } |
---|
| 795 | |
---|
| 796 | /** |
---|
| 797 | * This method checks if there is a conflict in the fragment names |
---|
| 798 | * of the uris. Also makes sure that each uri has a fragment. It |
---|
| 799 | * is only to be called if you want to create symlinks for |
---|
| 800 | * the various archives and files. |
---|
| 801 | * @param uriFiles The uri array of urifiles |
---|
| 802 | * @param uriArchives the uri array of uri archives |
---|
| 803 | */ |
---|
| 804 | public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){ |
---|
| 805 | if ((uriFiles == null) && (uriArchives == null)){ |
---|
| 806 | return true; |
---|
| 807 | } |
---|
| 808 | if (uriFiles != null){ |
---|
| 809 | for (int i = 0; i < uriFiles.length; i++){ |
---|
| 810 | String frag1 = uriFiles[i].getFragment(); |
---|
| 811 | if (frag1 == null) |
---|
| 812 | return false; |
---|
| 813 | for (int j=i+1; j < uriFiles.length; j++){ |
---|
| 814 | String frag2 = uriFiles[j].getFragment(); |
---|
| 815 | if (frag2 == null) |
---|
| 816 | return false; |
---|
| 817 | if (frag1.equalsIgnoreCase(frag2)) |
---|
| 818 | return false; |
---|
| 819 | } |
---|
| 820 | if (uriArchives != null){ |
---|
| 821 | for (int j = 0; j < uriArchives.length; j++){ |
---|
| 822 | String frag2 = uriArchives[j].getFragment(); |
---|
| 823 | if (frag2 == null){ |
---|
| 824 | return false; |
---|
| 825 | } |
---|
| 826 | if (frag1.equalsIgnoreCase(frag2)) |
---|
| 827 | return false; |
---|
| 828 | for (int k=j+1; k < uriArchives.length; k++){ |
---|
| 829 | String frag3 = uriArchives[k].getFragment(); |
---|
| 830 | if (frag3 == null) |
---|
| 831 | return false; |
---|
| 832 | if (frag2.equalsIgnoreCase(frag3)) |
---|
| 833 | return false; |
---|
| 834 | } |
---|
| 835 | } |
---|
| 836 | } |
---|
| 837 | } |
---|
| 838 | } |
---|
| 839 | return true; |
---|
| 840 | } |
---|
| 841 | |
---|
| 842 | private static class CacheStatus { |
---|
| 843 | // false, not loaded yet, true is loaded |
---|
| 844 | boolean currentStatus; |
---|
| 845 | |
---|
| 846 | // the local load path of this cache |
---|
| 847 | Path localLoadPath; |
---|
| 848 | |
---|
| 849 | //the base dir where the cache lies |
---|
| 850 | Path baseDir; |
---|
| 851 | |
---|
| 852 | //the size of this cache |
---|
| 853 | long size; |
---|
| 854 | |
---|
| 855 | // number of instances using this cache |
---|
| 856 | int refcount; |
---|
| 857 | |
---|
| 858 | // the cache-file modification time |
---|
| 859 | long mtime; |
---|
| 860 | |
---|
| 861 | public CacheStatus(Path baseDir, Path localLoadPath) { |
---|
| 862 | super(); |
---|
| 863 | this.currentStatus = false; |
---|
| 864 | this.localLoadPath = localLoadPath; |
---|
| 865 | this.refcount = 0; |
---|
| 866 | this.mtime = -1; |
---|
| 867 | this.baseDir = baseDir; |
---|
| 868 | this.size = 0; |
---|
| 869 | } |
---|
| 870 | } |
---|
| 871 | |
---|
| 872 | /** |
---|
| 873 | * Clear the entire contents of the cache and delete the backing files. This |
---|
| 874 | * should only be used when the server is reinitializing, because the users |
---|
| 875 | * are going to lose their files. |
---|
| 876 | */ |
---|
| 877 | public static void purgeCache(Configuration conf) throws IOException { |
---|
| 878 | synchronized (cachedArchives) { |
---|
| 879 | FileSystem localFs = FileSystem.getLocal(conf); |
---|
| 880 | for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) { |
---|
| 881 | try { |
---|
| 882 | localFs.delete(f.getValue().localLoadPath, true); |
---|
| 883 | } catch (IOException ie) { |
---|
| 884 | LOG.debug("Error cleaning up cache", ie); |
---|
| 885 | } |
---|
| 886 | } |
---|
| 887 | cachedArchives.clear(); |
---|
| 888 | } |
---|
| 889 | } |
---|
| 890 | } |
---|