[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.fs.s3; |
---|
| 20 | |
---|
| 21 | import java.io.FileNotFoundException; |
---|
| 22 | import java.io.IOException; |
---|
| 23 | import java.net.URI; |
---|
| 24 | import java.util.ArrayList; |
---|
| 25 | import java.util.HashMap; |
---|
| 26 | import java.util.List; |
---|
| 27 | import java.util.Map; |
---|
| 28 | import java.util.concurrent.TimeUnit; |
---|
| 29 | |
---|
| 30 | import org.apache.hadoop.conf.Configuration; |
---|
| 31 | import org.apache.hadoop.fs.FSDataInputStream; |
---|
| 32 | import org.apache.hadoop.fs.FSDataOutputStream; |
---|
| 33 | import org.apache.hadoop.fs.FileStatus; |
---|
| 34 | import org.apache.hadoop.fs.FileSystem; |
---|
| 35 | import org.apache.hadoop.fs.Path; |
---|
| 36 | import org.apache.hadoop.fs.permission.FsPermission; |
---|
| 37 | import org.apache.hadoop.fs.s3native.NativeS3FileSystem; |
---|
| 38 | import org.apache.hadoop.io.retry.RetryPolicies; |
---|
| 39 | import org.apache.hadoop.io.retry.RetryPolicy; |
---|
| 40 | import org.apache.hadoop.io.retry.RetryProxy; |
---|
| 41 | import org.apache.hadoop.util.Progressable; |
---|
| 42 | |
---|
| 43 | /** |
---|
| 44 | * <p> |
---|
| 45 | * A block-based {@link FileSystem} backed by |
---|
| 46 | * <a href="http://aws.amazon.com/s3">Amazon S3</a>. |
---|
| 47 | * </p> |
---|
| 48 | * @see NativeS3FileSystem |
---|
| 49 | */ |
---|
| 50 | public class S3FileSystem extends FileSystem { |
---|
| 51 | |
---|
| 52 | private URI uri; |
---|
| 53 | |
---|
| 54 | private FileSystemStore store; |
---|
| 55 | |
---|
| 56 | private Path workingDir; |
---|
| 57 | |
---|
| 58 | public S3FileSystem() { |
---|
| 59 | // set store in initialize() |
---|
| 60 | } |
---|
| 61 | |
---|
| 62 | public S3FileSystem(FileSystemStore store) { |
---|
| 63 | this.store = store; |
---|
| 64 | } |
---|
| 65 | |
---|
| 66 | @Override |
---|
| 67 | public URI getUri() { |
---|
| 68 | return uri; |
---|
| 69 | } |
---|
| 70 | |
---|
| 71 | @Override |
---|
| 72 | public void initialize(URI uri, Configuration conf) throws IOException { |
---|
| 73 | super.initialize(uri, conf); |
---|
| 74 | if (store == null) { |
---|
| 75 | store = createDefaultStore(conf); |
---|
| 76 | } |
---|
| 77 | store.initialize(uri, conf); |
---|
| 78 | setConf(conf); |
---|
| 79 | this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); |
---|
| 80 | this.workingDir = |
---|
| 81 | new Path("/user", System.getProperty("user.name")).makeQualified(this); |
---|
| 82 | } |
---|
| 83 | |
---|
| 84 | private static FileSystemStore createDefaultStore(Configuration conf) { |
---|
| 85 | FileSystemStore store = new Jets3tFileSystemStore(); |
---|
| 86 | |
---|
| 87 | RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( |
---|
| 88 | conf.getInt("fs.s3.maxRetries", 4), |
---|
| 89 | conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); |
---|
| 90 | Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = |
---|
| 91 | new HashMap<Class<? extends Exception>, RetryPolicy>(); |
---|
| 92 | exceptionToPolicyMap.put(IOException.class, basePolicy); |
---|
| 93 | exceptionToPolicyMap.put(S3Exception.class, basePolicy); |
---|
| 94 | |
---|
| 95 | RetryPolicy methodPolicy = RetryPolicies.retryByException( |
---|
| 96 | RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); |
---|
| 97 | Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); |
---|
| 98 | methodNameToPolicyMap.put("storeBlock", methodPolicy); |
---|
| 99 | methodNameToPolicyMap.put("retrieveBlock", methodPolicy); |
---|
| 100 | |
---|
| 101 | return (FileSystemStore) RetryProxy.create(FileSystemStore.class, |
---|
| 102 | store, methodNameToPolicyMap); |
---|
| 103 | } |
---|
| 104 | |
---|
| 105 | @Override |
---|
| 106 | public String getName() { |
---|
| 107 | return getUri().toString(); |
---|
| 108 | } |
---|
| 109 | |
---|
| 110 | @Override |
---|
| 111 | public Path getWorkingDirectory() { |
---|
| 112 | return workingDir; |
---|
| 113 | } |
---|
| 114 | |
---|
| 115 | @Override |
---|
| 116 | public void setWorkingDirectory(Path dir) { |
---|
| 117 | workingDir = makeAbsolute(dir); |
---|
| 118 | } |
---|
| 119 | |
---|
| 120 | private Path makeAbsolute(Path path) { |
---|
| 121 | if (path.isAbsolute()) { |
---|
| 122 | return path; |
---|
| 123 | } |
---|
| 124 | return new Path(workingDir, path); |
---|
| 125 | } |
---|
| 126 | |
---|
| 127 | /** |
---|
| 128 | * @param permission Currently ignored. |
---|
| 129 | */ |
---|
| 130 | @Override |
---|
| 131 | public boolean mkdirs(Path path, FsPermission permission) throws IOException { |
---|
| 132 | Path absolutePath = makeAbsolute(path); |
---|
| 133 | List<Path> paths = new ArrayList<Path>(); |
---|
| 134 | do { |
---|
| 135 | paths.add(0, absolutePath); |
---|
| 136 | absolutePath = absolutePath.getParent(); |
---|
| 137 | } while (absolutePath != null); |
---|
| 138 | |
---|
| 139 | boolean result = true; |
---|
| 140 | for (Path p : paths) { |
---|
| 141 | result &= mkdir(p); |
---|
| 142 | } |
---|
| 143 | return result; |
---|
| 144 | } |
---|
| 145 | |
---|
| 146 | private boolean mkdir(Path path) throws IOException { |
---|
| 147 | Path absolutePath = makeAbsolute(path); |
---|
| 148 | INode inode = store.retrieveINode(absolutePath); |
---|
| 149 | if (inode == null) { |
---|
| 150 | store.storeINode(absolutePath, INode.DIRECTORY_INODE); |
---|
| 151 | } else if (inode.isFile()) { |
---|
| 152 | throw new IOException(String.format( |
---|
| 153 | "Can't make directory for path %s since it is a file.", |
---|
| 154 | absolutePath)); |
---|
| 155 | } |
---|
| 156 | return true; |
---|
| 157 | } |
---|
| 158 | |
---|
| 159 | @Override |
---|
| 160 | public boolean isFile(Path path) throws IOException { |
---|
| 161 | INode inode = store.retrieveINode(makeAbsolute(path)); |
---|
| 162 | if (inode == null) { |
---|
| 163 | return false; |
---|
| 164 | } |
---|
| 165 | return inode.isFile(); |
---|
| 166 | } |
---|
| 167 | |
---|
| 168 | private INode checkFile(Path path) throws IOException { |
---|
| 169 | INode inode = store.retrieveINode(makeAbsolute(path)); |
---|
| 170 | if (inode == null) { |
---|
| 171 | throw new IOException("No such file."); |
---|
| 172 | } |
---|
| 173 | if (inode.isDirectory()) { |
---|
| 174 | throw new IOException("Path " + path + " is a directory."); |
---|
| 175 | } |
---|
| 176 | return inode; |
---|
| 177 | } |
---|
| 178 | |
---|
| 179 | @Override |
---|
| 180 | public FileStatus[] listStatus(Path f) throws IOException { |
---|
| 181 | Path absolutePath = makeAbsolute(f); |
---|
| 182 | INode inode = store.retrieveINode(absolutePath); |
---|
| 183 | if (inode == null) { |
---|
| 184 | return null; |
---|
| 185 | } |
---|
| 186 | if (inode.isFile()) { |
---|
| 187 | return new FileStatus[] { |
---|
| 188 | new S3FileStatus(f.makeQualified(this), inode) |
---|
| 189 | }; |
---|
| 190 | } |
---|
| 191 | ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); |
---|
| 192 | for (Path p : store.listSubPaths(absolutePath)) { |
---|
| 193 | ret.add(getFileStatus(p.makeQualified(this))); |
---|
| 194 | } |
---|
| 195 | return ret.toArray(new FileStatus[0]); |
---|
| 196 | } |
---|
| 197 | |
---|
| 198 | /** This optional operation is not yet supported. */ |
---|
| 199 | public FSDataOutputStream append(Path f, int bufferSize, |
---|
| 200 | Progressable progress) throws IOException { |
---|
| 201 | throw new IOException("Not supported"); |
---|
| 202 | } |
---|
| 203 | |
---|
| 204 | /** |
---|
| 205 | * @param permission Currently ignored. |
---|
| 206 | */ |
---|
| 207 | @Override |
---|
| 208 | public FSDataOutputStream create(Path file, FsPermission permission, |
---|
| 209 | boolean overwrite, int bufferSize, |
---|
| 210 | short replication, long blockSize, Progressable progress) |
---|
| 211 | throws IOException { |
---|
| 212 | |
---|
| 213 | INode inode = store.retrieveINode(makeAbsolute(file)); |
---|
| 214 | if (inode != null) { |
---|
| 215 | if (overwrite) { |
---|
| 216 | delete(file); |
---|
| 217 | } else { |
---|
| 218 | throw new IOException("File already exists: " + file); |
---|
| 219 | } |
---|
| 220 | } else { |
---|
| 221 | Path parent = file.getParent(); |
---|
| 222 | if (parent != null) { |
---|
| 223 | if (!mkdirs(parent)) { |
---|
| 224 | throw new IOException("Mkdirs failed to create " + parent.toString()); |
---|
| 225 | } |
---|
| 226 | } |
---|
| 227 | } |
---|
| 228 | return new FSDataOutputStream |
---|
| 229 | (new S3OutputStream(getConf(), store, makeAbsolute(file), |
---|
| 230 | blockSize, progress, bufferSize), |
---|
| 231 | statistics); |
---|
| 232 | } |
---|
| 233 | |
---|
| 234 | @Override |
---|
| 235 | public FSDataInputStream open(Path path, int bufferSize) throws IOException { |
---|
| 236 | INode inode = checkFile(path); |
---|
| 237 | return new FSDataInputStream(new S3InputStream(getConf(), store, inode, |
---|
| 238 | statistics)); |
---|
| 239 | } |
---|
| 240 | |
---|
| 241 | @Override |
---|
| 242 | public boolean rename(Path src, Path dst) throws IOException { |
---|
| 243 | Path absoluteSrc = makeAbsolute(src); |
---|
| 244 | INode srcINode = store.retrieveINode(absoluteSrc); |
---|
| 245 | if (srcINode == null) { |
---|
| 246 | // src path doesn't exist |
---|
| 247 | return false; |
---|
| 248 | } |
---|
| 249 | Path absoluteDst = makeAbsolute(dst); |
---|
| 250 | INode dstINode = store.retrieveINode(absoluteDst); |
---|
| 251 | if (dstINode != null && dstINode.isDirectory()) { |
---|
| 252 | absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); |
---|
| 253 | dstINode = store.retrieveINode(absoluteDst); |
---|
| 254 | } |
---|
| 255 | if (dstINode != null) { |
---|
| 256 | // dst path already exists - can't overwrite |
---|
| 257 | return false; |
---|
| 258 | } |
---|
| 259 | Path dstParent = absoluteDst.getParent(); |
---|
| 260 | if (dstParent != null) { |
---|
| 261 | INode dstParentINode = store.retrieveINode(dstParent); |
---|
| 262 | if (dstParentINode == null || dstParentINode.isFile()) { |
---|
| 263 | // dst parent doesn't exist or is a file |
---|
| 264 | return false; |
---|
| 265 | } |
---|
| 266 | } |
---|
| 267 | return renameRecursive(absoluteSrc, absoluteDst); |
---|
| 268 | } |
---|
| 269 | |
---|
| 270 | private boolean renameRecursive(Path src, Path dst) throws IOException { |
---|
| 271 | INode srcINode = store.retrieveINode(src); |
---|
| 272 | store.storeINode(dst, srcINode); |
---|
| 273 | store.deleteINode(src); |
---|
| 274 | if (srcINode.isDirectory()) { |
---|
| 275 | for (Path oldSrc : store.listDeepSubPaths(src)) { |
---|
| 276 | INode inode = store.retrieveINode(oldSrc); |
---|
| 277 | if (inode == null) { |
---|
| 278 | return false; |
---|
| 279 | } |
---|
| 280 | String oldSrcPath = oldSrc.toUri().getPath(); |
---|
| 281 | String srcPath = src.toUri().getPath(); |
---|
| 282 | String dstPath = dst.toUri().getPath(); |
---|
| 283 | Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); |
---|
| 284 | store.storeINode(newDst, inode); |
---|
| 285 | store.deleteINode(oldSrc); |
---|
| 286 | } |
---|
| 287 | } |
---|
| 288 | return true; |
---|
| 289 | } |
---|
| 290 | |
---|
| 291 | public boolean delete(Path path, boolean recursive) throws IOException { |
---|
| 292 | Path absolutePath = makeAbsolute(path); |
---|
| 293 | INode inode = store.retrieveINode(absolutePath); |
---|
| 294 | if (inode == null) { |
---|
| 295 | return false; |
---|
| 296 | } |
---|
| 297 | if (inode.isFile()) { |
---|
| 298 | store.deleteINode(absolutePath); |
---|
| 299 | for (Block block: inode.getBlocks()) { |
---|
| 300 | store.deleteBlock(block); |
---|
| 301 | } |
---|
| 302 | } else { |
---|
| 303 | FileStatus[] contents = listStatus(absolutePath); |
---|
| 304 | if (contents == null) { |
---|
| 305 | return false; |
---|
| 306 | } |
---|
| 307 | if ((contents.length !=0) && (!recursive)) { |
---|
| 308 | throw new IOException("Directory " + path.toString() |
---|
| 309 | + " is not empty."); |
---|
| 310 | } |
---|
| 311 | for (FileStatus p:contents) { |
---|
| 312 | if (!delete(p.getPath(), recursive)) { |
---|
| 313 | return false; |
---|
| 314 | } |
---|
| 315 | } |
---|
| 316 | store.deleteINode(absolutePath); |
---|
| 317 | } |
---|
| 318 | return true; |
---|
| 319 | } |
---|
| 320 | |
---|
| 321 | @Override |
---|
| 322 | @Deprecated |
---|
| 323 | public boolean delete(Path path) throws IOException { |
---|
| 324 | return delete(path, true); |
---|
| 325 | } |
---|
| 326 | |
---|
| 327 | /** |
---|
| 328 | * FileStatus for S3 file systems. |
---|
| 329 | */ |
---|
| 330 | @Override |
---|
| 331 | public FileStatus getFileStatus(Path f) throws IOException { |
---|
| 332 | INode inode = store.retrieveINode(makeAbsolute(f)); |
---|
| 333 | if (inode == null) { |
---|
| 334 | throw new FileNotFoundException(f + ": No such file or directory."); |
---|
| 335 | } |
---|
| 336 | return new S3FileStatus(f.makeQualified(this), inode); |
---|
| 337 | } |
---|
| 338 | |
---|
| 339 | // diagnostic methods |
---|
| 340 | |
---|
| 341 | void dump() throws IOException { |
---|
| 342 | store.dump(); |
---|
| 343 | } |
---|
| 344 | |
---|
| 345 | void purge() throws IOException { |
---|
| 346 | store.purge(); |
---|
| 347 | } |
---|
| 348 | |
---|
| 349 | private static class S3FileStatus extends FileStatus { |
---|
| 350 | |
---|
| 351 | S3FileStatus(Path f, INode inode) throws IOException { |
---|
| 352 | super(findLength(inode), inode.isDirectory(), 1, |
---|
| 353 | findBlocksize(inode), 0, f); |
---|
| 354 | } |
---|
| 355 | |
---|
| 356 | private static long findLength(INode inode) { |
---|
| 357 | if (!inode.isDirectory()) { |
---|
| 358 | long length = 0L; |
---|
| 359 | for (Block block : inode.getBlocks()) { |
---|
| 360 | length += block.getLength(); |
---|
| 361 | } |
---|
| 362 | return length; |
---|
| 363 | } |
---|
| 364 | return 0; |
---|
| 365 | } |
---|
| 366 | |
---|
| 367 | private static long findBlocksize(INode inode) { |
---|
| 368 | final Block[] ret = inode.getBlocks(); |
---|
| 369 | return ret == null ? 0L : ret[0].getLength(); |
---|
| 370 | } |
---|
| 371 | } |
---|
| 372 | } |
---|