source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.1 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs.s3;
20
21import java.io.FileNotFoundException;
22import java.io.IOException;
23import java.net.URI;
24import java.util.ArrayList;
25import java.util.HashMap;
26import java.util.List;
27import java.util.Map;
28import java.util.concurrent.TimeUnit;
29
30import org.apache.hadoop.conf.Configuration;
31import org.apache.hadoop.fs.FSDataInputStream;
32import org.apache.hadoop.fs.FSDataOutputStream;
33import org.apache.hadoop.fs.FileStatus;
34import org.apache.hadoop.fs.FileSystem;
35import org.apache.hadoop.fs.Path;
36import org.apache.hadoop.fs.permission.FsPermission;
37import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
38import org.apache.hadoop.io.retry.RetryPolicies;
39import org.apache.hadoop.io.retry.RetryPolicy;
40import org.apache.hadoop.io.retry.RetryProxy;
41import org.apache.hadoop.util.Progressable;
42
43/**
44 * <p>
45 * A block-based {@link FileSystem} backed by
46 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
47 * </p>
48 * @see NativeS3FileSystem
49 */
50public class S3FileSystem extends FileSystem {
51
52  private URI uri;
53
54  private FileSystemStore store;
55
56  private Path workingDir;
57
58  public S3FileSystem() {
59    // set store in initialize()
60  }
61 
62  public S3FileSystem(FileSystemStore store) {
63    this.store = store;
64  }
65
66  @Override
67  public URI getUri() {
68    return uri;
69  }
70
71  @Override
72  public void initialize(URI uri, Configuration conf) throws IOException {
73    super.initialize(uri, conf);
74    if (store == null) {
75      store = createDefaultStore(conf);
76    }
77    store.initialize(uri, conf);
78    setConf(conf);
79    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());   
80    this.workingDir =
81      new Path("/user", System.getProperty("user.name")).makeQualified(this);
82  } 
83
84  private static FileSystemStore createDefaultStore(Configuration conf) {
85    FileSystemStore store = new Jets3tFileSystemStore();
86   
87    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
88                                                                               conf.getInt("fs.s3.maxRetries", 4),
89                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
90    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
91      new HashMap<Class<? extends Exception>, RetryPolicy>();
92    exceptionToPolicyMap.put(IOException.class, basePolicy);
93    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
94   
95    RetryPolicy methodPolicy = RetryPolicies.retryByException(
96                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
97    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
98    methodNameToPolicyMap.put("storeBlock", methodPolicy);
99    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
100   
101    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
102                                               store, methodNameToPolicyMap);
103  }
104 
105  @Override
106  public String getName() {
107    return getUri().toString();
108  }
109
110  @Override
111  public Path getWorkingDirectory() {
112    return workingDir;
113  }
114
115  @Override
116  public void setWorkingDirectory(Path dir) {
117    workingDir = makeAbsolute(dir);
118  }
119
120  private Path makeAbsolute(Path path) {
121    if (path.isAbsolute()) {
122      return path;
123    }
124    return new Path(workingDir, path);
125  }
126
127  /**
128   * @param permission Currently ignored.
129   */
130  @Override
131  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
132    Path absolutePath = makeAbsolute(path);
133    List<Path> paths = new ArrayList<Path>();
134    do {
135      paths.add(0, absolutePath);
136      absolutePath = absolutePath.getParent();
137    } while (absolutePath != null);
138   
139    boolean result = true;
140    for (Path p : paths) {
141      result &= mkdir(p);
142    }
143    return result;
144  }
145 
146  private boolean mkdir(Path path) throws IOException {
147    Path absolutePath = makeAbsolute(path);
148    INode inode = store.retrieveINode(absolutePath);
149    if (inode == null) {
150      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
151    } else if (inode.isFile()) {
152      throw new IOException(String.format(
153          "Can't make directory for path %s since it is a file.",
154          absolutePath));
155    }
156    return true;
157  }
158
159  @Override
160  public boolean isFile(Path path) throws IOException {
161    INode inode = store.retrieveINode(makeAbsolute(path));
162    if (inode == null) {
163      return false;
164    }
165    return inode.isFile();
166  }
167
168  private INode checkFile(Path path) throws IOException {
169    INode inode = store.retrieveINode(makeAbsolute(path));
170    if (inode == null) {
171      throw new IOException("No such file.");
172    }
173    if (inode.isDirectory()) {
174      throw new IOException("Path " + path + " is a directory.");
175    }
176    return inode;
177  }
178
179  @Override
180  public FileStatus[] listStatus(Path f) throws IOException {
181    Path absolutePath = makeAbsolute(f);
182    INode inode = store.retrieveINode(absolutePath);
183    if (inode == null) {
184      return null;
185    }
186    if (inode.isFile()) {
187      return new FileStatus[] {
188        new S3FileStatus(f.makeQualified(this), inode)
189      };
190    }
191    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
192    for (Path p : store.listSubPaths(absolutePath)) {
193      ret.add(getFileStatus(p.makeQualified(this)));
194    }
195    return ret.toArray(new FileStatus[0]);
196  }
197
198  /** This optional operation is not yet supported. */
199  public FSDataOutputStream append(Path f, int bufferSize,
200      Progressable progress) throws IOException {
201    throw new IOException("Not supported");
202  }
203
204  /**
205   * @param permission Currently ignored.
206   */
207  @Override
208  public FSDataOutputStream create(Path file, FsPermission permission,
209      boolean overwrite, int bufferSize,
210      short replication, long blockSize, Progressable progress)
211    throws IOException {
212
213    INode inode = store.retrieveINode(makeAbsolute(file));
214    if (inode != null) {
215      if (overwrite) {
216        delete(file);
217      } else {
218        throw new IOException("File already exists: " + file);
219      }
220    } else {
221      Path parent = file.getParent();
222      if (parent != null) {
223        if (!mkdirs(parent)) {
224          throw new IOException("Mkdirs failed to create " + parent.toString());
225        }
226      }     
227    }
228    return new FSDataOutputStream
229        (new S3OutputStream(getConf(), store, makeAbsolute(file),
230                            blockSize, progress, bufferSize),
231         statistics);
232  }
233
234  @Override
235  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
236    INode inode = checkFile(path);
237    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
238                                                   statistics));
239  }
240
241  @Override
242  public boolean rename(Path src, Path dst) throws IOException {
243    Path absoluteSrc = makeAbsolute(src);
244    INode srcINode = store.retrieveINode(absoluteSrc);
245    if (srcINode == null) {
246      // src path doesn't exist
247      return false; 
248    }
249    Path absoluteDst = makeAbsolute(dst);
250    INode dstINode = store.retrieveINode(absoluteDst);
251    if (dstINode != null && dstINode.isDirectory()) {
252      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
253      dstINode = store.retrieveINode(absoluteDst);
254    }
255    if (dstINode != null) {
256      // dst path already exists - can't overwrite
257      return false;
258    }
259    Path dstParent = absoluteDst.getParent();
260    if (dstParent != null) {
261      INode dstParentINode = store.retrieveINode(dstParent);
262      if (dstParentINode == null || dstParentINode.isFile()) {
263        // dst parent doesn't exist or is a file
264        return false;
265      }
266    }
267    return renameRecursive(absoluteSrc, absoluteDst);
268  }
269 
270  private boolean renameRecursive(Path src, Path dst) throws IOException {
271    INode srcINode = store.retrieveINode(src);
272    store.storeINode(dst, srcINode);
273    store.deleteINode(src);
274    if (srcINode.isDirectory()) {
275      for (Path oldSrc : store.listDeepSubPaths(src)) {
276        INode inode = store.retrieveINode(oldSrc);
277        if (inode == null) {
278          return false;
279        }
280        String oldSrcPath = oldSrc.toUri().getPath();
281        String srcPath = src.toUri().getPath();
282        String dstPath = dst.toUri().getPath();
283        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
284        store.storeINode(newDst, inode);
285        store.deleteINode(oldSrc);
286      }
287    }
288    return true;
289  }
290
291  public boolean delete(Path path, boolean recursive) throws IOException {
292   Path absolutePath = makeAbsolute(path);
293   INode inode = store.retrieveINode(absolutePath);
294   if (inode == null) {
295     return false;
296   }
297   if (inode.isFile()) {
298     store.deleteINode(absolutePath);
299     for (Block block: inode.getBlocks()) {
300       store.deleteBlock(block);
301     }
302   } else {
303     FileStatus[] contents = listStatus(absolutePath);
304     if (contents == null) {
305       return false;
306     }
307     if ((contents.length !=0) && (!recursive)) {
308       throw new IOException("Directory " + path.toString() 
309           + " is not empty.");
310     }
311     for (FileStatus p:contents) {
312       if (!delete(p.getPath(), recursive)) {
313         return false;
314       }
315     }
316     store.deleteINode(absolutePath);
317   }
318   return true;
319  }
320 
321  @Override
322  @Deprecated
323  public boolean delete(Path path) throws IOException {
324    return delete(path, true);
325  }
326
327  /**
328   * FileStatus for S3 file systems.
329   */
330  @Override
331  public FileStatus getFileStatus(Path f)  throws IOException {
332    INode inode = store.retrieveINode(makeAbsolute(f));
333    if (inode == null) {
334      throw new FileNotFoundException(f + ": No such file or directory.");
335    }
336    return new S3FileStatus(f.makeQualified(this), inode);
337  }
338
339  // diagnostic methods
340
341  void dump() throws IOException {
342    store.dump();
343  }
344
345  void purge() throws IOException {
346    store.purge();
347  }
348
349  private static class S3FileStatus extends FileStatus {
350
351    S3FileStatus(Path f, INode inode) throws IOException {
352      super(findLength(inode), inode.isDirectory(), 1,
353            findBlocksize(inode), 0, f);
354    }
355
356    private static long findLength(INode inode) {
357      if (!inode.isDirectory()) {
358        long length = 0L;
359        for (Block block : inode.getBlocks()) {
360          length += block.getLength();
361        }
362        return length;
363      }
364      return 0;
365    }
366
367    private static long findBlocksize(INode inode) {
368      final Block[] ret = inode.getBlocks();
369      return ret == null ? 0L : ret[0].getLength();
370    }
371  }
372}
Note: See TracBrowser for help on using the repository browser.