source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 18.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.datanode;
19
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.OutputStream;
23import java.util.Arrays;
24import java.util.HashMap;
25import java.util.Random;
26
27import javax.management.NotCompliantMBeanException;
28import javax.management.ObjectName;
29import javax.management.StandardMBean;
30
31import org.apache.hadoop.conf.Configurable;
32import org.apache.hadoop.conf.Configuration;
33import org.apache.hadoop.hdfs.protocol.Block;
34import org.apache.hadoop.hdfs.protocol.FSConstants;
35import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
36import org.apache.hadoop.metrics.util.MBeanUtil;
37import org.apache.hadoop.util.DataChecksum;
38import org.apache.hadoop.util.DiskChecker.DiskErrorException;
39
40/**
41 * This class implements a simulated FSDataset.
42 *
43 * Blocks that are created are recorded but their data (plus their CRCs) are
44 *  discarded.
45 * Fixed data is returned when blocks are read; a null CRC meta file is
46 * created for such data.
47 *
48 * This FSDataset does not remember any block information across its
49 * restarts; it does however offer an operation to inject blocks
50 *  (See the TestInectionForSImulatedStorage()
51 * for a usage example of injection.
52 *
53 * Note the synchronization is coarse grained - it is at each method.
54 */
55
56public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Configurable{
57 
58  public static final String CONFIG_PROPERTY_SIMULATED =
59                                    "dfs.datanode.simulateddatastorage";
60  public static final String CONFIG_PROPERTY_CAPACITY =
61                            "dfs.datanode.simulateddatastorage.capacity";
62 
63  public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
64  public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte
65  byte simulatedDataByte = DEFAULT_DATABYTE;
66  Configuration conf = null;
67 
68  static byte[] nullCrcFileData;
69  {
70    DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
71                              CHECKSUM_NULL, 16*1024 );
72    byte[] nullCrcHeader = checksum.getHeader();
73    nullCrcFileData =  new byte[2 + nullCrcHeader.length];
74    nullCrcFileData[0] = (byte) ((FSDataset.METADATA_VERSION >>> 8) & 0xff);
75    nullCrcFileData[1] = (byte) (FSDataset.METADATA_VERSION & 0xff);
76    for (int i = 0; i < nullCrcHeader.length; i++) {
77      nullCrcFileData[i+2] = nullCrcHeader[i];
78    }
79  }
80 
81  private class BInfo { // information about a single block
82    Block theBlock;
83    private boolean finalized = false; // if not finalized => ongoing creation
84    SimulatedOutputStream oStream = null;
85    BInfo(Block b, boolean forWriting) throws IOException {
86      theBlock = new Block(b);
87      if (theBlock.getNumBytes() < 0) {
88        theBlock.setNumBytes(0);
89      }
90      if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may
91                                          // be more - we find out at finalize
92        DataNode.LOG.warn("Lack of free storage on a block alloc");
93        throw new IOException("Creating block, no free space available");
94      }
95
96      if (forWriting) {
97        finalized = false;
98        oStream = new SimulatedOutputStream();
99      } else {
100        finalized = true;
101        oStream = null;
102      }
103    }
104
105    synchronized long getGenerationStamp() {
106      return theBlock.getGenerationStamp();
107    }
108
109    synchronized void updateBlock(Block b) {
110      theBlock.setGenerationStamp(b.getGenerationStamp());
111      setlength(b.getNumBytes());
112    }
113   
114    synchronized long getlength() {
115      if (!finalized) {
116         return oStream.getLength();
117      } else {
118        return theBlock.getNumBytes();
119      }
120    }
121
122    synchronized void setlength(long length) {
123      if (!finalized) {
124         oStream.setLength(length);
125      } else {
126        theBlock.setNumBytes(length);
127      }
128    }
129   
130    synchronized SimulatedInputStream getIStream() throws IOException {
131      if (!finalized) {
132        // throw new IOException("Trying to read an unfinalized block");
133         return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE);
134      } else {
135        return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE);
136      }
137    }
138   
139    synchronized void finalizeBlock(long finalSize) throws IOException {
140      if (finalized) {
141        throw new IOException(
142            "Finalizing a block that has already been finalized" + 
143            theBlock.getBlockId());
144      }
145      if (oStream == null) {
146        DataNode.LOG.error("Null oStream on unfinalized block - bug");
147        throw new IOException("Unexpected error on finalize");
148      }
149
150      if (oStream.getLength() != finalSize) {
151        DataNode.LOG.warn("Size passed to finalize (" + finalSize +
152                    ")does not match what was written:" + oStream.getLength());
153        throw new IOException(
154          "Size passed to finalize does not match the amount of data written");
155      }
156      // We had allocated the expected length when block was created;
157      // adjust if necessary
158      long extraLen = finalSize - theBlock.getNumBytes();
159      if (extraLen > 0) {
160        if (!storage.alloc(extraLen)) {
161          DataNode.LOG.warn("Lack of free storage on a block alloc");
162          throw new IOException("Creating block, no free space available");
163        }
164      } else {
165        storage.free(-extraLen);
166      }
167      theBlock.setNumBytes(finalSize); 
168
169      finalized = true;
170      oStream = null;
171      return;
172    }
173   
174    SimulatedInputStream getMetaIStream() {
175      return new SimulatedInputStream(nullCrcFileData); 
176    }
177
178    synchronized boolean isFinalized() {
179      return finalized;
180    }
181  }
182 
183  static private class SimulatedStorage {
184    private long capacity;  // in bytes
185    private long used;    // in bytes
186   
187    synchronized long getFree() {
188      return capacity - used;
189    }
190   
191    synchronized long getCapacity() {
192      return capacity;
193    }
194   
195    synchronized long getUsed() {
196      return used;
197    }
198   
199    synchronized boolean alloc(long amount) {
200      if (getFree() >= amount) {
201        used += amount;
202        return true;
203      } else {
204        return false;   
205      }
206    }
207   
208    synchronized void free(long amount) {
209      used -= amount;
210    }
211   
212    SimulatedStorage(long cap) {
213      capacity = cap;
214      used = 0;   
215    }
216  }
217 
218  private HashMap<Block, BInfo> blockMap = null;
219  private SimulatedStorage storage = null;
220  private String storageId;
221 
222  public SimulatedFSDataset(Configuration conf) throws IOException {
223    setConf(conf);
224  }
225 
226  private SimulatedFSDataset() { // real construction when setConf called.. Uggg
227  }
228 
229  public Configuration getConf() {
230    return conf;
231  }
232
233  public void setConf(Configuration iconf)  {
234    conf = iconf;
235    storageId = conf.get("StorageId", "unknownStorageId" +
236                                        new Random().nextInt());
237    registerMBean(storageId);
238    storage = new SimulatedStorage(
239        conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
240    //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() +
241    //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
242
243    blockMap = new HashMap<Block,BInfo>(); 
244  }
245
246  public synchronized void injectBlocks(Block[] injectBlocks)
247                                            throws IOException {
248    if (injectBlocks != null) {
249      for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
250        if (b == null) {
251          throw new NullPointerException("Null blocks in block list");
252        }
253        if (isValidBlock(b)) {
254          throw new IOException("Block already exists in  block list");
255        }
256      }
257      HashMap<Block, BInfo> oldBlockMap = blockMap;
258      blockMap = 
259          new HashMap<Block,BInfo>(injectBlocks.length + oldBlockMap.size());
260      blockMap.putAll(oldBlockMap);
261      for (Block b: injectBlocks) {
262          BInfo binfo = new BInfo(b, false);
263          blockMap.put(b, binfo);
264      }
265    }
266  }
267
268  public synchronized void finalizeBlock(Block b) throws IOException {
269    BInfo binfo = blockMap.get(b);
270    if (binfo == null) {
271      throw new IOException("Finalizing a non existing block " + b);
272    }
273    binfo.finalizeBlock(b.getNumBytes());
274
275  }
276
277  public synchronized void unfinalizeBlock(Block b) throws IOException {
278    if (isBeingWritten(b)) {
279      blockMap.remove(b);
280    }
281  }
282
283  public synchronized Block[] getBlockReport() {
284    Block[] blockTable = new Block[blockMap.size()];
285    int count = 0;
286    for (BInfo b : blockMap.values()) {
287      if (b.isFinalized()) {
288        blockTable[count++] = b.theBlock;
289      }
290    }
291    if (count != blockTable.length) {
292      blockTable = Arrays.copyOf(blockTable, count);
293    }
294    return blockTable;
295  }
296
297  public long getCapacity() throws IOException {
298    return storage.getCapacity();
299  }
300
301  public long getDfsUsed() throws IOException {
302    return storage.getUsed();
303  }
304
305  public long getRemaining() throws IOException {
306    return storage.getFree();
307  }
308
309  public synchronized long getLength(Block b) throws IOException {
310    BInfo binfo = blockMap.get(b);
311    if (binfo == null) {
312      throw new IOException("Finalizing a non existing block " + b);
313    }
314    return binfo.getlength();
315  }
316
317  /** {@inheritDoc} */
318  public Block getStoredBlock(long blkid) throws IOException {
319    Block b = new Block(blkid);
320    BInfo binfo = blockMap.get(b);
321    if (binfo == null) {
322      return null;
323    }
324    b.setGenerationStamp(binfo.getGenerationStamp());
325    b.setNumBytes(binfo.getlength());
326    return b;
327  }
328
329  /** {@inheritDoc} */
330  public void updateBlock(Block oldblock, Block newblock) throws IOException {
331    BInfo binfo = blockMap.get(newblock);
332    if (binfo == null) {
333      throw new IOException("BInfo not found, b=" + newblock);
334    }
335    binfo.updateBlock(newblock);
336  }
337
338  public synchronized void invalidate(Block[] invalidBlks) throws IOException {
339    boolean error = false;
340    if (invalidBlks == null) {
341      return;
342    }
343    for (Block b: invalidBlks) {
344      if (b == null) {
345        continue;
346      }
347      BInfo binfo = blockMap.get(b);
348      if (binfo == null) {
349        error = true;
350        DataNode.LOG.warn("Invalidate: Missing block");
351        continue;
352      }
353      storage.free(binfo.getlength());
354      blockMap.remove(b);
355    }
356      if (error) {
357          throw new IOException("Invalidate: Missing blocks.");
358      }
359  }
360
361  public synchronized boolean isValidBlock(Block b) {
362    // return (blockMap.containsKey(b));
363    BInfo binfo = blockMap.get(b);
364    if (binfo == null) {
365      return false;
366    }
367    return binfo.isFinalized();
368  }
369
370  /* check if a block is created but not finalized */
371  private synchronized boolean isBeingWritten(Block b) {
372    BInfo binfo = blockMap.get(b);
373    if (binfo == null) {
374      return false;
375    }
376    return !binfo.isFinalized(); 
377  }
378 
379  public String toString() {
380    return getStorageInfo();
381  }
382
383  public synchronized BlockWriteStreams writeToBlock(Block b, 
384                                            boolean isRecovery)
385                                            throws IOException {
386    if (isValidBlock(b)) {
387          throw new BlockAlreadyExistsException("Block " + b + 
388              " is valid, and cannot be written to.");
389      }
390    if (isBeingWritten(b)) {
391        throw new BlockAlreadyExistsException("Block " + b + 
392            " is being written, and cannot be written to.");
393    }
394      BInfo binfo = new BInfo(b, true);
395      blockMap.put(b, binfo);
396      SimulatedOutputStream crcStream = new SimulatedOutputStream();
397      return new BlockWriteStreams(binfo.oStream, crcStream);
398  }
399
400  public synchronized InputStream getBlockInputStream(Block b)
401                                            throws IOException {
402    BInfo binfo = blockMap.get(b);
403    if (binfo == null) {
404      throw new IOException("No such Block " + b ); 
405    }
406   
407    //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
408    return binfo.getIStream();
409  }
410 
411  public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
412                              throws IOException {
413    InputStream result = getBlockInputStream(b);
414    result.skip(seekOffset);
415    return result;
416  }
417
418  /** Not supported */
419  public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
420      ) throws IOException {
421    throw new IOException("Not supported");
422  }
423
424  /** No-op */
425  public void validateBlockMetadata(Block b) {
426  }
427
428  /**
429   * Returns metaData of block b as an input stream
430   * @param b - the block for which the metadata is desired
431   * @return metaData of block b as an input stream
432   * @throws IOException - block does not exist or problems accessing
433   *  the meta file
434   */
435  private synchronized InputStream getMetaDataInStream(Block b)
436                                              throws IOException {
437    BInfo binfo = blockMap.get(b);
438    if (binfo == null) {
439      throw new IOException("No such Block " + b ); 
440    }
441    if (!binfo.finalized) {
442      throw new IOException("Block " + b + 
443          " is being written, its meta cannot be read");
444    }
445    return binfo.getMetaIStream();
446  }
447
448  public synchronized long getMetaDataLength(Block b) throws IOException {
449    BInfo binfo = blockMap.get(b);
450    if (binfo == null) {
451      throw new IOException("No such Block " + b ); 
452    }
453    if (!binfo.finalized) {
454      throw new IOException("Block " + b +
455          " is being written, its metalength cannot be read");
456    }
457    return binfo.getMetaIStream().getLength();
458  }
459 
460  public MetaDataInputStream getMetaDataInputStream(Block b)
461  throws IOException {
462
463       return new MetaDataInputStream(getMetaDataInStream(b),
464                                                getMetaDataLength(b));
465  }
466
467  public synchronized boolean metaFileExists(Block b) throws IOException {
468    if (!isValidBlock(b)) {
469          throw new IOException("Block " + b +
470              " is valid, and cannot be written to.");
471      }
472    return true; // crc exists for all valid blocks
473  }
474
475  public void checkDataDir() throws DiskErrorException {
476    // nothing to check for simulated data set
477  }
478
479  public synchronized long getChannelPosition(Block b, 
480                                              BlockWriteStreams stream)
481                                              throws IOException {
482    BInfo binfo = blockMap.get(b);
483    if (binfo == null) {
484      throw new IOException("No such Block " + b );
485    }
486    return binfo.getlength();
487  }
488
489  public synchronized void setChannelPosition(Block b, BlockWriteStreams stream, 
490                                              long dataOffset, long ckOffset)
491                                              throws IOException {
492    BInfo binfo = blockMap.get(b);
493    if (binfo == null) {
494      throw new IOException("No such Block " + b );
495    }
496    binfo.setlength(dataOffset);
497  }
498
499  /**
500   * Simulated input and output streams
501   *
502   */
503  static private class SimulatedInputStream extends java.io.InputStream {
504   
505
506    byte theRepeatedData = 7;
507    long length; // bytes
508    int currentPos = 0;
509    byte[] data = null;
510   
511    /**
512     * An input stream of size l with repeated bytes
513     * @param l
514     * @param iRepeatedData
515     */
516    SimulatedInputStream(long l, byte iRepeatedData) {
517      length = l;
518      theRepeatedData = iRepeatedData;
519    }
520   
521    /**
522     * An input stream of of the supplied data
523     *
524     * @param iData
525     */
526    SimulatedInputStream(byte[] iData) {
527      data = iData;
528      length = data.length;
529     
530    }
531   
532    /**
533     *
534     * @return the lenght of the input stream
535     */
536    long getLength() {
537      return length;
538    }
539
540    @Override
541    public int read() throws IOException {
542      if (currentPos >= length)
543        return -1;
544      if (data !=null) {
545        return data[currentPos++];
546      } else {
547        currentPos++;
548        return theRepeatedData;
549      }
550    }
551   
552    @Override
553    public int read(byte[] b) throws IOException { 
554
555      if (b == null) {
556        throw new NullPointerException();
557      }
558      if (b.length == 0) {
559        return 0;
560      }
561      if (currentPos >= length) { // EOF
562        return -1;
563      }
564      int bytesRead = (int) Math.min(b.length, length-currentPos);
565      if (data != null) {
566        System.arraycopy(data, currentPos, b, 0, bytesRead);
567      } else { // all data is zero
568        for (int i : b) { 
569          b[i] = theRepeatedData;
570        }
571      }
572      currentPos += bytesRead;
573      return bytesRead;
574    }
575  }
576 
577  /**
578   * This class implements an output stream that merely throws its data away, but records its
579   * length.
580   *
581   */
582  static private class SimulatedOutputStream extends OutputStream {
583    long length = 0;
584   
585    /**
586     * constructor for Simulated Output Steram
587     */
588    SimulatedOutputStream() {
589    }
590   
591    /**
592     *
593     * @return the length of the data created so far.
594     */
595    long getLength() {
596      return length;
597    }
598
599    /**
600     */
601    void setLength(long length) {
602      this.length = length;
603    }
604   
605    @Override
606    public void write(int arg0) throws IOException {
607      length++;
608    }
609   
610    @Override
611    public void write(byte[] b) throws IOException {
612      length += b.length;
613    }
614   
615    @Override
616    public void write(byte[] b,
617              int off,
618              int len) throws IOException  {
619      length += len;
620    }
621  }
622 
623  private ObjectName mbeanName;
624
625
626 
627  /**
628   * Register the FSDataset MBean using the name
629   *        "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
630   *  We use storage id for MBean name since a minicluster within a single
631   * Java VM may have multiple Simulated Datanodes.
632   */
633  void registerMBean(final String storageId) {
634    // We wrap to bypass standard mbean naming convetion.
635    // This wraping can be removed in java 6 as it is more flexible in
636    // package naming for mbeans and their impl.
637    StandardMBean bean;
638
639    try {
640      bean = new StandardMBean(this,FSDatasetMBean.class);
641      mbeanName = MBeanUtil.registerMBean("DataNode",
642          "FSDatasetState-" + storageId, bean);
643    } catch (NotCompliantMBeanException e) {
644      e.printStackTrace();
645    }
646 
647    DataNode.LOG.info("Registered FSDatasetStatusMBean");
648  }
649
650  public void shutdown() {
651    if (mbeanName != null)
652      MBeanUtil.unregisterMBean(mbeanName);
653  }
654
655  public String getStorageInfo() {
656    return "Simulated FSDataset-" + storageId;
657  }
658}
Note: See TracBrowser for help on using the repository browser.