[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one or more |
---|
| 3 | * contributor license agreements. See the NOTICE file distributed with this |
---|
| 4 | * work for additional information regarding copyright ownership. The ASF |
---|
| 5 | * licenses this file to you under the Apache License, Version 2.0 (the |
---|
| 6 | * "License"); you may not use this file except in compliance with the License. |
---|
| 7 | * You may obtain a copy of the License at |
---|
| 8 | * |
---|
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 10 | * |
---|
| 11 | * Unless required by applicable law or agreed to in writing, software |
---|
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
---|
| 13 | * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
---|
| 14 | * License for the specific language governing permissions and limitations under |
---|
| 15 | * the License. |
---|
| 16 | */ |
---|
| 17 | |
---|
| 18 | package org.apache.hadoop.io.file.tfile; |
---|
| 19 | |
---|
| 20 | import java.io.DataInputStream; |
---|
| 21 | import java.io.DataOutputStream; |
---|
| 22 | import java.io.EOFException; |
---|
| 23 | import java.io.IOException; |
---|
| 24 | import java.util.Random; |
---|
| 25 | |
---|
| 26 | import junit.framework.Assert; |
---|
| 27 | import junit.framework.TestCase; |
---|
| 28 | |
---|
| 29 | import org.apache.hadoop.conf.Configuration; |
---|
| 30 | import org.apache.hadoop.fs.FSDataOutputStream; |
---|
| 31 | import org.apache.hadoop.fs.FileSystem; |
---|
| 32 | import org.apache.hadoop.fs.Path; |
---|
| 33 | import org.apache.hadoop.io.WritableUtils; |
---|
| 34 | import org.apache.hadoop.io.file.tfile.TFile.Reader; |
---|
| 35 | import org.apache.hadoop.io.file.tfile.TFile.Writer; |
---|
| 36 | import org.apache.hadoop.io.file.tfile.TFile.Reader.Location; |
---|
| 37 | import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner; |
---|
| 38 | |
---|
| 39 | /** |
---|
| 40 | * |
---|
| 41 | * Byte arrays test case class using GZ compression codec, base class of none |
---|
| 42 | * and LZO compression classes. |
---|
| 43 | * |
---|
| 44 | */ |
---|
| 45 | public class TestTFileByteArrays extends TestCase { |
---|
| 46 | private static String ROOT = |
---|
| 47 | System.getProperty("test.build.data", "/tmp/tfile-test"); |
---|
| 48 | private final static int BLOCK_SIZE = 512; |
---|
| 49 | private final static int BUF_SIZE = 64; |
---|
| 50 | private final static int K = 1024; |
---|
| 51 | protected boolean skip = false; |
---|
| 52 | |
---|
| 53 | private static final String KEY = "key"; |
---|
| 54 | private static final String VALUE = "value"; |
---|
| 55 | |
---|
| 56 | private FileSystem fs; |
---|
| 57 | private Configuration conf; |
---|
| 58 | private Path path; |
---|
| 59 | private FSDataOutputStream out; |
---|
| 60 | private Writer writer; |
---|
| 61 | |
---|
| 62 | private String compression = Compression.Algorithm.GZ.getName(); |
---|
| 63 | private String comparator = "memcmp"; |
---|
| 64 | private String outputFile = "TFileTestByteArrays"; |
---|
| 65 | /* |
---|
| 66 | * pre-sampled numbers of records in one block, based on the given the |
---|
| 67 | * generated key and value strings |
---|
| 68 | */ |
---|
| 69 | // private int records1stBlock = 4314; |
---|
| 70 | // private int records2ndBlock = 4108; |
---|
| 71 | private int records1stBlock = 4480; |
---|
| 72 | private int records2ndBlock = 4263; |
---|
| 73 | |
---|
| 74 | public void init(String compression, String comparator, String outputFile, |
---|
| 75 | int numRecords1stBlock, int numRecords2ndBlock) { |
---|
| 76 | this.compression = compression; |
---|
| 77 | this.comparator = comparator; |
---|
| 78 | this.outputFile = outputFile; |
---|
| 79 | this.records1stBlock = numRecords1stBlock; |
---|
| 80 | this.records2ndBlock = numRecords2ndBlock; |
---|
| 81 | } |
---|
| 82 | |
---|
| 83 | @Override |
---|
| 84 | public void setUp() throws IOException { |
---|
| 85 | conf = new Configuration(); |
---|
| 86 | path = new Path(ROOT, outputFile); |
---|
| 87 | fs = path.getFileSystem(conf); |
---|
| 88 | out = fs.create(path); |
---|
| 89 | writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf); |
---|
| 90 | } |
---|
| 91 | |
---|
| 92 | @Override |
---|
| 93 | public void tearDown() throws IOException { |
---|
| 94 | if (!skip) |
---|
| 95 | fs.delete(path, true); |
---|
| 96 | } |
---|
| 97 | |
---|
| 98 | public void testNoDataEntry() throws IOException { |
---|
| 99 | if (skip) |
---|
| 100 | return; |
---|
| 101 | closeOutput(); |
---|
| 102 | |
---|
| 103 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 104 | Assert.assertTrue(reader.isSorted()); |
---|
| 105 | Scanner scanner = reader.createScanner(); |
---|
| 106 | Assert.assertTrue(scanner.atEnd()); |
---|
| 107 | scanner.close(); |
---|
| 108 | reader.close(); |
---|
| 109 | } |
---|
| 110 | |
---|
| 111 | public void testOneDataEntry() throws IOException { |
---|
| 112 | if (skip) |
---|
| 113 | return; |
---|
| 114 | writeRecords(1); |
---|
| 115 | readRecords(1); |
---|
| 116 | |
---|
| 117 | checkBlockIndex(1, 0, 0); |
---|
| 118 | readValueBeforeKey(1, 0); |
---|
| 119 | readKeyWithoutValue(1, 0); |
---|
| 120 | readValueWithoutKey(1, 0); |
---|
| 121 | readKeyManyTimes(1, 0); |
---|
| 122 | } |
---|
| 123 | |
---|
| 124 | public void testTwoDataEntries() throws IOException { |
---|
| 125 | if (skip) |
---|
| 126 | return; |
---|
| 127 | writeRecords(2); |
---|
| 128 | readRecords(2); |
---|
| 129 | } |
---|
| 130 | |
---|
| 131 | /** |
---|
| 132 | * Fill up exactly one block. |
---|
| 133 | * |
---|
| 134 | * @throws IOException |
---|
| 135 | */ |
---|
| 136 | public void testOneBlock() throws IOException { |
---|
| 137 | if (skip) |
---|
| 138 | return; |
---|
| 139 | // just under one block |
---|
| 140 | writeRecords(records1stBlock); |
---|
| 141 | readRecords(records1stBlock); |
---|
| 142 | // last key should be in the first block (block 0) |
---|
| 143 | checkBlockIndex(records1stBlock, records1stBlock - 1, 0); |
---|
| 144 | } |
---|
| 145 | |
---|
| 146 | /** |
---|
| 147 | * One block plus one record. |
---|
| 148 | * |
---|
| 149 | * @throws IOException |
---|
| 150 | */ |
---|
| 151 | public void testOneBlockPlusOneEntry() throws IOException { |
---|
| 152 | if (skip) |
---|
| 153 | return; |
---|
| 154 | writeRecords(records1stBlock + 1); |
---|
| 155 | readRecords(records1stBlock + 1); |
---|
| 156 | checkBlockIndex(records1stBlock + 1, records1stBlock - 1, 0); |
---|
| 157 | checkBlockIndex(records1stBlock + 1, records1stBlock, 1); |
---|
| 158 | } |
---|
| 159 | |
---|
| 160 | public void testTwoBlocks() throws IOException { |
---|
| 161 | if (skip) |
---|
| 162 | return; |
---|
| 163 | writeRecords(records1stBlock + 5); |
---|
| 164 | readRecords(records1stBlock + 5); |
---|
| 165 | checkBlockIndex(records1stBlock + 5, records1stBlock + 4, 1); |
---|
| 166 | } |
---|
| 167 | |
---|
| 168 | public void testThreeBlocks() throws IOException { |
---|
| 169 | if (skip) |
---|
| 170 | return; |
---|
| 171 | writeRecords(2 * records1stBlock + 5); |
---|
| 172 | readRecords(2 * records1stBlock + 5); |
---|
| 173 | |
---|
| 174 | checkBlockIndex(2 * records1stBlock + 5, 2 * records1stBlock + 4, 2); |
---|
| 175 | // 1st key in file |
---|
| 176 | readValueBeforeKey(2 * records1stBlock + 5, 0); |
---|
| 177 | readKeyWithoutValue(2 * records1stBlock + 5, 0); |
---|
| 178 | readValueWithoutKey(2 * records1stBlock + 5, 0); |
---|
| 179 | readKeyManyTimes(2 * records1stBlock + 5, 0); |
---|
| 180 | // last key in file |
---|
| 181 | readValueBeforeKey(2 * records1stBlock + 5, 2 * records1stBlock + 4); |
---|
| 182 | readKeyWithoutValue(2 * records1stBlock + 5, 2 * records1stBlock + 4); |
---|
| 183 | readValueWithoutKey(2 * records1stBlock + 5, 2 * records1stBlock + 4); |
---|
| 184 | readKeyManyTimes(2 * records1stBlock + 5, 2 * records1stBlock + 4); |
---|
| 185 | |
---|
| 186 | // 1st key in mid block, verify block indexes then read |
---|
| 187 | checkBlockIndex(2 * records1stBlock + 5, records1stBlock - 1, 0); |
---|
| 188 | checkBlockIndex(2 * records1stBlock + 5, records1stBlock, 1); |
---|
| 189 | readValueBeforeKey(2 * records1stBlock + 5, records1stBlock); |
---|
| 190 | readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock); |
---|
| 191 | readValueWithoutKey(2 * records1stBlock + 5, records1stBlock); |
---|
| 192 | readKeyManyTimes(2 * records1stBlock + 5, records1stBlock); |
---|
| 193 | |
---|
| 194 | // last key in mid block, verify block indexes then read |
---|
| 195 | checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock |
---|
| 196 | - 1, 1); |
---|
| 197 | checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock, |
---|
| 198 | 2); |
---|
| 199 | readValueBeforeKey(2 * records1stBlock + 5, records1stBlock |
---|
| 200 | + records2ndBlock - 1); |
---|
| 201 | readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock |
---|
| 202 | + records2ndBlock - 1); |
---|
| 203 | readValueWithoutKey(2 * records1stBlock + 5, records1stBlock |
---|
| 204 | + records2ndBlock - 1); |
---|
| 205 | readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + records2ndBlock |
---|
| 206 | - 1); |
---|
| 207 | |
---|
| 208 | // mid in mid block |
---|
| 209 | readValueBeforeKey(2 * records1stBlock + 5, records1stBlock + 10); |
---|
| 210 | readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock + 10); |
---|
| 211 | readValueWithoutKey(2 * records1stBlock + 5, records1stBlock + 10); |
---|
| 212 | readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + 10); |
---|
| 213 | } |
---|
| 214 | |
---|
| 215 | Location locate(Scanner scanner, byte[] key) throws IOException { |
---|
| 216 | if (scanner.seekTo(key) == true) { |
---|
| 217 | return scanner.currentLocation; |
---|
| 218 | } |
---|
| 219 | return scanner.endLocation; |
---|
| 220 | } |
---|
| 221 | |
---|
| 222 | public void testLocate() throws IOException { |
---|
| 223 | if (skip) |
---|
| 224 | return; |
---|
| 225 | writeRecords(3 * records1stBlock); |
---|
| 226 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 227 | Scanner scanner = reader.createScanner(); |
---|
| 228 | Location loc2 = |
---|
| 229 | locate(scanner, composeSortedKey(KEY, 3 * records1stBlock, 2) |
---|
| 230 | .getBytes()); |
---|
| 231 | Location locLastIn1stBlock = |
---|
| 232 | locate(scanner, composeSortedKey(KEY, 3 * records1stBlock, |
---|
| 233 | records1stBlock - 1).getBytes()); |
---|
| 234 | Location locFirstIn2ndBlock = |
---|
| 235 | locate(scanner, composeSortedKey(KEY, 3 * records1stBlock, |
---|
| 236 | records1stBlock).getBytes()); |
---|
| 237 | Location locX = locate(scanner, "keyX".getBytes()); |
---|
| 238 | Assert.assertEquals(scanner.endLocation, locX); |
---|
| 239 | scanner.close(); |
---|
| 240 | reader.close(); |
---|
| 241 | } |
---|
| 242 | |
---|
| 243 | public void testFailureWriterNotClosed() throws IOException { |
---|
| 244 | if (skip) |
---|
| 245 | return; |
---|
| 246 | Reader reader = null; |
---|
| 247 | try { |
---|
| 248 | reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 249 | Assert.fail("Cannot read before closing the writer."); |
---|
| 250 | } |
---|
| 251 | catch (IOException e) { |
---|
| 252 | // noop, expecting exceptions |
---|
| 253 | } |
---|
| 254 | finally { |
---|
| 255 | if (reader != null) { |
---|
| 256 | reader.close(); |
---|
| 257 | } |
---|
| 258 | } |
---|
| 259 | } |
---|
| 260 | |
---|
| 261 | public void testFailureWriteMetaBlocksWithSameName() throws IOException { |
---|
| 262 | if (skip) |
---|
| 263 | return; |
---|
| 264 | writer.append("keyX".getBytes(), "valueX".getBytes()); |
---|
| 265 | |
---|
| 266 | // create a new metablock |
---|
| 267 | DataOutputStream outMeta = |
---|
| 268 | writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); |
---|
| 269 | outMeta.write(123); |
---|
| 270 | outMeta.write("foo".getBytes()); |
---|
| 271 | outMeta.close(); |
---|
| 272 | // add the same metablock |
---|
| 273 | try { |
---|
| 274 | DataOutputStream outMeta2 = |
---|
| 275 | writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); |
---|
| 276 | Assert.fail("Cannot create metablocks with the same name."); |
---|
| 277 | } |
---|
| 278 | catch (Exception e) { |
---|
| 279 | // noop, expecting exceptions |
---|
| 280 | } |
---|
| 281 | closeOutput(); |
---|
| 282 | } |
---|
| 283 | |
---|
| 284 | public void testFailureGetNonExistentMetaBlock() throws IOException { |
---|
| 285 | if (skip) |
---|
| 286 | return; |
---|
| 287 | writer.append("keyX".getBytes(), "valueX".getBytes()); |
---|
| 288 | |
---|
| 289 | // create a new metablock |
---|
| 290 | DataOutputStream outMeta = |
---|
| 291 | writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); |
---|
| 292 | outMeta.write(123); |
---|
| 293 | outMeta.write("foo".getBytes()); |
---|
| 294 | outMeta.close(); |
---|
| 295 | closeOutput(); |
---|
| 296 | |
---|
| 297 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 298 | DataInputStream mb = reader.getMetaBlock("testX"); |
---|
| 299 | Assert.assertNotNull(mb); |
---|
| 300 | mb.close(); |
---|
| 301 | try { |
---|
| 302 | DataInputStream mbBad = reader.getMetaBlock("testY"); |
---|
| 303 | Assert.assertNull(mbBad); |
---|
| 304 | Assert.fail("Error on handling non-existent metablocks."); |
---|
| 305 | } |
---|
| 306 | catch (Exception e) { |
---|
| 307 | // noop, expecting exceptions |
---|
| 308 | } |
---|
| 309 | reader.close(); |
---|
| 310 | } |
---|
| 311 | |
---|
| 312 | public void testFailureWriteRecordAfterMetaBlock() throws IOException { |
---|
| 313 | if (skip) |
---|
| 314 | return; |
---|
| 315 | // write a key/value first |
---|
| 316 | writer.append("keyX".getBytes(), "valueX".getBytes()); |
---|
| 317 | // create a new metablock |
---|
| 318 | DataOutputStream outMeta = |
---|
| 319 | writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); |
---|
| 320 | outMeta.write(123); |
---|
| 321 | outMeta.write("dummy".getBytes()); |
---|
| 322 | outMeta.close(); |
---|
| 323 | // add more key/value |
---|
| 324 | try { |
---|
| 325 | writer.append("keyY".getBytes(), "valueY".getBytes()); |
---|
| 326 | Assert.fail("Cannot add key/value after start adding meta blocks."); |
---|
| 327 | } |
---|
| 328 | catch (Exception e) { |
---|
| 329 | // noop, expecting exceptions |
---|
| 330 | } |
---|
| 331 | closeOutput(); |
---|
| 332 | } |
---|
| 333 | |
---|
| 334 | public void testFailureReadValueManyTimes() throws IOException { |
---|
| 335 | if (skip) |
---|
| 336 | return; |
---|
| 337 | writeRecords(5); |
---|
| 338 | |
---|
| 339 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 340 | Scanner scanner = reader.createScanner(); |
---|
| 341 | |
---|
| 342 | byte[] vbuf = new byte[BUF_SIZE]; |
---|
| 343 | int vlen = scanner.entry().getValueLength(); |
---|
| 344 | scanner.entry().getValue(vbuf); |
---|
| 345 | Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0); |
---|
| 346 | try { |
---|
| 347 | scanner.entry().getValue(vbuf); |
---|
| 348 | Assert.fail("Cannot get the value mlutiple times."); |
---|
| 349 | } |
---|
| 350 | catch (Exception e) { |
---|
| 351 | // noop, expecting exceptions |
---|
| 352 | } |
---|
| 353 | |
---|
| 354 | scanner.close(); |
---|
| 355 | reader.close(); |
---|
| 356 | } |
---|
| 357 | |
---|
| 358 | public void testFailureBadCompressionCodec() throws IOException { |
---|
| 359 | if (skip) |
---|
| 360 | return; |
---|
| 361 | closeOutput(); |
---|
| 362 | out = fs.create(path); |
---|
| 363 | try { |
---|
| 364 | writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf); |
---|
| 365 | Assert.fail("Error on handling invalid compression codecs."); |
---|
| 366 | } |
---|
| 367 | catch (Exception e) { |
---|
| 368 | // noop, expecting exceptions |
---|
| 369 | // e.printStackTrace(); |
---|
| 370 | } |
---|
| 371 | } |
---|
| 372 | |
---|
| 373 | public void testFailureOpenEmptyFile() throws IOException { |
---|
| 374 | if (skip) |
---|
| 375 | return; |
---|
| 376 | closeOutput(); |
---|
| 377 | // create an absolutely empty file |
---|
| 378 | path = new Path(fs.getWorkingDirectory(), outputFile); |
---|
| 379 | out = fs.create(path); |
---|
| 380 | out.close(); |
---|
| 381 | try { |
---|
| 382 | Reader reader = |
---|
| 383 | new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 384 | Assert.fail("Error on handling empty files."); |
---|
| 385 | } |
---|
| 386 | catch (EOFException e) { |
---|
| 387 | // noop, expecting exceptions |
---|
| 388 | } |
---|
| 389 | } |
---|
| 390 | |
---|
| 391 | public void testFailureOpenRandomFile() throws IOException { |
---|
| 392 | if (skip) |
---|
| 393 | return; |
---|
| 394 | closeOutput(); |
---|
| 395 | // create an random file |
---|
| 396 | path = new Path(fs.getWorkingDirectory(), outputFile); |
---|
| 397 | out = fs.create(path); |
---|
| 398 | Random rand = new Random(); |
---|
| 399 | byte[] buf = new byte[K]; |
---|
| 400 | // fill with > 1MB data |
---|
| 401 | for (int nx = 0; nx < K + 2; nx++) { |
---|
| 402 | rand.nextBytes(buf); |
---|
| 403 | out.write(buf); |
---|
| 404 | } |
---|
| 405 | out.close(); |
---|
| 406 | try { |
---|
| 407 | Reader reader = |
---|
| 408 | new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 409 | Assert.fail("Error on handling random files."); |
---|
| 410 | } |
---|
| 411 | catch (IOException e) { |
---|
| 412 | // noop, expecting exceptions |
---|
| 413 | } |
---|
| 414 | } |
---|
| 415 | |
---|
| 416 | public void testFailureKeyLongerThan64K() throws IOException { |
---|
| 417 | if (skip) |
---|
| 418 | return; |
---|
| 419 | byte[] buf = new byte[64 * K + 1]; |
---|
| 420 | Random rand = new Random(); |
---|
| 421 | rand.nextBytes(buf); |
---|
| 422 | try { |
---|
| 423 | writer.append(buf, "valueX".getBytes()); |
---|
| 424 | } |
---|
| 425 | catch (IndexOutOfBoundsException e) { |
---|
| 426 | // noop, expecting exceptions |
---|
| 427 | } |
---|
| 428 | closeOutput(); |
---|
| 429 | } |
---|
| 430 | |
---|
| 431 | public void testFailureOutOfOrderKeys() throws IOException { |
---|
| 432 | if (skip) |
---|
| 433 | return; |
---|
| 434 | try { |
---|
| 435 | writer.append("keyM".getBytes(), "valueM".getBytes()); |
---|
| 436 | writer.append("keyA".getBytes(), "valueA".getBytes()); |
---|
| 437 | Assert.fail("Error on handling out of order keys."); |
---|
| 438 | } |
---|
| 439 | catch (Exception e) { |
---|
| 440 | // noop, expecting exceptions |
---|
| 441 | // e.printStackTrace(); |
---|
| 442 | } |
---|
| 443 | |
---|
| 444 | closeOutput(); |
---|
| 445 | } |
---|
| 446 | |
---|
| 447 | public void testFailureNegativeOffset() throws IOException { |
---|
| 448 | if (skip) |
---|
| 449 | return; |
---|
| 450 | try { |
---|
| 451 | writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6); |
---|
| 452 | Assert.fail("Error on handling negative offset."); |
---|
| 453 | } |
---|
| 454 | catch (Exception e) { |
---|
| 455 | // noop, expecting exceptions |
---|
| 456 | } |
---|
| 457 | closeOutput(); |
---|
| 458 | } |
---|
| 459 | |
---|
| 460 | public void testFailureNegativeOffset_2() throws IOException { |
---|
| 461 | if (skip) |
---|
| 462 | return; |
---|
| 463 | closeOutput(); |
---|
| 464 | |
---|
| 465 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 466 | Scanner scanner = reader.createScanner(); |
---|
| 467 | try { |
---|
| 468 | scanner.lowerBound("keyX".getBytes(), -1, 4); |
---|
| 469 | Assert.fail("Error on handling negative offset."); |
---|
| 470 | } |
---|
| 471 | catch (Exception e) { |
---|
| 472 | // noop, expecting exceptions |
---|
| 473 | } |
---|
| 474 | finally { |
---|
| 475 | reader.close(); |
---|
| 476 | scanner.close(); |
---|
| 477 | } |
---|
| 478 | closeOutput(); |
---|
| 479 | } |
---|
| 480 | |
---|
| 481 | public void testFailureNegativeLength() throws IOException { |
---|
| 482 | if (skip) |
---|
| 483 | return; |
---|
| 484 | try { |
---|
| 485 | writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6); |
---|
| 486 | Assert.fail("Error on handling negative length."); |
---|
| 487 | } |
---|
| 488 | catch (Exception e) { |
---|
| 489 | // noop, expecting exceptions |
---|
| 490 | } |
---|
| 491 | closeOutput(); |
---|
| 492 | } |
---|
| 493 | |
---|
| 494 | public void testFailureNegativeLength_2() throws IOException { |
---|
| 495 | if (skip) |
---|
| 496 | return; |
---|
| 497 | closeOutput(); |
---|
| 498 | |
---|
| 499 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 500 | Scanner scanner = reader.createScanner(); |
---|
| 501 | try { |
---|
| 502 | scanner.lowerBound("keyX".getBytes(), 0, -1); |
---|
| 503 | Assert.fail("Error on handling negative length."); |
---|
| 504 | } |
---|
| 505 | catch (Exception e) { |
---|
| 506 | // noop, expecting exceptions |
---|
| 507 | } |
---|
| 508 | finally { |
---|
| 509 | scanner.close(); |
---|
| 510 | reader.close(); |
---|
| 511 | } |
---|
| 512 | closeOutput(); |
---|
| 513 | } |
---|
| 514 | |
---|
| 515 | public void testFailureNegativeLength_3() throws IOException { |
---|
| 516 | if (skip) |
---|
| 517 | return; |
---|
| 518 | writeRecords(3); |
---|
| 519 | |
---|
| 520 | Reader reader = |
---|
| 521 | new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 522 | Scanner scanner = reader.createScanner(); |
---|
| 523 | try { |
---|
| 524 | // test negative array offset |
---|
| 525 | try { |
---|
| 526 | scanner.seekTo("keyY".getBytes(), -1, 4); |
---|
| 527 | Assert.fail("Failed to handle negative offset."); |
---|
| 528 | } catch (Exception e) { |
---|
| 529 | // noop, expecting exceptions |
---|
| 530 | } |
---|
| 531 | |
---|
| 532 | // test negative array length |
---|
| 533 | try { |
---|
| 534 | scanner.seekTo("keyY".getBytes(), 0, -2); |
---|
| 535 | Assert.fail("Failed to handle negative key length."); |
---|
| 536 | } catch (Exception e) { |
---|
| 537 | // noop, expecting exceptions |
---|
| 538 | } |
---|
| 539 | } finally { |
---|
| 540 | reader.close(); |
---|
| 541 | scanner.close(); |
---|
| 542 | } |
---|
| 543 | } |
---|
| 544 | |
---|
| 545 | public void testFailureCompressionNotWorking() throws IOException { |
---|
| 546 | if (skip) |
---|
| 547 | return; |
---|
| 548 | long rawDataSize = writeRecords(10 * records1stBlock, false); |
---|
| 549 | if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) { |
---|
| 550 | Assert.assertTrue(out.getPos() < rawDataSize); |
---|
| 551 | } |
---|
| 552 | closeOutput(); |
---|
| 553 | } |
---|
| 554 | |
---|
| 555 | public void testFailureFileWriteNotAt0Position() throws IOException { |
---|
| 556 | if (skip) |
---|
| 557 | return; |
---|
| 558 | closeOutput(); |
---|
| 559 | out = fs.create(path); |
---|
| 560 | out.write(123); |
---|
| 561 | |
---|
| 562 | try { |
---|
| 563 | writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf); |
---|
| 564 | Assert.fail("Failed to catch file write not at position 0."); |
---|
| 565 | } |
---|
| 566 | catch (Exception e) { |
---|
| 567 | // noop, expecting exceptions |
---|
| 568 | } |
---|
| 569 | closeOutput(); |
---|
| 570 | } |
---|
| 571 | |
---|
| 572 | private long writeRecords(int count) throws IOException { |
---|
| 573 | return writeRecords(count, true); |
---|
| 574 | } |
---|
| 575 | |
---|
| 576 | private long writeRecords(int count, boolean close) throws IOException { |
---|
| 577 | long rawDataSize = writeRecords(writer, count); |
---|
| 578 | if (close) { |
---|
| 579 | closeOutput(); |
---|
| 580 | } |
---|
| 581 | return rawDataSize; |
---|
| 582 | } |
---|
| 583 | |
---|
| 584 | static long writeRecords(Writer writer, int count) throws IOException { |
---|
| 585 | long rawDataSize = 0; |
---|
| 586 | int nx; |
---|
| 587 | for (nx = 0; nx < count; nx++) { |
---|
| 588 | byte[] key = composeSortedKey(KEY, count, nx).getBytes(); |
---|
| 589 | byte[] value = (VALUE + nx).getBytes(); |
---|
| 590 | writer.append(key, value); |
---|
| 591 | rawDataSize += |
---|
| 592 | WritableUtils.getVIntSize(key.length) + key.length |
---|
| 593 | + WritableUtils.getVIntSize(value.length) + value.length; |
---|
| 594 | } |
---|
| 595 | return rawDataSize; |
---|
| 596 | } |
---|
| 597 | |
---|
| 598 | /** |
---|
| 599 | * Insert some leading 0's in front of the value, to make the keys sorted. |
---|
| 600 | * |
---|
| 601 | * @param prefix |
---|
| 602 | * @param total |
---|
| 603 | * @param value |
---|
| 604 | * @return |
---|
| 605 | */ |
---|
| 606 | static String composeSortedKey(String prefix, int total, int value) { |
---|
| 607 | return String.format("%s%010d", prefix, value); |
---|
| 608 | } |
---|
| 609 | |
---|
| 610 | /** |
---|
| 611 | * Calculate how many digits are in the 10-based integer. |
---|
| 612 | * |
---|
| 613 | * @param value |
---|
| 614 | * @return |
---|
| 615 | */ |
---|
| 616 | private static int numberDigits(int value) { |
---|
| 617 | int digits = 0; |
---|
| 618 | while ((value = value / 10) > 0) { |
---|
| 619 | digits++; |
---|
| 620 | } |
---|
| 621 | return digits; |
---|
| 622 | } |
---|
| 623 | |
---|
| 624 | private void readRecords(int count) throws IOException { |
---|
| 625 | readRecords(fs, path, count, conf); |
---|
| 626 | } |
---|
| 627 | |
---|
| 628 | static void readRecords(FileSystem fs, Path path, int count, |
---|
| 629 | Configuration conf) throws IOException { |
---|
| 630 | Reader reader = |
---|
| 631 | new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 632 | Scanner scanner = reader.createScanner(); |
---|
| 633 | |
---|
| 634 | try { |
---|
| 635 | for (int nx = 0; nx < count; nx++, scanner.advance()) { |
---|
| 636 | Assert.assertFalse(scanner.atEnd()); |
---|
| 637 | // Assert.assertTrue(scanner.next()); |
---|
| 638 | |
---|
| 639 | byte[] kbuf = new byte[BUF_SIZE]; |
---|
| 640 | int klen = scanner.entry().getKeyLength(); |
---|
| 641 | scanner.entry().getKey(kbuf); |
---|
| 642 | Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY, |
---|
| 643 | count, nx)); |
---|
| 644 | |
---|
| 645 | byte[] vbuf = new byte[BUF_SIZE]; |
---|
| 646 | int vlen = scanner.entry().getValueLength(); |
---|
| 647 | scanner.entry().getValue(vbuf); |
---|
| 648 | Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx); |
---|
| 649 | } |
---|
| 650 | |
---|
| 651 | Assert.assertTrue(scanner.atEnd()); |
---|
| 652 | Assert.assertFalse(scanner.advance()); |
---|
| 653 | } |
---|
| 654 | finally { |
---|
| 655 | scanner.close(); |
---|
| 656 | reader.close(); |
---|
| 657 | } |
---|
| 658 | } |
---|
| 659 | |
---|
| 660 | private void checkBlockIndex(int count, int recordIndex, |
---|
| 661 | int blockIndexExpected) throws IOException { |
---|
| 662 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 663 | Scanner scanner = reader.createScanner(); |
---|
| 664 | scanner.seekTo(composeSortedKey(KEY, count, recordIndex).getBytes()); |
---|
| 665 | Assert.assertEquals(blockIndexExpected, scanner.currentLocation |
---|
| 666 | .getBlockIndex()); |
---|
| 667 | scanner.close(); |
---|
| 668 | reader.close(); |
---|
| 669 | } |
---|
| 670 | |
---|
| 671 | private void readValueBeforeKey(int count, int recordIndex) |
---|
| 672 | throws IOException { |
---|
| 673 | Reader reader = |
---|
| 674 | new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 675 | Scanner scanner = |
---|
| 676 | reader.createScanner(composeSortedKey(KEY, count, recordIndex) |
---|
| 677 | .getBytes(), null); |
---|
| 678 | |
---|
| 679 | try { |
---|
| 680 | byte[] vbuf = new byte[BUF_SIZE]; |
---|
| 681 | int vlen = scanner.entry().getValueLength(); |
---|
| 682 | scanner.entry().getValue(vbuf); |
---|
| 683 | Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex); |
---|
| 684 | |
---|
| 685 | byte[] kbuf = new byte[BUF_SIZE]; |
---|
| 686 | int klen = scanner.entry().getKeyLength(); |
---|
| 687 | scanner.entry().getKey(kbuf); |
---|
| 688 | Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY, |
---|
| 689 | count, recordIndex)); |
---|
| 690 | } |
---|
| 691 | finally { |
---|
| 692 | scanner.close(); |
---|
| 693 | reader.close(); |
---|
| 694 | } |
---|
| 695 | } |
---|
| 696 | |
---|
| 697 | private void readKeyWithoutValue(int count, int recordIndex) |
---|
| 698 | throws IOException { |
---|
| 699 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 700 | Scanner scanner = |
---|
| 701 | reader.createScanner(composeSortedKey(KEY, count, recordIndex) |
---|
| 702 | .getBytes(), null); |
---|
| 703 | |
---|
| 704 | try { |
---|
| 705 | // read the indexed key |
---|
| 706 | byte[] kbuf1 = new byte[BUF_SIZE]; |
---|
| 707 | int klen1 = scanner.entry().getKeyLength(); |
---|
| 708 | scanner.entry().getKey(kbuf1); |
---|
| 709 | Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, |
---|
| 710 | count, recordIndex)); |
---|
| 711 | |
---|
| 712 | if (scanner.advance() && !scanner.atEnd()) { |
---|
| 713 | // read the next key following the indexed |
---|
| 714 | byte[] kbuf2 = new byte[BUF_SIZE]; |
---|
| 715 | int klen2 = scanner.entry().getKeyLength(); |
---|
| 716 | scanner.entry().getKey(kbuf2); |
---|
| 717 | Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY, |
---|
| 718 | count, recordIndex + 1)); |
---|
| 719 | } |
---|
| 720 | } |
---|
| 721 | finally { |
---|
| 722 | scanner.close(); |
---|
| 723 | reader.close(); |
---|
| 724 | } |
---|
| 725 | } |
---|
| 726 | |
---|
| 727 | private void readValueWithoutKey(int count, int recordIndex) |
---|
| 728 | throws IOException { |
---|
| 729 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 730 | |
---|
| 731 | Scanner scanner = |
---|
| 732 | reader.createScanner(composeSortedKey(KEY, count, recordIndex) |
---|
| 733 | .getBytes(), null); |
---|
| 734 | |
---|
| 735 | byte[] vbuf1 = new byte[BUF_SIZE]; |
---|
| 736 | int vlen1 = scanner.entry().getValueLength(); |
---|
| 737 | scanner.entry().getValue(vbuf1); |
---|
| 738 | Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex); |
---|
| 739 | |
---|
| 740 | if (scanner.advance() && !scanner.atEnd()) { |
---|
| 741 | byte[] vbuf2 = new byte[BUF_SIZE]; |
---|
| 742 | int vlen2 = scanner.entry().getValueLength(); |
---|
| 743 | scanner.entry().getValue(vbuf2); |
---|
| 744 | Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE |
---|
| 745 | + (recordIndex + 1)); |
---|
| 746 | } |
---|
| 747 | |
---|
| 748 | scanner.close(); |
---|
| 749 | reader.close(); |
---|
| 750 | } |
---|
| 751 | |
---|
| 752 | private void readKeyManyTimes(int count, int recordIndex) throws IOException { |
---|
| 753 | Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); |
---|
| 754 | |
---|
| 755 | Scanner scanner = |
---|
| 756 | reader.createScanner(composeSortedKey(KEY, count, recordIndex) |
---|
| 757 | .getBytes(), null); |
---|
| 758 | |
---|
| 759 | // read the indexed key |
---|
| 760 | byte[] kbuf1 = new byte[BUF_SIZE]; |
---|
| 761 | int klen1 = scanner.entry().getKeyLength(); |
---|
| 762 | scanner.entry().getKey(kbuf1); |
---|
| 763 | Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, |
---|
| 764 | count, recordIndex)); |
---|
| 765 | |
---|
| 766 | klen1 = scanner.entry().getKeyLength(); |
---|
| 767 | scanner.entry().getKey(kbuf1); |
---|
| 768 | Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, |
---|
| 769 | count, recordIndex)); |
---|
| 770 | |
---|
| 771 | klen1 = scanner.entry().getKeyLength(); |
---|
| 772 | scanner.entry().getKey(kbuf1); |
---|
| 773 | Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, |
---|
| 774 | count, recordIndex)); |
---|
| 775 | |
---|
| 776 | scanner.close(); |
---|
| 777 | reader.close(); |
---|
| 778 | } |
---|
| 779 | |
---|
| 780 | private void closeOutput() throws IOException { |
---|
| 781 | if (writer != null) { |
---|
| 782 | writer.close(); |
---|
| 783 | writer = null; |
---|
| 784 | } |
---|
| 785 | if (out != null) { |
---|
| 786 | out.close(); |
---|
| 787 | out = null; |
---|
| 788 | } |
---|
| 789 | } |
---|
| 790 | } |
---|