source: proiecte/HadoopJUnit/hadoop-0.20.1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 44.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.namenode;
19
20import java.io.*;
21import java.util.*;
22
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.fs.FileStatus;
25import org.apache.hadoop.fs.Path;
26import org.apache.hadoop.fs.ContentSummary;
27import org.apache.hadoop.fs.permission.*;
28import org.apache.hadoop.metrics.MetricsRecord;
29import org.apache.hadoop.metrics.MetricsUtil;
30import org.apache.hadoop.metrics.MetricsContext;
31import org.apache.hadoop.hdfs.protocol.FSConstants;
32import org.apache.hadoop.hdfs.protocol.Block;
33import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
34import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
35import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
36
37/*************************************************
38 * FSDirectory stores the filesystem directory state.
39 * It handles writing/loading values to disk, and logging
40 * changes as we go.
41 *
42 * It keeps the filename->blockset mapping always-current
43 * and logged to disk.
44 *
45 *************************************************/
46class FSDirectory implements FSConstants, Closeable {
47
48  final FSNamesystem namesystem;
49  final INodeDirectoryWithQuota rootDir;
50  FSImage fsImage; 
51  private boolean ready = false;
52  // Metrics record
53  private MetricsRecord directoryMetrics = null;
54
55  /** Access an existing dfs name directory. */
56  FSDirectory(FSNamesystem ns, Configuration conf) {
57    this(new FSImage(), ns, conf);
58    fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
59                                FSImage.getCheckpointEditsDirs(conf, null));
60  }
61
62  FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
63    rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
64        ns.createFsOwnerPermissions(new FsPermission((short)0755)),
65        Integer.MAX_VALUE, -1);
66    this.fsImage = fsImage;
67    namesystem = ns;
68    initialize(conf);
69  }
70   
71  private void initialize(Configuration conf) {
72    MetricsContext metricsContext = MetricsUtil.getContext("dfs");
73    directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
74    directoryMetrics.setTag("sessionId", conf.get("session.id"));
75  }
76
77  void loadFSImage(Collection<File> dataDirs,
78                   Collection<File> editsDirs,
79                   StartupOption startOpt) throws IOException {
80    // format before starting up if requested
81    if (startOpt == StartupOption.FORMAT) {
82      fsImage.setStorageDirectories(dataDirs, editsDirs);
83      fsImage.format();
84      startOpt = StartupOption.REGULAR;
85    }
86    try {
87      if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
88        fsImage.saveFSImage();
89      }
90      FSEditLog editLog = fsImage.getEditLog();
91      assert editLog != null : "editLog must be initialized";
92      if (!editLog.isOpen())
93        editLog.open();
94      fsImage.setCheckpointDirectories(null, null);
95    } catch(IOException e) {
96      fsImage.close();
97      throw e;
98    }
99    synchronized (this) {
100      this.ready = true;
101      this.notifyAll();
102    }
103  }
104
105  private void incrDeletedFileCount(int count) {
106    directoryMetrics.incrMetric("files_deleted", count);
107    directoryMetrics.update();
108  }
109   
110  /**
111   * Shutdown the filestore
112   */
113  public void close() throws IOException {
114    fsImage.close();
115  }
116
117  /**
118   * Block until the object is ready to be used.
119   */
120  void waitForReady() {
121    if (!ready) {
122      synchronized (this) {
123        while (!ready) {
124          try {
125            this.wait(5000);
126          } catch (InterruptedException ie) {
127          }
128        }
129      }
130    }
131  }
132
133  /**
134   * Add the given filename to the fs.
135   */
136  INodeFileUnderConstruction addFile(String path, 
137                PermissionStatus permissions,
138                short replication,
139                long preferredBlockSize,
140                String clientName,
141                String clientMachine,
142                DatanodeDescriptor clientNode,
143                long generationStamp) 
144                throws IOException {
145    waitForReady();
146
147    // Always do an implicit mkdirs for parent directory tree.
148    long modTime = FSNamesystem.now();
149    if (!mkdirs(new Path(path).getParent().toString(), permissions, true,
150        modTime)) {
151      return null;
152    }
153    INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
154                                 permissions,replication,
155                                 preferredBlockSize, modTime, clientName, 
156                                 clientMachine, clientNode);
157    synchronized (rootDir) {
158      newNode = addNode(path, newNode, -1, false);
159    }
160    if (newNode == null) {
161      NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
162                                   +"failed to add "+path
163                                   +" to the file system");
164      return null;
165    }
166    // add create file record to log, record new generation stamp
167    fsImage.getEditLog().logOpenFile(path, newNode);
168
169    NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
170                                  +path+" is added to the file system");
171    return newNode;
172  }
173
174  /**
175   */
176  INode unprotectedAddFile( String path, 
177                            PermissionStatus permissions,
178                            Block[] blocks, 
179                            short replication,
180                            long modificationTime,
181                            long atime,
182                            long preferredBlockSize) {
183    INode newNode;
184    long diskspace = -1; // unknown
185    if (blocks == null)
186      newNode = new INodeDirectory(permissions, modificationTime);
187    else {
188      newNode = new INodeFile(permissions, blocks.length, replication,
189                              modificationTime, atime, preferredBlockSize);
190      diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
191    }
192    synchronized (rootDir) {
193      try {
194        newNode = addNode(path, newNode, diskspace, false);
195        if(newNode != null && blocks != null) {
196          int nrBlocks = blocks.length;
197          // Add file->block mapping
198          INodeFile newF = (INodeFile)newNode;
199          for (int i = 0; i < nrBlocks; i++) {
200            newF.setBlock(i, namesystem.blocksMap.addINode(blocks[i], newF));
201          }
202        }
203      } catch (IOException e) {
204        return null;
205      }
206      return newNode;
207    }
208  }
209
210  INodeDirectory addToParent( String src,
211                              INodeDirectory parentINode,
212                              PermissionStatus permissions,
213                              Block[] blocks, 
214                              short replication,
215                              long modificationTime,
216                              long atime,
217                              long nsQuota,
218                              long dsQuota,
219                              long preferredBlockSize) {
220    // NOTE: This does not update space counts for parents
221    // create new inode
222    INode newNode;
223    if (blocks == null) {
224      if (nsQuota >= 0 || dsQuota >= 0) {
225        newNode = new INodeDirectoryWithQuota(
226            permissions, modificationTime, nsQuota, dsQuota);
227      } else {
228        newNode = new INodeDirectory(permissions, modificationTime);
229      }
230    } else 
231      newNode = new INodeFile(permissions, blocks.length, replication,
232                              modificationTime, atime, preferredBlockSize);
233    // add new node to the parent
234    INodeDirectory newParent = null;
235    synchronized (rootDir) {
236      try {
237        newParent = rootDir.addToParent(src, newNode, parentINode, false);
238      } catch (FileNotFoundException e) {
239        return null;
240      }
241      if(newParent == null)
242        return null;
243      if(blocks != null) {
244        int nrBlocks = blocks.length;
245        // Add file->block mapping
246        INodeFile newF = (INodeFile)newNode;
247        for (int i = 0; i < nrBlocks; i++) {
248          newF.setBlock(i, namesystem.blocksMap.addINode(blocks[i], newF));
249        }
250      }
251    }
252    return newParent;
253  }
254
255  /**
256   * Add a block to the file. Returns a reference to the added block.
257   */
258  Block addBlock(String path, INode[] inodes, Block block) throws IOException {
259    waitForReady();
260
261    synchronized (rootDir) {
262      INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
263
264      // check quota limits and updated space consumed
265      updateCount(inodes, inodes.length-1, 0, 
266                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
267     
268      // associate the new list of blocks with this file
269      namesystem.blocksMap.addINode(block, fileNode);
270      BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
271      fileNode.addBlock(blockInfo);
272
273      NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
274                                    + path + " with " + block
275                                    + " block is added to the in-memory "
276                                    + "file system");
277    }
278    return block;
279  }
280
281  /**
282   * Persist the block list for the inode.
283   */
284  void persistBlocks(String path, INodeFileUnderConstruction file) 
285                     throws IOException {
286    waitForReady();
287
288    synchronized (rootDir) {
289      fsImage.getEditLog().logOpenFile(path, file);
290      NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
291                                    +path+" with "+ file.getBlocks().length 
292                                    +" blocks is persisted to the file system");
293    }
294  }
295
296  /**
297   * Close file.
298   */
299  void closeFile(String path, INodeFile file) throws IOException {
300    waitForReady();
301    synchronized (rootDir) {
302      // file is closed
303      fsImage.getEditLog().logCloseFile(path, file);
304      if (NameNode.stateChangeLog.isDebugEnabled()) {
305        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
306                                    +path+" with "+ file.getBlocks().length 
307                                    +" blocks is persisted to the file system");
308      }
309    }
310  }
311
312  /**
313   * Remove a block to the file.
314   */
315  boolean removeBlock(String path, INodeFileUnderConstruction fileNode, 
316                      Block block) throws IOException {
317    waitForReady();
318
319    synchronized (rootDir) {
320      // modify file-> block and blocksMap
321      fileNode.removeBlock(block);
322      namesystem.blocksMap.removeINode(block);
323      // If block is removed from blocksMap remove it from corruptReplicasMap
324      namesystem.corruptReplicas.removeFromCorruptReplicasMap(block);
325
326      // write modified block locations to log
327      fsImage.getEditLog().logOpenFile(path, fileNode);
328      NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
329                                    +path+" with "+block
330                                    +" block is added to the file system");
331    }
332    return true;
333  }
334
335  /**
336   * @see #unprotectedRenameTo(String, String, long)
337   */
338  boolean renameTo(String src, String dst) throws QuotaExceededException {
339    if (NameNode.stateChangeLog.isDebugEnabled()) {
340      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
341                                  +src+" to "+dst);
342    }
343    waitForReady();
344    long now = FSNamesystem.now();
345    if (!unprotectedRenameTo(src, dst, now))
346      return false;
347    fsImage.getEditLog().logRename(src, dst, now);
348    return true;
349  }
350
351  /** Change a path name
352   *
353   * @param src source path
354   * @param dst destination path
355   * @return true if rename succeeds; false otherwise
356   * @throws QuotaExceededException if the operation violates any quota limit
357   */
358  boolean unprotectedRenameTo(String src, String dst, long timestamp) 
359  throws QuotaExceededException {
360    synchronized (rootDir) {
361      INode[] srcInodes = rootDir.getExistingPathINodes(src);
362
363      // check the validation of the source
364      if (srcInodes[srcInodes.length-1] == null) {
365        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
366                                     +"failed to rename "+src+" to "+dst+ " because source does not exist");
367        return false;
368      } else if (srcInodes.length == 1) {
369        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
370            +"failed to rename "+src+" to "+dst+ " because source is the root");
371        return false;
372      }
373      if (isDir(dst)) {
374        dst += Path.SEPARATOR + new Path(src).getName();
375      }
376     
377      // remove source
378      INode srcChild = null;
379      try {
380        srcChild = removeChild(srcInodes, srcInodes.length-1);
381      } catch (IOException e) {
382        // srcChild == null; go to next if statement
383      }
384      if (srcChild == null) {
385        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
386            +"failed to rename "+src+" to "+dst+ " because the source can not be removed");
387        return false;
388      }
389
390      String srcChildName = srcChild.getLocalName();
391     
392      // check the validity of the destination
393      INode dstChild = null;
394      QuotaExceededException failureByQuota = null;
395
396      byte[][] dstComponents = INode.getPathComponents(dst);
397      INode[] dstInodes = new INode[dstComponents.length];
398      rootDir.getExistingPathINodes(dstComponents, dstInodes);
399      if (dstInodes[dstInodes.length-1] != null) { //check if destination exists
400        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
401                                     +"failed to rename "+src+" to "+dst+ 
402                                     " because destination exists");
403      } else if (dstInodes[dstInodes.length-2] == null) { // check if its parent exists
404        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
405            +"failed to rename "+src+" to "+dst+ 
406            " because destination's parent does not exists");
407      }
408      else {
409        // add to the destination
410        srcChild.setLocalName(dstComponents[dstInodes.length-1]);
411        try {
412          // add it to the namespace
413          dstChild = addChild(dstInodes, dstInodes.length-1, srcChild, false);
414        } catch (QuotaExceededException qe) {
415          failureByQuota = qe;
416        }
417      }
418      if (dstChild != null) {
419        if (NameNode.stateChangeLog.isDebugEnabled()) {
420          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
421            +src+" is renamed to "+dst);
422        }
423
424        // update modification time of dst and the parent of src
425        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
426        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
427        return true;
428      } else {
429        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
430            +"failed to rename "+src+" to "+dst);
431        try {
432          // put it back
433          srcChild.setLocalName(srcChildName);
434          addChild(srcInodes, srcInodes.length-1, srcChild, false);
435        } catch (IOException ignored) {}
436        if (failureByQuota != null) {
437          throw failureByQuota;
438        } else {
439          return false;
440        }
441      }
442    }
443  }
444
445  /**
446   * Set file replication
447   *
448   * @param src file name
449   * @param replication new replication
450   * @param oldReplication old replication - output parameter
451   * @return array of file blocks
452   * @throws IOException
453   */
454  Block[] setReplication(String src, 
455                         short replication,
456                         int[] oldReplication
457                         ) throws IOException {
458    waitForReady();
459    Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
460    if (fileBlocks != null)  // log replication change
461      fsImage.getEditLog().logSetReplication(src, replication);
462    return fileBlocks;
463  }
464
465  Block[] unprotectedSetReplication( String src, 
466                                     short replication,
467                                     int[] oldReplication
468                                     ) throws IOException {
469    if (oldReplication == null)
470      oldReplication = new int[1];
471    oldReplication[0] = -1;
472    Block[] fileBlocks = null;
473    synchronized(rootDir) {
474      INode[] inodes = rootDir.getExistingPathINodes(src);
475      INode inode = inodes[inodes.length - 1];
476      if (inode == null)
477        return null;
478      if (inode.isDirectory())
479        return null;
480      INodeFile fileNode = (INodeFile)inode;
481      oldReplication[0] = fileNode.getReplication();
482
483      // check disk quota
484      long dsDelta = (replication - oldReplication[0]) *
485           (fileNode.diskspaceConsumed()/oldReplication[0]);
486      updateCount(inodes, inodes.length-1, 0, dsDelta);
487
488      fileNode.setReplication(replication);
489      fileBlocks = fileNode.getBlocks();
490    }
491    return fileBlocks;
492  }
493
494  /**
495   * Get the blocksize of a file
496   * @param filename the filename
497   * @return the number of bytes
498   * @throws IOException if it is a directory or does not exist.
499   */
500  long getPreferredBlockSize(String filename) throws IOException {
501    synchronized (rootDir) {
502      INode fileNode = rootDir.getNode(filename);
503      if (fileNode == null) {
504        throw new IOException("Unknown file: " + filename);
505      }
506      if (fileNode.isDirectory()) {
507        throw new IOException("Getting block size of a directory: " + 
508                              filename);
509      }
510      return ((INodeFile)fileNode).getPreferredBlockSize();
511    }
512  }
513
514  boolean exists(String src) {
515    src = normalizePath(src);
516    synchronized(rootDir) {
517      INode inode = rootDir.getNode(src);
518      if (inode == null) {
519         return false;
520      }
521      return inode.isDirectory()? true: ((INodeFile)inode).getBlocks() != null;
522    }
523  }
524
525  void setPermission(String src, FsPermission permission
526      ) throws IOException {
527    unprotectedSetPermission(src, permission);
528    fsImage.getEditLog().logSetPermissions(src, permission);
529  }
530
531  void unprotectedSetPermission(String src, FsPermission permissions) throws FileNotFoundException {
532    synchronized(rootDir) {
533        INode inode = rootDir.getNode(src);
534        if(inode == null)
535            throw new FileNotFoundException("File does not exist: " + src);
536        inode.setPermission(permissions);
537    }
538  }
539
540  void setOwner(String src, String username, String groupname
541      ) throws IOException {
542    unprotectedSetOwner(src, username, groupname);
543    fsImage.getEditLog().logSetOwner(src, username, groupname);
544  }
545
546  void unprotectedSetOwner(String src, String username, String groupname) throws FileNotFoundException {
547    synchronized(rootDir) {
548      INode inode = rootDir.getNode(src);
549      if(inode == null)
550          throw new FileNotFoundException("File does not exist: " + src);
551      if (username != null) {
552        inode.setUser(username);
553      }
554      if (groupname != null) {
555        inode.setGroup(groupname);
556      }
557    }
558  }
559   
560  /**
561   * Remove the file from management, return blocks
562   */
563  INode delete(String src) {
564    if (NameNode.stateChangeLog.isDebugEnabled()) {
565      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
566    }
567    waitForReady();
568    long now = FSNamesystem.now();
569    INode deletedNode = unprotectedDelete(src, now);
570    if (deletedNode != null) {
571      fsImage.getEditLog().logDelete(src, now);
572    }
573    return deletedNode;
574  }
575 
576  /** Return if a directory is empty or not **/
577  boolean isDirEmpty(String src) {
578           boolean dirNotEmpty = true;
579    if (!isDir(src)) {
580      return true;
581    }
582    synchronized(rootDir) {
583      INode targetNode = rootDir.getNode(src);
584      assert targetNode != null : "should be taken care in isDir() above";
585      if (((INodeDirectory)targetNode).getChildren().size() != 0) {
586        dirNotEmpty = false;
587      }
588    }
589    return dirNotEmpty;
590  }
591 
592  /**
593   * Delete a path from the name space
594   * Update the count at each ancestor directory with quota
595   * @param src a string representation of a path to an inode
596   * @param modificationTime the time the inode is removed
597   * @param deletedBlocks the place holder for the blocks to be removed
598   * @return if the deletion succeeds
599   */ 
600  INode unprotectedDelete(String src, long modificationTime) {
601    src = normalizePath(src);
602
603    synchronized (rootDir) {
604      INode[] inodes =  rootDir.getExistingPathINodes(src);
605      INode targetNode = inodes[inodes.length-1];
606
607      if (targetNode == null) { // non-existent src
608        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
609            +"failed to remove "+src+" because it does not exist");
610        return null;
611      } else if (inodes.length == 1) { // src is the root
612        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
613            "failed to remove " + src +
614            " because the root is not allowed to be deleted");
615        return null;
616      } else {
617        try {
618          // Remove the node from the namespace
619          removeChild(inodes, inodes.length-1);
620          // set the parent's modification time
621          inodes[inodes.length-2].setModificationTime(modificationTime);
622          // GC all the blocks underneath the node.
623          ArrayList<Block> v = new ArrayList<Block>();
624          int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
625          incrDeletedFileCount(filesRemoved);
626          namesystem.removePathAndBlocks(src, v);
627          if (NameNode.stateChangeLog.isDebugEnabled()) {
628            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
629              +src+" is removed");
630          }
631          return targetNode;
632        } catch (IOException e) {
633          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
634              "failed to remove " + src + " because " + e.getMessage());
635          return null;
636        }
637      }
638    }
639  }
640
641  /**
642   * Replaces the specified inode with the specified one.
643   */
644  void replaceNode(String path, INodeFile oldnode, INodeFile newnode) 
645                                                   throws IOException {
646    replaceNode(path, oldnode, newnode, true);
647  }
648 
649  /**
650   * @see #replaceNode(String, INodeFile, INodeFile)
651   */
652  private void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
653                           boolean updateDiskspace) throws IOException {   
654    synchronized (rootDir) {
655      long dsOld = oldnode.diskspaceConsumed();
656     
657      //
658      // Remove the node from the namespace
659      //
660      if (!oldnode.removeNode()) {
661        NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
662                                     "failed to remove " + path);
663        throw new IOException("FSDirectory.replaceNode: " +
664                              "failed to remove " + path);
665      } 
666     
667      /* Currently oldnode and newnode are assumed to contain the same
668       * blocks. Otherwise, blocks need to be removed from the blocksMap.
669       */
670     
671      rootDir.addNode(path, newnode); 
672
673      //check if disk space needs to be updated.
674      long dsNew = 0;
675      if (updateDiskspace && (dsNew = newnode.diskspaceConsumed()) != dsOld) {
676        try {
677          updateSpaceConsumed(path, 0, dsNew-dsOld);
678        } catch (QuotaExceededException e) {
679          // undo
680          replaceNode(path, newnode, oldnode, false);
681          throw e;
682        }
683      }
684     
685      int index = 0;
686      for (Block b : newnode.getBlocks()) {
687        BlockInfo info = namesystem.blocksMap.addINode(b, newnode);
688        newnode.setBlock(index, info); // inode refers to the block in BlocksMap
689        index++;
690      }
691    }
692  }
693
694  /**
695   * Get a listing of files given path 'src'
696   *
697   * This function is admittedly very inefficient right now.  We'll
698   * make it better later.
699   */
700  FileStatus[] getListing(String src) {
701    String srcs = normalizePath(src);
702
703    synchronized (rootDir) {
704      INode targetNode = rootDir.getNode(srcs);
705      if (targetNode == null)
706        return null;
707      if (!targetNode.isDirectory()) {
708        return new FileStatus[]{createFileStatus(srcs, targetNode)};
709      }
710      List<INode> contents = ((INodeDirectory)targetNode).getChildren();
711      FileStatus listing[] = new FileStatus[contents.size()];
712      if(! srcs.endsWith(Path.SEPARATOR))
713        srcs += Path.SEPARATOR;
714      int i = 0;
715      for (INode cur : contents) {
716        listing[i] = createFileStatus(srcs+cur.getLocalName(), cur);
717        i++;
718      }
719      return listing;
720    }
721  }
722
723  /** Get the file info for a specific file.
724   * @param src The string representation of the path to the file
725   * @return object containing information regarding the file
726   *         or null if file not found
727   */
728  FileStatus getFileInfo(String src) {
729    String srcs = normalizePath(src);
730    synchronized (rootDir) {
731      INode targetNode = rootDir.getNode(srcs);
732      if (targetNode == null) {
733        return null;
734      }
735      else {
736        return createFileStatus(srcs, targetNode);
737      }
738    }
739  }
740
741  /**
742   * Get the blocks associated with the file.
743   */
744  Block[] getFileBlocks(String src) {
745    waitForReady();
746    synchronized (rootDir) {
747      INode targetNode = rootDir.getNode(src);
748      if (targetNode == null)
749        return null;
750      if(targetNode.isDirectory())
751        return null;
752      return ((INodeFile)targetNode).getBlocks();
753    }
754  }
755
756  /**
757   * Get {@link INode} associated with the file.
758   */
759  INodeFile getFileINode(String src) {
760    synchronized (rootDir) {
761      INode inode = rootDir.getNode(src);
762      if (inode == null || inode.isDirectory())
763        return null;
764      return (INodeFile)inode;
765    }
766  }
767
768  /**
769   * Retrieve the existing INodes along the given path.
770   *
771   * @param path the path to explore
772   * @return INodes array containing the existing INodes in the order they
773   *         appear when following the path from the root INode to the
774   *         deepest INodes. The array size will be the number of expected
775   *         components in the path, and non existing components will be
776   *         filled with null
777   *         
778   * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
779   */
780  INode[] getExistingPathINodes(String path) {
781    synchronized (rootDir){
782      return rootDir.getExistingPathINodes(path);
783    }
784  }
785 
786  /**
787   * Check whether the filepath could be created
788   */
789  boolean isValidToCreate(String src) {
790    String srcs = normalizePath(src);
791    synchronized (rootDir) {
792      if (srcs.startsWith("/") && 
793          !srcs.endsWith("/") && 
794          rootDir.getNode(srcs) == null) {
795        return true;
796      } else {
797        return false;
798      }
799    }
800  }
801
802  /**
803   * Check whether the path specifies a directory
804   */
805  boolean isDir(String src) {
806    synchronized (rootDir) {
807      INode node = rootDir.getNode(normalizePath(src));
808      return node != null && node.isDirectory();
809    }
810  }
811
812  /** Updates namespace and diskspace consumed for all
813   * directories until the parent directory of file represented by path.
814   *
815   * @param path path for the file.
816   * @param nsDelta the delta change of namespace
817   * @param dsDelta the delta change of diskspace
818   * @throws QuotaExceededException if the new count violates any quota limit
819   * @throws FileNotFound if path does not exist.
820   */
821  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
822                                         throws QuotaExceededException,
823                                                FileNotFoundException {
824    synchronized (rootDir) {
825      INode[] inodes = rootDir.getExistingPathINodes(path);
826      int len = inodes.length;
827      if (inodes[len - 1] == null) {
828        throw new FileNotFoundException(path + 
829                                        " does not exist under rootDir.");
830      }
831      updateCount(inodes, len-1, nsDelta, dsDelta);
832    }
833  }
834 
835  /** update count of each inode with quota
836   *
837   * @param inodes an array of inodes on a path
838   * @param numOfINodes the number of inodes to update starting from index 0
839   * @param nsDelta the delta change of namespace
840   * @param dsDelta the delta change of diskspace
841   * @throws QuotaExceededException if the new count violates any quota limit
842   */
843  private void updateCount(INode[] inodes, int numOfINodes, 
844                           long nsDelta, long dsDelta)
845                           throws QuotaExceededException {
846    if (!ready) {
847      //still intializing. do not check or update quotas.
848      return;
849    }
850    if (numOfINodes>inodes.length) {
851      numOfINodes = inodes.length;
852    }
853    // check existing components in the path 
854    int i=0;
855    try {
856      for(; i < numOfINodes; i++) {
857        if (inodes[i].isQuotaSet()) { // a directory with quota
858          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
859          node.updateNumItemsInTree(nsDelta, dsDelta);
860        }
861      }
862    } catch (QuotaExceededException e) {
863      e.setPathName(getFullPathName(inodes, i));
864      // undo updates
865      for( ; i-- > 0; ) {
866        try {
867          if (inodes[i].isQuotaSet()) { // a directory with quota
868            INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
869            node.updateNumItemsInTree(-nsDelta, -dsDelta);
870          }
871        } catch (IOException ingored) {
872        }
873      }
874      throw e;
875    }
876  }
877 
878  /** Return the name of the path represented by inodes at [0, pos] */
879  private static String getFullPathName(INode[] inodes, int pos) {
880    StringBuilder fullPathName = new StringBuilder();
881    for (int i=1; i<=pos; i++) {
882      fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
883    }
884    return fullPathName.toString();
885  }
886 
887  /**
888   * Create a directory
889   * If ancestor directories do not exist, automatically create them.
890
891   * @param src string representation of the path to the directory
892   * @param permissions the permission of the directory
893   * @param inheritPermission if the permission of the directory should inherit
894   *                          from its parent or not. The automatically created
895   *                          ones always inherit its permission from its parent
896   * @param now creation time
897   * @return true if the operation succeeds false otherwise
898   * @throws FileNotFoundException if an ancestor or itself is a file
899   * @throws QuotaExceededException if directory creation violates
900   *                                any quota limit
901   */
902  boolean mkdirs(String src, PermissionStatus permissions,
903      boolean inheritPermission, long now)
904      throws FileNotFoundException, QuotaExceededException {
905    src = normalizePath(src);
906    String[] names = INode.getPathNames(src);
907    byte[][] components = INode.getPathComponents(names);
908    INode[] inodes = new INode[components.length];
909
910    synchronized(rootDir) {
911      rootDir.getExistingPathINodes(components, inodes);
912
913      // find the index of the first null in inodes[]
914      StringBuilder pathbuilder = new StringBuilder();
915      int i = 1;
916      for(; i < inodes.length && inodes[i] != null; i++) {
917        pathbuilder.append(Path.SEPARATOR + names[i]);
918        if (!inodes[i].isDirectory()) {
919          throw new FileNotFoundException("Parent path is not a directory: "
920              + pathbuilder);
921        }
922      }
923
924      // create directories beginning from the first null index
925      for(; i < inodes.length; i++) {
926        pathbuilder.append(Path.SEPARATOR + names[i]);
927        String cur = pathbuilder.toString();
928        unprotectedMkdir(inodes, i, components[i], permissions,
929            inheritPermission || i != components.length-1, now);
930        if (inodes[i] == null) {
931          return false;
932        }
933        // Directory creation also count towards FilesCreated
934        // to match count of files_deleted metric.
935        if (namesystem != null)
936          NameNode.getNameNodeMetrics().numFilesCreated.inc();
937        fsImage.getEditLog().logMkDir(cur, inodes[i]);
938        NameNode.stateChangeLog.debug(
939            "DIR* FSDirectory.mkdirs: created directory " + cur);
940      }
941    }
942    return true;
943  }
944
945  /**
946   */
947  INode unprotectedMkdir(String src, PermissionStatus permissions,
948                          long timestamp) throws QuotaExceededException {
949    byte[][] components = INode.getPathComponents(src);
950    INode[] inodes = new INode[components.length];
951    synchronized (rootDir) {
952      rootDir.getExistingPathINodes(components, inodes);
953      unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
954          permissions, false, timestamp);
955      return inodes[inodes.length-1];
956    }
957  }
958
959  /** create a directory at index pos.
960   * The parent path to the directory is at [0, pos-1].
961   * All ancestors exist. Newly created one stored at index pos.
962   */
963  private void unprotectedMkdir(INode[] inodes, int pos,
964      byte[] name, PermissionStatus permission, boolean inheritPermission,
965      long timestamp) throws QuotaExceededException {
966    inodes[pos] = addChild(inodes, pos, 
967        new INodeDirectory(name, permission, timestamp),
968        inheritPermission );
969  }
970 
971  /** Add a node child to the namespace. The full path name of the node is src.
972   * childDiskspace should be -1, if unknown.
973   * QuotaExceededException is thrown if it violates quota limit */
974  private <T extends INode> T addNode(String src, T child, 
975        long childDiskspace, boolean inheritPermission) 
976  throws QuotaExceededException {
977    byte[][] components = INode.getPathComponents(src);
978    child.setLocalName(components[components.length-1]);
979    INode[] inodes = new INode[components.length];
980    synchronized (rootDir) {
981      rootDir.getExistingPathINodes(components, inodes);
982      return addChild(inodes, inodes.length-1, child, childDiskspace,
983                      inheritPermission);
984    }
985  }
986 
987  /** Add a node child to the inodes at index pos.
988   * Its ancestors are stored at [0, pos-1].
989   * QuotaExceededException is thrown if it violates quota limit */
990  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
991      boolean inheritPermission) throws QuotaExceededException {
992    return addChild(pathComponents, pos, child, -1, inheritPermission);
993  }
994 
995  /** Add a node child to the inodes at index pos.
996   * Its ancestors are stored at [0, pos-1].
997   * QuotaExceededException is thrown if it violates quota limit */
998  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
999       long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
1000    INode.DirCounts counts = new INode.DirCounts();
1001    child.spaceConsumedInTree(counts);
1002    if (childDiskspace < 0) {
1003      childDiskspace = counts.getDsCount();
1004    }
1005    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
1006    T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
1007        child, inheritPermission);
1008    if (addedNode == null) {
1009      updateCount(pathComponents, pos, 
1010                  -counts.getNsCount(), -childDiskspace);
1011    }
1012    return addedNode;
1013  }
1014 
1015  /** Remove an inode at index pos from the namespace.
1016   * Its ancestors are stored at [0, pos-1].
1017   * Count of each ancestor with quota is also updated.
1018   * Return the removed node; null if the removal fails.
1019   */
1020  private INode removeChild(INode[] pathComponents, int pos)
1021  throws QuotaExceededException {
1022    INode removedNode = 
1023      ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
1024    if (removedNode != null) {
1025      INode.DirCounts counts = new INode.DirCounts();
1026      removedNode.spaceConsumedInTree(counts);
1027      updateCount(pathComponents, pos,
1028                  -counts.getNsCount(), -counts.getDsCount());
1029    }
1030    return removedNode;
1031  }
1032 
1033  /**
1034   */
1035  String normalizePath(String src) {
1036    if (src.length() > 1 && src.endsWith("/")) {
1037      src = src.substring(0, src.length() - 1);
1038    }
1039    return src;
1040  }
1041
1042  ContentSummary getContentSummary(String src) throws IOException {
1043    String srcs = normalizePath(src);
1044    synchronized (rootDir) {
1045      INode targetNode = rootDir.getNode(srcs);
1046      if (targetNode == null) {
1047        throw new FileNotFoundException("File does not exist: " + srcs);
1048      }
1049      else {
1050        return targetNode.computeContentSummary();
1051      }
1052    }
1053  }
1054
1055  /** Update the count of each directory with quota in the namespace
1056   * A directory's count is defined as the total number inodes in the tree
1057   * rooted at the directory.
1058   *
1059   * This is an update of existing state of the filesystem and does not
1060   * throw QuotaExceededException.
1061   */
1062  void updateCountForINodeWithQuota() {
1063    updateCountForINodeWithQuota(rootDir, new INode.DirCounts(), 
1064                                 new ArrayList<INode>(50));
1065  }
1066 
1067  /**
1068   * Update the count of the directory if it has a quota and return the count
1069   *
1070   * This does not throw a QuotaExceededException. This is just an update
1071   * of of existing state and throwing QuotaExceededException does not help
1072   * with fixing the state, if there is a problem.
1073   *
1074   * @param dir the root of the tree that represents the directory
1075   * @param counters counters for name space and disk space
1076   * @param nodesInPath INodes for the each of components in the path.
1077   * @return the size of the tree
1078   */
1079  private static void updateCountForINodeWithQuota(INodeDirectory dir, 
1080                                               INode.DirCounts counts,
1081                                               ArrayList<INode> nodesInPath) {
1082    long parentNamespace = counts.nsCount;
1083    long parentDiskspace = counts.dsCount;
1084   
1085    counts.nsCount = 1L;//for self. should not call node.spaceConsumedInTree()
1086    counts.dsCount = 0L;
1087   
1088    /* We don't need nodesInPath if we could use 'parent' field in
1089     * INode. using 'parent' is not currently recommended. */
1090    nodesInPath.add(dir);
1091
1092    for (INode child : dir.getChildren()) {
1093      if (child.isDirectory()) {
1094        updateCountForINodeWithQuota((INodeDirectory)child, 
1095                                     counts, nodesInPath);
1096      } else { // reduce recursive calls
1097        counts.nsCount += 1;
1098        counts.dsCount += ((INodeFile)child).diskspaceConsumed();
1099      }
1100    }
1101     
1102    if (dir.isQuotaSet()) {
1103      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(counts.nsCount,
1104                                                      counts.dsCount);
1105
1106      // check if quota is violated for some reason.
1107      if ((dir.getNsQuota() >= 0 && counts.nsCount > dir.getNsQuota()) ||
1108          (dir.getDsQuota() >= 0 && counts.dsCount > dir.getDsQuota())) {
1109
1110        // can only happen because of a software bug. the bug should be fixed.
1111        StringBuilder path = new StringBuilder(512);
1112        for (INode n : nodesInPath) {
1113          path.append('/');
1114          path.append(n.getLocalName());
1115        }
1116       
1117        NameNode.LOG.warn("Quota violation in image for " + path + 
1118                          " (Namespace quota : " + dir.getNsQuota() +
1119                          " consumed : " + counts.nsCount + ")" +
1120                          " (Diskspace quota : " + dir.getDsQuota() +
1121                          " consumed : " + counts.dsCount + ").");
1122      }           
1123    }
1124     
1125    // pop
1126    nodesInPath.remove(nodesInPath.size()-1);
1127   
1128    counts.nsCount += parentNamespace;
1129    counts.dsCount += parentDiskspace;
1130  }
1131 
1132  /**
1133   * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
1134   * Sets quota for for a directory.
1135   * @returns INodeDirectory if any of the quotas have changed. null other wise.
1136   * @throws FileNotFoundException if the path does not exist or is a file
1137   * @throws QuotaExceededException if the directory tree size is
1138   *                                greater than the given quota
1139   */
1140  INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota) 
1141                       throws FileNotFoundException, QuotaExceededException {
1142    // sanity check
1143    if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET && 
1144         nsQuota < FSConstants.QUOTA_RESET) || 
1145        (dsQuota < 0 && dsQuota != FSConstants.QUOTA_DONT_SET && 
1146          dsQuota < FSConstants.QUOTA_RESET)) {
1147      throw new IllegalArgumentException("Illegal value for nsQuota or " +
1148                                         "dsQuota : " + nsQuota + " and " +
1149                                         dsQuota);
1150    }
1151   
1152    String srcs = normalizePath(src);
1153    INode[] inodes = rootDir.getExistingPathINodes(src);
1154    INode targetNode = inodes[inodes.length-1];
1155    if (targetNode == null) {
1156      throw new FileNotFoundException("Directory does not exist: " + srcs);
1157    } else if (!targetNode.isDirectory()) {
1158      throw new FileNotFoundException("Cannot set quota on a file: " + srcs); 
1159    } else { // a directory inode
1160      INodeDirectory dirNode = (INodeDirectory)targetNode;
1161      long oldNsQuota = dirNode.getNsQuota();
1162      long oldDsQuota = dirNode.getDsQuota();
1163      if (nsQuota == FSConstants.QUOTA_DONT_SET) {
1164        nsQuota = oldNsQuota;
1165      }
1166      if (dsQuota == FSConstants.QUOTA_DONT_SET) {
1167        dsQuota = oldDsQuota;
1168      }       
1169
1170      if (dirNode instanceof INodeDirectoryWithQuota) { 
1171        // a directory with quota; so set the quota to the new value
1172        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
1173      } else {
1174        // a non-quota directory; so replace it with a directory with quota
1175        INodeDirectoryWithQuota newNode = 
1176          new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
1177        // non-root directory node; parent != null
1178        INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
1179        dirNode = newNode;
1180        parent.replaceChild(newNode);
1181      }
1182      return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
1183    }
1184  }
1185 
1186  /**
1187   * See {@link ClientProtocol#setQuota(String, long, long)} for the
1188   * contract.
1189   * @see #unprotectedSetQuota(String, long, long)
1190   */
1191  void setQuota(String src, long nsQuota, long dsQuota) 
1192                throws FileNotFoundException, QuotaExceededException {
1193    synchronized (rootDir) {   
1194      INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
1195      if (dir != null) {
1196        fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
1197                                         dir.getDsQuota());
1198      }
1199    }
1200  }
1201 
1202  long totalInodes() {
1203    synchronized (rootDir) {
1204      return rootDir.numItemsInTree();
1205    }
1206  }
1207
1208  /**
1209   * Sets the access time on the file. Logs it in the transaction log
1210   */
1211  void setTimes(String src, INodeFile inode, long mtime, long atime, boolean force) 
1212                                                        throws IOException {
1213    if (unprotectedSetTimes(src, inode, mtime, atime, force)) {
1214      fsImage.getEditLog().logTimes(src, mtime, atime);
1215    }
1216  }
1217
1218  boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
1219                              throws IOException {
1220    INodeFile inode = getFileINode(src);
1221    return unprotectedSetTimes(src, inode, mtime, atime, force);
1222  }
1223
1224  private boolean unprotectedSetTimes(String src, INodeFile inode, long mtime,
1225                                      long atime, boolean force) throws IOException {
1226    boolean status = false;
1227    if (mtime != -1) {
1228      inode.setModificationTimeForce(mtime);
1229      status = true;
1230    }
1231    if (atime != -1) {
1232      long inodeTime = inode.getAccessTime();
1233
1234      // if the last access time update was within the last precision interval, then
1235      // no need to store access time
1236      if (atime <= inodeTime + namesystem.getAccessTimePrecision() && !force) {
1237        status =  false;
1238      } else {
1239        inode.setAccessTime(atime);
1240        status = true;
1241      }
1242    } 
1243    return status;
1244  }
1245 
1246  /**
1247   * Create FileStatus by file INode
1248   */
1249   private static FileStatus createFileStatus(String path, INode node) {
1250    // length is zero for directories
1251    return new FileStatus(node.isDirectory() ? 0 : node.computeContentSummary().getLength(), 
1252        node.isDirectory(), 
1253        node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
1254        node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
1255        node.getModificationTime(),
1256        node.getAccessTime(),
1257        node.getFsPermission(),
1258        node.getUserName(),
1259        node.getGroupName(),
1260        new Path(path));
1261  }
1262}
Note: See TracBrowser for help on using the repository browser.