1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one or more |
---|
3 | * contributor license agreements. See the NOTICE file distributed with this |
---|
4 | * work for additional information regarding copyright ownership. The ASF |
---|
5 | * licenses this file to you under the Apache License, Version 2.0 (the |
---|
6 | * "License"); you may not use this file except in compliance with the License. |
---|
7 | * You may obtain a copy of the License at |
---|
8 | * |
---|
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
10 | * |
---|
11 | * Unless required by applicable law or agreed to in writing, software |
---|
12 | * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
---|
13 | * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
---|
14 | * License for the specific language governing permissions and limitations under |
---|
15 | * the License. |
---|
16 | */ |
---|
17 | |
---|
18 | package org.apache.hadoop.io.file.tfile; |
---|
19 | |
---|
20 | import java.io.IOException; |
---|
21 | import java.util.Random; |
---|
22 | import java.util.StringTokenizer; |
---|
23 | |
---|
24 | import junit.framework.TestCase; |
---|
25 | |
---|
26 | import org.apache.commons.cli.CommandLine; |
---|
27 | import org.apache.commons.cli.CommandLineParser; |
---|
28 | import org.apache.commons.cli.GnuParser; |
---|
29 | import org.apache.commons.cli.HelpFormatter; |
---|
30 | import org.apache.commons.cli.Option; |
---|
31 | import org.apache.commons.cli.OptionBuilder; |
---|
32 | import org.apache.commons.cli.Options; |
---|
33 | import org.apache.commons.cli.ParseException; |
---|
34 | import org.apache.hadoop.conf.Configuration; |
---|
35 | import org.apache.hadoop.fs.FSDataInputStream; |
---|
36 | import org.apache.hadoop.fs.FSDataOutputStream; |
---|
37 | import org.apache.hadoop.fs.FileSystem; |
---|
38 | import org.apache.hadoop.fs.Path; |
---|
39 | import org.apache.hadoop.io.BytesWritable; |
---|
40 | import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG; |
---|
41 | import org.apache.hadoop.io.file.tfile.TFile.Reader; |
---|
42 | import org.apache.hadoop.io.file.tfile.TFile.Writer; |
---|
43 | import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner; |
---|
44 | |
---|
45 | /** |
---|
46 | * test the performance for seek. |
---|
47 | * |
---|
48 | */ |
---|
49 | public class TestTFileSeek extends TestCase { |
---|
50 | private MyOptions options; |
---|
51 | private Configuration conf; |
---|
52 | private Path path; |
---|
53 | private FileSystem fs; |
---|
54 | private NanoTimer timer; |
---|
55 | private Random rng; |
---|
56 | private DiscreteRNG keyLenGen; |
---|
57 | private KVGenerator kvGen; |
---|
58 | |
---|
59 | @Override |
---|
60 | public void setUp() throws IOException { |
---|
61 | if (options == null) { |
---|
62 | options = new MyOptions(new String[0]); |
---|
63 | } |
---|
64 | |
---|
65 | conf = new Configuration(); |
---|
66 | conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize); |
---|
67 | conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize); |
---|
68 | path = new Path(new Path(options.rootDir), options.file); |
---|
69 | fs = path.getFileSystem(conf); |
---|
70 | timer = new NanoTimer(false); |
---|
71 | rng = new Random(options.seed); |
---|
72 | keyLenGen = |
---|
73 | new RandomDistribution.Zipf(new Random(rng.nextLong()), |
---|
74 | options.minKeyLen, options.maxKeyLen, 1.2); |
---|
75 | DiscreteRNG valLenGen = |
---|
76 | new RandomDistribution.Flat(new Random(rng.nextLong()), |
---|
77 | options.minValLength, options.maxValLength); |
---|
78 | DiscreteRNG wordLenGen = |
---|
79 | new RandomDistribution.Flat(new Random(rng.nextLong()), |
---|
80 | options.minWordLen, options.maxWordLen); |
---|
81 | kvGen = |
---|
82 | new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, |
---|
83 | options.dictSize); |
---|
84 | } |
---|
85 | |
---|
86 | @Override |
---|
87 | public void tearDown() throws IOException { |
---|
88 | fs.delete(path, true); |
---|
89 | } |
---|
90 | |
---|
91 | private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) |
---|
92 | throws IOException { |
---|
93 | if (fs.exists(name)) { |
---|
94 | fs.delete(name, true); |
---|
95 | } |
---|
96 | FSDataOutputStream fout = fs.create(name); |
---|
97 | return fout; |
---|
98 | } |
---|
99 | |
---|
100 | private void createTFile() throws IOException { |
---|
101 | long totalBytes = 0; |
---|
102 | FSDataOutputStream fout = createFSOutput(path, fs); |
---|
103 | try { |
---|
104 | Writer writer = |
---|
105 | new Writer(fout, options.minBlockSize, options.compress, "memcmp", |
---|
106 | conf); |
---|
107 | try { |
---|
108 | BytesWritable key = new BytesWritable(); |
---|
109 | BytesWritable val = new BytesWritable(); |
---|
110 | timer.start(); |
---|
111 | for (long i = 0; true; ++i) { |
---|
112 | if (i % 1000 == 0) { // test the size for every 1000 rows. |
---|
113 | if (fs.getFileStatus(path).getLen() >= options.fileSize) { |
---|
114 | break; |
---|
115 | } |
---|
116 | } |
---|
117 | kvGen.next(key, val, false); |
---|
118 | writer.append(key.get(), 0, key.getSize(), val.get(), 0, val |
---|
119 | .getSize()); |
---|
120 | totalBytes += key.getSize(); |
---|
121 | totalBytes += val.getSize(); |
---|
122 | } |
---|
123 | timer.stop(); |
---|
124 | } |
---|
125 | finally { |
---|
126 | writer.close(); |
---|
127 | } |
---|
128 | } |
---|
129 | finally { |
---|
130 | fout.close(); |
---|
131 | } |
---|
132 | double duration = (double)timer.read()/1000; // in us. |
---|
133 | long fsize = fs.getFileStatus(path).getLen(); |
---|
134 | |
---|
135 | System.out.printf( |
---|
136 | "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n", |
---|
137 | timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes |
---|
138 | / duration); |
---|
139 | System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n", |
---|
140 | timer.toString(), (double) fsize / 1024 / 1024, fsize / duration); |
---|
141 | } |
---|
142 | |
---|
143 | public void seekTFile() throws IOException { |
---|
144 | int miss = 0; |
---|
145 | long totalBytes = 0; |
---|
146 | FSDataInputStream fsdis = fs.open(path); |
---|
147 | Reader reader = |
---|
148 | new Reader(fsdis, fs.getFileStatus(path).getLen(), conf); |
---|
149 | KeySampler kSampler = |
---|
150 | new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(), |
---|
151 | keyLenGen); |
---|
152 | Scanner scanner = reader.createScanner(); |
---|
153 | BytesWritable key = new BytesWritable(); |
---|
154 | BytesWritable val = new BytesWritable(); |
---|
155 | timer.reset(); |
---|
156 | timer.start(); |
---|
157 | for (int i = 0; i < options.seekCount; ++i) { |
---|
158 | kSampler.next(key); |
---|
159 | scanner.lowerBound(key.get(), 0, key.getSize()); |
---|
160 | if (!scanner.atEnd()) { |
---|
161 | scanner.entry().get(key, val); |
---|
162 | totalBytes += key.getSize(); |
---|
163 | totalBytes += val.getSize(); |
---|
164 | } |
---|
165 | else { |
---|
166 | ++miss; |
---|
167 | } |
---|
168 | } |
---|
169 | timer.stop(); |
---|
170 | double duration = (double) timer.read() / 1000; // in us. |
---|
171 | System.out.printf( |
---|
172 | "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n", |
---|
173 | timer.toString(), NanoTimer.nanoTimeToString(timer.read() |
---|
174 | / options.seekCount), options.seekCount - miss, miss, |
---|
175 | (double) totalBytes / 1024 / (options.seekCount - miss)); |
---|
176 | |
---|
177 | } |
---|
178 | |
---|
179 | public void testSeeks() throws IOException { |
---|
180 | String[] supported = TFile.getSupportedCompressionAlgorithms(); |
---|
181 | boolean proceed = false; |
---|
182 | for (String c : supported) { |
---|
183 | if (c.equals(options.compress)) { |
---|
184 | proceed = true; |
---|
185 | break; |
---|
186 | } |
---|
187 | } |
---|
188 | |
---|
189 | if (!proceed) { |
---|
190 | System.out.println("Skipped for " + options.compress); |
---|
191 | return; |
---|
192 | } |
---|
193 | |
---|
194 | if (options.doCreate()) { |
---|
195 | createTFile(); |
---|
196 | } |
---|
197 | |
---|
198 | if (options.doRead()) { |
---|
199 | seekTFile(); |
---|
200 | } |
---|
201 | } |
---|
202 | |
---|
203 | private static class IntegerRange { |
---|
204 | private final int from, to; |
---|
205 | |
---|
206 | public IntegerRange(int from, int to) { |
---|
207 | this.from = from; |
---|
208 | this.to = to; |
---|
209 | } |
---|
210 | |
---|
211 | public static IntegerRange parse(String s) throws ParseException { |
---|
212 | StringTokenizer st = new StringTokenizer(s, " \t,"); |
---|
213 | if (st.countTokens() != 2) { |
---|
214 | throw new ParseException("Bad integer specification: " + s); |
---|
215 | } |
---|
216 | int from = Integer.parseInt(st.nextToken()); |
---|
217 | int to = Integer.parseInt(st.nextToken()); |
---|
218 | return new IntegerRange(from, to); |
---|
219 | } |
---|
220 | |
---|
221 | public int from() { |
---|
222 | return from; |
---|
223 | } |
---|
224 | |
---|
225 | public int to() { |
---|
226 | return to; |
---|
227 | } |
---|
228 | } |
---|
229 | |
---|
230 | private static class MyOptions { |
---|
231 | // hard coded constants |
---|
232 | int dictSize = 1000; |
---|
233 | int minWordLen = 5; |
---|
234 | int maxWordLen = 20; |
---|
235 | int osInputBufferSize = 64 * 1024; |
---|
236 | int osOutputBufferSize = 64 * 1024; |
---|
237 | int fsInputBufferSizeNone = 0; |
---|
238 | int fsInputBufferSizeLzo = 0; |
---|
239 | int fsInputBufferSizeGz = 0; |
---|
240 | int fsOutputBufferSizeNone = 1; |
---|
241 | int fsOutputBufferSizeLzo = 1; |
---|
242 | int fsOutputBufferSizeGz = 1; |
---|
243 | |
---|
244 | String rootDir = |
---|
245 | System.getProperty("test.build.data", "/tmp/tfile-test"); |
---|
246 | String file = "TestTFileSeek"; |
---|
247 | String compress = "gz"; |
---|
248 | int minKeyLen = 10; |
---|
249 | int maxKeyLen = 50; |
---|
250 | int minValLength = 100; |
---|
251 | int maxValLength = 200; |
---|
252 | int minBlockSize = 64 * 1024; |
---|
253 | int fsOutputBufferSize = 1; |
---|
254 | int fsInputBufferSize = 0; |
---|
255 | long fileSize = 3 * 1024 * 1024; |
---|
256 | long seekCount = 1000; |
---|
257 | long seed; |
---|
258 | |
---|
259 | static final int OP_CREATE = 1; |
---|
260 | static final int OP_READ = 2; |
---|
261 | int op = OP_CREATE | OP_READ; |
---|
262 | |
---|
263 | boolean proceed = false; |
---|
264 | |
---|
265 | public MyOptions(String[] args) { |
---|
266 | seed = System.nanoTime(); |
---|
267 | |
---|
268 | try { |
---|
269 | Options opts = buildOptions(); |
---|
270 | CommandLineParser parser = new GnuParser(); |
---|
271 | CommandLine line = parser.parse(opts, args, true); |
---|
272 | processOptions(line, opts); |
---|
273 | validateOptions(); |
---|
274 | } |
---|
275 | catch (ParseException e) { |
---|
276 | System.out.println(e.getMessage()); |
---|
277 | System.out.println("Try \"--help\" option for details."); |
---|
278 | setStopProceed(); |
---|
279 | } |
---|
280 | } |
---|
281 | |
---|
282 | public boolean proceed() { |
---|
283 | return proceed; |
---|
284 | } |
---|
285 | |
---|
286 | private Options buildOptions() { |
---|
287 | Option compress = |
---|
288 | OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]") |
---|
289 | .hasArg().withDescription("compression scheme").create('c'); |
---|
290 | |
---|
291 | Option fileSize = |
---|
292 | OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB") |
---|
293 | .hasArg().withDescription("target size of the file (in MB).") |
---|
294 | .create('s'); |
---|
295 | |
---|
296 | Option fsInputBufferSz = |
---|
297 | OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size") |
---|
298 | .hasArg().withDescription( |
---|
299 | "size of the file system input buffer (in bytes).").create( |
---|
300 | 'i'); |
---|
301 | |
---|
302 | Option fsOutputBufferSize = |
---|
303 | OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size") |
---|
304 | .hasArg().withDescription( |
---|
305 | "size of the file system output buffer (in bytes).").create( |
---|
306 | 'o'); |
---|
307 | |
---|
308 | Option keyLen = |
---|
309 | OptionBuilder |
---|
310 | .withLongOpt("key-length") |
---|
311 | .withArgName("min,max") |
---|
312 | .hasArg() |
---|
313 | .withDescription( |
---|
314 | "the length range of the key (in bytes)") |
---|
315 | .create('k'); |
---|
316 | |
---|
317 | Option valueLen = |
---|
318 | OptionBuilder |
---|
319 | .withLongOpt("value-length") |
---|
320 | .withArgName("min,max") |
---|
321 | .hasArg() |
---|
322 | .withDescription( |
---|
323 | "the length range of the value (in bytes)") |
---|
324 | .create('v'); |
---|
325 | |
---|
326 | Option blockSz = |
---|
327 | OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg() |
---|
328 | .withDescription("minimum block size (in KB)").create('b'); |
---|
329 | |
---|
330 | Option seed = |
---|
331 | OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg() |
---|
332 | .withDescription("specify the seed").create('S'); |
---|
333 | |
---|
334 | Option operation = |
---|
335 | OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg() |
---|
336 | .withDescription( |
---|
337 | "action: seek-only, create-only, seek-after-create").create( |
---|
338 | 'x'); |
---|
339 | |
---|
340 | Option rootDir = |
---|
341 | OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg() |
---|
342 | .withDescription( |
---|
343 | "specify root directory where files will be created.") |
---|
344 | .create('r'); |
---|
345 | |
---|
346 | Option file = |
---|
347 | OptionBuilder.withLongOpt("file").withArgName("name").hasArg() |
---|
348 | .withDescription("specify the file name to be created or read.") |
---|
349 | .create('f'); |
---|
350 | |
---|
351 | Option seekCount = |
---|
352 | OptionBuilder |
---|
353 | .withLongOpt("seek") |
---|
354 | .withArgName("count") |
---|
355 | .hasArg() |
---|
356 | .withDescription( |
---|
357 | "specify how many seek operations we perform (requires -x r or -x rw.") |
---|
358 | .create('n'); |
---|
359 | |
---|
360 | Option help = |
---|
361 | OptionBuilder.withLongOpt("help").hasArg(false).withDescription( |
---|
362 | "show this screen").create("h"); |
---|
363 | |
---|
364 | return new Options().addOption(compress).addOption(fileSize).addOption( |
---|
365 | fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen) |
---|
366 | .addOption(blockSz).addOption(rootDir).addOption(valueLen).addOption( |
---|
367 | operation).addOption(seekCount).addOption(file).addOption(help); |
---|
368 | |
---|
369 | } |
---|
370 | |
---|
371 | private void processOptions(CommandLine line, Options opts) |
---|
372 | throws ParseException { |
---|
373 | // --help -h and --version -V must be processed first. |
---|
374 | if (line.hasOption('h')) { |
---|
375 | HelpFormatter formatter = new HelpFormatter(); |
---|
376 | System.out.println("TFile and SeqFile benchmark."); |
---|
377 | System.out.println(); |
---|
378 | formatter.printHelp(100, |
---|
379 | "java ... TestTFileSeqFileComparison [options]", |
---|
380 | "\nSupported options:", opts, ""); |
---|
381 | return; |
---|
382 | } |
---|
383 | |
---|
384 | if (line.hasOption('c')) { |
---|
385 | compress = line.getOptionValue('c'); |
---|
386 | } |
---|
387 | |
---|
388 | if (line.hasOption('d')) { |
---|
389 | dictSize = Integer.parseInt(line.getOptionValue('d')); |
---|
390 | } |
---|
391 | |
---|
392 | if (line.hasOption('s')) { |
---|
393 | fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024; |
---|
394 | } |
---|
395 | |
---|
396 | if (line.hasOption('i')) { |
---|
397 | fsInputBufferSize = Integer.parseInt(line.getOptionValue('i')); |
---|
398 | } |
---|
399 | |
---|
400 | if (line.hasOption('o')) { |
---|
401 | fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o')); |
---|
402 | } |
---|
403 | |
---|
404 | if (line.hasOption('n')) { |
---|
405 | seekCount = Integer.parseInt(line.getOptionValue('n')); |
---|
406 | } |
---|
407 | |
---|
408 | if (line.hasOption('k')) { |
---|
409 | IntegerRange ir = IntegerRange.parse(line.getOptionValue('k')); |
---|
410 | minKeyLen = ir.from(); |
---|
411 | maxKeyLen = ir.to(); |
---|
412 | } |
---|
413 | |
---|
414 | if (line.hasOption('v')) { |
---|
415 | IntegerRange ir = IntegerRange.parse(line.getOptionValue('v')); |
---|
416 | minValLength = ir.from(); |
---|
417 | maxValLength = ir.to(); |
---|
418 | } |
---|
419 | |
---|
420 | if (line.hasOption('b')) { |
---|
421 | minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024; |
---|
422 | } |
---|
423 | |
---|
424 | if (line.hasOption('r')) { |
---|
425 | rootDir = line.getOptionValue('r'); |
---|
426 | } |
---|
427 | |
---|
428 | if (line.hasOption('f')) { |
---|
429 | file = line.getOptionValue('f'); |
---|
430 | } |
---|
431 | |
---|
432 | if (line.hasOption('S')) { |
---|
433 | seed = Long.parseLong(line.getOptionValue('S')); |
---|
434 | } |
---|
435 | |
---|
436 | if (line.hasOption('x')) { |
---|
437 | String strOp = line.getOptionValue('x'); |
---|
438 | if (strOp.equals("r")) { |
---|
439 | op = OP_READ; |
---|
440 | } |
---|
441 | else if (strOp.equals("w")) { |
---|
442 | op = OP_CREATE; |
---|
443 | } |
---|
444 | else if (strOp.equals("rw")) { |
---|
445 | op = OP_CREATE | OP_READ; |
---|
446 | } |
---|
447 | else { |
---|
448 | throw new ParseException("Unknown action specifier: " + strOp); |
---|
449 | } |
---|
450 | } |
---|
451 | |
---|
452 | proceed = true; |
---|
453 | } |
---|
454 | |
---|
455 | private void validateOptions() throws ParseException { |
---|
456 | if (!compress.equals("none") && !compress.equals("lzo") |
---|
457 | && !compress.equals("gz")) { |
---|
458 | throw new ParseException("Unknown compression scheme: " + compress); |
---|
459 | } |
---|
460 | |
---|
461 | if (minKeyLen >= maxKeyLen) { |
---|
462 | throw new ParseException( |
---|
463 | "Max key length must be greater than min key length."); |
---|
464 | } |
---|
465 | |
---|
466 | if (minValLength >= maxValLength) { |
---|
467 | throw new ParseException( |
---|
468 | "Max value length must be greater than min value length."); |
---|
469 | } |
---|
470 | |
---|
471 | if (minWordLen >= maxWordLen) { |
---|
472 | throw new ParseException( |
---|
473 | "Max word length must be greater than min word length."); |
---|
474 | } |
---|
475 | return; |
---|
476 | } |
---|
477 | |
---|
478 | private void setStopProceed() { |
---|
479 | proceed = false; |
---|
480 | } |
---|
481 | |
---|
482 | public boolean doCreate() { |
---|
483 | return (op & OP_CREATE) != 0; |
---|
484 | } |
---|
485 | |
---|
486 | public boolean doRead() { |
---|
487 | return (op & OP_READ) != 0; |
---|
488 | } |
---|
489 | } |
---|
490 | |
---|
491 | public static void main(String[] argv) throws IOException { |
---|
492 | TestTFileSeek testCase = new TestTFileSeek(); |
---|
493 | MyOptions options = new MyOptions(argv); |
---|
494 | |
---|
495 | if (options.proceed == false) { |
---|
496 | return; |
---|
497 | } |
---|
498 | |
---|
499 | testCase.options = options; |
---|
500 | testCase.setUp(); |
---|
501 | testCase.testSeeks(); |
---|
502 | testCase.tearDown(); |
---|
503 | } |
---|
504 | } |
---|