1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.fs; |
---|
19 | |
---|
20 | import java.io.FileNotFoundException; |
---|
21 | import java.io.IOException; |
---|
22 | import java.io.OutputStream; |
---|
23 | import java.net.URI; |
---|
24 | import java.util.*; |
---|
25 | |
---|
26 | import org.apache.hadoop.fs.permission.FsPermission; |
---|
27 | import org.apache.hadoop.io.DataInputBuffer; |
---|
28 | import org.apache.hadoop.conf.Configuration; |
---|
29 | import org.apache.hadoop.util.Progressable; |
---|
30 | |
---|
31 | /** An implementation of the in-memory filesystem. This implementation assumes |
---|
32 | * that the file lengths are known ahead of time and the total lengths of all |
---|
33 | * the files is below a certain number (like 100 MB, configurable). Use the API |
---|
34 | * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of |
---|
35 | * the API for reserving space in the FS. The uri of this filesystem starts with |
---|
36 | * ramfs:// . |
---|
37 | */ |
---|
38 | @Deprecated |
---|
39 | public class InMemoryFileSystem extends ChecksumFileSystem { |
---|
40 | private static class RawInMemoryFileSystem extends FileSystem { |
---|
41 | private URI uri; |
---|
42 | private long fsSize; |
---|
43 | private volatile long totalUsed; |
---|
44 | private Path staticWorkingDir; |
---|
45 | |
---|
46 | //pathToFileAttribs is the final place where a file is put after it is closed |
---|
47 | private Map<String, FileAttributes> pathToFileAttribs = |
---|
48 | new HashMap<String, FileAttributes>(); |
---|
49 | |
---|
50 | //tempFileAttribs is a temp place which is updated while reserving memory for |
---|
51 | //files we are going to create. It is read in the createRaw method and the |
---|
52 | //temp key/value is discarded. If the file makes it to "close", then it |
---|
53 | //ends up being in the pathToFileAttribs map. |
---|
54 | private Map<String, FileAttributes> tempFileAttribs = |
---|
55 | new HashMap<String, FileAttributes>(); |
---|
56 | |
---|
57 | public RawInMemoryFileSystem() { |
---|
58 | setConf(new Configuration()); |
---|
59 | } |
---|
60 | |
---|
61 | public RawInMemoryFileSystem(URI uri, Configuration conf) { |
---|
62 | initialize(uri, conf); |
---|
63 | } |
---|
64 | |
---|
65 | //inherit javadoc |
---|
66 | public void initialize(URI uri, Configuration conf) { |
---|
67 | setConf(conf); |
---|
68 | int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100")); |
---|
69 | this.fsSize = size * 1024L * 1024L; |
---|
70 | this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); |
---|
71 | String path = this.uri.getPath(); |
---|
72 | if (path.length() == 0) { |
---|
73 | path = Path.CUR_DIR; |
---|
74 | } |
---|
75 | this.staticWorkingDir = new Path(path); |
---|
76 | LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + |
---|
77 | " of size (in bytes): " + fsSize); |
---|
78 | } |
---|
79 | |
---|
80 | //inherit javadoc |
---|
81 | public URI getUri() { |
---|
82 | return uri; |
---|
83 | } |
---|
84 | |
---|
85 | private class InMemoryInputStream extends FSInputStream { |
---|
86 | private DataInputBuffer din = new DataInputBuffer(); |
---|
87 | private FileAttributes fAttr; |
---|
88 | |
---|
89 | public InMemoryInputStream(Path f) throws IOException { |
---|
90 | synchronized (RawInMemoryFileSystem.this) { |
---|
91 | fAttr = pathToFileAttribs.get(getPath(f)); |
---|
92 | if (fAttr == null) { |
---|
93 | throw new FileNotFoundException("File " + f + " does not exist"); |
---|
94 | } |
---|
95 | din.reset(fAttr.data, 0, fAttr.size); |
---|
96 | } |
---|
97 | } |
---|
98 | |
---|
99 | public long getPos() throws IOException { |
---|
100 | return din.getPosition(); |
---|
101 | } |
---|
102 | |
---|
103 | public void seek(long pos) throws IOException { |
---|
104 | if ((int)pos > fAttr.size) |
---|
105 | throw new IOException("Cannot seek after EOF"); |
---|
106 | din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos); |
---|
107 | } |
---|
108 | |
---|
109 | public boolean seekToNewSource(long targetPos) throws IOException { |
---|
110 | return false; |
---|
111 | } |
---|
112 | |
---|
113 | public int available() throws IOException { |
---|
114 | return din.available(); |
---|
115 | } |
---|
116 | public boolean markSupport() { return false; } |
---|
117 | |
---|
118 | public int read() throws IOException { |
---|
119 | return din.read(); |
---|
120 | } |
---|
121 | |
---|
122 | public int read(byte[] b, int off, int len) throws IOException { |
---|
123 | return din.read(b, off, len); |
---|
124 | } |
---|
125 | |
---|
126 | public long skip(long n) throws IOException { return din.skip(n); } |
---|
127 | } |
---|
128 | |
---|
129 | public FSDataInputStream open(Path f, int bufferSize) throws IOException { |
---|
130 | return new FSDataInputStream(new InMemoryInputStream(f)); |
---|
131 | } |
---|
132 | |
---|
133 | private class InMemoryOutputStream extends OutputStream { |
---|
134 | private int count; |
---|
135 | private FileAttributes fAttr; |
---|
136 | private Path f; |
---|
137 | |
---|
138 | public InMemoryOutputStream(Path f, FileAttributes fAttr) |
---|
139 | throws IOException { |
---|
140 | this.fAttr = fAttr; |
---|
141 | this.f = f; |
---|
142 | } |
---|
143 | |
---|
144 | public long getPos() throws IOException { |
---|
145 | return count; |
---|
146 | } |
---|
147 | |
---|
148 | public void close() throws IOException { |
---|
149 | synchronized (RawInMemoryFileSystem.this) { |
---|
150 | pathToFileAttribs.put(getPath(f), fAttr); |
---|
151 | } |
---|
152 | } |
---|
153 | |
---|
154 | public void write(byte[] b, int off, int len) throws IOException { |
---|
155 | if ((off < 0) || (off > b.length) || (len < 0) || |
---|
156 | ((off + len) > b.length) || ((off + len) < 0)) { |
---|
157 | throw new IndexOutOfBoundsException(); |
---|
158 | } else if (len == 0) { |
---|
159 | return; |
---|
160 | } |
---|
161 | int newcount = count + len; |
---|
162 | if (newcount > fAttr.size) { |
---|
163 | throw new IOException("Insufficient space"); |
---|
164 | } |
---|
165 | System.arraycopy(b, off, fAttr.data, count, len); |
---|
166 | count = newcount; |
---|
167 | } |
---|
168 | |
---|
169 | public void write(int b) throws IOException { |
---|
170 | int newcount = count + 1; |
---|
171 | if (newcount > fAttr.size) { |
---|
172 | throw new IOException("Insufficient space"); |
---|
173 | } |
---|
174 | fAttr.data[count] = (byte)b; |
---|
175 | count = newcount; |
---|
176 | } |
---|
177 | } |
---|
178 | |
---|
179 | /** This optional operation is not yet supported. */ |
---|
180 | public FSDataOutputStream append(Path f, int bufferSize, |
---|
181 | Progressable progress) throws IOException { |
---|
182 | throw new IOException("Not supported"); |
---|
183 | } |
---|
184 | |
---|
185 | /** |
---|
186 | * @param permission Currently ignored. |
---|
187 | */ |
---|
188 | public FSDataOutputStream create(Path f, FsPermission permission, |
---|
189 | boolean overwrite, int bufferSize, |
---|
190 | short replication, long blockSize, Progressable progress) |
---|
191 | throws IOException { |
---|
192 | synchronized (this) { |
---|
193 | if (exists(f) && !overwrite) { |
---|
194 | throw new IOException("File already exists:"+f); |
---|
195 | } |
---|
196 | FileAttributes fAttr = tempFileAttribs.remove(getPath(f)); |
---|
197 | if (fAttr != null) |
---|
198 | return create(f, fAttr); |
---|
199 | return null; |
---|
200 | } |
---|
201 | } |
---|
202 | |
---|
203 | public FSDataOutputStream create(Path f, FileAttributes fAttr) |
---|
204 | throws IOException { |
---|
205 | // the path is not added into the filesystem (in the pathToFileAttribs |
---|
206 | // map) until close is called on the outputstream that this method is |
---|
207 | // going to return |
---|
208 | // Create an output stream out of data byte array |
---|
209 | return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr), |
---|
210 | statistics); |
---|
211 | } |
---|
212 | |
---|
213 | public void close() throws IOException { |
---|
214 | super.close(); |
---|
215 | synchronized (this) { |
---|
216 | if (pathToFileAttribs != null) { |
---|
217 | pathToFileAttribs.clear(); |
---|
218 | } |
---|
219 | pathToFileAttribs = null; |
---|
220 | if (tempFileAttribs != null) { |
---|
221 | tempFileAttribs.clear(); |
---|
222 | } |
---|
223 | tempFileAttribs = null; |
---|
224 | } |
---|
225 | } |
---|
226 | |
---|
227 | public boolean setReplication(Path src, short replication) |
---|
228 | throws IOException { |
---|
229 | return true; |
---|
230 | } |
---|
231 | |
---|
232 | public boolean rename(Path src, Path dst) throws IOException { |
---|
233 | synchronized (this) { |
---|
234 | if (exists(dst)) { |
---|
235 | throw new IOException ("Path " + dst + " already exists"); |
---|
236 | } |
---|
237 | FileAttributes fAttr = pathToFileAttribs.remove(getPath(src)); |
---|
238 | if (fAttr == null) return false; |
---|
239 | pathToFileAttribs.put(getPath(dst), fAttr); |
---|
240 | return true; |
---|
241 | } |
---|
242 | } |
---|
243 | |
---|
244 | @Deprecated |
---|
245 | public boolean delete(Path f) throws IOException { |
---|
246 | return delete(f, true); |
---|
247 | } |
---|
248 | |
---|
249 | public boolean delete(Path f, boolean recursive) throws IOException { |
---|
250 | synchronized (this) { |
---|
251 | FileAttributes fAttr = pathToFileAttribs.remove(getPath(f)); |
---|
252 | if (fAttr != null) { |
---|
253 | fAttr.data = null; |
---|
254 | totalUsed -= fAttr.size; |
---|
255 | return true; |
---|
256 | } |
---|
257 | return false; |
---|
258 | } |
---|
259 | } |
---|
260 | |
---|
261 | /** |
---|
262 | * Directory operations are not supported |
---|
263 | */ |
---|
264 | public FileStatus[] listStatus(Path f) throws IOException { |
---|
265 | return null; |
---|
266 | } |
---|
267 | |
---|
268 | public void setWorkingDirectory(Path new_dir) { |
---|
269 | staticWorkingDir = new_dir; |
---|
270 | } |
---|
271 | |
---|
272 | public Path getWorkingDirectory() { |
---|
273 | return staticWorkingDir; |
---|
274 | } |
---|
275 | |
---|
276 | /** |
---|
277 | * @param permission Currently ignored. |
---|
278 | */ |
---|
279 | public boolean mkdirs(Path f, FsPermission permission) throws IOException { |
---|
280 | return true; |
---|
281 | } |
---|
282 | |
---|
283 | public FileStatus getFileStatus(Path f) throws IOException { |
---|
284 | synchronized (this) { |
---|
285 | FileAttributes attr = pathToFileAttribs.get(getPath(f)); |
---|
286 | if (attr==null) { |
---|
287 | throw new FileNotFoundException("File " + f + " does not exist."); |
---|
288 | } |
---|
289 | return new InMemoryFileStatus(f.makeQualified(this), attr); |
---|
290 | } |
---|
291 | } |
---|
292 | |
---|
293 | /** Some APIs exclusively for InMemoryFileSystem */ |
---|
294 | |
---|
295 | /** Register a path with its size. */ |
---|
296 | public boolean reserveSpace(Path f, long size) { |
---|
297 | synchronized (this) { |
---|
298 | if (!canFitInMemory(size)) |
---|
299 | return false; |
---|
300 | FileAttributes fileAttr; |
---|
301 | try { |
---|
302 | fileAttr = new FileAttributes((int)size); |
---|
303 | } catch (OutOfMemoryError o) { |
---|
304 | return false; |
---|
305 | } |
---|
306 | totalUsed += size; |
---|
307 | tempFileAttribs.put(getPath(f), fileAttr); |
---|
308 | return true; |
---|
309 | } |
---|
310 | } |
---|
311 | public void unreserveSpace(Path f) { |
---|
312 | synchronized (this) { |
---|
313 | FileAttributes fAttr = tempFileAttribs.remove(getPath(f)); |
---|
314 | if (fAttr != null) { |
---|
315 | fAttr.data = null; |
---|
316 | totalUsed -= fAttr.size; |
---|
317 | } |
---|
318 | } |
---|
319 | } |
---|
320 | |
---|
321 | /** This API getClosedFiles could have been implemented over listPathsRaw |
---|
322 | * but it is an overhead to maintain directory structures for this impl of |
---|
323 | * the in-memory fs. |
---|
324 | */ |
---|
325 | public Path[] getFiles(PathFilter filter) { |
---|
326 | synchronized (this) { |
---|
327 | List<String> closedFilesList = new ArrayList<String>(); |
---|
328 | synchronized (pathToFileAttribs) { |
---|
329 | Set paths = pathToFileAttribs.keySet(); |
---|
330 | if (paths == null || paths.isEmpty()) { |
---|
331 | return new Path[0]; |
---|
332 | } |
---|
333 | Iterator iter = paths.iterator(); |
---|
334 | while (iter.hasNext()) { |
---|
335 | String f = (String)iter.next(); |
---|
336 | if (filter.accept(new Path(f))) { |
---|
337 | closedFilesList.add(f); |
---|
338 | } |
---|
339 | } |
---|
340 | } |
---|
341 | String [] names = |
---|
342 | closedFilesList.toArray(new String[closedFilesList.size()]); |
---|
343 | Path [] results = new Path[names.length]; |
---|
344 | for (int i = 0; i < names.length; i++) { |
---|
345 | results[i] = new Path(names[i]); |
---|
346 | } |
---|
347 | return results; |
---|
348 | } |
---|
349 | } |
---|
350 | |
---|
351 | public int getNumFiles(PathFilter filter) { |
---|
352 | return getFiles(filter).length; |
---|
353 | } |
---|
354 | |
---|
355 | public long getFSSize() { |
---|
356 | return fsSize; |
---|
357 | } |
---|
358 | |
---|
359 | public float getPercentUsed() { |
---|
360 | if (fsSize > 0) |
---|
361 | return (float)totalUsed/fsSize; |
---|
362 | else return 0.1f; |
---|
363 | } |
---|
364 | |
---|
365 | /** |
---|
366 | * @TODO: Fix for Java6? |
---|
367 | * As of Java5 it is safe to assume that if the file can fit |
---|
368 | * in-memory then its file-size is less than Integer.MAX_VALUE. |
---|
369 | */ |
---|
370 | private boolean canFitInMemory(long size) { |
---|
371 | if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) { |
---|
372 | return true; |
---|
373 | } |
---|
374 | return false; |
---|
375 | } |
---|
376 | |
---|
377 | private String getPath(Path f) { |
---|
378 | return f.toUri().getPath(); |
---|
379 | } |
---|
380 | |
---|
381 | private static class FileAttributes { |
---|
382 | private byte[] data; |
---|
383 | private int size; |
---|
384 | |
---|
385 | public FileAttributes(int size) { |
---|
386 | this.size = size; |
---|
387 | this.data = new byte[size]; |
---|
388 | } |
---|
389 | } |
---|
390 | |
---|
391 | private class InMemoryFileStatus extends FileStatus { |
---|
392 | InMemoryFileStatus(Path f, FileAttributes attr) throws IOException { |
---|
393 | super(attr.size, false, 1, getDefaultBlockSize(), 0, f); |
---|
394 | } |
---|
395 | } |
---|
396 | } |
---|
397 | |
---|
398 | public InMemoryFileSystem() { |
---|
399 | super(new RawInMemoryFileSystem()); |
---|
400 | } |
---|
401 | |
---|
402 | public InMemoryFileSystem(URI uri, Configuration conf) { |
---|
403 | super(new RawInMemoryFileSystem(uri, conf)); |
---|
404 | } |
---|
405 | |
---|
406 | /** |
---|
407 | * Register a file with its size. This will also register a checksum for the |
---|
408 | * file that the user is trying to create. This is required since none of |
---|
409 | * the FileSystem APIs accept the size of the file as argument. But since it |
---|
410 | * is required for us to apriori know the size of the file we are going to |
---|
411 | * create, the user must call this method for each file he wants to create |
---|
412 | * and reserve memory for that file. We either succeed in reserving memory |
---|
413 | * for both the main file and the checksum file and return true, or return |
---|
414 | * false. |
---|
415 | */ |
---|
416 | public boolean reserveSpaceWithCheckSum(Path f, long size) { |
---|
417 | RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem(); |
---|
418 | synchronized(mfs) { |
---|
419 | boolean b = mfs.reserveSpace(f, size); |
---|
420 | if (b) { |
---|
421 | long checksumSize = getChecksumFileLength(f, size); |
---|
422 | b = mfs.reserveSpace(getChecksumFile(f), checksumSize); |
---|
423 | if (!b) { |
---|
424 | mfs.unreserveSpace(f); |
---|
425 | } |
---|
426 | } |
---|
427 | return b; |
---|
428 | } |
---|
429 | } |
---|
430 | |
---|
431 | public Path[] getFiles(PathFilter filter) { |
---|
432 | return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter); |
---|
433 | } |
---|
434 | |
---|
435 | public int getNumFiles(PathFilter filter) { |
---|
436 | return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter); |
---|
437 | } |
---|
438 | |
---|
439 | public long getFSSize() { |
---|
440 | return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize(); |
---|
441 | } |
---|
442 | |
---|
443 | public float getPercentUsed() { |
---|
444 | return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed(); |
---|
445 | } |
---|
446 | } |
---|