source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/InMemoryFileSystem.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.fs;
19
20import java.io.FileNotFoundException;
21import java.io.IOException;
22import java.io.OutputStream;
23import java.net.URI;
24import java.util.*;
25
26import org.apache.hadoop.fs.permission.FsPermission;
27import org.apache.hadoop.io.DataInputBuffer;
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.util.Progressable;
30
31/** An implementation of the in-memory filesystem. This implementation assumes
32 * that the file lengths are known ahead of time and the total lengths of all
33 * the files is below a certain number (like 100 MB, configurable). Use the API
34 * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
35 * the API for reserving space in the FS. The uri of this filesystem starts with
36 * ramfs:// .
37 */
38@Deprecated
39public class InMemoryFileSystem extends ChecksumFileSystem {
40  private static class RawInMemoryFileSystem extends FileSystem {
41    private URI uri;
42    private long fsSize;
43    private volatile long totalUsed;
44    private Path staticWorkingDir;
45 
46    //pathToFileAttribs is the final place where a file is put after it is closed
47    private Map<String, FileAttributes> pathToFileAttribs =
48      new HashMap<String, FileAttributes>();
49 
50    //tempFileAttribs is a temp place which is updated while reserving memory for
51    //files we are going to create. It is read in the createRaw method and the
52    //temp key/value is discarded. If the file makes it to "close", then it
53    //ends up being in the pathToFileAttribs map.
54    private Map<String, FileAttributes> tempFileAttribs =
55      new HashMap<String, FileAttributes>();
56 
57    public RawInMemoryFileSystem() {
58      setConf(new Configuration());
59    }
60
61    public RawInMemoryFileSystem(URI uri, Configuration conf) {
62      initialize(uri, conf);
63    }
64 
65    //inherit javadoc
66    public void initialize(URI uri, Configuration conf) {
67      setConf(conf);
68      int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
69      this.fsSize = size * 1024L * 1024L;
70      this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
71      String path = this.uri.getPath();
72      if (path.length() == 0) {
73        path = Path.CUR_DIR;
74      }
75      this.staticWorkingDir = new Path(path);
76      LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + 
77               " of size (in bytes): " + fsSize);
78    }
79
80    //inherit javadoc
81    public URI getUri() {
82      return uri;
83    }
84
85    private class InMemoryInputStream extends FSInputStream {
86      private DataInputBuffer din = new DataInputBuffer();
87      private FileAttributes fAttr;
88   
89      public InMemoryInputStream(Path f) throws IOException {
90        synchronized (RawInMemoryFileSystem.this) {
91          fAttr = pathToFileAttribs.get(getPath(f));
92          if (fAttr == null) { 
93            throw new FileNotFoundException("File " + f + " does not exist");
94          }                           
95          din.reset(fAttr.data, 0, fAttr.size);
96        }
97      }
98   
99      public long getPos() throws IOException {
100        return din.getPosition();
101      }
102   
103      public void seek(long pos) throws IOException {
104        if ((int)pos > fAttr.size)
105          throw new IOException("Cannot seek after EOF");
106        din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
107      }
108   
109      public boolean seekToNewSource(long targetPos) throws IOException {
110        return false;
111      }
112
113      public int available() throws IOException {
114        return din.available(); 
115      }
116      public boolean markSupport() { return false; }
117
118      public int read() throws IOException {
119        return din.read();
120      }
121
122      public int read(byte[] b, int off, int len) throws IOException {
123        return din.read(b, off, len);
124      }
125   
126      public long skip(long n) throws IOException { return din.skip(n); }
127    }
128
129    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
130      return new FSDataInputStream(new InMemoryInputStream(f));
131    }
132
133    private class InMemoryOutputStream extends OutputStream {
134      private int count;
135      private FileAttributes fAttr;
136      private Path f;
137   
138      public InMemoryOutputStream(Path f, FileAttributes fAttr) 
139        throws IOException {
140        this.fAttr = fAttr;
141        this.f = f;
142      }
143   
144      public long getPos() throws IOException {
145        return count;
146      }
147   
148      public void close() throws IOException {
149        synchronized (RawInMemoryFileSystem.this) {
150          pathToFileAttribs.put(getPath(f), fAttr);
151        }
152      }
153   
154      public void write(byte[] b, int off, int len) throws IOException {
155        if ((off < 0) || (off > b.length) || (len < 0) ||
156            ((off + len) > b.length) || ((off + len) < 0)) {
157          throw new IndexOutOfBoundsException();
158        } else if (len == 0) {
159          return;
160        }
161        int newcount = count + len;
162        if (newcount > fAttr.size) {
163          throw new IOException("Insufficient space");
164        }
165        System.arraycopy(b, off, fAttr.data, count, len);
166        count = newcount;
167      }
168   
169      public void write(int b) throws IOException {
170        int newcount = count + 1;
171        if (newcount > fAttr.size) {
172          throw new IOException("Insufficient space");
173        }
174        fAttr.data[count] = (byte)b;
175        count = newcount;
176      }
177    }
178 
179    /** This optional operation is not yet supported. */
180    public FSDataOutputStream append(Path f, int bufferSize,
181        Progressable progress) throws IOException {
182      throw new IOException("Not supported");
183    }
184
185    /**
186     * @param permission Currently ignored.
187     */
188    public FSDataOutputStream create(Path f, FsPermission permission,
189                                     boolean overwrite, int bufferSize,
190                                     short replication, long blockSize, Progressable progress)
191      throws IOException {
192      synchronized (this) {
193        if (exists(f) && !overwrite) {
194          throw new IOException("File already exists:"+f);
195        }
196        FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
197        if (fAttr != null)
198          return create(f, fAttr);
199        return null;
200      }
201    }
202 
203    public FSDataOutputStream create(Path f, FileAttributes fAttr)
204      throws IOException {
205      // the path is not added into the filesystem (in the pathToFileAttribs
206      // map) until close is called on the outputstream that this method is
207      // going to return
208      // Create an output stream out of data byte array
209      return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr), 
210                                    statistics);
211    }
212
213    public void close() throws IOException {
214      super.close();
215      synchronized (this) {
216        if (pathToFileAttribs != null) { 
217          pathToFileAttribs.clear();
218        }
219        pathToFileAttribs = null;
220        if (tempFileAttribs != null) {
221          tempFileAttribs.clear();
222        }
223        tempFileAttribs = null;
224      }
225    }
226
227    public boolean setReplication(Path src, short replication)
228      throws IOException {
229      return true;
230    }
231
232    public boolean rename(Path src, Path dst) throws IOException {
233      synchronized (this) {
234        if (exists(dst)) {
235          throw new IOException ("Path " + dst + " already exists");
236        }
237        FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
238        if (fAttr == null) return false;
239        pathToFileAttribs.put(getPath(dst), fAttr);
240        return true;
241      }
242    }
243   
244    @Deprecated
245    public boolean delete(Path f) throws IOException {
246      return delete(f, true);
247    }
248   
249    public boolean delete(Path f, boolean recursive) throws IOException {
250      synchronized (this) {
251        FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
252        if (fAttr != null) {
253          fAttr.data = null;
254          totalUsed -= fAttr.size;
255          return true;
256        }
257        return false;
258      }
259    }
260 
261    /**
262     * Directory operations are not supported
263     */
264    public FileStatus[] listStatus(Path f) throws IOException {
265      return null;
266    }
267
268    public void setWorkingDirectory(Path new_dir) {
269      staticWorkingDir = new_dir;
270    }
271 
272    public Path getWorkingDirectory() {
273      return staticWorkingDir;
274    }
275
276    /**
277     * @param permission Currently ignored.
278     */
279    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
280      return true;
281    }
282 
283    public FileStatus getFileStatus(Path f) throws IOException {
284      synchronized (this) {
285        FileAttributes attr = pathToFileAttribs.get(getPath(f));
286        if (attr==null) {
287          throw new FileNotFoundException("File " + f + " does not exist.");
288        }
289        return new InMemoryFileStatus(f.makeQualified(this), attr);
290      }
291    }
292 
293    /** Some APIs exclusively for InMemoryFileSystem */
294
295    /** Register a path with its size. */
296    public boolean reserveSpace(Path f, long size) {
297      synchronized (this) {
298        if (!canFitInMemory(size))
299          return false;
300        FileAttributes fileAttr;
301        try {
302          fileAttr = new FileAttributes((int)size);
303        } catch (OutOfMemoryError o) {
304          return false;
305        }
306        totalUsed += size;
307        tempFileAttribs.put(getPath(f), fileAttr);
308        return true;
309      }
310    }
311    public void unreserveSpace(Path f) {
312      synchronized (this) {
313        FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
314        if (fAttr != null) {
315          fAttr.data = null;
316          totalUsed -= fAttr.size;
317        }
318      }
319    }
320 
321    /** This API getClosedFiles could have been implemented over listPathsRaw
322     * but it is an overhead to maintain directory structures for this impl of
323     * the in-memory fs.
324     */
325    public Path[] getFiles(PathFilter filter) {
326      synchronized (this) {
327        List<String> closedFilesList = new ArrayList<String>();
328        synchronized (pathToFileAttribs) {
329          Set paths = pathToFileAttribs.keySet();
330          if (paths == null || paths.isEmpty()) {
331            return new Path[0];
332          }
333          Iterator iter = paths.iterator();
334          while (iter.hasNext()) {
335            String f = (String)iter.next();
336            if (filter.accept(new Path(f))) {
337              closedFilesList.add(f);
338            }
339          }
340        }
341        String [] names = 
342          closedFilesList.toArray(new String[closedFilesList.size()]);
343        Path [] results = new Path[names.length];
344        for (int i = 0; i < names.length; i++) {
345          results[i] = new Path(names[i]);
346        }
347        return results;
348      }
349    }
350 
351    public int getNumFiles(PathFilter filter) {
352      return getFiles(filter).length;
353    }
354
355    public long getFSSize() {
356      return fsSize;
357    }
358 
359    public float getPercentUsed() {
360      if (fsSize > 0)
361        return (float)totalUsed/fsSize;
362      else return 0.1f;
363    }
364 
365    /**
366     * @TODO: Fix for Java6?
367     * As of Java5 it is safe to assume that if the file can fit
368     * in-memory then its file-size is less than Integer.MAX_VALUE.
369     */ 
370    private boolean canFitInMemory(long size) {
371      if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
372        return true;
373      }
374      return false;
375    }
376 
377    private String getPath(Path f) {
378      return f.toUri().getPath();
379    }
380 
381    private static class FileAttributes {
382      private byte[] data;
383      private int size;
384   
385      public FileAttributes(int size) {
386        this.size = size;
387        this.data = new byte[size];
388      }
389    }
390
391    private class InMemoryFileStatus extends FileStatus {
392      InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
393        super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
394      }
395    }
396  }
397   
398  public InMemoryFileSystem() {
399    super(new RawInMemoryFileSystem());
400  }
401   
402  public InMemoryFileSystem(URI uri, Configuration conf) {
403    super(new RawInMemoryFileSystem(uri, conf));
404  }
405   
406  /**
407   * Register a file with its size. This will also register a checksum for the
408   * file that the user is trying to create. This is required since none of
409   * the FileSystem APIs accept the size of the file as argument. But since it
410   * is required for us to apriori know the size of the file we are going to
411   * create, the user must call this method for each file he wants to create
412   * and reserve memory for that file. We either succeed in reserving memory
413   * for both the main file and the checksum file and return true, or return
414   * false.
415   */
416  public boolean reserveSpaceWithCheckSum(Path f, long size) {
417    RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
418    synchronized(mfs) {
419      boolean b = mfs.reserveSpace(f, size);
420      if (b) {
421        long checksumSize = getChecksumFileLength(f, size);
422        b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
423        if (!b) {
424          mfs.unreserveSpace(f);
425        }
426      }
427      return b;
428    }
429  }
430
431  public Path[] getFiles(PathFilter filter) {
432    return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
433  }
434   
435  public int getNumFiles(PathFilter filter) {
436    return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
437  }
438
439  public long getFSSize() {
440    return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
441  }
442   
443  public float getPercentUsed() {
444    return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
445  }
446}
Note: See TracBrowser for help on using the repository browser.