[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.fs.s3native; |
---|
| 20 | |
---|
| 21 | import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER; |
---|
| 22 | |
---|
| 23 | import java.io.BufferedInputStream; |
---|
| 24 | import java.io.ByteArrayInputStream; |
---|
| 25 | import java.io.File; |
---|
| 26 | import java.io.FileInputStream; |
---|
| 27 | import java.io.IOException; |
---|
| 28 | import java.io.InputStream; |
---|
| 29 | import java.net.URI; |
---|
| 30 | |
---|
| 31 | import org.apache.hadoop.conf.Configuration; |
---|
| 32 | import org.apache.hadoop.fs.s3.S3Credentials; |
---|
| 33 | import org.apache.hadoop.fs.s3.S3Exception; |
---|
| 34 | import org.jets3t.service.S3ObjectsChunk; |
---|
| 35 | import org.jets3t.service.S3Service; |
---|
| 36 | import org.jets3t.service.S3ServiceException; |
---|
| 37 | import org.jets3t.service.impl.rest.httpclient.RestS3Service; |
---|
| 38 | import org.jets3t.service.model.S3Bucket; |
---|
| 39 | import org.jets3t.service.model.S3Object; |
---|
| 40 | import org.jets3t.service.security.AWSCredentials; |
---|
| 41 | |
---|
| 42 | class Jets3tNativeFileSystemStore implements NativeFileSystemStore { |
---|
| 43 | |
---|
| 44 | private S3Service s3Service; |
---|
| 45 | private S3Bucket bucket; |
---|
| 46 | |
---|
| 47 | public void initialize(URI uri, Configuration conf) throws IOException { |
---|
| 48 | S3Credentials s3Credentials = new S3Credentials(); |
---|
| 49 | s3Credentials.initialize(uri, conf); |
---|
| 50 | try { |
---|
| 51 | AWSCredentials awsCredentials = |
---|
| 52 | new AWSCredentials(s3Credentials.getAccessKey(), |
---|
| 53 | s3Credentials.getSecretAccessKey()); |
---|
| 54 | this.s3Service = new RestS3Service(awsCredentials); |
---|
| 55 | } catch (S3ServiceException e) { |
---|
| 56 | if (e.getCause() instanceof IOException) { |
---|
| 57 | throw (IOException) e.getCause(); |
---|
| 58 | } |
---|
| 59 | throw new S3Exception(e); |
---|
| 60 | } |
---|
| 61 | bucket = new S3Bucket(uri.getHost()); |
---|
| 62 | } |
---|
| 63 | |
---|
| 64 | public void storeFile(String key, File file, byte[] md5Hash) |
---|
| 65 | throws IOException { |
---|
| 66 | |
---|
| 67 | BufferedInputStream in = null; |
---|
| 68 | try { |
---|
| 69 | in = new BufferedInputStream(new FileInputStream(file)); |
---|
| 70 | S3Object object = new S3Object(key); |
---|
| 71 | object.setDataInputStream(in); |
---|
| 72 | object.setContentType("binary/octet-stream"); |
---|
| 73 | object.setContentLength(file.length()); |
---|
| 74 | if (md5Hash != null) { |
---|
| 75 | object.setMd5Hash(md5Hash); |
---|
| 76 | } |
---|
| 77 | s3Service.putObject(bucket, object); |
---|
| 78 | } catch (S3ServiceException e) { |
---|
| 79 | if (e.getCause() instanceof IOException) { |
---|
| 80 | throw (IOException) e.getCause(); |
---|
| 81 | } |
---|
| 82 | throw new S3Exception(e); |
---|
| 83 | } finally { |
---|
| 84 | if (in != null) { |
---|
| 85 | try { |
---|
| 86 | in.close(); |
---|
| 87 | } catch (IOException e) { |
---|
| 88 | // ignore |
---|
| 89 | } |
---|
| 90 | } |
---|
| 91 | } |
---|
| 92 | } |
---|
| 93 | |
---|
| 94 | public void storeEmptyFile(String key) throws IOException { |
---|
| 95 | try { |
---|
| 96 | S3Object object = new S3Object(key); |
---|
| 97 | object.setDataInputStream(new ByteArrayInputStream(new byte[0])); |
---|
| 98 | object.setContentType("binary/octet-stream"); |
---|
| 99 | object.setContentLength(0); |
---|
| 100 | s3Service.putObject(bucket, object); |
---|
| 101 | } catch (S3ServiceException e) { |
---|
| 102 | if (e.getCause() instanceof IOException) { |
---|
| 103 | throw (IOException) e.getCause(); |
---|
| 104 | } |
---|
| 105 | throw new S3Exception(e); |
---|
| 106 | } |
---|
| 107 | } |
---|
| 108 | |
---|
| 109 | public FileMetadata retrieveMetadata(String key) throws IOException { |
---|
| 110 | try { |
---|
| 111 | S3Object object = s3Service.getObjectDetails(bucket, key); |
---|
| 112 | return new FileMetadata(key, object.getContentLength(), |
---|
| 113 | object.getLastModifiedDate().getTime()); |
---|
| 114 | } catch (S3ServiceException e) { |
---|
| 115 | // Following is brittle. Is there a better way? |
---|
| 116 | if (e.getMessage().contains("ResponseCode=404")) { |
---|
| 117 | return null; |
---|
| 118 | } |
---|
| 119 | if (e.getCause() instanceof IOException) { |
---|
| 120 | throw (IOException) e.getCause(); |
---|
| 121 | } |
---|
| 122 | throw new S3Exception(e); |
---|
| 123 | } |
---|
| 124 | } |
---|
| 125 | |
---|
| 126 | public InputStream retrieve(String key) throws IOException { |
---|
| 127 | try { |
---|
| 128 | S3Object object = s3Service.getObject(bucket, key); |
---|
| 129 | return object.getDataInputStream(); |
---|
| 130 | } catch (S3ServiceException e) { |
---|
| 131 | if ("NoSuchKey".equals(e.getS3ErrorCode())) { |
---|
| 132 | return null; |
---|
| 133 | } |
---|
| 134 | if (e.getCause() instanceof IOException) { |
---|
| 135 | throw (IOException) e.getCause(); |
---|
| 136 | } |
---|
| 137 | throw new S3Exception(e); |
---|
| 138 | } |
---|
| 139 | } |
---|
| 140 | |
---|
| 141 | public InputStream retrieve(String key, long byteRangeStart) |
---|
| 142 | throws IOException { |
---|
| 143 | try { |
---|
| 144 | S3Object object = s3Service.getObject(bucket, key, null, null, null, |
---|
| 145 | null, byteRangeStart, null); |
---|
| 146 | return object.getDataInputStream(); |
---|
| 147 | } catch (S3ServiceException e) { |
---|
| 148 | if ("NoSuchKey".equals(e.getS3ErrorCode())) { |
---|
| 149 | return null; |
---|
| 150 | } |
---|
| 151 | if (e.getCause() instanceof IOException) { |
---|
| 152 | throw (IOException) e.getCause(); |
---|
| 153 | } |
---|
| 154 | throw new S3Exception(e); |
---|
| 155 | } |
---|
| 156 | } |
---|
| 157 | |
---|
| 158 | public PartialListing list(String prefix, int maxListingLength) |
---|
| 159 | throws IOException { |
---|
| 160 | return list(prefix, maxListingLength, null); |
---|
| 161 | } |
---|
| 162 | |
---|
| 163 | public PartialListing list(String prefix, int maxListingLength, |
---|
| 164 | String priorLastKey) throws IOException { |
---|
| 165 | |
---|
| 166 | return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey); |
---|
| 167 | } |
---|
| 168 | |
---|
| 169 | public PartialListing listAll(String prefix, int maxListingLength, |
---|
| 170 | String priorLastKey) throws IOException { |
---|
| 171 | |
---|
| 172 | return list(prefix, null, maxListingLength, priorLastKey); |
---|
| 173 | } |
---|
| 174 | |
---|
| 175 | private PartialListing list(String prefix, String delimiter, |
---|
| 176 | int maxListingLength, String priorLastKey) throws IOException { |
---|
| 177 | try { |
---|
| 178 | if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) { |
---|
| 179 | prefix += PATH_DELIMITER; |
---|
| 180 | } |
---|
| 181 | S3ObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(), |
---|
| 182 | prefix, delimiter, maxListingLength, priorLastKey); |
---|
| 183 | |
---|
| 184 | FileMetadata[] fileMetadata = |
---|
| 185 | new FileMetadata[chunk.getObjects().length]; |
---|
| 186 | for (int i = 0; i < fileMetadata.length; i++) { |
---|
| 187 | S3Object object = chunk.getObjects()[i]; |
---|
| 188 | fileMetadata[i] = new FileMetadata(object.getKey(), |
---|
| 189 | object.getContentLength(), object.getLastModifiedDate().getTime()); |
---|
| 190 | } |
---|
| 191 | return new PartialListing(chunk.getPriorLastKey(), fileMetadata, |
---|
| 192 | chunk.getCommonPrefixes()); |
---|
| 193 | } catch (S3ServiceException e) { |
---|
| 194 | if (e.getCause() instanceof IOException) { |
---|
| 195 | throw (IOException) e.getCause(); |
---|
| 196 | } |
---|
| 197 | throw new S3Exception(e); |
---|
| 198 | } |
---|
| 199 | } |
---|
| 200 | |
---|
| 201 | public void delete(String key) throws IOException { |
---|
| 202 | try { |
---|
| 203 | s3Service.deleteObject(bucket, key); |
---|
| 204 | } catch (S3ServiceException e) { |
---|
| 205 | if (e.getCause() instanceof IOException) { |
---|
| 206 | throw (IOException) e.getCause(); |
---|
| 207 | } |
---|
| 208 | throw new S3Exception(e); |
---|
| 209 | } |
---|
| 210 | } |
---|
| 211 | |
---|
| 212 | public void rename(String srcKey, String dstKey) throws IOException { |
---|
| 213 | try { |
---|
| 214 | s3Service.moveObject(bucket.getName(), srcKey, bucket.getName(), |
---|
| 215 | new S3Object(dstKey), false); |
---|
| 216 | } catch (S3ServiceException e) { |
---|
| 217 | if (e.getCause() instanceof IOException) { |
---|
| 218 | throw (IOException) e.getCause(); |
---|
| 219 | } |
---|
| 220 | throw new S3Exception(e); |
---|
| 221 | } |
---|
| 222 | } |
---|
| 223 | |
---|
| 224 | public void purge(String prefix) throws IOException { |
---|
| 225 | try { |
---|
| 226 | S3Object[] objects = s3Service.listObjects(bucket, prefix, null); |
---|
| 227 | for (int i = 0; i < objects.length; i++) { |
---|
| 228 | s3Service.deleteObject(bucket, objects[i].getKey()); |
---|
| 229 | } |
---|
| 230 | } catch (S3ServiceException e) { |
---|
| 231 | if (e.getCause() instanceof IOException) { |
---|
| 232 | throw (IOException) e.getCause(); |
---|
| 233 | } |
---|
| 234 | throw new S3Exception(e); |
---|
| 235 | } |
---|
| 236 | } |
---|
| 237 | |
---|
| 238 | public void dump() throws IOException { |
---|
| 239 | StringBuilder sb = new StringBuilder("S3 Native Filesystem, "); |
---|
| 240 | sb.append(bucket.getName()).append("\n"); |
---|
| 241 | try { |
---|
| 242 | S3Object[] objects = s3Service.listObjects(bucket); |
---|
| 243 | for (int i = 0; i < objects.length; i++) { |
---|
| 244 | sb.append(objects[i].getKey()).append("\n"); |
---|
| 245 | } |
---|
| 246 | } catch (S3ServiceException e) { |
---|
| 247 | if (e.getCause() instanceof IOException) { |
---|
| 248 | throw (IOException) e.getCause(); |
---|
| 249 | } |
---|
| 250 | throw new S3Exception(e); |
---|
| 251 | } |
---|
| 252 | System.out.println(sb); |
---|
| 253 | } |
---|
| 254 | |
---|
| 255 | } |
---|