source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/HarFileSystem.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 27.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.fs;
19
20import java.io.FileNotFoundException;
21import java.io.IOException;
22import java.net.URI;
23import java.net.URISyntaxException;
24import java.util.ArrayList;
25import java.util.List;
26
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.fs.permission.FsPermission;
29import org.apache.hadoop.io.Text;
30import org.apache.hadoop.util.LineReader;
31import org.apache.hadoop.util.Progressable;
32
33/**
34 * This is an implementation of the Hadoop Archive
35 * Filesystem. This archive Filesystem has index files
36 * of the form _index* and has contents of the form
37 * part-*. The index files store the indexes of the
38 * real files. The index files are of the form _masterindex
39 * and _index. The master index is a level of indirection
40 * in to the index file to make the look ups faster. the index
41 * file is sorted with hash code of the paths that it contains
42 * and the master index contains pointers to the positions in
43 * index for ranges of hashcodes.
44 */
45
46public class HarFileSystem extends FilterFileSystem {
47  public static final int VERSION = 1;
48  // uri representation of this Har filesystem
49  private URI uri;
50  // the version of this har filesystem
51  private int version;
52  // underlying uri
53  private URI underLyingURI;
54  // the top level path of the archive
55  // in the underlying file system
56  private Path archivePath;
57  // the masterIndex of the archive
58  private Path masterIndex;
59  // the index file
60  private Path archiveIndex;
61  // the har auth
62  private String harAuth;
63 
64  /**
65   * public construction of harfilesystem
66   *
67   */
68  public HarFileSystem() {
69  }
70 
71  /**
72   * Constructor to create a HarFileSystem with an
73   * underlying filesystem.
74   * @param fs
75   */
76  public HarFileSystem(FileSystem fs) {
77    super(fs);
78  }
79 
80  /**
81   * Initialize a Har filesystem per har archive. The
82   * archive home directory is the top level directory
83   * in the filesystem that contains the HAR archive.
84   * Be careful with this method, you do not want to go
85   * on creating new Filesystem instances per call to
86   * path.getFileSystem().
87   * the uri of Har is
88   * har://underlyingfsscheme-host:port/archivepath.
89   * or
90   * har:///archivepath. This assumes the underlying filesystem
91   * to be used in case not specified.
92   */
93  public void initialize(URI name, Configuration conf) throws IOException {
94    //decode the name
95    underLyingURI = decodeHarURI(name, conf);
96    //  we got the right har Path- now check if this is
97    //truly a har filesystem
98    Path harPath = archivePath(new Path(name.toString()));
99    if (harPath == null) { 
100      throw new IOException("Invalid path for the Har Filesystem. " + 
101                           name.toString());
102    }
103    if (fs == null) {
104      fs = FileSystem.get(underLyingURI, conf);
105    }
106    this.uri = harPath.toUri();
107    this.archivePath = new Path(this.uri.getPath());
108    this.harAuth = getHarAuth(this.underLyingURI);
109    //check for the underlying fs containing
110    // the index file
111    this.masterIndex = new Path(archivePath, "_masterindex");
112    this.archiveIndex = new Path(archivePath, "_index");
113    if (!fs.exists(masterIndex) || !fs.exists(archiveIndex)) {
114      throw new IOException("Invalid path for the Har Filesystem. " +
115          "No index file in " + harPath);
116    }
117    try{ 
118      this.version = getHarVersion();
119    } catch(IOException io) {
120      throw new IOException("Unable to " +
121          "read the version of the Har file system: " + this.archivePath);
122    }
123    if (this.version != HarFileSystem.VERSION) {
124      throw new IOException("Invalid version " + 
125          this.version + " expected " + HarFileSystem.VERSION);
126    }
127  }
128 
129  // get the version of the filesystem from the masterindex file
130  // the version is currently not useful since its the first version
131  // of archives
132  public int getHarVersion() throws IOException { 
133    FSDataInputStream masterIn = fs.open(masterIndex);
134    LineReader lmaster = new LineReader(masterIn, getConf());
135    Text line = new Text();
136    lmaster.readLine(line);
137    try {
138      masterIn.close();
139    } catch(IOException e){
140      //disregard it.
141      // its a read.
142    }
143    String versionLine = line.toString();
144    String[] arr = versionLine.split(" ");
145    int version = Integer.parseInt(arr[0]);
146    return version;
147  }
148 
149  /*
150   * find the parent path that is the
151   * archive path in the path. The last
152   * path segment that ends with .har is
153   * the path that will be returned.
154   */
155  private Path archivePath(Path p) {
156    Path retPath = null;
157    Path tmp = p;
158    for (int i=0; i< p.depth(); i++) {
159      if (tmp.toString().endsWith(".har")) {
160        retPath = tmp;
161        break;
162      }
163      tmp = tmp.getParent();
164    }
165    return retPath;
166  }
167
168  /**
169   * decode the raw URI to get the underlying URI
170   * @param rawURI raw Har URI
171   * @return filtered URI of the underlying fileSystem
172   */
173  private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
174    String tmpAuth = rawURI.getAuthority();
175    //we are using the default file
176    //system in the config
177    //so create a underlying uri and
178    //return it
179    if (tmpAuth == null) {
180      //create a path
181      return FileSystem.getDefaultUri(conf);
182    }
183    String host = rawURI.getHost();
184    String[] str = host.split("-", 2);
185    if (str[0] == null) {
186      throw new IOException("URI: " + rawURI + " is an invalid Har URI.");
187    }
188    String underLyingScheme = str[0];
189    String underLyingHost = (str.length > 1)? str[1]:null;
190    int underLyingPort = rawURI.getPort();
191    String auth = (underLyingHost == null && underLyingPort == -1)?
192                  null:(underLyingHost+":"+underLyingPort);
193    URI tmp = null;
194    if (rawURI.getQuery() != null) {
195      // query component not allowed
196      throw new IOException("query component in Path not supported  " + rawURI);
197    }
198    try {
199      tmp = new URI(underLyingScheme, auth, rawURI.getPath(), 
200            rawURI.getQuery(), rawURI.getFragment());
201    } catch (URISyntaxException e) {
202        // do nothing should not happen
203    }
204    return tmp;
205  }
206 
207  /**
208   * return the top level archive.
209   */
210  public Path getWorkingDirectory() {
211    return new Path(uri.toString());
212  }
213 
214  /**
215   * Create a har specific auth
216   * har-underlyingfs:port
217   * @param underLyingURI the uri of underlying
218   * filesystem
219   * @return har specific auth
220   */
221  private String getHarAuth(URI underLyingUri) {
222    String auth = underLyingUri.getScheme() + "-";
223    if (underLyingUri.getHost() != null) {
224      auth += underLyingUri.getHost() + ":";
225      if (underLyingUri.getPort() != -1) {
226        auth +=  underLyingUri.getPort();
227      }
228    }
229    else {
230      auth += ":";
231    }
232    return auth;
233  }
234 
235  /**
236   * Returns the uri of this filesystem.
237   * The uri is of the form
238   * har://underlyingfsschema-host:port/pathintheunderlyingfs
239   */
240  @Override
241  public URI getUri() {
242    return this.uri;
243  }
244 
245  /**
246   * this method returns the path
247   * inside the har filesystem.
248   * this is relative path inside
249   * the har filesystem.
250   * @param path the fully qualified path in the har filesystem.
251   * @return relative path in the filesystem.
252   */
253  private Path getPathInHar(Path path) {
254    Path harPath = new Path(path.toUri().getPath());
255    if (archivePath.compareTo(harPath) == 0)
256      return new Path(Path.SEPARATOR);
257    Path tmp = new Path(harPath.getName());
258    Path parent = harPath.getParent();
259    while (!(parent.compareTo(archivePath) == 0)) {
260      if (parent.toString().equals(Path.SEPARATOR)) {
261        tmp = null;
262        break;
263      }
264      tmp = new Path(parent.getName(), tmp);
265      parent = parent.getParent();
266    }
267    if (tmp != null) 
268      tmp = new Path(Path.SEPARATOR, tmp);
269    return tmp;
270  }
271 
272  //the relative path of p. basically
273  // getting rid of /. Parsing and doing
274  // string manipulation is not good - so
275  // just use the path api to do it.
276  private Path makeRelative(String initial, Path p) {
277    Path root = new Path(Path.SEPARATOR);
278    if (root.compareTo(p) == 0)
279      return new Path(initial);
280    Path retPath = new Path(p.getName());
281    Path parent = p.getParent();
282    for (int i=0; i < p.depth()-1; i++) {
283      retPath = new Path(parent.getName(), retPath);
284      parent = parent.getParent();
285    }
286    return new Path(initial, retPath.toString());
287  }
288 
289  /* this makes a path qualified in the har filesystem
290   * (non-Javadoc)
291   * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified(
292   * org.apache.hadoop.fs.Path)
293   */
294  @Override
295  public Path makeQualified(Path path) {
296    // make sure that we just get the
297    // path component
298    Path fsPath = path;
299    if (!path.isAbsolute()) {
300      fsPath = new Path(archivePath, path);
301    }
302
303    URI tmpURI = fsPath.toUri();
304    fsPath = new Path(tmpURI.getPath());
305    //change this to Har uri
306    URI tmp = null;
307    try {
308      tmp = new URI(uri.getScheme(), harAuth, fsPath.toString(),
309                    tmpURI.getQuery(), tmpURI.getFragment());
310    } catch(URISyntaxException ue) {
311      LOG.error("Error in URI ", ue);
312    }
313    if (tmp != null) {
314      return new Path(tmp.toString());
315    }
316    return null;
317  }
318 
319  /**
320   * get block locations from the underlying fs
321   * @param file the input filestatus to get block locations
322   * @param start the start in the file
323   * @param len the length in the file
324   * @return block locations for this segment of file
325   * @throws IOException
326   */
327  @Override
328  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
329      long len) throws IOException {
330    // need to look up the file in the underlying fs
331    // look up the index
332   
333    // make sure this is a prt of this har filesystem
334    Path p = makeQualified(file.getPath());
335    Path harPath = getPathInHar(p);
336    String line = fileStatusInIndex(harPath);
337    if (line == null)  {
338      throw new FileNotFoundException("File " + file.getPath() + " not found");
339    }
340    HarStatus harStatus = new HarStatus(line);
341    if (harStatus.isDir()) {
342      return new BlockLocation[0];
343    }
344    FileStatus fsFile = fs.getFileStatus(new Path(archivePath,
345        harStatus.getPartName()));
346    BlockLocation[] rawBlocks = fs.getFileBlockLocations(fsFile, 
347        harStatus.getStartIndex() + start, len);
348    return fakeBlockLocations(rawBlocks, harStatus.getStartIndex());
349  }
350 
351  /**
352   * fake the rawblocks since map reduce uses the block offsets to
353   * fo some computations regarding the blocks
354   * @param rawBlocks the raw blocks returned by the filesystem
355   * @return faked blocks with changed offsets.
356   */
357  private BlockLocation[] fakeBlockLocations(BlockLocation[] rawBlocks, 
358                  long startIndex) {
359        for (BlockLocation block : rawBlocks) {
360                long rawOffset = block.getOffset();
361                block.setOffset(rawOffset - startIndex);
362        }
363        return rawBlocks;
364  }
365 
366  /**
367   * the hash of the path p inside iniside
368   * the filesystem
369   * @param p the path in the harfilesystem
370   * @return the hash code of the path.
371   */
372  public static int getHarHash(Path p) {
373    return (p.toString().hashCode() & 0x7fffffff);
374  }
375 
376  static class Store {
377    public Store() {
378      begin = end = startHash = endHash = 0;
379    }
380    public Store(long begin, long end, int startHash, int endHash) {
381      this.begin = begin;
382      this.end = end;
383      this.startHash = startHash;
384      this.endHash = endHash;
385    }
386    public long begin;
387    public long end;
388    public int startHash;
389    public int endHash;
390  }
391 
392  // make sure that this harPath is relative to the har filesystem
393  // this only works for relative paths. This returns the line matching
394  // the file in the index. Returns a null if there is not matching
395  // filename in the index file.
396  private String fileStatusInIndex(Path harPath) throws IOException {
397    // read the index file
398    int hashCode = getHarHash(harPath);
399    // get the master index to find the pos
400    // in the index file
401    FSDataInputStream in = fs.open(masterIndex);
402    FileStatus masterStat = fs.getFileStatus(masterIndex);
403    LineReader lin = new LineReader(in, getConf());
404    Text line = new Text();
405    long read = lin.readLine(line);
406   //ignore the first line. this is the header of the index files
407    String[] readStr = null;
408    List<Store> stores = new ArrayList<Store>();
409    while(read < masterStat.getLen()) {
410      int b = lin.readLine(line);
411      read += b;
412      readStr = line.toString().split(" ");
413      int startHash = Integer.parseInt(readStr[0]);
414      int endHash  = Integer.parseInt(readStr[1]);
415      if (startHash <= hashCode && hashCode <= endHash) {
416        stores.add(new Store(Long.parseLong(readStr[2]), 
417            Long.parseLong(readStr[3]), startHash,
418            endHash));
419      }
420      line.clear();
421    }
422    try {
423      lin.close();
424    } catch(IOException io){
425      // do nothing just a read.
426    }
427    FSDataInputStream aIn = fs.open(archiveIndex);
428    LineReader aLin = new LineReader(aIn, getConf());
429    String retStr = null;
430    // now start reading the real index file
431     read = 0;
432    for (Store s: stores) {
433      aIn.seek(s.begin);
434      while (read + s.begin < s.end) {
435        int tmp = aLin.readLine(line);
436        read += tmp;
437        String lineFeed = line.toString();
438        String[] parsed = lineFeed.split(" ");
439        if (harPath.compareTo(new Path(parsed[0])) == 0) {
440          // bingo!
441          retStr = lineFeed;
442          break;
443        }
444        line.clear();
445      }
446      if (retStr != null)
447        break;
448    }
449    try {
450      aIn.close();
451    } catch(IOException io) {
452      //do nothing
453    }
454    return retStr;
455  }
456 
457  // a single line parser for hadoop archives status
458  // stored in a single line in the index files
459  // the format is of the form
460  // filename "dir"/"file" partFileName startIndex length
461  // <space seperated children>
462  private static class HarStatus {
463    boolean isDir;
464    String name;
465    List<String> children;
466    String partName;
467    long startIndex;
468    long length;
469    public HarStatus(String harString) {
470      String[] splits = harString.split(" ");
471      this.name = splits[0];
472      this.isDir = "dir".equals(splits[1]) ? true: false;
473      // this is equal to "none" if its a directory
474      this.partName = splits[2];
475      this.startIndex = Long.parseLong(splits[3]);
476      this.length = Long.parseLong(splits[4]);
477      if (isDir) {
478        children = new ArrayList<String>();
479        for (int i = 5; i < splits.length; i++) {
480          children.add(splits[i]);
481        }
482      }
483    }
484    public boolean isDir() {
485      return isDir;
486    }
487   
488    public String getName() {
489      return name;
490    }
491   
492    public List<String> getChildren() {
493      return children;
494    }
495    public String getFileName() {
496      return name;
497    }
498    public String getPartName() {
499      return partName;
500    }
501    public long getStartIndex() {
502      return startIndex;
503    }
504    public long getLength() {
505      return length;
506    }
507  }
508 
509  /**
510   * return the filestatus of files in har archive.
511   * The permission returned are that of the archive
512   * index files. The permissions are not persisted
513   * while creating a hadoop archive.
514   * @param f the path in har filesystem
515   * @return filestatus.
516   * @throws IOException
517   */
518  @Override
519  public FileStatus getFileStatus(Path f) throws IOException {
520    FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
521    // get the fs DataInputStream for the underlying file
522    // look up the index.
523    Path p = makeQualified(f);
524    Path harPath = getPathInHar(p);
525    if (harPath == null) {
526      throw new IOException("Invalid file name: " + f + " in " + uri);
527    }
528    String readStr = fileStatusInIndex(harPath);
529    if (readStr == null) {
530      throw new FileNotFoundException("File: " +  f + " does not exist in " + uri);
531    }
532    HarStatus hstatus = null;
533    hstatus = new HarStatus(readStr);
534    return new FileStatus(hstatus.isDir()?0:hstatus.getLength(), hstatus.isDir(),
535        (int)archiveStatus.getReplication(), archiveStatus.getBlockSize(),
536        archiveStatus.getModificationTime(), archiveStatus.getAccessTime(),
537        new FsPermission(
538        archiveStatus.getPermission()), archiveStatus.getOwner(), 
539        archiveStatus.getGroup(), 
540            makeRelative(this.uri.toString(), new Path(hstatus.name)));
541  }
542
543  /**
544   * Returns a har input stream which fakes end of
545   * file. It reads the index files to get the part
546   * file name and the size and start of the file.
547   */
548  @Override
549  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
550    // get the fs DataInputStream for the underlying file
551    // look up the index.
552    Path p = makeQualified(f);
553    Path harPath = getPathInHar(p);
554    if (harPath == null) {
555      throw new IOException("Invalid file name: " + f + " in " + uri);
556    }
557    String readStr = fileStatusInIndex(harPath);
558    if (readStr == null) {
559      throw new FileNotFoundException(f + ": not found in " + archivePath);
560    }
561    HarStatus hstatus = new HarStatus(readStr); 
562    // we got it.. woo hooo!!!
563    if (hstatus.isDir()) {
564      throw new FileNotFoundException(f + " : not a file in " +
565                archivePath);
566    }
567    return new HarFSDataInputStream(fs, new Path(archivePath, 
568        hstatus.getPartName()),
569        hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
570  }
571 
572  /*
573   * create throws an exception in Har filesystem.
574   * The archive once created cannot be changed.
575   */
576  public FSDataOutputStream create(Path f, int bufferSize) 
577                                    throws IOException {
578    throw new IOException("Har: Create not allowed");
579  }
580 
581  public FSDataOutputStream create(Path f,
582      FsPermission permission,
583      boolean overwrite,
584      int bufferSize,
585      short replication,
586      long blockSize,
587      Progressable progress) throws IOException {
588    throw new IOException("Har: create not allowed.");
589  }
590 
591  @Override
592  public void close() throws IOException {
593    if (fs != null) {
594      try {
595        fs.close();
596      } catch(IOException ie) {
597        //this might already be closed
598        // ignore
599      }
600    }
601  }
602 
603  /**
604   * Not implemented.
605   */
606  @Override
607  public boolean setReplication(Path src, short replication) throws IOException{
608    throw new IOException("Har: setreplication not allowed");
609  }
610 
611  /**
612   * Not implemented.
613   */
614  @Override
615  public boolean delete(Path f, boolean recursive) throws IOException { 
616    throw new IOException("Har: delete not allowed");
617  }
618 
619  /**
620   * liststatus returns the children of a directory
621   * after looking up the index files.
622   */
623  @Override
624  public FileStatus[] listStatus(Path f) throws IOException {
625    //need to see if the file is an index in file
626    //get the filestatus of the archive directory
627    // we will create fake filestatuses to return
628    // to the client
629    List<FileStatus> statuses = new ArrayList<FileStatus>();
630    FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
631    Path tmpPath = makeQualified(f);
632    Path harPath = getPathInHar(tmpPath);
633    String readStr = fileStatusInIndex(harPath);
634    if (readStr == null) {
635      throw new FileNotFoundException("File " + f + " not found in " + archivePath);
636    }
637    HarStatus hstatus = new HarStatus(readStr);
638    if (!hstatus.isDir()) 
639        statuses.add(new FileStatus(hstatus.getLength(), 
640            hstatus.isDir(),
641            archiveStatus.getReplication(), archiveStatus.getBlockSize(),
642            archiveStatus.getModificationTime(), archiveStatus.getAccessTime(),
643            new FsPermission(archiveStatus.getPermission()),
644            archiveStatus.getOwner(), archiveStatus.getGroup(), 
645            makeRelative(this.uri.toString(), new Path(hstatus.name))));
646    else 
647      for (String child: hstatus.children) {
648        FileStatus tmp = getFileStatus(new Path(tmpPath, child));
649        statuses.add(tmp);
650      }
651    return statuses.toArray(new FileStatus[statuses.size()]);
652  }
653 
654  /**
655   * return the top level archive path.
656   */
657  public Path getHomeDirectory() {
658    return new Path(uri.toString());
659  }
660 
661  public void setWorkingDirectory(Path newDir) {
662    //does nothing.
663  }
664 
665  /**
666   * not implemented.
667   */
668  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
669    throw new IOException("Har: mkdirs not allowed");
670  }
671 
672  /**
673   * not implemented.
674   */
675  public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws 
676        IOException {
677    throw new IOException("Har: copyfromlocalfile not allowed");
678  }
679 
680  /**
681   * copies the file in the har filesystem to a local file.
682   */
683  public void copyToLocalFile(boolean delSrc, Path src, Path dst) 
684    throws IOException {
685    FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf());
686  }
687 
688  /**
689   * not implemented.
690   */
691  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
692    throws IOException {
693    throw new IOException("Har: startLocalOutput not allowed");
694  }
695 
696  /**
697   * not implemented.
698   */
699  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
700    throws IOException {
701    throw new IOException("Har: completeLocalOutput not allowed");
702  }
703 
704  /**
705   * not implemented.
706   */
707  public void setOwner(Path p, String username, String groupname)
708    throws IOException {
709    throw new IOException("Har: setowner not allowed");
710  }
711
712  /**
713   * Not implemented.
714   */
715  public void setPermission(Path p, FsPermission permisssion) 
716    throws IOException {
717    throw new IOException("Har: setPermission not allowed");
718  }
719 
720  /**
721   * Hadoop archives input stream. This input stream fakes EOF
722   * since archive files are part of bigger part files.
723   */
724  private static class HarFSDataInputStream extends FSDataInputStream {
725    /**
726     * Create an input stream that fakes all the reads/positions/seeking.
727     */
728    private static class HarFsInputStream extends FSInputStream {
729      private long position, start, end;
730      //The underlying data input stream that the
731      // underlying filesystem will return.
732      private FSDataInputStream underLyingStream;
733      //one byte buffer
734      private byte[] oneBytebuff = new byte[1];
735      HarFsInputStream(FileSystem fs, Path path, long start,
736          long length, int bufferSize) throws IOException {
737        underLyingStream = fs.open(path, bufferSize);
738        underLyingStream.seek(start);
739        // the start of this file in the part file
740        this.start = start;
741        // the position pointer in the part file
742        this.position = start;
743        // the end pointer in the part file
744        this.end = start + length;
745      }
746     
747      public synchronized int available() throws IOException {
748        long remaining = end - underLyingStream.getPos();
749        if (remaining > (long)Integer.MAX_VALUE) {
750          return Integer.MAX_VALUE;
751        }
752        return (int) remaining;
753      }
754     
755      public synchronized  void close() throws IOException {
756        underLyingStream.close();
757        super.close();
758      }
759     
760      //not implemented
761      @Override
762      public void mark(int readLimit) {
763        // do nothing
764      }
765     
766      /**
767       * reset is not implemented
768       */
769      public void reset() throws IOException {
770        throw new IOException("reset not implemented.");
771      }
772     
773      public synchronized int read() throws IOException {
774        int ret = read(oneBytebuff, 0, 1);
775        return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
776      }
777     
778      public synchronized int read(byte[] b) throws IOException {
779        int ret = read(b, 0, b.length);
780        if (ret != -1) {
781          position += ret;
782        }
783        return ret;
784      }
785     
786      /**
787       *
788       */
789      public synchronized int read(byte[] b, int offset, int len) 
790        throws IOException {
791        int newlen = len;
792        int ret = -1;
793        if (position + len > end) {
794          newlen = (int) (end - position);
795        }
796        // end case
797        if (newlen == 0) 
798          return ret;
799        ret = underLyingStream.read(b, offset, newlen);
800        position += ret;
801        return ret;
802      }
803     
804      public synchronized long skip(long n) throws IOException {
805        long tmpN = n;
806        if (tmpN > 0) {
807          if (position + tmpN > end) {
808            tmpN = end - position;
809          }
810          underLyingStream.seek(tmpN + position);
811          position += tmpN;
812          return tmpN;
813        }
814        return (tmpN < 0)? -1 : 0;
815      }
816     
817      public synchronized long getPos() throws IOException {
818        return (position - start);
819      }
820     
821      public synchronized void seek(long pos) throws IOException {
822        if (pos < 0 || (start + pos > end)) {
823          throw new IOException("Failed to seek: EOF");
824        }
825        position = start + pos;
826        underLyingStream.seek(position);
827      }
828
829      public boolean seekToNewSource(long targetPos) throws IOException {
830        //do not need to implement this
831        // hdfs in itself does seektonewsource
832        // while reading.
833        return false;
834      }
835     
836      /**
837       * implementing position readable.
838       */
839      public int read(long pos, byte[] b, int offset, int length) 
840      throws IOException {
841        int nlength = length;
842        if (start + nlength + pos > end) {
843          nlength = (int) (end - (start + pos));
844        }
845        return underLyingStream.read(pos + start , b, offset, nlength);
846      }
847     
848      /**
849       * position readable again.
850       */
851      public void readFully(long pos, byte[] b, int offset, int length) 
852      throws IOException {
853        if (start + length + pos > end) {
854          throw new IOException("Not enough bytes to read.");
855        }
856        underLyingStream.readFully(pos + start, b, offset, length);
857      }
858     
859      public void readFully(long pos, byte[] b) throws IOException {
860          readFully(pos, b, 0, b.length);
861      }
862     
863    }
864 
865    /**
866     * constructors for har input stream.
867     * @param fs the underlying filesystem
868     * @param p The path in the underlying filesystem
869     * @param start the start position in the part file
870     * @param length the length of valid data in the part file
871     * @param bufsize the buffer size
872     * @throws IOException
873     */
874    public HarFSDataInputStream(FileSystem fs, Path  p, long start, 
875        long length, int bufsize) throws IOException {
876        super(new HarFsInputStream(fs, p, start, length, bufsize));
877    }
878
879    /**
880     * constructor for har input stream.
881     * @param fs the underlying filesystem
882     * @param p the path in the underlying file system
883     * @param start the start position in the part file
884     * @param length the length of valid data in the part file.
885     * @throws IOException
886     */
887    public HarFSDataInputStream(FileSystem fs, Path  p, long start, long length)
888      throws IOException {
889        super(new HarFsInputStream(fs, p, start, length, 0));
890    }
891  }
892}
Note: See TracBrowser for help on using the repository browser.