source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/NNBench.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 33.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.hdfs;
20
21import java.io.IOException;
22import java.util.Date;
23import java.io.DataInputStream;
24import java.io.FileOutputStream;
25import java.io.InputStreamReader;
26import java.io.PrintStream;
27import java.io.File;
28import java.io.BufferedReader;
29import java.util.StringTokenizer;
30import java.net.InetAddress;
31import java.text.SimpleDateFormat;
32import java.util.Iterator;
33
34import org.apache.commons.logging.LogFactory;
35import org.apache.commons.logging.Log;
36
37import org.apache.hadoop.conf.Configuration;
38import org.apache.hadoop.conf.Configured;
39
40import org.apache.hadoop.fs.Path;
41import org.apache.hadoop.fs.FSDataOutputStream;
42import org.apache.hadoop.fs.FSDataInputStream;
43import org.apache.hadoop.fs.FileSystem;
44
45import org.apache.hadoop.io.Text;
46import org.apache.hadoop.io.LongWritable;
47import org.apache.hadoop.io.SequenceFile.CompressionType;
48import org.apache.hadoop.io.SequenceFile;
49
50import org.apache.hadoop.mapred.FileInputFormat;
51import org.apache.hadoop.mapred.FileOutputFormat;
52import org.apache.hadoop.mapred.Mapper;
53import org.apache.hadoop.mapred.SequenceFileInputFormat;
54import org.apache.hadoop.mapred.JobClient;
55import org.apache.hadoop.mapred.MapReduceBase;
56import org.apache.hadoop.mapred.Reporter;
57import org.apache.hadoop.mapred.OutputCollector;
58import org.apache.hadoop.mapred.JobConf;
59import org.apache.hadoop.mapred.Reducer;
60
61/**
62 * This program executes a specified operation that applies load to
63 * the NameNode.
64 *
65 * When run simultaneously on multiple nodes, this program functions
66 * as a stress-test and benchmark for namenode, especially when
67 * the number of bytes written to each file is small.
68 *
69 * Valid operations are:
70 *   create_write
71 *   open_read
72 *   rename
73 *   delete
74 *
75 * NOTE: The open_read, rename and delete operations assume that the files
76 *       they operate on are already available. The create_write operation
77 *       must be run before running the other operations.
78 */
79
80public class NNBench {
81  private static final Log LOG = LogFactory.getLog(
82          "org.apache.hadoop.hdfs.NNBench");
83 
84  protected static String CONTROL_DIR_NAME = "control";
85  protected static String OUTPUT_DIR_NAME = "output";
86  protected static String DATA_DIR_NAME = "data";
87  protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log";
88  protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4";
89 
90  public static String operation = "none";
91  public static long numberOfMaps = 1l; // default is 1
92  public static long numberOfReduces = 1l; // default is 1
93  public static long startTime = 
94          System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min
95  public static long blockSize = 1l; // default is 1
96  public static int bytesToWrite = 0; // default is 0
97  public static long bytesPerChecksum = 1l; // default is 1
98  public static long numberOfFiles = 1l; // default is 1
99  public static short replicationFactorPerFile = 1; // default is 1
100  public static String baseDir = "/benchmarks/NNBench";  // default
101  public static boolean readFileAfterOpen = false; // default is to not read
102 
103  // Supported operations
104  private static final String OP_CREATE_WRITE = "create_write";
105  private static final String OP_OPEN_READ = "open_read";
106  private static final String OP_RENAME = "rename";
107  private static final String OP_DELETE = "delete";
108 
109  // To display in the format that matches the NN and DN log format
110  // Example: 2007-10-26 00:01:19,853
111  static SimpleDateFormat sdf = 
112          new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
113
114  private static Configuration config = new Configuration();
115 
116  /**
117   * Clean up the files before a test run
118   *
119   * @throws IOException on error
120   */
121  private static void cleanupBeforeTestrun() throws IOException {
122    FileSystem tempFS = FileSystem.get(config);
123   
124    // Delete the data directory only if it is the create/write operation
125    if (operation.equals(OP_CREATE_WRITE)) {
126      LOG.info("Deleting data directory");
127      tempFS.delete(new Path(baseDir, DATA_DIR_NAME), true);
128    }
129    tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME), true);
130    tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME), true);
131  }
132 
133  /**
134   * Create control files before a test run.
135   * Number of files created is equal to the number of maps specified
136   *
137   * @throws IOException on error
138   */
139  private static void createControlFiles() throws IOException {
140    FileSystem tempFS = FileSystem.get(config);
141    LOG.info("Creating " + numberOfMaps + " control files");
142
143    for (int i = 0; i < numberOfMaps; i++) {
144      String strFileName = "NNBench_Controlfile_" + i;
145      Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
146              strFileName);
147
148      SequenceFile.Writer writer = null;
149      try {
150        writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, 
151                LongWritable.class, CompressionType.NONE);
152        writer.append(new Text(strFileName), new LongWritable(0l));
153      } catch(Exception e) {
154        throw new IOException(e.getLocalizedMessage());
155      } finally {
156        if (writer != null) {
157          writer.close();
158        }
159        writer = null;
160      }
161    }
162  }
163  /**
164   * Display version
165   */
166  private static void displayVersion() {
167    System.out.println(NNBENCH_VERSION);
168  }
169 
170  /**
171   * Display usage
172   */
173  private static void displayUsage() {
174    String usage =
175      "Usage: nnbench <options>\n" +
176      "Options:\n" +
177      "\t-operation <Available operations are " + OP_CREATE_WRITE + " " +
178      OP_OPEN_READ + " " + OP_RENAME + " " + OP_DELETE + ". " +
179      "This option is mandatory>\n" +
180      "\t * NOTE: The open_read, rename and delete operations assume " +
181      "that the files they operate on, are already available. " +
182      "The create_write operation must be run before running the " +
183      "other operations.\n" +
184      "\t-maps <number of maps. default is 1. This is not mandatory>\n" +
185      "\t-reduces <number of reduces. default is 1. This is not mandatory>\n" +
186      "\t-startTime <time to start, given in seconds from the epoch. " +
187      "Make sure this is far enough into the future, so all maps " +
188      "(operations) will start at the same time>. " +
189      "default is launch time + 2 mins. This is not mandatory \n" +
190      "\t-blockSize <Block size in bytes. default is 1. " + 
191      "This is not mandatory>\n" +
192      "\t-bytesToWrite <Bytes to write. default is 0. " + 
193      "This is not mandatory>\n" +
194      "\t-bytesPerChecksum <Bytes per checksum for the files. default is 1. " + 
195      "This is not mandatory>\n" +
196      "\t-numberOfFiles <number of files to create. default is 1. " +
197      "This is not mandatory>\n" +
198      "\t-replicationFactorPerFile <Replication factor for the files." +
199        " default is 1. This is not mandatory>\n" +
200      "\t-baseDir <base DFS path. default is /becnhmarks/NNBench. " +
201      "This is not mandatory>\n" +
202      "\t-readFileAfterOpen <true or false. if true, it reads the file and " +
203      "reports the average time to read. This is valid with the open_read " +
204      "operation. default is false. This is not mandatory>\n" +
205      "\t-help: Display the help statement\n";
206     
207   
208    System.out.println(usage);
209  }
210
211  /**
212   * check for arguments and fail if the values are not specified
213   */
214  public static void checkArgs(final int index, final int length) {
215    if (index == length) {
216      displayUsage();
217      System.exit(-1);
218    }
219  }
220 
221  /**
222   * Parse input arguments
223   *
224   * @params args Command line inputs
225   */
226  public static void parseInputs(final String[] args) {
227    // If there are no command line arguments, exit
228    if (args.length == 0) {
229      displayUsage();
230      System.exit(-1);
231    }
232   
233    // Parse command line args
234    for (int i = 0; i < args.length; i++) {
235      if (args[i].equals("-operation")) {
236        operation = args[++i];
237      } else if (args[i].equals("-maps")) {
238        checkArgs(i + 1, args.length);
239        numberOfMaps = Long.parseLong(args[++i]);
240      } else if (args[i].equals("-reduces")) {
241        checkArgs(i + 1, args.length);
242        numberOfReduces = Long.parseLong(args[++i]);
243      } else if (args[i].equals("-startTime")) {
244        checkArgs(i + 1, args.length);
245        startTime = Long.parseLong(args[++i]) * 1000;
246      } else if (args[i].equals("-blockSize")) {
247        checkArgs(i + 1, args.length);
248        blockSize = Long.parseLong(args[++i]);
249      } else if (args[i].equals("-bytesToWrite")) {
250        checkArgs(i + 1, args.length);
251        bytesToWrite = Integer.parseInt(args[++i]);
252      } else if (args[i].equals("-bytesPerChecksum")) {
253        checkArgs(i + 1, args.length);
254        bytesPerChecksum = Long.parseLong(args[++i]);
255      } else if (args[i].equals("-numberOfFiles")) {
256        checkArgs(i + 1, args.length);
257        numberOfFiles = Long.parseLong(args[++i]);
258      } else if (args[i].equals("-replicationFactorPerFile")) {
259        checkArgs(i + 1, args.length);
260        replicationFactorPerFile = Short.parseShort(args[++i]);
261      } else if (args[i].equals("-baseDir")) {
262        checkArgs(i + 1, args.length);
263        baseDir = args[++i];
264      } else if (args[i].equals("-readFileAfterOpen")) {
265        checkArgs(i + 1, args.length);
266        readFileAfterOpen = Boolean.parseBoolean(args[++i]);
267      } else if (args[i].equals("-help")) {
268        displayUsage();
269        System.exit(-1);
270      }
271    }
272   
273    LOG.info("Test Inputs: ");
274    LOG.info("           Test Operation: " + operation);
275    LOG.info("               Start time: " + sdf.format(new Date(startTime)));
276    LOG.info("           Number of maps: " + numberOfMaps);
277    LOG.info("        Number of reduces: " + numberOfReduces);
278    LOG.info("               Block Size: " + blockSize);
279    LOG.info("           Bytes to write: " + bytesToWrite);
280    LOG.info("       Bytes per checksum: " + bytesPerChecksum);
281    LOG.info("          Number of files: " + numberOfFiles);
282    LOG.info("       Replication factor: " + replicationFactorPerFile);
283    LOG.info("                 Base dir: " + baseDir);
284    LOG.info("     Read file after open: " + readFileAfterOpen);
285   
286    // Set user-defined parameters, so the map method can access the values
287    config.set("test.nnbench.operation", operation);
288    config.setLong("test.nnbench.maps", numberOfMaps);
289    config.setLong("test.nnbench.reduces", numberOfReduces);
290    config.setLong("test.nnbench.starttime", startTime);
291    config.setLong("test.nnbench.blocksize", blockSize);
292    config.setInt("test.nnbench.bytestowrite", bytesToWrite);
293    config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum);
294    config.setLong("test.nnbench.numberoffiles", numberOfFiles);
295    config.setInt("test.nnbench.replicationfactor", 
296            (int) replicationFactorPerFile);
297    config.set("test.nnbench.basedir", baseDir);
298    config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen);
299
300    config.set("test.nnbench.datadir.name", DATA_DIR_NAME);
301    config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME);
302    config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME);
303  }
304 
305  /**
306   * Analyze the results
307   *
308   * @throws IOException on error
309   */
310  private static void analyzeResults() throws IOException {
311    final FileSystem fs = FileSystem.get(config);
312    Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
313            "part-00000");
314
315    DataInputStream in;
316    in = new DataInputStream(fs.open(reduceFile));
317
318    BufferedReader lines;
319    lines = new BufferedReader(new InputStreamReader(in));
320
321    long totalTimeAL1 = 0l;
322    long totalTimeAL2 = 0l;
323    long totalTimeTPmS = 0l;
324    long lateMaps = 0l;
325    long numOfExceptions = 0l;
326    long successfulFileOps = 0l;
327   
328    long mapStartTimeTPmS = 0l;
329    long mapEndTimeTPmS = 0l;
330   
331    String resultTPSLine1 = null;
332    String resultTPSLine2 = null;
333    String resultALLine1 = null;
334    String resultALLine2 = null;
335   
336    String line;
337    while((line = lines.readLine()) != null) {
338      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
339      String attr = tokens.nextToken();
340      if (attr.endsWith(":totalTimeAL1")) {
341        totalTimeAL1 = Long.parseLong(tokens.nextToken());
342      } else if (attr.endsWith(":totalTimeAL2")) {
343        totalTimeAL2 = Long.parseLong(tokens.nextToken());
344      } else if (attr.endsWith(":totalTimeTPmS")) {
345        totalTimeTPmS = Long.parseLong(tokens.nextToken());
346      } else if (attr.endsWith(":latemaps")) {
347        lateMaps = Long.parseLong(tokens.nextToken());
348      } else if (attr.endsWith(":numOfExceptions")) {
349        numOfExceptions = Long.parseLong(tokens.nextToken());
350      } else if (attr.endsWith(":successfulFileOps")) {
351        successfulFileOps = Long.parseLong(tokens.nextToken());
352      } else if (attr.endsWith(":mapStartTimeTPmS")) {
353        mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
354      } else if (attr.endsWith(":mapEndTimeTPmS")) {
355        mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
356      }
357    }
358   
359    // Average latency is the average time to perform 'n' number of
360    // operations, n being the number of files
361    double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps;
362    double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps;
363   
364    // The time it takes for the longest running map is measured. Using that,
365    // cluster transactions per second is calculated. It includes time to
366    // retry any of the failed operations
367    double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS);
368    double totalTimeTPS = (longestMapTimeTPmS == 0) ?
369            (1000 * successfulFileOps) :
370            (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS;
371           
372    // The time it takes to perform 'n' operations is calculated (in ms),
373    // n being the number of files. Using that time, the average execution
374    // time is calculated. It includes time to retry any of the
375    // failed operations
376    double AverageExecutionTime = (totalTimeTPmS == 0) ?
377        (double) successfulFileOps : 
378        (double) (totalTimeTPmS / successfulFileOps);
379           
380    if (operation.equals(OP_CREATE_WRITE)) {
381      // For create/write/close, it is treated as two transactions,
382      // since a file create from a client perspective involves create and close
383      resultTPSLine1 = "               TPS: Create/Write/Close: " + 
384        (int) (totalTimeTPS * 2);
385      resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " + 
386        (double) AverageExecutionTime;
387      resultALLine1 = "            Avg Lat (ms): Create/Write: " + avgLatency1;
388      resultALLine2 = "                   Avg Lat (ms): Close: " + avgLatency2;
389    } else if (operation.equals(OP_OPEN_READ)) {
390      resultTPSLine1 = "                        TPS: Open/Read: " + 
391        (int) totalTimeTPS;
392      resultTPSLine2 = "         Avg Exec time (ms): Open/Read: " + 
393        (double) AverageExecutionTime;
394      resultALLine1 = "                    Avg Lat (ms): Open: " + avgLatency1;
395      if (readFileAfterOpen) {
396        resultALLine2 = "                  Avg Lat (ms): Read: " + avgLatency2;
397      }
398    } else if (operation.equals(OP_RENAME)) {
399      resultTPSLine1 = "                           TPS: Rename: " + 
400        (int) totalTimeTPS;
401      resultTPSLine2 = "            Avg Exec time (ms): Rename: " + 
402        (double) AverageExecutionTime;
403      resultALLine1 = "                  Avg Lat (ms): Rename: " + avgLatency1;
404    } else if (operation.equals(OP_DELETE)) {
405      resultTPSLine1 = "                           TPS: Delete: " + 
406        (int) totalTimeTPS;
407      resultTPSLine2 = "            Avg Exec time (ms): Delete: " + 
408        (double) AverageExecutionTime;
409      resultALLine1 = "                  Avg Lat (ms): Delete: " + avgLatency1;
410    }
411   
412    String resultLines[] = {
413    "-------------- NNBench -------------- : ",
414    "                               Version: " + NNBENCH_VERSION,
415    "                           Date & time: " + sdf.format(new Date(
416            System.currentTimeMillis())),
417    "",
418    "                        Test Operation: " + operation,
419    "                            Start time: " + 
420      sdf.format(new Date(startTime)),
421    "                           Maps to run: " + numberOfMaps,
422    "                        Reduces to run: " + numberOfReduces,
423    "                    Block Size (bytes): " + blockSize,
424    "                        Bytes to write: " + bytesToWrite,
425    "                    Bytes per checksum: " + bytesPerChecksum,
426    "                       Number of files: " + numberOfFiles,
427    "                    Replication factor: " + replicationFactorPerFile,
428    "            Successful file operations: " + successfulFileOps,
429    "",
430    "        # maps that missed the barrier: " + lateMaps,
431    "                          # exceptions: " + numOfExceptions,
432    "",
433    resultTPSLine1,
434    resultTPSLine2,
435    resultALLine1,
436    resultALLine2,
437    "",
438    "                 RAW DATA: AL Total #1: " + totalTimeAL1,
439    "                 RAW DATA: AL Total #2: " + totalTimeAL2,
440    "              RAW DATA: TPS Total (ms): " + totalTimeTPmS,
441    "       RAW DATA: Longest Map Time (ms): " + longestMapTimeTPmS,
442    "                   RAW DATA: Late maps: " + lateMaps,
443    "             RAW DATA: # of exceptions: " + numOfExceptions,
444    "" };
445
446    PrintStream res = new PrintStream(new FileOutputStream(
447            new File(DEFAULT_RES_FILE_NAME), true));
448   
449    // Write to a file and also dump to log
450    for(int i = 0; i < resultLines.length; i++) {
451      LOG.info(resultLines[i]);
452      res.println(resultLines[i]);
453    }
454  }
455 
456  /**
457   * Run the test
458   *
459   * @throws IOException on error
460   */
461  public static void runTests() throws IOException {
462    config.setLong("io.bytes.per.checksum", bytesPerChecksum);
463   
464    JobConf job = new JobConf(config, NNBench.class);
465
466    job.setJobName("NNBench-" + operation);
467    FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
468    job.setInputFormat(SequenceFileInputFormat.class);
469   
470    // Explicitly set number of max map attempts to 1.
471    job.setMaxMapAttempts(1);
472   
473    // Explicitly turn off speculative execution
474    job.setSpeculativeExecution(false);
475
476    job.setMapperClass(NNBenchMapper.class);
477    job.setReducerClass(NNBenchReducer.class);
478
479    FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
480    job.setOutputKeyClass(Text.class);
481    job.setOutputValueClass(Text.class);
482    job.setNumReduceTasks((int) numberOfReduces);
483    JobClient.runJob(job);
484  }
485 
486  /**
487   * Validate the inputs
488   */
489  public static void validateInputs() {
490    // If it is not one of the four operations, then fail
491    if (!operation.equals(OP_CREATE_WRITE) &&
492            !operation.equals(OP_OPEN_READ) &&
493            !operation.equals(OP_RENAME) &&
494            !operation.equals(OP_DELETE)) {
495      System.err.println("Error: Unknown operation: " + operation);
496      displayUsage();
497      System.exit(-1);
498    }
499   
500    // If number of maps is a negative number, then fail
501    // Hadoop allows the number of maps to be 0
502    if (numberOfMaps < 0) {
503      System.err.println("Error: Number of maps must be a positive number");
504      displayUsage();
505      System.exit(-1);
506    }
507   
508    // If number of reduces is a negative number or 0, then fail
509    if (numberOfReduces <= 0) {
510      System.err.println("Error: Number of reduces must be a positive number");
511      displayUsage();
512      System.exit(-1);
513    }
514
515    // If blocksize is a negative number or 0, then fail
516    if (blockSize <= 0) {
517      System.err.println("Error: Block size must be a positive number");
518      displayUsage();
519      System.exit(-1);
520    }
521   
522    // If bytes to write is a negative number, then fail
523    if (bytesToWrite < 0) {
524      System.err.println("Error: Bytes to write must be a positive number");
525      displayUsage();
526      System.exit(-1);
527    }
528   
529    // If bytes per checksum is a negative number, then fail
530    if (bytesPerChecksum < 0) {
531      System.err.println("Error: Bytes per checksum must be a positive number");
532      displayUsage();
533      System.exit(-1);
534    }
535   
536    // If number of files is a negative number, then fail
537    if (numberOfFiles < 0) {
538      System.err.println("Error: Number of files must be a positive number");
539      displayUsage();
540      System.exit(-1);
541    }
542   
543    // If replication factor is a negative number, then fail
544    if (replicationFactorPerFile < 0) {
545      System.err.println("Error: Replication factor must be a positive number");
546      displayUsage();
547      System.exit(-1);
548    }
549   
550    // If block size is not a multiple of bytesperchecksum, fail
551    if (blockSize % bytesPerChecksum != 0) {
552      System.err.println("Error: Block Size in bytes must be a multiple of " +
553              "bytes per checksum: ");
554      displayUsage();
555      System.exit(-1);
556    }
557  }
558  /**
559  * Main method for running the NNBench benchmarks
560  *
561  * @throws IOException indicates a problem with test startup
562  */
563  public static void main(String[] args) throws IOException {
564    // Display the application version string
565    displayVersion();
566
567    // Parse the inputs
568    parseInputs(args);
569   
570    // Validate inputs
571    validateInputs();
572   
573    // Clean up files before the test run
574    cleanupBeforeTestrun();
575   
576    // Create control files before test run
577    createControlFiles();
578
579    // Run the tests as a map reduce job
580    runTests();
581   
582    // Analyze results
583    analyzeResults();
584  }
585
586 
587  /**
588   * Mapper class
589   */
590  static class NNBenchMapper extends Configured
591          implements Mapper<Text, LongWritable, Text, Text> {
592    FileSystem filesystem = null;
593    private String hostName = null;
594
595    long numberOfFiles = 1l;
596    long blkSize = 1l;
597    short replFactor = 1;
598    int bytesToWrite = 0;
599    String baseDir = null;
600    String dataDirName = null;
601    String op = null;
602    boolean readFile = false;
603    final int MAX_OPERATION_EXCEPTIONS = 1000;
604   
605    // Data to collect from the operation
606    int numOfExceptions = 0;
607    long startTimeAL = 0l;
608    long totalTimeAL1 = 0l;
609    long totalTimeAL2 = 0l;
610    long successfulFileOps = 0l;
611   
612    /**
613     * Constructor
614     */
615    public NNBenchMapper() {
616    }
617   
618    /**
619     * Mapper base implementation
620     */
621    public void configure(JobConf conf) {
622      setConf(conf);
623     
624      try {
625        filesystem = FileSystem.get(conf);
626      } catch(Exception e) {
627        throw new RuntimeException("Cannot get file system.", e);
628      }
629     
630      try {
631        hostName = InetAddress.getLocalHost().getHostName();
632      } catch(Exception e) {
633        throw new RuntimeException("Error getting hostname", e);
634      }
635    }
636   
637    /**
638     * Mapper base implementation
639     */
640    public void close() throws IOException {
641    }
642   
643    /**
644    * Returns when the current number of seconds from the epoch equals
645    * the command line argument given by <code>-startTime</code>.
646    * This allows multiple instances of this program, running on clock
647    * synchronized nodes, to start at roughly the same time.
648    */
649    private boolean barrier() {
650      long startTime = getConf().getLong("test.nnbench.starttime", 0l);
651      long currentTime = System.currentTimeMillis();
652      long sleepTime = startTime - currentTime;
653      boolean retVal = false;
654     
655      // If the sleep time is greater than 0, then sleep and return
656      if (sleepTime > 0) {
657        LOG.info("Waiting in barrier for: " + sleepTime + " ms");
658     
659        try {
660          Thread.sleep(sleepTime);
661          retVal = true;
662        } catch (Exception e) {
663          retVal = false;
664        }
665      }
666     
667      return retVal;
668    }
669   
670    /**
671     * Map method
672     */ 
673    public void map(Text key, 
674            LongWritable value,
675            OutputCollector<Text, Text> output,
676            Reporter reporter) throws IOException {
677      Configuration conf = filesystem.getConf();
678     
679      numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l);
680      blkSize = conf.getLong("test.nnbench.blocksize", 1l);
681      replFactor = (short) (conf.getInt("test.nnbench.replicationfactor", 1));
682      bytesToWrite = conf.getInt("test.nnbench.bytestowrite", 0);
683      baseDir = conf.get("test.nnbench.basedir");
684      dataDirName = conf.get("test.nnbench.datadir.name");
685      op = conf.get("test.nnbench.operation");
686      readFile = conf.getBoolean("test.nnbench.readFileAfterOpen", false);
687     
688      long totalTimeTPmS = 0l;
689      long startTimeTPmS = 0l;
690      long endTimeTPms = 0l;
691     
692      numOfExceptions = 0;
693      startTimeAL = 0l;
694      totalTimeAL1 = 0l;
695      totalTimeAL2 = 0l;
696      successfulFileOps = 0l;
697     
698      if (barrier()) {
699        if (op.equals(OP_CREATE_WRITE)) {
700          startTimeTPmS = System.currentTimeMillis();
701          doCreateWriteOp("file_" + hostName + "_", output, reporter);
702        } else if (op.equals(OP_OPEN_READ)) {
703          startTimeTPmS = System.currentTimeMillis();
704          doOpenReadOp("file_" + hostName + "_", output, reporter);
705        } else if (op.equals(OP_RENAME)) {
706          startTimeTPmS = System.currentTimeMillis();
707          doRenameOp("file_" + hostName + "_", output, reporter);
708        } else if (op.equals(OP_DELETE)) {
709          startTimeTPmS = System.currentTimeMillis();
710          doDeleteOp("file_" + hostName + "_", output, reporter);
711        }
712       
713        endTimeTPms = System.currentTimeMillis();
714        totalTimeTPmS = endTimeTPms - startTimeTPmS;
715      } else {
716        output.collect(new Text("l:latemaps"), new Text("1"));
717      }
718     
719      // collect after the map end time is measured
720      output.collect(new Text("l:totalTimeAL1"), 
721          new Text(String.valueOf(totalTimeAL1)));
722      output.collect(new Text("l:totalTimeAL2"), 
723          new Text(String.valueOf(totalTimeAL2)));
724      output.collect(new Text("l:numOfExceptions"), 
725          new Text(String.valueOf(numOfExceptions)));
726      output.collect(new Text("l:successfulFileOps"), 
727          new Text(String.valueOf(successfulFileOps)));
728      output.collect(new Text("l:totalTimeTPmS"), 
729              new Text(String.valueOf(totalTimeTPmS)));
730      output.collect(new Text("min:mapStartTimeTPmS"), 
731          new Text(String.valueOf(startTimeTPmS)));
732      output.collect(new Text("max:mapEndTimeTPmS"), 
733          new Text(String.valueOf(endTimeTPms)));
734    }
735   
736    /**
737     * Create and Write operation.
738     */
739    private void doCreateWriteOp(String name,
740            OutputCollector<Text, Text> output,
741            Reporter reporter) {
742      FSDataOutputStream out = null;
743      byte[] buffer = new byte[bytesToWrite];
744     
745      for (long l = 0l; l < numberOfFiles; l++) {
746        Path filePath = new Path(new Path(baseDir, dataDirName), 
747                name + "_" + l);
748
749        boolean successfulOp = false;
750        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
751          try {
752            // Set up timer for measuring AL (transaction #1)
753            startTimeAL = System.currentTimeMillis();
754            // Create the file
755            // Use a buffer size of 512
756            out = filesystem.create(filePath, 
757                    true, 
758                    512, 
759                    replFactor, 
760                    blkSize);
761            out.write(buffer);
762            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
763
764            // Close the file / file output stream
765            // Set up timers for measuring AL (transaction #2)
766            startTimeAL = System.currentTimeMillis();
767            out.close();
768           
769            totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
770            successfulOp = true;
771            successfulFileOps ++;
772
773            reporter.setStatus("Finish "+ l + " files");
774          } catch (IOException e) {
775            LOG.info("Exception recorded in op: " +
776                    "Create/Write/Close");
777 
778            numOfExceptions++;
779          }
780        }
781      }
782    }
783   
784    /**
785     * Open operation
786     */
787    private void doOpenReadOp(String name,
788            OutputCollector<Text, Text> output,
789            Reporter reporter) {
790      FSDataInputStream input = null;
791      byte[] buffer = new byte[bytesToWrite];
792     
793      for (long l = 0l; l < numberOfFiles; l++) {
794        Path filePath = new Path(new Path(baseDir, dataDirName), 
795                name + "_" + l);
796
797        boolean successfulOp = false;
798        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
799          try {
800            // Set up timer for measuring AL
801            startTimeAL = System.currentTimeMillis();
802            input = filesystem.open(filePath);
803            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
804           
805            // If the file needs to be read (specified at command line)
806            if (readFile) {
807              startTimeAL = System.currentTimeMillis();
808              input.readFully(buffer);
809
810              totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
811            }
812            input.close();
813            successfulOp = true;
814            successfulFileOps ++;
815
816            reporter.setStatus("Finish "+ l + " files");
817          } catch (IOException e) {
818            LOG.info("Exception recorded in op: OpenRead " + e);
819            numOfExceptions++;
820          }
821        }
822      }
823    }
824   
825    /**
826     * Rename operation
827     */
828    private void doRenameOp(String name,
829            OutputCollector<Text, Text> output,
830            Reporter reporter) {
831      for (long l = 0l; l < numberOfFiles; l++) {
832        Path filePath = new Path(new Path(baseDir, dataDirName), 
833                name + "_" + l);
834        Path filePathR = new Path(new Path(baseDir, dataDirName), 
835                name + "_r_" + l);
836
837        boolean successfulOp = false;
838        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
839          try {
840            // Set up timer for measuring AL
841            startTimeAL = System.currentTimeMillis();
842            filesystem.rename(filePath, filePathR);
843            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
844           
845            successfulOp = true;
846            successfulFileOps ++;
847
848            reporter.setStatus("Finish "+ l + " files");
849          } catch (IOException e) {
850            LOG.info("Exception recorded in op: Rename");
851
852            numOfExceptions++;
853          }
854        }
855      }
856    }
857   
858    /**
859     * Delete operation
860     */
861    private void doDeleteOp(String name,
862            OutputCollector<Text, Text> output,
863            Reporter reporter) {
864      for (long l = 0l; l < numberOfFiles; l++) {
865        Path filePath = new Path(new Path(baseDir, dataDirName), 
866                name + "_" + l);
867       
868        boolean successfulOp = false;
869        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
870          try {
871            // Set up timer for measuring AL
872            startTimeAL = System.currentTimeMillis();
873            filesystem.delete(filePath, true);
874            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
875           
876            successfulOp = true;
877            successfulFileOps ++;
878
879            reporter.setStatus("Finish "+ l + " files");
880          } catch (IOException e) {
881            LOG.info("Exception in recorded op: Delete");
882
883            numOfExceptions++;
884          }
885        }
886      }
887    }
888  }
889 
890  /**
891   * Reducer class
892   */
893  static class NNBenchReducer extends MapReduceBase
894      implements Reducer<Text, Text, Text, Text> {
895
896    protected String hostName;
897
898    public NNBenchReducer () {
899      LOG.info("Starting NNBenchReducer !!!");
900      try {
901        hostName = java.net.InetAddress.getLocalHost().getHostName();
902      } catch(Exception e) {
903        hostName = "localhost";
904      }
905      LOG.info("Starting NNBenchReducer on " + hostName);
906    }
907
908    /**
909     * Reduce method
910     */
911    public void reduce(Text key, 
912                       Iterator<Text> values,
913                       OutputCollector<Text, Text> output, 
914                       Reporter reporter
915                       ) throws IOException {
916      String field = key.toString();
917     
918      reporter.setStatus("starting " + field + " ::host = " + hostName);
919     
920      // sum long values
921      if (field.startsWith("l:")) {
922        long lSum = 0;
923        while (values.hasNext()) {
924          lSum += Long.parseLong(values.next().toString());
925        }
926        output.collect(key, new Text(String.valueOf(lSum)));
927      }
928     
929      if (field.startsWith("min:")) {
930        long minVal = -1;
931        while (values.hasNext()) {
932          long value = Long.parseLong(values.next().toString());
933         
934          if (minVal == -1) {
935            minVal = value;
936          } else {
937            if (value != 0 && value < minVal) {
938              minVal = value;
939            }
940          }
941        }
942        output.collect(key, new Text(String.valueOf(minVal)));
943      }
944     
945      if (field.startsWith("max:")) {
946        long maxVal = -1;
947        while (values.hasNext()) {
948          long value = Long.parseLong(values.next().toString());
949         
950          if (maxVal == -1) {
951            maxVal = value;
952          } else {
953            if (value > maxVal) {
954              maxVal = value;
955            }
956          }
957        }
958        output.collect(key, new Text(String.valueOf(maxVal)));
959      }
960     
961      reporter.setStatus("finished " + field + " ::host = " + hostName);
962    }
963  }
964}
Note: See TracBrowser for help on using the repository browser.