source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/filecache/DistributedCache.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 33.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.filecache;
20
21import org.apache.commons.logging.*;
22import java.io.*;
23import java.util.*;
24import org.apache.hadoop.conf.*;
25import org.apache.hadoop.util.*;
26import org.apache.hadoop.fs.*;
27
28import java.net.URI;
29
30/**
31 * Distribute application-specific large, read-only files efficiently.
32 *
33 * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce
34 * framework to cache files (text, archives, jars etc.) needed by applications.
35 * </p>
36 *
37 * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached
38 * via the {@link org.apache.hadoop.mapred.JobConf}.
39 * The <code>DistributedCache</code> assumes that the
40 * files specified via hdfs:// urls are already present on the
41 * {@link FileSystem} at the path specified by the url.</p>
42 *
43 * <p>The framework will copy the necessary files on to the slave node before
44 * any tasks for the job are executed on that node. Its efficiency stems from
45 * the fact that the files are only copied once per job and the ability to
46 * cache archives which are un-archived on the slaves.</p>
47 *
48 * <p><code>DistributedCache</code> can be used to distribute simple, read-only
49 * data/text files and/or more complex types such as archives, jars etc.
50 * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
51 * Jars may be optionally added to the classpath of the tasks, a rudimentary
52 * software distribution mechanism.  Files have execution permissions.
53 * Optionally users can also direct it to symlink the distributed cache file(s)
54 * into the working directory of the task.</p>
55 *
56 * <p><code>DistributedCache</code> tracks modification timestamps of the cache
57 * files. Clearly the cache files should not be modified by the application
58 * or externally while the job is executing.</p>
59 *
60 * <p>Here is an illustrative example on how to use the
61 * <code>DistributedCache</code>:</p>
62 * <p><blockquote><pre>
63 *     // Setting up the cache for the application
64 *     
65 *     1. Copy the requisite files to the <code>FileSystem</code>:
66 *     
67 *     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat 
68 *     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip 
69 *     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
70 *     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
71 *     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
72 *     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
73 *     
74 *     2. Setup the application's <code>JobConf</code>:
75 *     
76 *     JobConf job = new JobConf();
77 *     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
78 *                                   job);
79 *     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
80 *     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
81 *     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
82 *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
83 *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
84 *     
85 *     3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
86 *     or {@link org.apache.hadoop.mapred.Reducer}:
87 *     
88 *     public static class MapClass extends MapReduceBase 
89 *     implements Mapper&lt;K, V, K, V&gt; {
90 *     
91 *       private Path[] localArchives;
92 *       private Path[] localFiles;
93 *       
94 *       public void configure(JobConf job) {
95 *         // Get the cached archives/files
96 *         localArchives = DistributedCache.getLocalCacheArchives(job);
97 *         localFiles = DistributedCache.getLocalCacheFiles(job);
98 *       }
99 *       
100 *       public void map(K key, V value,
101 *                       OutputCollector&lt;K, V&gt; output, Reporter reporter)
102 *       throws IOException {
103 *         // Use data from the cached archives/files here
104 *         // ...
105 *         // ...
106 *         output.collect(k, v);
107 *       }
108 *     }
109 *     
110 * </pre></blockquote></p>
111 *
112 * @see org.apache.hadoop.mapred.JobConf
113 * @see org.apache.hadoop.mapred.JobClient
114 */
115public class DistributedCache {
116  // cacheID to cacheStatus mapping
117  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
118 
119  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
120 
121  // default total cache size
122  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
123
124  private static final Log LOG =
125    LogFactory.getLog(DistributedCache.class);
126 
127  /**
128   * Get the locally cached file or archive; it could either be
129   * previously cached (and valid) or copy it from the {@link FileSystem} now.
130   *
131   * @param cache the cache to be localized, this should be specified as
132   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
133   * or hostname:port is provided the file is assumed to be in the filesystem
134   * being used in the Configuration
135   * @param conf The Confguration file which contains the filesystem
136   * @param baseDir The base cache Dir where you wnat to localize the files/archives
137   * @param fileStatus The file status on the dfs.
138   * @param isArchive if the cache is an archive or a file. In case it is an
139   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
140   *  be unzipped/unjarred/untarred automatically
141   *  and the directory where the archive is unzipped/unjarred/untarred is
142   *  returned as the Path.
143   *  In case of a file, the path to the file is returned
144   * @param confFileStamp this is the hdfs file modification timestamp to verify that the
145   * file to be cached hasn't changed since the job started
146   * @param currentWorkDir this is the directory where you would want to create symlinks
147   * for the locally cached files/archives
148   * @return the path to directory where the archives are unjarred in case of archives,
149   * the path to the file where the file is copied locally
150   * @throws IOException
151   */
152  public static Path getLocalCache(URI cache, Configuration conf, 
153                                   Path baseDir, FileStatus fileStatus,
154                                   boolean isArchive, long confFileStamp,
155                                   Path currentWorkDir) 
156  throws IOException {
157    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
158        confFileStamp, currentWorkDir, true);
159  }
160  /**
161   * Get the locally cached file or archive; it could either be
162   * previously cached (and valid) or copy it from the {@link FileSystem} now.
163   *
164   * @param cache the cache to be localized, this should be specified as
165   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
166   * or hostname:port is provided the file is assumed to be in the filesystem
167   * being used in the Configuration
168   * @param conf The Confguration file which contains the filesystem
169   * @param baseDir The base cache Dir where you wnat to localize the files/archives
170   * @param fileStatus The file status on the dfs.
171   * @param isArchive if the cache is an archive or a file. In case it is an
172   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
173   *  be unzipped/unjarred/untarred automatically
174   *  and the directory where the archive is unzipped/unjarred/untarred is
175   *  returned as the Path.
176   *  In case of a file, the path to the file is returned
177   * @param confFileStamp this is the hdfs file modification timestamp to verify that the
178   * file to be cached hasn't changed since the job started
179   * @param currentWorkDir this is the directory where you would want to create symlinks
180   * for the locally cached files/archives
181   * @param honorSymLinkConf if this is false, then the symlinks are not
182   * created even if conf says so (this is required for an optimization in task
183   * launches
184   * @return the path to directory where the archives are unjarred in case of archives,
185   * the path to the file where the file is copied locally
186   * @throws IOException
187   */
188  public static Path getLocalCache(URI cache, Configuration conf, 
189      Path baseDir, FileStatus fileStatus,
190      boolean isArchive, long confFileStamp,
191      Path currentWorkDir, boolean honorSymLinkConf) 
192  throws IOException {
193    String cacheId = makeRelative(cache, conf);
194    CacheStatus lcacheStatus;
195    Path localizedPath;
196    synchronized (cachedArchives) {
197      lcacheStatus = cachedArchives.get(cacheId);
198      if (lcacheStatus == null) {
199        // was never localized
200        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
201        cachedArchives.put(cacheId, lcacheStatus);
202      }
203
204      synchronized (lcacheStatus) {
205        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
206            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
207        lcacheStatus.refcount++;
208      }
209    }
210
211    // try deleting stuff if you can
212    long size = 0;
213    synchronized (baseDirSize) {
214      Long get = baseDirSize.get(baseDir);
215      if ( get != null ) {
216        size = get.longValue();
217      }
218    }
219    // setting the cache size to a default of 10GB
220    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
221    if (allowedSize < size) {
222      // try some cache deletions
223      deleteCache(conf);
224    }
225    return localizedPath;
226  }
227
228 
229  /**
230   * Get the locally cached file or archive; it could either be
231   * previously cached (and valid) or copy it from the {@link FileSystem} now.
232   *
233   * @param cache the cache to be localized, this should be specified as
234   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
235   * or hostname:port is provided the file is assumed to be in the filesystem
236   * being used in the Configuration
237   * @param conf The Confguration file which contains the filesystem
238   * @param baseDir The base cache Dir where you wnat to localize the files/archives
239   * @param isArchive if the cache is an archive or a file. In case it is an
240   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
241   *  be unzipped/unjarred/untarred automatically
242   *  and the directory where the archive is unzipped/unjarred/untarred
243   *  is returned as the Path.
244   *  In case of a file, the path to the file is returned
245   * @param confFileStamp this is the hdfs file modification timestamp to verify that the
246   * file to be cached hasn't changed since the job started
247   * @param currentWorkDir this is the directory where you would want to create symlinks
248   * for the locally cached files/archives
249   * @return the path to directory where the archives are unjarred in case of archives,
250   * the path to the file where the file is copied locally
251   * @throws IOException
252
253   */
254  public static Path getLocalCache(URI cache, Configuration conf, 
255                                   Path baseDir, boolean isArchive,
256                                   long confFileStamp, Path currentWorkDir) 
257  throws IOException {
258    return getLocalCache(cache, conf, 
259                         baseDir, null, isArchive,
260                         confFileStamp, currentWorkDir);
261  }
262 
263  /**
264   * This is the opposite of getlocalcache. When you are done with
265   * using the cache, you need to release the cache
266   * @param cache The cache URI to be released
267   * @param conf configuration which contains the filesystem the cache
268   * is contained in.
269   * @throws IOException
270   */
271  public static void releaseCache(URI cache, Configuration conf)
272    throws IOException {
273    String cacheId = makeRelative(cache, conf);
274    synchronized (cachedArchives) {
275      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
276      if (lcacheStatus == null)
277        return;
278      synchronized (lcacheStatus) {
279        lcacheStatus.refcount--;
280      }
281    }
282  }
283 
284  // To delete the caches which have a refcount of zero
285 
286  private static void deleteCache(Configuration conf) throws IOException {
287    // try deleting cache Status with refcount of zero
288    synchronized (cachedArchives) {
289      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
290        String cacheId = (String) it.next();
291        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
292        synchronized (lcacheStatus) {
293          if (lcacheStatus.refcount == 0) {
294            // delete this cache entry
295            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
296            synchronized (baseDirSize) {
297              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
298              if ( dirSize != null ) {
299                dirSize -= lcacheStatus.size;
300                baseDirSize.put(lcacheStatus.baseDir, dirSize);
301              }
302            }
303            it.remove();
304          }
305        }
306      }
307    }
308  }
309
310  /*
311   * Returns the relative path of the dir this cache will be localized in
312   * relative path that this cache will be localized in. For
313   * hdfs://hostname:port/absolute_path -- the relative path is
314   * hostname/absolute path -- if it is just /absolute_path -- then the
315   * relative path is hostname of DFS this mapred cluster is running
316   * on/absolute_path
317   */
318  public static String makeRelative(URI cache, Configuration conf)
319    throws IOException {
320    String host = cache.getHost();
321    if (host == null) {
322      host = cache.getScheme();
323    }
324    if (host == null) {
325      URI defaultUri = FileSystem.get(conf).getUri();
326      host = defaultUri.getHost();
327      if (host == null) {
328        host = defaultUri.getScheme();
329      }
330    }
331    String path = host + cache.getPath();
332    path = path.replace(":/","/");                // remove windows device colon
333    return path;
334  }
335
336  private static Path cacheFilePath(Path p) {
337    return new Path(p, p.getName());
338  }
339
340  // the method which actually copies the caches locally and unjars/unzips them
341  // and does chmod for the files
342  private static Path localizeCache(Configuration conf, 
343                                    URI cache, long confFileStamp,
344                                    CacheStatus cacheStatus,
345                                    FileStatus fileStatus,
346                                    boolean isArchive, 
347                                    Path currentWorkDir,boolean honorSymLinkConf) 
348  throws IOException {
349    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
350    if(cache.getFragment() == null) {
351        doSymlink = false;
352    }
353    FileSystem fs = getFileSystem(cache, conf);
354    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
355    File flink = new File(link);
356    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
357                           cacheStatus, fileStatus)) {
358      if (isArchive) {
359        if (doSymlink){
360          if (!flink.exists())
361            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
362                             link);
363        }
364        return cacheStatus.localLoadPath;
365      }
366      else {
367        if (doSymlink){
368          if (!flink.exists())
369            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
370                             link);
371        }
372        return cacheFilePath(cacheStatus.localLoadPath);
373      }
374    } else {
375      // remove the old archive
376      // if the old archive cannot be removed since it is being used by another
377      // job
378      // return null
379      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
380        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
381                              + " is in use and cannot be refreshed");
382     
383      FileSystem localFs = FileSystem.getLocal(conf);
384      localFs.delete(cacheStatus.localLoadPath, true);
385      synchronized (baseDirSize) {
386        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
387        if ( dirSize != null ) {
388          dirSize -= cacheStatus.size;
389          baseDirSize.put(cacheStatus.baseDir, dirSize);
390        }
391      }
392      Path parchive = new Path(cacheStatus.localLoadPath,
393                               new Path(cacheStatus.localLoadPath.getName()));
394     
395      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
396        throw new IOException("Mkdirs failed to create directory " + 
397                              cacheStatus.localLoadPath.toString());
398      }
399
400      String cacheId = cache.getPath();
401      fs.copyToLocalFile(new Path(cacheId), parchive);
402      if (isArchive) {
403        String tmpArchive = parchive.toString().toLowerCase();
404        File srcFile = new File(parchive.toString());
405        File destDir = new File(parchive.getParent().toString());
406        if (tmpArchive.endsWith(".jar")) {
407          RunJar.unJar(srcFile, destDir);
408        } else if (tmpArchive.endsWith(".zip")) {
409          FileUtil.unZip(srcFile, destDir);
410        } else if (isTarFile(tmpArchive)) {
411          FileUtil.unTar(srcFile, destDir);
412        }
413        // else will not do anyhting
414        // and copy the file into the dir as it is
415      }
416     
417      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
418      cacheStatus.size = cacheSize;
419      synchronized (baseDirSize) {
420        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
421        if( dirSize == null ) {
422          dirSize = Long.valueOf(cacheSize);
423        } else {
424          dirSize += cacheSize;
425        }
426        baseDirSize.put(cacheStatus.baseDir, dirSize);
427      }
428     
429      // do chmod here
430      try {
431        FileUtil.chmod(parchive.toString(), "+x");
432      } catch(InterruptedException e) {
433        LOG.warn("Exception in chmod" + e.toString());
434      }
435
436      // update cacheStatus to reflect the newly cached file
437      cacheStatus.currentStatus = true;
438      cacheStatus.mtime = getTimestamp(conf, cache);
439    }
440   
441    if (isArchive){
442      if (doSymlink){
443        if (!flink.exists())
444          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
445                           link);
446      }
447      return cacheStatus.localLoadPath;
448    }
449    else {
450      if (doSymlink){
451        if (!flink.exists())
452          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
453                           link);
454      }
455      return cacheFilePath(cacheStatus.localLoadPath);
456    }
457  }
458
459  private static boolean isTarFile(String filename) {
460    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
461           filename.endsWith(".tar"));
462  }
463 
464  // Checks if the cache has already been localized and is fresh
465  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
466                                          URI cache, long confFileStamp, 
467                                          CacheStatus lcacheStatus,
468                                          FileStatus fileStatus) 
469  throws IOException {
470    // check for existence of the cache
471    if (lcacheStatus.currentStatus == false) {
472      return false;
473    } else {
474      long dfsFileStamp;
475      if (fileStatus != null) {
476        dfsFileStamp = fileStatus.getModificationTime();
477      } else {
478        dfsFileStamp = getTimestamp(conf, cache);
479      }
480
481      // ensure that the file on hdfs hasn't been modified since the job started
482      if (dfsFileStamp != confFileStamp) {
483        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
484        throw new IOException("File: " + cache + 
485                              " has changed on HDFS since job started");
486      }
487     
488      if (dfsFileStamp != lcacheStatus.mtime) {
489        // needs refreshing
490        return false;
491      }
492    }
493   
494    return true;
495  }
496
497  /**
498   * Returns mtime of a given cache file on hdfs.
499   * @param conf configuration
500   * @param cache cache file
501   * @return mtime of a given cache file on hdfs
502   * @throws IOException
503   */
504  public static long getTimestamp(Configuration conf, URI cache)
505    throws IOException {
506    FileSystem fileSystem = FileSystem.get(cache, conf);
507    Path filePath = new Path(cache.getPath());
508
509    return fileSystem.getFileStatus(filePath).getModificationTime();
510  }
511 
512  /**
513   * This method create symlinks for all files in a given dir in another directory
514   * @param conf the configuration
515   * @param jobCacheDir the target directory for creating symlinks
516   * @param workDir the directory in which the symlinks are created
517   * @throws IOException
518   */
519  public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
520    throws IOException{
521    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
522           workDir == null || (!workDir.isDirectory())) {
523      return;
524    }
525    boolean createSymlink = getSymlink(conf);
526    if (createSymlink){
527      File[] list = jobCacheDir.listFiles();
528      for (int i=0; i < list.length; i++){
529        FileUtil.symLink(list[i].getAbsolutePath(),
530                         new File(workDir, list[i].getName()).toString());
531      }
532    } 
533  }
534 
535  private static String getFileSysName(URI url) {
536    String fsname = url.getScheme();
537    if ("hdfs".equals(fsname)) {
538      String host = url.getHost();
539      int port = url.getPort();
540      return (port == (-1)) ? host : (host + ":" + port);
541    } else {
542      return null;
543    }
544  }
545 
546  private static FileSystem getFileSystem(URI cache, Configuration conf)
547    throws IOException {
548    String fileSysName = getFileSysName(cache);
549    if (fileSysName != null)
550      return FileSystem.getNamed(fileSysName, conf);
551    else
552      return FileSystem.get(conf);
553  }
554
555  /**
556   * Set the configuration with the given set of archives
557   * @param archives The list of archives that need to be localized
558   * @param conf Configuration which will be changed
559   */
560  public static void setCacheArchives(URI[] archives, Configuration conf) {
561    String sarchives = StringUtils.uriToString(archives);
562    conf.set("mapred.cache.archives", sarchives);
563  }
564
565  /**
566   * Set the configuration with the given set of files
567   * @param files The list of files that need to be localized
568   * @param conf Configuration which will be changed
569   */
570  public static void setCacheFiles(URI[] files, Configuration conf) {
571    String sfiles = StringUtils.uriToString(files);
572    conf.set("mapred.cache.files", sfiles);
573  }
574
575  /**
576   * Get cache archives set in the Configuration
577   * @param conf The configuration which contains the archives
578   * @return A URI array of the caches set in the Configuration
579   * @throws IOException
580   */
581  public static URI[] getCacheArchives(Configuration conf) throws IOException {
582    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
583  }
584
585  /**
586   * Get cache files set in the Configuration
587   * @param conf The configuration which contains the files
588   * @return A URI array of the files set in the Configuration
589   * @throws IOException
590   */
591
592  public static URI[] getCacheFiles(Configuration conf) throws IOException {
593    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
594  }
595
596  /**
597   * Return the path array of the localized caches
598   * @param conf Configuration that contains the localized archives
599   * @return A path array of localized caches
600   * @throws IOException
601   */
602  public static Path[] getLocalCacheArchives(Configuration conf)
603    throws IOException {
604    return StringUtils.stringToPath(conf
605                                    .getStrings("mapred.cache.localArchives"));
606  }
607
608  /**
609   * Return the path array of the localized files
610   * @param conf Configuration that contains the localized files
611   * @return A path array of localized files
612   * @throws IOException
613   */
614  public static Path[] getLocalCacheFiles(Configuration conf)
615    throws IOException {
616    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
617  }
618
619  /**
620   * Get the timestamps of the archives
621   * @param conf The configuration which stored the timestamps
622   * @return a string array of timestamps
623   * @throws IOException
624   */
625  public static String[] getArchiveTimestamps(Configuration conf) {
626    return conf.getStrings("mapred.cache.archives.timestamps");
627  }
628
629
630  /**
631   * Get the timestamps of the files
632   * @param conf The configuration which stored the timestamps
633   * @return a string array of timestamps
634   * @throws IOException
635   */
636  public static String[] getFileTimestamps(Configuration conf) {
637    return conf.getStrings("mapred.cache.files.timestamps");
638  }
639
640  /**
641   * This is to check the timestamp of the archives to be localized
642   * @param conf Configuration which stores the timestamp's
643   * @param timestamps comma separated list of timestamps of archives.
644   * The order should be the same as the order in which the archives are added.
645   */
646  public static void setArchiveTimestamps(Configuration conf, String timestamps) {
647    conf.set("mapred.cache.archives.timestamps", timestamps);
648  }
649
650  /**
651   * This is to check the timestamp of the files to be localized
652   * @param conf Configuration which stores the timestamp's
653   * @param timestamps comma separated list of timestamps of files.
654   * The order should be the same as the order in which the files are added.
655   */
656  public static void setFileTimestamps(Configuration conf, String timestamps) {
657    conf.set("mapred.cache.files.timestamps", timestamps);
658  }
659 
660  /**
661   * Set the conf to contain the location for localized archives
662   * @param conf The conf to modify to contain the localized caches
663   * @param str a comma separated list of local archives
664   */
665  public static void setLocalArchives(Configuration conf, String str) {
666    conf.set("mapred.cache.localArchives", str);
667  }
668
669  /**
670   * Set the conf to contain the location for localized files
671   * @param conf The conf to modify to contain the localized caches
672   * @param str a comma separated list of local files
673   */
674  public static void setLocalFiles(Configuration conf, String str) {
675    conf.set("mapred.cache.localFiles", str);
676  }
677
678  /**
679   * Add a archives to be localized to the conf
680   * @param uri The uri of the cache to be localized
681   * @param conf Configuration to add the cache to
682   */
683  public static void addCacheArchive(URI uri, Configuration conf) {
684    String archives = conf.get("mapred.cache.archives");
685    conf.set("mapred.cache.archives", archives == null ? uri.toString()
686             : archives + "," + uri.toString());
687  }
688 
689  /**
690   * Add a file to be localized to the conf
691   * @param uri The uri of the cache to be localized
692   * @param conf Configuration to add the cache to
693   */
694  public static void addCacheFile(URI uri, Configuration conf) {
695    String files = conf.get("mapred.cache.files");
696    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
697             + uri.toString());
698  }
699
700  /**
701   * Add an file path to the current set of classpath entries It adds the file
702   * to cache as well.
703   *
704   * @param file Path of the file to be added
705   * @param conf Configuration that contains the classpath setting
706   */
707  public static void addFileToClassPath(Path file, Configuration conf)
708    throws IOException {
709    String classpath = conf.get("mapred.job.classpath.files");
710    conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
711             : classpath + System.getProperty("path.separator") + file.toString());
712    FileSystem fs = FileSystem.get(conf);
713    URI uri = fs.makeQualified(file).toUri();
714
715    addCacheFile(uri, conf);
716  }
717
718  /**
719   * Get the file entries in classpath as an array of Path
720   *
721   * @param conf Configuration that contains the classpath setting
722   */
723  public static Path[] getFileClassPaths(Configuration conf) {
724    String classpath = conf.get("mapred.job.classpath.files");
725    if (classpath == null)
726      return null;
727    ArrayList list = Collections.list(new StringTokenizer(classpath, System
728                                                          .getProperty("path.separator")));
729    Path[] paths = new Path[list.size()];
730    for (int i = 0; i < list.size(); i++) {
731      paths[i] = new Path((String) list.get(i));
732    }
733    return paths;
734  }
735
736  /**
737   * Add an archive path to the current set of classpath entries. It adds the
738   * archive to cache as well.
739   *
740   * @param archive Path of the archive to be added
741   * @param conf Configuration that contains the classpath setting
742   */
743  public static void addArchiveToClassPath(Path archive, Configuration conf)
744    throws IOException {
745    String classpath = conf.get("mapred.job.classpath.archives");
746    conf.set("mapred.job.classpath.archives", classpath == null ? archive
747             .toString() : classpath + System.getProperty("path.separator")
748             + archive.toString());
749    FileSystem fs = FileSystem.get(conf);
750    URI uri = fs.makeQualified(archive).toUri();
751
752    addCacheArchive(uri, conf);
753  }
754
755  /**
756   * Get the archive entries in classpath as an array of Path
757   *
758   * @param conf Configuration that contains the classpath setting
759   */
760  public static Path[] getArchiveClassPaths(Configuration conf) {
761    String classpath = conf.get("mapred.job.classpath.archives");
762    if (classpath == null)
763      return null;
764    ArrayList list = Collections.list(new StringTokenizer(classpath, System
765                                                          .getProperty("path.separator")));
766    Path[] paths = new Path[list.size()];
767    for (int i = 0; i < list.size(); i++) {
768      paths[i] = new Path((String) list.get(i));
769    }
770    return paths;
771  }
772
773  /**
774   * This method allows you to create symlinks in the current working directory
775   * of the task to all the cache files/archives
776   * @param conf the jobconf
777   */
778  public static void createSymlink(Configuration conf){
779    conf.set("mapred.create.symlink", "yes");
780  }
781 
782  /**
783   * This method checks to see if symlinks are to be create for the
784   * localized cache files in the current working directory
785   * @param conf the jobconf
786   * @return true if symlinks are to be created- else return false
787   */
788  public static boolean getSymlink(Configuration conf){
789    String result = conf.get("mapred.create.symlink");
790    if ("yes".equals(result)){
791      return true;
792    }
793    return false;
794  }
795
796  /**
797   * This method checks if there is a conflict in the fragment names
798   * of the uris. Also makes sure that each uri has a fragment. It
799   * is only to be called if you want to create symlinks for
800   * the various archives and files.
801   * @param uriFiles The uri array of urifiles
802   * @param uriArchives the uri array of uri archives
803   */
804  public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
805    if ((uriFiles == null) && (uriArchives == null)){
806      return true;
807    }
808    if (uriFiles != null){
809      for (int i = 0; i < uriFiles.length; i++){
810        String frag1 = uriFiles[i].getFragment();
811        if (frag1 == null)
812          return false;
813        for (int j=i+1; j < uriFiles.length; j++){
814          String frag2 = uriFiles[j].getFragment();
815          if (frag2 == null)
816            return false;
817          if (frag1.equalsIgnoreCase(frag2))
818            return false;
819        }
820        if (uriArchives != null){
821          for (int j = 0; j < uriArchives.length; j++){
822            String frag2 = uriArchives[j].getFragment();
823            if (frag2 == null){
824              return false;
825            }
826            if (frag1.equalsIgnoreCase(frag2))
827              return false;
828            for (int k=j+1; k < uriArchives.length; k++){
829              String frag3 = uriArchives[k].getFragment();
830              if (frag3 == null)
831                return false;
832              if (frag2.equalsIgnoreCase(frag3))
833                return false;
834            }
835          }
836        }
837      }
838    }
839    return true;
840  }
841
842  private static class CacheStatus {
843    // false, not loaded yet, true is loaded
844    boolean currentStatus;
845
846    // the local load path of this cache
847    Path localLoadPath;
848   
849    //the base dir where the cache lies
850    Path baseDir;
851   
852    //the size of this cache
853    long size;
854
855    // number of instances using this cache
856    int refcount;
857
858    // the cache-file modification time
859    long mtime;
860
861    public CacheStatus(Path baseDir, Path localLoadPath) {
862      super();
863      this.currentStatus = false;
864      this.localLoadPath = localLoadPath;
865      this.refcount = 0;
866      this.mtime = -1;
867      this.baseDir = baseDir;
868      this.size = 0;
869    }
870  }
871
872  /**
873   * Clear the entire contents of the cache and delete the backing files. This
874   * should only be used when the server is reinitializing, because the users
875   * are going to lose their files.
876   */
877  public static void purgeCache(Configuration conf) throws IOException {
878    synchronized (cachedArchives) {
879      FileSystem localFs = FileSystem.getLocal(conf);
880      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
881        try {
882          localFs.delete(f.getValue().localLoadPath, true);
883        } catch (IOException ie) {
884          LOG.debug("Error cleaning up cache", ie);
885        }
886      }
887      cachedArchives.clear();
888    }
889  }
890}
Note: See TracBrowser for help on using the repository browser.