source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 3.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.util.concurrent.LinkedBlockingQueue;
23
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26
27import org.apache.hadoop.fs.FileSystem;
28import org.apache.hadoop.fs.Path;
29
30class CleanupQueue {
31
32  public static final Log LOG =
33    LogFactory.getLog(CleanupQueue.class);
34
35  private static PathCleanupThread cleanupThread;
36
37  /**
38   * Create a singleton path-clean-up queue. It can be used to delete
39   * paths(directories/files) in a separate thread. This constructor creates a
40   * clean-up thread and also starts it as a daemon. Callers can instantiate one
41   * CleanupQueue per JVM and can use it for deleting paths. Use
42   * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
43   * deletion.
44   */
45  public CleanupQueue() {
46    synchronized (PathCleanupThread.class) {
47      if (cleanupThread == null) {
48        cleanupThread = new PathCleanupThread();
49      }
50    }
51  }
52 
53  public void addToQueue(JobConf conf, Path...paths) {
54    cleanupThread.addToQueue(conf,paths);
55  }
56
57  private static class PathCleanupThread extends Thread {
58
59    static class PathAndConf {
60      JobConf conf;
61      Path path;
62      PathAndConf(JobConf conf, Path path) {
63        this.conf = conf;
64        this.path = path;
65      }
66    }
67    // cleanup queue which deletes files/directories of the paths queued up.
68    private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
69
70    public PathCleanupThread() {
71      setName("Directory/File cleanup thread");
72      setDaemon(true);
73      start();
74    }
75
76    public void addToQueue(JobConf conf,Path... paths) {
77      for (Path p : paths) {
78        try {
79          queue.put(new PathAndConf(conf,p));
80        } catch (InterruptedException ie) {}
81      }
82    }
83
84    public void run() {
85      LOG.debug(getName() + " started.");
86      PathAndConf pathAndConf = null;
87      while (true) {
88        try {
89          pathAndConf = queue.take();
90          // delete the path.
91          FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
92          fs.delete(pathAndConf.path, true);
93          LOG.debug("DELETED " + pathAndConf.path);
94        } catch (InterruptedException t) {
95          return;
96        } catch (Exception e) {
97          LOG.warn("Error deleting path" + pathAndConf.path);
98        } 
99      }
100    }
101  }
102}
Note: See TracBrowser for help on using the repository browser.