source: proiecte/HadoopJUnit/hadoop-0.20.1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 12.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.namenode;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.List;
24import java.util.Map;
25import java.util.SortedMap;
26import java.util.SortedSet;
27import java.util.TreeMap;
28import java.util.TreeSet;
29
30import org.apache.commons.logging.Log;
31import org.apache.commons.logging.LogFactory;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.hdfs.protocol.FSConstants;
34
35/**
36 * LeaseManager does the lease housekeeping for writing on files.   
37 * This class also provides useful static methods for lease recovery.
38 *
39 * Lease Recovery Algorithm
40 * 1) Namenode retrieves lease information
41 * 2) For each file f in the lease, consider the last block b of f
42 * 2.1) Get the datanodes which contains b
43 * 2.2) Assign one of the datanodes as the primary datanode p
44
45 * 2.3) p obtains a new generation stamp form the namenode
46 * 2.4) p get the block info from each datanode
47 * 2.5) p computes the minimum block length
48 * 2.6) p updates the datanodes, which have a valid generation stamp,
49 *      with the new generation stamp and the minimum block length
50 * 2.7) p acknowledges the namenode the update results
51
52 * 2.8) Namenode updates the BlockInfo
53 * 2.9) Namenode removes f from the lease
54 *      and removes the lease once all files have been removed
55 * 2.10) Namenode commit changes to edit log
56 */
57public class LeaseManager {
58  public static final Log LOG = LogFactory.getLog(LeaseManager.class);
59
60  private final FSNamesystem fsnamesystem;
61
62  private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
63  private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;
64
65  //
66  // Used for handling lock-leases
67  // Mapping: leaseHolder -> Lease
68  //
69  private SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
70  // Set of: Lease
71  private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
72
73  //
74  // Map path names to leases. It is protected by the sortedLeases lock.
75  // The map stores pathnames in lexicographical order.
76  //
77  private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
78
79  LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
80
81  Lease getLease(String holder) {
82    return leases.get(holder);
83  }
84 
85  SortedSet<Lease> getSortedLeases() {return sortedLeases;}
86
87  /** @return the lease containing src */
88  public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
89
90  /** @return the number of leases currently in the system */
91  public synchronized int countLease() {return sortedLeases.size();}
92
93  /** @return the number of paths contained in all leases */
94  synchronized int countPath() {
95    int count = 0;
96    for(Lease lease : sortedLeases) {
97      count += lease.getPaths().size();
98    }
99    return count;
100  }
101 
102  /**
103   * Adds (or re-adds) the lease for the specified file.
104   */
105  synchronized void addLease(String holder, String src) {
106    Lease lease = getLease(holder);
107    if (lease == null) {
108      lease = new Lease(holder);
109      leases.put(holder, lease);
110      sortedLeases.add(lease);
111    } else {
112      renewLease(lease);
113    }
114    sortedLeasesByPath.put(src, lease);
115    lease.paths.add(src);
116  }
117
118  /**
119   * Remove the specified lease and src.
120   */
121  synchronized void removeLease(Lease lease, String src) {
122    sortedLeasesByPath.remove(src);
123    if (!lease.removePath(src)) {
124      LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
125    }
126
127    if (!lease.hasPath()) {
128      leases.remove(lease.holder);
129      if (!sortedLeases.remove(lease)) {
130        LOG.error(lease + " not found in sortedLeases");
131      }
132    }
133  }
134
135  /**
136   * Remove the lease for the specified holder and src
137   */
138  synchronized void removeLease(String holder, String src) {
139    Lease lease = getLease(holder);
140    if (lease != null) {
141      removeLease(lease, src);
142    }
143  }
144
145  /**
146   * Finds the pathname for the specified pendingFile
147   */
148  synchronized String findPath(INodeFileUnderConstruction pendingFile
149      ) throws IOException {
150    Lease lease = getLease(pendingFile.clientName);
151    if (lease != null) {
152      String src = lease.findPath(pendingFile);
153      if (src != null) {
154        return src;
155      }
156    }
157    throw new IOException("pendingFile (=" + pendingFile + ") not found."
158        + "(lease=" + lease + ")");
159  }
160
161  /**
162   * Renew the lease(s) held by the given client
163   */
164  synchronized void renewLease(String holder) {
165    renewLease(getLease(holder));
166  }
167  synchronized void renewLease(Lease lease) {
168    if (lease != null) {
169      sortedLeases.remove(lease);
170      lease.renew();
171      sortedLeases.add(lease);
172    }
173  }
174
175  /************************************************************
176   * A Lease governs all the locks held by a single client.
177   * For each client there's a corresponding lease, whose
178   * timestamp is updated when the client periodically
179   * checks in.  If the client dies and allows its lease to
180   * expire, all the corresponding locks can be released.
181   *************************************************************/
182  class Lease implements Comparable<Lease> {
183    private final String holder;
184    private long lastUpdate;
185    private final Collection<String> paths = new TreeSet<String>();
186 
187    /** Only LeaseManager object can create a lease */
188    private Lease(String holder) {
189      this.holder = holder;
190      renew();
191    }
192    /** Only LeaseManager object can renew a lease */
193    private void renew() {
194      this.lastUpdate = FSNamesystem.now();
195    }
196
197    /** @return true if the Hard Limit Timer has expired */
198    public boolean expiredHardLimit() {
199      return FSNamesystem.now() - lastUpdate > hardLimit;
200    }
201
202    /** @return true if the Soft Limit Timer has expired */
203    public boolean expiredSoftLimit() {
204      return FSNamesystem.now() - lastUpdate > softLimit;
205    }
206
207    /**
208     * @return the path associated with the pendingFile and null if not found.
209     */
210    private String findPath(INodeFileUnderConstruction pendingFile) {
211      for(String src : paths) {
212        if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
213          return src;
214        }
215      }
216      return null;
217    }
218
219    /** Does this lease contain any path? */
220    boolean hasPath() {return !paths.isEmpty();}
221
222    boolean removePath(String src) {
223      return paths.remove(src);
224    }
225
226    /** {@inheritDoc} */
227    public String toString() {
228      return "[Lease.  Holder: " + holder
229          + ", pendingcreates: " + paths.size() + "]";
230    }
231 
232    /** {@inheritDoc} */
233    public int compareTo(Lease o) {
234      Lease l1 = this;
235      Lease l2 = o;
236      long lu1 = l1.lastUpdate;
237      long lu2 = l2.lastUpdate;
238      if (lu1 < lu2) {
239        return -1;
240      } else if (lu1 > lu2) {
241        return 1;
242      } else {
243        return l1.holder.compareTo(l2.holder);
244      }
245    }
246 
247    /** {@inheritDoc} */
248    public boolean equals(Object o) {
249      if (!(o instanceof Lease)) {
250        return false;
251      }
252      Lease obj = (Lease) o;
253      if (lastUpdate == obj.lastUpdate &&
254          holder.equals(obj.holder)) {
255        return true;
256      }
257      return false;
258    }
259 
260    /** {@inheritDoc} */
261    public int hashCode() {
262      return holder.hashCode();
263    }
264   
265    Collection<String> getPaths() {
266      return paths;
267    }
268   
269    void replacePath(String oldpath, String newpath) {
270      paths.remove(oldpath);
271      paths.add(newpath);
272    }
273  }
274
275  synchronized void changeLease(String src, String dst,
276      String overwrite, String replaceBy) {
277    if (LOG.isDebugEnabled()) {
278      LOG.debug(getClass().getSimpleName() + ".changelease: " +
279               " src=" + src + ", dest=" + dst + 
280               ", overwrite=" + overwrite +
281               ", replaceBy=" + replaceBy);
282    }
283
284    final int len = overwrite.length();
285    for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(src, sortedLeasesByPath)) {
286      final String oldpath = entry.getKey();
287      final Lease lease = entry.getValue();
288      //overwrite must be a prefix of oldpath
289      final String newpath = replaceBy + oldpath.substring(len);
290      if (LOG.isDebugEnabled()) {
291        LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
292      }
293      lease.replacePath(oldpath, newpath);
294      sortedLeasesByPath.remove(oldpath);
295      sortedLeasesByPath.put(newpath, lease);
296    }
297  }
298
299  synchronized void removeLeaseWithPrefixPath(String prefix) {
300    for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(prefix, sortedLeasesByPath)) {
301      if (LOG.isDebugEnabled()) {
302        LOG.debug(LeaseManager.class.getSimpleName()
303            + ".removeLeaseWithPrefixPath: entry=" + entry);
304      }
305      removeLease(entry.getValue(), entry.getKey());   
306    }
307  }
308
309  static private List<Map.Entry<String, Lease>> findLeaseWithPrefixPath(
310      String prefix, SortedMap<String, Lease> path2lease) {
311    if (LOG.isDebugEnabled()) {
312      LOG.debug(LeaseManager.class.getSimpleName() + ".findLease: prefix=" + prefix);
313    }
314
315    List<Map.Entry<String, Lease>> entries = new ArrayList<Map.Entry<String, Lease>>();
316    final int srclen = prefix.length();
317
318    for(Map.Entry<String, Lease> entry : path2lease.tailMap(prefix).entrySet()) {
319      final String p = entry.getKey();
320      if (!p.startsWith(prefix)) {
321        return entries;
322      }
323      if (p.length() == srclen || p.charAt(srclen) == Path.SEPARATOR_CHAR) {
324        entries.add(entry);
325      }
326    }
327    return entries;
328  }
329
330  public void setLeasePeriod(long softLimit, long hardLimit) {
331    this.softLimit = softLimit;
332    this.hardLimit = hardLimit; 
333  }
334 
335  /******************************************************
336   * Monitor checks for leases that have expired,
337   * and disposes of them.
338   ******************************************************/
339  class Monitor implements Runnable {
340    final String name = getClass().getSimpleName();
341
342    /** Check leases periodically. */
343    public void run() {
344      for(; fsnamesystem.isRunning(); ) {
345        synchronized(fsnamesystem) {
346          checkLeases();
347        }
348
349        try {
350          Thread.sleep(2000);
351        } catch(InterruptedException ie) {
352          if (LOG.isDebugEnabled()) {
353            LOG.debug(name + " is interrupted", ie);
354          }
355        }
356      }
357    }
358  }
359
360  /** Check the leases beginning from the oldest. */
361  private synchronized void checkLeases() {
362    for(; sortedLeases.size() > 0; ) {
363      final Lease oldest = sortedLeases.first();
364      if (!oldest.expiredHardLimit()) {
365        return;
366      }
367
368      LOG.info("Lease " + oldest + " has expired hard limit");
369
370      final List<String> removing = new ArrayList<String>();
371      // need to create a copy of the oldest lease paths, becuase
372      // internalReleaseLease() removes paths corresponding to empty files,
373      // i.e. it needs to modify the collection being iterated over
374      // causing ConcurrentModificationException
375      String[] leasePaths = new String[oldest.getPaths().size()];
376      oldest.getPaths().toArray(leasePaths);
377      for(String p : leasePaths) {
378        try {
379          fsnamesystem.internalReleaseLease(oldest, p);
380        } catch (IOException e) {
381          LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
382          removing.add(p);
383        }
384      }
385
386      for(String p : removing) {
387        removeLease(oldest, p);
388      }
389    }
390  }
391
392  /** {@inheritDoc} */
393  public synchronized String toString() {
394    return getClass().getSimpleName() + "= {"
395        + "\n leases=" + leases
396        + "\n sortedLeases=" + sortedLeases
397        + "\n sortedLeasesByPath=" + sortedLeasesByPath
398        + "\n}";
399  }
400}
Note: See TracBrowser for help on using the repository browser.