source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 17.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs.s3native;
20
21import java.io.BufferedOutputStream;
22import java.io.File;
23import java.io.FileNotFoundException;
24import java.io.FileOutputStream;
25import java.io.IOException;
26import java.io.InputStream;
27import java.io.OutputStream;
28import java.net.URI;
29import java.security.DigestOutputStream;
30import java.security.MessageDigest;
31import java.security.NoSuchAlgorithmException;
32import java.util.ArrayList;
33import java.util.HashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Set;
37import java.util.TreeSet;
38import java.util.concurrent.TimeUnit;
39
40import org.apache.commons.logging.Log;
41import org.apache.commons.logging.LogFactory;
42import org.apache.hadoop.conf.Configuration;
43import org.apache.hadoop.fs.BufferedFSInputStream;
44import org.apache.hadoop.fs.FSDataInputStream;
45import org.apache.hadoop.fs.FSDataOutputStream;
46import org.apache.hadoop.fs.FSInputStream;
47import org.apache.hadoop.fs.FileStatus;
48import org.apache.hadoop.fs.FileSystem;
49import org.apache.hadoop.fs.Path;
50import org.apache.hadoop.fs.permission.FsPermission;
51import org.apache.hadoop.fs.s3.S3Exception;
52import org.apache.hadoop.io.retry.RetryPolicies;
53import org.apache.hadoop.io.retry.RetryPolicy;
54import org.apache.hadoop.io.retry.RetryProxy;
55import org.apache.hadoop.util.Progressable;
56
57/**
58 * <p>
59 * A {@link FileSystem} for reading and writing files stored on
60 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
61 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
62 * stores files on S3 in their
63 * native form so they can be read by other S3 tools.
64 * </p>
65 * @see org.apache.hadoop.fs.s3.S3FileSystem
66 */
67public class NativeS3FileSystem extends FileSystem {
68 
69  public static final Log LOG = 
70    LogFactory.getLog(NativeS3FileSystem.class);
71 
72  private static final String FOLDER_SUFFIX = "_$folder$";
73  private static final long MAX_S3_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
74  static final String PATH_DELIMITER = Path.SEPARATOR;
75  private static final int S3_MAX_LISTING_LENGTH = 1000;
76 
77  private class NativeS3FsInputStream extends FSInputStream {
78   
79    private InputStream in;
80    private final String key;
81    private long pos = 0;
82   
83    public NativeS3FsInputStream(InputStream in, String key) {
84      this.in = in;
85      this.key = key;
86    }
87   
88    public synchronized int read() throws IOException {
89      int result = in.read();
90      if (result != -1) {
91        pos++;
92      }
93      return result;
94    }
95    public synchronized int read(byte[] b, int off, int len)
96      throws IOException {
97     
98      int result = in.read(b, off, len);
99      if (result > 0) {
100        pos += result;
101      }
102      return result;
103    }
104
105    public void close() throws IOException {
106      in.close();
107    }
108
109    public synchronized void seek(long pos) throws IOException {
110      in.close();
111      in = store.retrieve(key, pos);
112      this.pos = pos;
113    }
114    public synchronized long getPos() throws IOException {
115      return pos;
116    }
117    public boolean seekToNewSource(long targetPos) throws IOException {
118      return false;
119    }
120  }
121 
122  private class NativeS3FsOutputStream extends OutputStream {
123   
124    private Configuration conf;
125    private String key;
126    private File backupFile;
127    private OutputStream backupStream;
128    private MessageDigest digest;
129    private boolean closed;
130   
131    public NativeS3FsOutputStream(Configuration conf,
132        NativeFileSystemStore store, String key, Progressable progress,
133        int bufferSize) throws IOException {
134      this.conf = conf;
135      this.key = key;
136      this.backupFile = newBackupFile();
137      try {
138        this.digest = MessageDigest.getInstance("MD5");
139        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
140            new FileOutputStream(backupFile), this.digest));
141      } catch (NoSuchAlgorithmException e) {
142        LOG.warn("Cannot load MD5 digest algorithm," +
143            "skipping message integrity check.", e);
144        this.backupStream = new BufferedOutputStream(
145            new FileOutputStream(backupFile));
146      }
147    }
148
149    private File newBackupFile() throws IOException {
150      File dir = new File(conf.get("fs.s3.buffer.dir"));
151      if (!dir.mkdirs() && !dir.exists()) {
152        throw new IOException("Cannot create S3 buffer directory: " + dir);
153      }
154      File result = File.createTempFile("output-", ".tmp", dir);
155      result.deleteOnExit();
156      return result;
157    }
158   
159    @Override
160    public void flush() throws IOException {
161      backupStream.flush();
162    }
163   
164    @Override
165    public synchronized void close() throws IOException {
166      if (closed) {
167        return;
168      }
169
170      backupStream.close();
171     
172      try {
173        byte[] md5Hash = digest == null ? null : digest.digest();
174        store.storeFile(key, backupFile, md5Hash);
175      } finally {
176        if (!backupFile.delete()) {
177          LOG.warn("Could not delete temporary s3n file: " + backupFile);
178        }
179        super.close();
180        closed = true;
181      } 
182
183    }
184
185    @Override
186    public void write(int b) throws IOException {
187      backupStream.write(b);
188    }
189
190    @Override
191    public void write(byte[] b, int off, int len) throws IOException {
192      backupStream.write(b, off, len);
193    }
194   
195   
196  }
197 
198  private URI uri;
199  private NativeFileSystemStore store;
200  private Path workingDir;
201 
202  public NativeS3FileSystem() {
203    // set store in initialize()
204  }
205 
206  public NativeS3FileSystem(NativeFileSystemStore store) {
207    this.store = store;
208  }
209 
210  @Override
211  public void initialize(URI uri, Configuration conf) throws IOException {
212    super.initialize(uri, conf);
213    if (store == null) {
214      store = createDefaultStore(conf);
215    }
216    store.initialize(uri, conf);
217    setConf(conf);
218    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
219    this.workingDir =
220      new Path("/user", System.getProperty("user.name")).makeQualified(this);
221  }
222 
223  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
224    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
225   
226    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
227        conf.getInt("fs.s3.maxRetries", 4),
228        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
229    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
230      new HashMap<Class<? extends Exception>, RetryPolicy>();
231    exceptionToPolicyMap.put(IOException.class, basePolicy);
232    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
233   
234    RetryPolicy methodPolicy = RetryPolicies.retryByException(
235        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
236    Map<String, RetryPolicy> methodNameToPolicyMap =
237      new HashMap<String, RetryPolicy>();
238    methodNameToPolicyMap.put("storeFile", methodPolicy);
239   
240    return (NativeFileSystemStore)
241      RetryProxy.create(NativeFileSystemStore.class, store,
242          methodNameToPolicyMap);
243  }
244 
245  private static String pathToKey(Path path) {
246    if (!path.isAbsolute()) {
247      throw new IllegalArgumentException("Path must be absolute: " + path);
248    }
249    return path.toUri().getPath().substring(1); // remove initial slash
250  }
251 
252  private static Path keyToPath(String key) {
253    return new Path("/" + key);
254  }
255 
256  private Path makeAbsolute(Path path) {
257    if (path.isAbsolute()) {
258      return path;
259    }
260    return new Path(workingDir, path);
261  }
262
263  /** This optional operation is not yet supported. */
264  public FSDataOutputStream append(Path f, int bufferSize,
265      Progressable progress) throws IOException {
266    throw new IOException("Not supported");
267  }
268 
269  @Override
270  public FSDataOutputStream create(Path f, FsPermission permission,
271      boolean overwrite, int bufferSize, short replication, long blockSize,
272      Progressable progress) throws IOException {
273
274    if (exists(f) && !overwrite) {
275      throw new IOException("File already exists:"+f);
276    }
277    Path absolutePath = makeAbsolute(f);
278    String key = pathToKey(absolutePath);
279    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
280        key, progress, bufferSize), statistics);
281  }
282 
283  @Override
284  @Deprecated
285  public boolean delete(Path path) throws IOException {
286    return delete(path, true);
287  }
288
289  @Override
290  public boolean delete(Path f, boolean recursive) throws IOException {
291    FileStatus status;
292    try {
293      status = getFileStatus(f);
294    } catch (FileNotFoundException e) {
295      return false;
296    }
297    Path absolutePath = makeAbsolute(f);
298    String key = pathToKey(absolutePath);
299    if (status.isDir()) {
300      FileStatus[] contents = listStatus(f);
301      if (!recursive && contents.length > 0) {
302        throw new IOException("Directory " + f.toString() + " is not empty.");
303      }
304      for (FileStatus p : contents) {
305        if (!delete(p.getPath(), recursive)) {
306          return false;
307        }
308      }
309      store.delete(key + FOLDER_SUFFIX);
310    } else {
311      store.delete(key);
312    }
313    return true;
314  }
315
316  @Override
317  public FileStatus getFileStatus(Path f) throws IOException {
318   
319    Path absolutePath = makeAbsolute(f);
320    String key = pathToKey(absolutePath);
321   
322    if (key.length() == 0) { // root always exists
323      return newDirectory(absolutePath);
324    }
325   
326    FileMetadata meta = store.retrieveMetadata(key);
327    if (meta != null) {
328      return newFile(meta, absolutePath);
329    }
330    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
331      return newDirectory(absolutePath);
332    }
333   
334    PartialListing listing = store.list(key, 1);
335    if (listing.getFiles().length > 0 ||
336        listing.getCommonPrefixes().length > 0) {
337      return newDirectory(absolutePath);
338    }
339   
340    throw new FileNotFoundException(absolutePath +
341        ": No such file or directory.");
342   
343  }
344
345  @Override
346  public URI getUri() {
347    return uri;
348  }
349
350  /**
351   * <p>
352   * If <code>f</code> is a file, this method will make a single call to S3.
353   * If <code>f</code> is a directory, this method will make a maximum of
354   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
355   * files and directories contained directly in <code>f</code>.
356   * </p>
357   */
358  @Override
359  public FileStatus[] listStatus(Path f) throws IOException {
360
361    Path absolutePath = makeAbsolute(f);
362    String key = pathToKey(absolutePath);
363   
364    if (key.length() > 0) {
365      FileMetadata meta = store.retrieveMetadata(key);
366      if (meta != null) {
367        return new FileStatus[] { newFile(meta, absolutePath) };
368      }
369    }
370   
371    URI pathUri = absolutePath.toUri();
372    Set<FileStatus> status = new TreeSet<FileStatus>();
373    String priorLastKey = null;
374    do {
375      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, 
376          priorLastKey);
377      for (FileMetadata fileMetadata : listing.getFiles()) {
378        Path subpath = keyToPath(fileMetadata.getKey());
379        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
380        if (relativePath.endsWith(FOLDER_SUFFIX)) {
381          status.add(newDirectory(new Path(absolutePath,
382              relativePath.substring(0,
383                  relativePath.indexOf(FOLDER_SUFFIX)))));
384        } else {
385          status.add(newFile(fileMetadata, subpath));
386        }
387      }
388      for (String commonPrefix : listing.getCommonPrefixes()) {
389        Path subpath = keyToPath(commonPrefix);
390        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
391        status.add(newDirectory(new Path(absolutePath, relativePath)));
392      }
393      priorLastKey = listing.getPriorLastKey();
394    } while (priorLastKey != null);
395   
396    if (status.isEmpty() &&
397        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
398      return null;
399    }
400   
401    return status.toArray(new FileStatus[0]);
402  }
403 
404  private FileStatus newFile(FileMetadata meta, Path path) {
405    return new FileStatus(meta.getLength(), false, 1, MAX_S3_FILE_SIZE,
406        meta.getLastModified(), path.makeQualified(this));
407  }
408 
409  private FileStatus newDirectory(Path path) {
410    return new FileStatus(0, true, 1, MAX_S3_FILE_SIZE, 0,
411        path.makeQualified(this));
412  }
413
414  @Override
415  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
416    Path absolutePath = makeAbsolute(f);
417    List<Path> paths = new ArrayList<Path>();
418    do {
419      paths.add(0, absolutePath);
420      absolutePath = absolutePath.getParent();
421    } while (absolutePath != null);
422   
423    boolean result = true;
424    for (Path path : paths) {
425      result &= mkdir(path);
426    }
427    return result;
428  }
429 
430  private boolean mkdir(Path f) throws IOException {
431    try {
432      FileStatus fileStatus = getFileStatus(f);
433      if (!fileStatus.isDir()) {
434        throw new IOException(String.format(
435            "Can't make directory for path %s since it is a file.", f));
436
437      }
438    } catch (FileNotFoundException e) {
439      String key = pathToKey(f) + FOLDER_SUFFIX;
440      store.storeEmptyFile(key);   
441    }
442    return true;
443  }
444
445  @Override
446  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
447    if (!exists(f)) {
448      throw new FileNotFoundException(f.toString());
449    }
450    Path absolutePath = makeAbsolute(f);
451    String key = pathToKey(absolutePath);
452    return new FSDataInputStream(new BufferedFSInputStream(
453        new NativeS3FsInputStream(store.retrieve(key), key), bufferSize));
454  }
455 
456  // rename() and delete() use this method to ensure that the parent directory
457  // of the source does not vanish.
458  private void createParent(Path path) throws IOException {
459      Path parent = path.getParent();
460      if (parent != null) {
461          String key = pathToKey(makeAbsolute(parent));
462          if (key.length() > 0) {
463              store.storeEmptyFile(key + FOLDER_SUFFIX);
464          }
465      }
466  }
467 
468  private boolean existsAndIsFile(Path f) throws IOException {
469   
470    Path absolutePath = makeAbsolute(f);
471    String key = pathToKey(absolutePath);
472   
473    if (key.length() == 0) {
474        return false;
475    }
476   
477    FileMetadata meta = store.retrieveMetadata(key);
478    if (meta != null) {
479        // S3 object with given key exists, so this is a file
480        return true;
481    }
482   
483    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
484        // Signifies empty directory
485        return false;
486    }
487   
488    PartialListing listing = store.list(key, 1, null);
489    if (listing.getFiles().length > 0 ||
490        listing.getCommonPrefixes().length > 0) {
491        // Non-empty directory
492        return false;
493    }
494   
495    throw new FileNotFoundException(absolutePath +
496        ": No such file or directory");
497}
498
499
500  @Override
501  public boolean rename(Path src, Path dst) throws IOException {
502
503    String srcKey = pathToKey(makeAbsolute(src));
504
505    if (srcKey.length() == 0) {
506      // Cannot rename root of file system
507      return false;
508    }
509
510    // Figure out the final destination
511    String dstKey;
512    try {
513      boolean dstIsFile = existsAndIsFile(dst);
514      if (dstIsFile) {
515        // Attempting to overwrite a file using rename()
516        return false;
517      } else {
518        // Move to within the existent directory
519        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
520      }
521    } catch (FileNotFoundException e) {
522      // dst doesn't exist, so we can proceed
523      dstKey = pathToKey(makeAbsolute(dst));
524      try {
525        if (!getFileStatus(dst.getParent()).isDir()) {
526          return false; // parent dst is a file
527        }
528      } catch (FileNotFoundException ex) {
529        return false; // parent dst does not exist
530      }
531    }
532
533    try {
534      boolean srcIsFile = existsAndIsFile(src);
535      if (srcIsFile) {
536        store.rename(srcKey, dstKey);
537      } else {
538        // Move the folder object
539        store.delete(srcKey + FOLDER_SUFFIX);
540        store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
541
542        // Move everything inside the folder
543        String priorLastKey = null;
544        do {
545          PartialListing listing = store.listAll(srcKey, S3_MAX_LISTING_LENGTH,
546              priorLastKey);
547          for (FileMetadata file : listing.getFiles()) {
548            store.rename(file.getKey(), dstKey
549                + file.getKey().substring(srcKey.length()));
550          }
551          priorLastKey = listing.getPriorLastKey();
552        } while (priorLastKey != null);
553      }
554
555      createParent(src);
556      return true;
557
558    } catch (FileNotFoundException e) {
559      // Source file does not exist;
560      return false;
561    }
562  }
563
564
565  /**
566   * Set the working directory to the given directory.
567   */
568  @Override
569  public void setWorkingDirectory(Path newDir) {
570    workingDir = newDir;
571  }
572 
573  @Override
574  public Path getWorkingDirectory() {
575    return workingDir;
576  }
577
578}
Note: See TracBrowser for help on using the repository browser.