source: proiecte/HadoopJUnit/hadoop-0.20.1/src/core/org/apache/hadoop/fs/s3/S3InputStream.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 4.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs.s3;
20
21import java.io.DataInputStream;
22import java.io.File;
23import java.io.FileInputStream;
24import java.io.IOException;
25
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.fs.FSInputStream;
28import org.apache.hadoop.fs.FileSystem;
29
30class S3InputStream extends FSInputStream {
31
32  private FileSystemStore store;
33
34  private Block[] blocks;
35
36  private boolean closed;
37
38  private long fileLength;
39
40  private long pos = 0;
41
42  private File blockFile;
43 
44  private DataInputStream blockStream;
45
46  private long blockEnd = -1;
47 
48  private FileSystem.Statistics stats;
49
50  @Deprecated
51  public S3InputStream(Configuration conf, FileSystemStore store,
52                       INode inode) {
53    this(conf, store, inode, null);
54  }
55
56  public S3InputStream(Configuration conf, FileSystemStore store,
57                       INode inode, FileSystem.Statistics stats) {
58   
59    this.store = store;
60    this.stats = stats;
61    this.blocks = inode.getBlocks();
62    for (Block block : blocks) {
63      this.fileLength += block.getLength();
64    }
65  }
66
67  @Override
68  public synchronized long getPos() throws IOException {
69    return pos;
70  }
71
72  @Override
73  public synchronized int available() throws IOException {
74    return (int) (fileLength - pos);
75  }
76
77  @Override
78  public synchronized void seek(long targetPos) throws IOException {
79    if (targetPos > fileLength) {
80      throw new IOException("Cannot seek after EOF");
81    }
82    pos = targetPos;
83    blockEnd = -1;
84  }
85
86  @Override
87  public synchronized boolean seekToNewSource(long targetPos) throws IOException {
88    return false;
89  }
90
91  @Override
92  public synchronized int read() throws IOException {
93    if (closed) {
94      throw new IOException("Stream closed");
95    }
96    int result = -1;
97    if (pos < fileLength) {
98      if (pos > blockEnd) {
99        blockSeekTo(pos);
100      }
101      result = blockStream.read();
102      if (result >= 0) {
103        pos++;
104      }
105    }
106    if (stats != null & result >= 0) {
107      stats.incrementBytesRead(1);
108    }
109    return result;
110  }
111
112  @Override
113  public synchronized int read(byte buf[], int off, int len) throws IOException {
114    if (closed) {
115      throw new IOException("Stream closed");
116    }
117    if (pos < fileLength) {
118      if (pos > blockEnd) {
119        blockSeekTo(pos);
120      }
121      int realLen = Math.min(len, (int) (blockEnd - pos + 1));
122      int result = blockStream.read(buf, off, realLen);
123      if (result >= 0) {
124        pos += result;
125      }
126      if (stats != null && result > 0) {
127        stats.incrementBytesRead(result);
128      }
129      return result;
130    }
131    return -1;
132  }
133
134  private synchronized void blockSeekTo(long target) throws IOException {
135    //
136    // Compute desired block
137    //
138    int targetBlock = -1;
139    long targetBlockStart = 0;
140    long targetBlockEnd = 0;
141    for (int i = 0; i < blocks.length; i++) {
142      long blockLength = blocks[i].getLength();
143      targetBlockEnd = targetBlockStart + blockLength - 1;
144
145      if (target >= targetBlockStart && target <= targetBlockEnd) {
146        targetBlock = i;
147        break;
148      } else {
149        targetBlockStart = targetBlockEnd + 1;
150      }
151    }
152    if (targetBlock < 0) {
153      throw new IOException(
154                            "Impossible situation: could not find target position " + target);
155    }
156    long offsetIntoBlock = target - targetBlockStart;
157
158    // read block blocks[targetBlock] from position offsetIntoBlock
159
160    this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock);
161
162    this.pos = target;
163    this.blockEnd = targetBlockEnd;
164    this.blockStream = new DataInputStream(new FileInputStream(blockFile));
165
166  }
167
168  @Override
169  public void close() throws IOException {
170    if (closed) {
171      return;
172    }
173    if (blockStream != null) {
174      blockStream.close();
175      blockStream = null;
176    }
177    if (blockFile != null) {
178      blockFile.delete();
179    }
180    super.close();
181    closed = true;
182  }
183
184  /**
185   * We don't support marks.
186   */
187  @Override
188  public boolean markSupported() {
189    return false;
190  }
191
192  @Override
193  public void mark(int readLimit) {
194    // Do nothing
195  }
196
197  @Override
198  public void reset() throws IOException {
199    throw new IOException("Mark not supported");
200  }
201
202}
Note: See TracBrowser for help on using the repository browser.