1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.hdfs.server.datanode; |
---|
19 | |
---|
20 | import java.io.IOException; |
---|
21 | import java.io.InputStream; |
---|
22 | import java.io.OutputStream; |
---|
23 | import java.util.Arrays; |
---|
24 | import java.util.HashMap; |
---|
25 | import java.util.Random; |
---|
26 | |
---|
27 | import javax.management.NotCompliantMBeanException; |
---|
28 | import javax.management.ObjectName; |
---|
29 | import javax.management.StandardMBean; |
---|
30 | |
---|
31 | import org.apache.hadoop.conf.Configurable; |
---|
32 | import org.apache.hadoop.conf.Configuration; |
---|
33 | import org.apache.hadoop.hdfs.protocol.Block; |
---|
34 | import org.apache.hadoop.hdfs.protocol.FSConstants; |
---|
35 | import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; |
---|
36 | import org.apache.hadoop.metrics.util.MBeanUtil; |
---|
37 | import org.apache.hadoop.util.DataChecksum; |
---|
38 | import org.apache.hadoop.util.DiskChecker.DiskErrorException; |
---|
39 | |
---|
40 | /** |
---|
41 | * This class implements a simulated FSDataset. |
---|
42 | * |
---|
43 | * Blocks that are created are recorded but their data (plus their CRCs) are |
---|
44 | * discarded. |
---|
45 | * Fixed data is returned when blocks are read; a null CRC meta file is |
---|
46 | * created for such data. |
---|
47 | * |
---|
48 | * This FSDataset does not remember any block information across its |
---|
49 | * restarts; it does however offer an operation to inject blocks |
---|
50 | * (See the TestInectionForSImulatedStorage() |
---|
51 | * for a usage example of injection. |
---|
52 | * |
---|
53 | * Note the synchronization is coarse grained - it is at each method. |
---|
54 | */ |
---|
55 | |
---|
56 | public class SimulatedFSDataset implements FSConstants, FSDatasetInterface, Configurable{ |
---|
57 | |
---|
58 | public static final String CONFIG_PROPERTY_SIMULATED = |
---|
59 | "dfs.datanode.simulateddatastorage"; |
---|
60 | public static final String CONFIG_PROPERTY_CAPACITY = |
---|
61 | "dfs.datanode.simulateddatastorage.capacity"; |
---|
62 | |
---|
63 | public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte |
---|
64 | public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte |
---|
65 | byte simulatedDataByte = DEFAULT_DATABYTE; |
---|
66 | Configuration conf = null; |
---|
67 | |
---|
68 | static byte[] nullCrcFileData; |
---|
69 | { |
---|
70 | DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum. |
---|
71 | CHECKSUM_NULL, 16*1024 ); |
---|
72 | byte[] nullCrcHeader = checksum.getHeader(); |
---|
73 | nullCrcFileData = new byte[2 + nullCrcHeader.length]; |
---|
74 | nullCrcFileData[0] = (byte) ((FSDataset.METADATA_VERSION >>> 8) & 0xff); |
---|
75 | nullCrcFileData[1] = (byte) (FSDataset.METADATA_VERSION & 0xff); |
---|
76 | for (int i = 0; i < nullCrcHeader.length; i++) { |
---|
77 | nullCrcFileData[i+2] = nullCrcHeader[i]; |
---|
78 | } |
---|
79 | } |
---|
80 | |
---|
81 | private class BInfo { // information about a single block |
---|
82 | Block theBlock; |
---|
83 | private boolean finalized = false; // if not finalized => ongoing creation |
---|
84 | SimulatedOutputStream oStream = null; |
---|
85 | BInfo(Block b, boolean forWriting) throws IOException { |
---|
86 | theBlock = new Block(b); |
---|
87 | if (theBlock.getNumBytes() < 0) { |
---|
88 | theBlock.setNumBytes(0); |
---|
89 | } |
---|
90 | if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may |
---|
91 | // be more - we find out at finalize |
---|
92 | DataNode.LOG.warn("Lack of free storage on a block alloc"); |
---|
93 | throw new IOException("Creating block, no free space available"); |
---|
94 | } |
---|
95 | |
---|
96 | if (forWriting) { |
---|
97 | finalized = false; |
---|
98 | oStream = new SimulatedOutputStream(); |
---|
99 | } else { |
---|
100 | finalized = true; |
---|
101 | oStream = null; |
---|
102 | } |
---|
103 | } |
---|
104 | |
---|
105 | synchronized long getGenerationStamp() { |
---|
106 | return theBlock.getGenerationStamp(); |
---|
107 | } |
---|
108 | |
---|
109 | synchronized void updateBlock(Block b) { |
---|
110 | theBlock.setGenerationStamp(b.getGenerationStamp()); |
---|
111 | setlength(b.getNumBytes()); |
---|
112 | } |
---|
113 | |
---|
114 | synchronized long getlength() { |
---|
115 | if (!finalized) { |
---|
116 | return oStream.getLength(); |
---|
117 | } else { |
---|
118 | return theBlock.getNumBytes(); |
---|
119 | } |
---|
120 | } |
---|
121 | |
---|
122 | synchronized void setlength(long length) { |
---|
123 | if (!finalized) { |
---|
124 | oStream.setLength(length); |
---|
125 | } else { |
---|
126 | theBlock.setNumBytes(length); |
---|
127 | } |
---|
128 | } |
---|
129 | |
---|
130 | synchronized SimulatedInputStream getIStream() throws IOException { |
---|
131 | if (!finalized) { |
---|
132 | // throw new IOException("Trying to read an unfinalized block"); |
---|
133 | return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE); |
---|
134 | } else { |
---|
135 | return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE); |
---|
136 | } |
---|
137 | } |
---|
138 | |
---|
139 | synchronized void finalizeBlock(long finalSize) throws IOException { |
---|
140 | if (finalized) { |
---|
141 | throw new IOException( |
---|
142 | "Finalizing a block that has already been finalized" + |
---|
143 | theBlock.getBlockId()); |
---|
144 | } |
---|
145 | if (oStream == null) { |
---|
146 | DataNode.LOG.error("Null oStream on unfinalized block - bug"); |
---|
147 | throw new IOException("Unexpected error on finalize"); |
---|
148 | } |
---|
149 | |
---|
150 | if (oStream.getLength() != finalSize) { |
---|
151 | DataNode.LOG.warn("Size passed to finalize (" + finalSize + |
---|
152 | ")does not match what was written:" + oStream.getLength()); |
---|
153 | throw new IOException( |
---|
154 | "Size passed to finalize does not match the amount of data written"); |
---|
155 | } |
---|
156 | // We had allocated the expected length when block was created; |
---|
157 | // adjust if necessary |
---|
158 | long extraLen = finalSize - theBlock.getNumBytes(); |
---|
159 | if (extraLen > 0) { |
---|
160 | if (!storage.alloc(extraLen)) { |
---|
161 | DataNode.LOG.warn("Lack of free storage on a block alloc"); |
---|
162 | throw new IOException("Creating block, no free space available"); |
---|
163 | } |
---|
164 | } else { |
---|
165 | storage.free(-extraLen); |
---|
166 | } |
---|
167 | theBlock.setNumBytes(finalSize); |
---|
168 | |
---|
169 | finalized = true; |
---|
170 | oStream = null; |
---|
171 | return; |
---|
172 | } |
---|
173 | |
---|
174 | SimulatedInputStream getMetaIStream() { |
---|
175 | return new SimulatedInputStream(nullCrcFileData); |
---|
176 | } |
---|
177 | |
---|
178 | synchronized boolean isFinalized() { |
---|
179 | return finalized; |
---|
180 | } |
---|
181 | } |
---|
182 | |
---|
183 | static private class SimulatedStorage { |
---|
184 | private long capacity; // in bytes |
---|
185 | private long used; // in bytes |
---|
186 | |
---|
187 | synchronized long getFree() { |
---|
188 | return capacity - used; |
---|
189 | } |
---|
190 | |
---|
191 | synchronized long getCapacity() { |
---|
192 | return capacity; |
---|
193 | } |
---|
194 | |
---|
195 | synchronized long getUsed() { |
---|
196 | return used; |
---|
197 | } |
---|
198 | |
---|
199 | synchronized boolean alloc(long amount) { |
---|
200 | if (getFree() >= amount) { |
---|
201 | used += amount; |
---|
202 | return true; |
---|
203 | } else { |
---|
204 | return false; |
---|
205 | } |
---|
206 | } |
---|
207 | |
---|
208 | synchronized void free(long amount) { |
---|
209 | used -= amount; |
---|
210 | } |
---|
211 | |
---|
212 | SimulatedStorage(long cap) { |
---|
213 | capacity = cap; |
---|
214 | used = 0; |
---|
215 | } |
---|
216 | } |
---|
217 | |
---|
218 | private HashMap<Block, BInfo> blockMap = null; |
---|
219 | private SimulatedStorage storage = null; |
---|
220 | private String storageId; |
---|
221 | |
---|
222 | public SimulatedFSDataset(Configuration conf) throws IOException { |
---|
223 | setConf(conf); |
---|
224 | } |
---|
225 | |
---|
226 | private SimulatedFSDataset() { // real construction when setConf called.. Uggg |
---|
227 | } |
---|
228 | |
---|
229 | public Configuration getConf() { |
---|
230 | return conf; |
---|
231 | } |
---|
232 | |
---|
233 | public void setConf(Configuration iconf) { |
---|
234 | conf = iconf; |
---|
235 | storageId = conf.get("StorageId", "unknownStorageId" + |
---|
236 | new Random().nextInt()); |
---|
237 | registerMBean(storageId); |
---|
238 | storage = new SimulatedStorage( |
---|
239 | conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); |
---|
240 | //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + |
---|
241 | // "Used = " + getDfsUsed() + "Free =" + getRemaining()); |
---|
242 | |
---|
243 | blockMap = new HashMap<Block,BInfo>(); |
---|
244 | } |
---|
245 | |
---|
246 | public synchronized void injectBlocks(Block[] injectBlocks) |
---|
247 | throws IOException { |
---|
248 | if (injectBlocks != null) { |
---|
249 | for (Block b: injectBlocks) { // if any blocks in list is bad, reject list |
---|
250 | if (b == null) { |
---|
251 | throw new NullPointerException("Null blocks in block list"); |
---|
252 | } |
---|
253 | if (isValidBlock(b)) { |
---|
254 | throw new IOException("Block already exists in block list"); |
---|
255 | } |
---|
256 | } |
---|
257 | HashMap<Block, BInfo> oldBlockMap = blockMap; |
---|
258 | blockMap = |
---|
259 | new HashMap<Block,BInfo>(injectBlocks.length + oldBlockMap.size()); |
---|
260 | blockMap.putAll(oldBlockMap); |
---|
261 | for (Block b: injectBlocks) { |
---|
262 | BInfo binfo = new BInfo(b, false); |
---|
263 | blockMap.put(b, binfo); |
---|
264 | } |
---|
265 | } |
---|
266 | } |
---|
267 | |
---|
268 | public synchronized void finalizeBlock(Block b) throws IOException { |
---|
269 | BInfo binfo = blockMap.get(b); |
---|
270 | if (binfo == null) { |
---|
271 | throw new IOException("Finalizing a non existing block " + b); |
---|
272 | } |
---|
273 | binfo.finalizeBlock(b.getNumBytes()); |
---|
274 | |
---|
275 | } |
---|
276 | |
---|
277 | public synchronized void unfinalizeBlock(Block b) throws IOException { |
---|
278 | if (isBeingWritten(b)) { |
---|
279 | blockMap.remove(b); |
---|
280 | } |
---|
281 | } |
---|
282 | |
---|
283 | public synchronized Block[] getBlockReport() { |
---|
284 | Block[] blockTable = new Block[blockMap.size()]; |
---|
285 | int count = 0; |
---|
286 | for (BInfo b : blockMap.values()) { |
---|
287 | if (b.isFinalized()) { |
---|
288 | blockTable[count++] = b.theBlock; |
---|
289 | } |
---|
290 | } |
---|
291 | if (count != blockTable.length) { |
---|
292 | blockTable = Arrays.copyOf(blockTable, count); |
---|
293 | } |
---|
294 | return blockTable; |
---|
295 | } |
---|
296 | |
---|
297 | public long getCapacity() throws IOException { |
---|
298 | return storage.getCapacity(); |
---|
299 | } |
---|
300 | |
---|
301 | public long getDfsUsed() throws IOException { |
---|
302 | return storage.getUsed(); |
---|
303 | } |
---|
304 | |
---|
305 | public long getRemaining() throws IOException { |
---|
306 | return storage.getFree(); |
---|
307 | } |
---|
308 | |
---|
309 | public synchronized long getLength(Block b) throws IOException { |
---|
310 | BInfo binfo = blockMap.get(b); |
---|
311 | if (binfo == null) { |
---|
312 | throw new IOException("Finalizing a non existing block " + b); |
---|
313 | } |
---|
314 | return binfo.getlength(); |
---|
315 | } |
---|
316 | |
---|
317 | /** {@inheritDoc} */ |
---|
318 | public Block getStoredBlock(long blkid) throws IOException { |
---|
319 | Block b = new Block(blkid); |
---|
320 | BInfo binfo = blockMap.get(b); |
---|
321 | if (binfo == null) { |
---|
322 | return null; |
---|
323 | } |
---|
324 | b.setGenerationStamp(binfo.getGenerationStamp()); |
---|
325 | b.setNumBytes(binfo.getlength()); |
---|
326 | return b; |
---|
327 | } |
---|
328 | |
---|
329 | /** {@inheritDoc} */ |
---|
330 | public void updateBlock(Block oldblock, Block newblock) throws IOException { |
---|
331 | BInfo binfo = blockMap.get(newblock); |
---|
332 | if (binfo == null) { |
---|
333 | throw new IOException("BInfo not found, b=" + newblock); |
---|
334 | } |
---|
335 | binfo.updateBlock(newblock); |
---|
336 | } |
---|
337 | |
---|
338 | public synchronized void invalidate(Block[] invalidBlks) throws IOException { |
---|
339 | boolean error = false; |
---|
340 | if (invalidBlks == null) { |
---|
341 | return; |
---|
342 | } |
---|
343 | for (Block b: invalidBlks) { |
---|
344 | if (b == null) { |
---|
345 | continue; |
---|
346 | } |
---|
347 | BInfo binfo = blockMap.get(b); |
---|
348 | if (binfo == null) { |
---|
349 | error = true; |
---|
350 | DataNode.LOG.warn("Invalidate: Missing block"); |
---|
351 | continue; |
---|
352 | } |
---|
353 | storage.free(binfo.getlength()); |
---|
354 | blockMap.remove(b); |
---|
355 | } |
---|
356 | if (error) { |
---|
357 | throw new IOException("Invalidate: Missing blocks."); |
---|
358 | } |
---|
359 | } |
---|
360 | |
---|
361 | public synchronized boolean isValidBlock(Block b) { |
---|
362 | // return (blockMap.containsKey(b)); |
---|
363 | BInfo binfo = blockMap.get(b); |
---|
364 | if (binfo == null) { |
---|
365 | return false; |
---|
366 | } |
---|
367 | return binfo.isFinalized(); |
---|
368 | } |
---|
369 | |
---|
370 | /* check if a block is created but not finalized */ |
---|
371 | private synchronized boolean isBeingWritten(Block b) { |
---|
372 | BInfo binfo = blockMap.get(b); |
---|
373 | if (binfo == null) { |
---|
374 | return false; |
---|
375 | } |
---|
376 | return !binfo.isFinalized(); |
---|
377 | } |
---|
378 | |
---|
379 | public String toString() { |
---|
380 | return getStorageInfo(); |
---|
381 | } |
---|
382 | |
---|
383 | public synchronized BlockWriteStreams writeToBlock(Block b, |
---|
384 | boolean isRecovery) |
---|
385 | throws IOException { |
---|
386 | if (isValidBlock(b)) { |
---|
387 | throw new BlockAlreadyExistsException("Block " + b + |
---|
388 | " is valid, and cannot be written to."); |
---|
389 | } |
---|
390 | if (isBeingWritten(b)) { |
---|
391 | throw new BlockAlreadyExistsException("Block " + b + |
---|
392 | " is being written, and cannot be written to."); |
---|
393 | } |
---|
394 | BInfo binfo = new BInfo(b, true); |
---|
395 | blockMap.put(b, binfo); |
---|
396 | SimulatedOutputStream crcStream = new SimulatedOutputStream(); |
---|
397 | return new BlockWriteStreams(binfo.oStream, crcStream); |
---|
398 | } |
---|
399 | |
---|
400 | public synchronized InputStream getBlockInputStream(Block b) |
---|
401 | throws IOException { |
---|
402 | BInfo binfo = blockMap.get(b); |
---|
403 | if (binfo == null) { |
---|
404 | throw new IOException("No such Block " + b ); |
---|
405 | } |
---|
406 | |
---|
407 | //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len); |
---|
408 | return binfo.getIStream(); |
---|
409 | } |
---|
410 | |
---|
411 | public synchronized InputStream getBlockInputStream(Block b, long seekOffset) |
---|
412 | throws IOException { |
---|
413 | InputStream result = getBlockInputStream(b); |
---|
414 | result.skip(seekOffset); |
---|
415 | return result; |
---|
416 | } |
---|
417 | |
---|
418 | /** Not supported */ |
---|
419 | public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff |
---|
420 | ) throws IOException { |
---|
421 | throw new IOException("Not supported"); |
---|
422 | } |
---|
423 | |
---|
424 | /** No-op */ |
---|
425 | public void validateBlockMetadata(Block b) { |
---|
426 | } |
---|
427 | |
---|
428 | /** |
---|
429 | * Returns metaData of block b as an input stream |
---|
430 | * @param b - the block for which the metadata is desired |
---|
431 | * @return metaData of block b as an input stream |
---|
432 | * @throws IOException - block does not exist or problems accessing |
---|
433 | * the meta file |
---|
434 | */ |
---|
435 | private synchronized InputStream getMetaDataInStream(Block b) |
---|
436 | throws IOException { |
---|
437 | BInfo binfo = blockMap.get(b); |
---|
438 | if (binfo == null) { |
---|
439 | throw new IOException("No such Block " + b ); |
---|
440 | } |
---|
441 | if (!binfo.finalized) { |
---|
442 | throw new IOException("Block " + b + |
---|
443 | " is being written, its meta cannot be read"); |
---|
444 | } |
---|
445 | return binfo.getMetaIStream(); |
---|
446 | } |
---|
447 | |
---|
448 | public synchronized long getMetaDataLength(Block b) throws IOException { |
---|
449 | BInfo binfo = blockMap.get(b); |
---|
450 | if (binfo == null) { |
---|
451 | throw new IOException("No such Block " + b ); |
---|
452 | } |
---|
453 | if (!binfo.finalized) { |
---|
454 | throw new IOException("Block " + b + |
---|
455 | " is being written, its metalength cannot be read"); |
---|
456 | } |
---|
457 | return binfo.getMetaIStream().getLength(); |
---|
458 | } |
---|
459 | |
---|
460 | public MetaDataInputStream getMetaDataInputStream(Block b) |
---|
461 | throws IOException { |
---|
462 | |
---|
463 | return new MetaDataInputStream(getMetaDataInStream(b), |
---|
464 | getMetaDataLength(b)); |
---|
465 | } |
---|
466 | |
---|
467 | public synchronized boolean metaFileExists(Block b) throws IOException { |
---|
468 | if (!isValidBlock(b)) { |
---|
469 | throw new IOException("Block " + b + |
---|
470 | " is valid, and cannot be written to."); |
---|
471 | } |
---|
472 | return true; // crc exists for all valid blocks |
---|
473 | } |
---|
474 | |
---|
475 | public void checkDataDir() throws DiskErrorException { |
---|
476 | // nothing to check for simulated data set |
---|
477 | } |
---|
478 | |
---|
479 | public synchronized long getChannelPosition(Block b, |
---|
480 | BlockWriteStreams stream) |
---|
481 | throws IOException { |
---|
482 | BInfo binfo = blockMap.get(b); |
---|
483 | if (binfo == null) { |
---|
484 | throw new IOException("No such Block " + b ); |
---|
485 | } |
---|
486 | return binfo.getlength(); |
---|
487 | } |
---|
488 | |
---|
489 | public synchronized void setChannelPosition(Block b, BlockWriteStreams stream, |
---|
490 | long dataOffset, long ckOffset) |
---|
491 | throws IOException { |
---|
492 | BInfo binfo = blockMap.get(b); |
---|
493 | if (binfo == null) { |
---|
494 | throw new IOException("No such Block " + b ); |
---|
495 | } |
---|
496 | binfo.setlength(dataOffset); |
---|
497 | } |
---|
498 | |
---|
499 | /** |
---|
500 | * Simulated input and output streams |
---|
501 | * |
---|
502 | */ |
---|
503 | static private class SimulatedInputStream extends java.io.InputStream { |
---|
504 | |
---|
505 | |
---|
506 | byte theRepeatedData = 7; |
---|
507 | long length; // bytes |
---|
508 | int currentPos = 0; |
---|
509 | byte[] data = null; |
---|
510 | |
---|
511 | /** |
---|
512 | * An input stream of size l with repeated bytes |
---|
513 | * @param l |
---|
514 | * @param iRepeatedData |
---|
515 | */ |
---|
516 | SimulatedInputStream(long l, byte iRepeatedData) { |
---|
517 | length = l; |
---|
518 | theRepeatedData = iRepeatedData; |
---|
519 | } |
---|
520 | |
---|
521 | /** |
---|
522 | * An input stream of of the supplied data |
---|
523 | * |
---|
524 | * @param iData |
---|
525 | */ |
---|
526 | SimulatedInputStream(byte[] iData) { |
---|
527 | data = iData; |
---|
528 | length = data.length; |
---|
529 | |
---|
530 | } |
---|
531 | |
---|
532 | /** |
---|
533 | * |
---|
534 | * @return the lenght of the input stream |
---|
535 | */ |
---|
536 | long getLength() { |
---|
537 | return length; |
---|
538 | } |
---|
539 | |
---|
540 | @Override |
---|
541 | public int read() throws IOException { |
---|
542 | if (currentPos >= length) |
---|
543 | return -1; |
---|
544 | if (data !=null) { |
---|
545 | return data[currentPos++]; |
---|
546 | } else { |
---|
547 | currentPos++; |
---|
548 | return theRepeatedData; |
---|
549 | } |
---|
550 | } |
---|
551 | |
---|
552 | @Override |
---|
553 | public int read(byte[] b) throws IOException { |
---|
554 | |
---|
555 | if (b == null) { |
---|
556 | throw new NullPointerException(); |
---|
557 | } |
---|
558 | if (b.length == 0) { |
---|
559 | return 0; |
---|
560 | } |
---|
561 | if (currentPos >= length) { // EOF |
---|
562 | return -1; |
---|
563 | } |
---|
564 | int bytesRead = (int) Math.min(b.length, length-currentPos); |
---|
565 | if (data != null) { |
---|
566 | System.arraycopy(data, currentPos, b, 0, bytesRead); |
---|
567 | } else { // all data is zero |
---|
568 | for (int i : b) { |
---|
569 | b[i] = theRepeatedData; |
---|
570 | } |
---|
571 | } |
---|
572 | currentPos += bytesRead; |
---|
573 | return bytesRead; |
---|
574 | } |
---|
575 | } |
---|
576 | |
---|
577 | /** |
---|
578 | * This class implements an output stream that merely throws its data away, but records its |
---|
579 | * length. |
---|
580 | * |
---|
581 | */ |
---|
582 | static private class SimulatedOutputStream extends OutputStream { |
---|
583 | long length = 0; |
---|
584 | |
---|
585 | /** |
---|
586 | * constructor for Simulated Output Steram |
---|
587 | */ |
---|
588 | SimulatedOutputStream() { |
---|
589 | } |
---|
590 | |
---|
591 | /** |
---|
592 | * |
---|
593 | * @return the length of the data created so far. |
---|
594 | */ |
---|
595 | long getLength() { |
---|
596 | return length; |
---|
597 | } |
---|
598 | |
---|
599 | /** |
---|
600 | */ |
---|
601 | void setLength(long length) { |
---|
602 | this.length = length; |
---|
603 | } |
---|
604 | |
---|
605 | @Override |
---|
606 | public void write(int arg0) throws IOException { |
---|
607 | length++; |
---|
608 | } |
---|
609 | |
---|
610 | @Override |
---|
611 | public void write(byte[] b) throws IOException { |
---|
612 | length += b.length; |
---|
613 | } |
---|
614 | |
---|
615 | @Override |
---|
616 | public void write(byte[] b, |
---|
617 | int off, |
---|
618 | int len) throws IOException { |
---|
619 | length += len; |
---|
620 | } |
---|
621 | } |
---|
622 | |
---|
623 | private ObjectName mbeanName; |
---|
624 | |
---|
625 | |
---|
626 | |
---|
627 | /** |
---|
628 | * Register the FSDataset MBean using the name |
---|
629 | * "hadoop:service=DataNode,name=FSDatasetState-<storageid>" |
---|
630 | * We use storage id for MBean name since a minicluster within a single |
---|
631 | * Java VM may have multiple Simulated Datanodes. |
---|
632 | */ |
---|
633 | void registerMBean(final String storageId) { |
---|
634 | // We wrap to bypass standard mbean naming convetion. |
---|
635 | // This wraping can be removed in java 6 as it is more flexible in |
---|
636 | // package naming for mbeans and their impl. |
---|
637 | StandardMBean bean; |
---|
638 | |
---|
639 | try { |
---|
640 | bean = new StandardMBean(this,FSDatasetMBean.class); |
---|
641 | mbeanName = MBeanUtil.registerMBean("DataNode", |
---|
642 | "FSDatasetState-" + storageId, bean); |
---|
643 | } catch (NotCompliantMBeanException e) { |
---|
644 | e.printStackTrace(); |
---|
645 | } |
---|
646 | |
---|
647 | DataNode.LOG.info("Registered FSDatasetStatusMBean"); |
---|
648 | } |
---|
649 | |
---|
650 | public void shutdown() { |
---|
651 | if (mbeanName != null) |
---|
652 | MBeanUtil.unregisterMBean(mbeanName); |
---|
653 | } |
---|
654 | |
---|
655 | public String getStorageInfo() { |
---|
656 | return "Simulated FSDataset-" + storageId; |
---|
657 | } |
---|
658 | } |
---|