source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 40.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.namenode;
19
20import java.io.File;
21import java.io.FileOutputStream;
22import java.io.IOException;
23import java.util.ArrayList;
24import java.util.Arrays;
25import java.util.List;
26
27import javax.security.auth.login.LoginException;
28
29import org.apache.commons.logging.Log;
30import org.apache.commons.logging.LogFactory;
31import org.apache.commons.logging.impl.Log4JLogger;
32import org.apache.hadoop.conf.Configuration;
33import org.apache.hadoop.fs.permission.FsPermission;
34import org.apache.hadoop.hdfs.protocol.Block;
35import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
36import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
37import org.apache.hadoop.hdfs.protocol.FSConstants;
38import org.apache.hadoop.hdfs.protocol.LocatedBlock;
39import org.apache.hadoop.hdfs.server.datanode.DataNode;
40import org.apache.hadoop.hdfs.server.datanode.DataStorage;
41import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
42import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
43import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
44import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
45import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
46import org.apache.hadoop.net.DNS;
47import org.apache.hadoop.net.NetworkTopology;
48import org.apache.hadoop.security.UnixUserGroupInformation;
49import org.apache.hadoop.security.UserGroupInformation;
50import org.apache.hadoop.util.StringUtils;
51import org.apache.log4j.Level;
52
53/**
54 * Main class for a series of name-node benchmarks.
55 *
56 * Each benchmark measures throughput and average execution time
57 * of a specific name-node operation, e.g. file creation or block reports.
58 *
59 * The benchmark does not involve any other hadoop components
60 * except for the name-node. Each operation is executed
61 * by calling directly the respective name-node method.
62 * The name-node here is real all other components are simulated.
63 *
64 * Command line arguments for the benchmark include:<br>
65 * 1) total number of operations to be performed,<br>
66 * 2) number of threads to run these operations,<br>
67 * 3) followed by operation specific input parameters.
68 *
69 * Then the benchmark generates inputs for each thread so that the
70 * input generation overhead does not effect the resulting statistics.
71 * The number of operations performed by threads practically is the same.
72 * Precisely, the difference between the number of operations
73 * performed by any two threads does not exceed 1.
74 *
75 * Then the benchmark executes the specified number of operations using
76 * the specified number of threads and outputs the resulting stats.
77 */
78public class NNThroughputBenchmark {
79  private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
80  private static final int BLOCK_SIZE = 16;
81
82  static Configuration config;
83  static NameNode nameNode;
84
85  private final UserGroupInformation ugi;
86
87  NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
88    config = conf;
89    ugi = UnixUserGroupInformation.login(config);
90    UserGroupInformation.setCurrentUser(ugi);
91
92    // We do not need many handlers, since each thread simulates a handler
93    // by calling name-node methods directly
94    config.setInt("dfs.namenode.handler.count", 1);
95    // set exclude file
96    config.set("dfs.hosts.exclude", "${hadoop.tmp.dir}/dfs/hosts/exclude");
97    File excludeFile = new File(config.get("dfs.hosts.exclude", "exclude"));
98    if(! excludeFile.exists()) {
99      if(!excludeFile.getParentFile().mkdirs())
100        throw new IOException("NNThroughputBenchmark: cannot mkdir " + excludeFile);
101    }
102    new FileOutputStream(excludeFile).close();
103    // Start the NameNode
104    String[] argv = new String[] {};
105    nameNode = NameNode.createNameNode(argv, config);
106  }
107
108  void close() throws IOException {
109    nameNode.stop();
110  }
111
112  static void turnOffNameNodeLogging() {
113    // change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
114    ((Log4JLogger)NameNode.LOG).getLogger().setLevel(Level.ERROR);
115    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ERROR);
116    ((Log4JLogger)NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
117    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
118    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.ERROR);
119    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ERROR);
120  }
121
122  /**
123   * Base class for collecting operation statistics.
124   *
125   * Overload this class in order to run statistics for a
126   * specific name-node operation.
127   */
128  abstract class OperationStatsBase {
129    protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
130    protected static final String OP_ALL_NAME = "all";
131    protected static final String OP_ALL_USAGE = "-op all " +
132                                  "<other ops options> [-keepResults]";
133
134    protected String baseDir;
135    protected short replication;
136    protected int  numThreads = 0;        // number of threads
137    protected int  numOpsRequired = 0;    // number of operations requested
138    protected int  numOpsExecuted = 0;    // number of operations executed
139    protected long cumulativeTime = 0;    // sum of times for each op
140    protected long elapsedTime = 0;       // time from start to finish
141    protected boolean keepResults = false;// don't clean base directory on exit
142
143    protected List<StatsDaemon> daemons;
144
145    /**
146     * Operation name.
147     */
148    abstract String getOpName();
149
150    /**
151     * Parse command line arguments.
152     *
153     * @param args arguments
154     * @throws IOException
155     */
156    abstract void parseArguments(List<String> args) throws IOException;
157
158    /**
159     * Generate inputs for each daemon thread.
160     *
161     * @param opsPerThread number of inputs for each thread.
162     * @throws IOException
163     */
164    abstract void generateInputs(int[] opsPerThread) throws IOException;
165
166    /**
167     * This corresponds to the arg1 argument of
168     * {@link #executeOp(int, int, String)}, which can have different meanings
169     * depending on the operation performed.
170     *
171     * @param daemonId
172     * @return the argument
173     */
174    abstract String getExecutionArgument(int daemonId);
175
176    /**
177     * Execute name-node operation.
178     *
179     * @param daemonId id of the daemon calling this method.
180     * @param inputIdx serial index of the operation called by the deamon.
181     * @param arg1 operation specific argument.
182     * @return time of the individual name-node call.
183     * @throws IOException
184     */
185    abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
186
187    /**
188     * Print the results of the benchmarking.
189     */
190    abstract void printResults();
191
192    OperationStatsBase() {
193      baseDir = BASE_DIR_NAME + "/" + getOpName();
194      replication = (short) config.getInt("dfs.replication", 3);
195      numOpsRequired = 10;
196      numThreads = 3;
197    }
198
199    void benchmark() throws IOException {
200      daemons = new ArrayList<StatsDaemon>();
201      long start = 0;
202      try {
203        numOpsExecuted = 0;
204        cumulativeTime = 0;
205        if(numThreads < 1)
206          return;
207        int tIdx = 0; // thread index < nrThreads
208        int opsPerThread[] = new int[numThreads];
209        for(int opsScheduled = 0; opsScheduled < numOpsRequired; 
210                                  opsScheduled += opsPerThread[tIdx++]) {
211          // execute  in a separate thread
212          opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx);
213          if(opsPerThread[tIdx] == 0)
214            opsPerThread[tIdx] = 1;
215        }
216        // if numThreads > numOpsRequired then the remaining threads will do nothing
217        for(; tIdx < numThreads; tIdx++)
218          opsPerThread[tIdx] = 0;
219        turnOffNameNodeLogging();
220        generateInputs(opsPerThread);
221        for(tIdx=0; tIdx < numThreads; tIdx++)
222          daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
223        start = System.currentTimeMillis();
224        LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s).");
225        for(StatsDaemon d : daemons)
226          d.start();
227      } finally {
228        while(isInPorgress()) {
229          // try {Thread.sleep(500);} catch (InterruptedException e) {}
230        }
231        elapsedTime = System.currentTimeMillis() - start;
232        for(StatsDaemon d : daemons) {
233          incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
234          // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted);
235        }
236      }
237    }
238
239    private boolean isInPorgress() {
240      for(StatsDaemon d : daemons)
241        if(d.isInProgress())
242          return true;
243      return false;
244    }
245
246    void cleanUp() throws IOException {
247      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
248      if(!keepResults)
249        nameNode.delete(getBaseDir(), true);
250    }
251
252    int getNumOpsExecuted() {
253      return numOpsExecuted;
254    }
255
256    long getCumulativeTime() {
257      return cumulativeTime;
258    }
259
260    long getElapsedTime() {
261      return elapsedTime;
262    }
263
264    long getAverageTime() {
265      return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
266    }
267
268    double getOpsPerSecond() {
269      return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime;
270    }
271
272    String getBaseDir() {
273      return baseDir;
274    }
275
276    String getClientName(int idx) {
277      return getOpName() + "-client-" + idx;
278    }
279
280    void incrementStats(int ops, long time) {
281      numOpsExecuted += ops;
282      cumulativeTime += time;
283    }
284
285    /**
286     * Parse first 2 arguments, corresponding to the "-op" option.
287     *
288     * @param args
289     * @return true if operation is all, which means that options not related
290     * to this operation should be ignored, or false otherwise, meaning
291     * that usage should be printed when an unrelated option is encountered.
292     * @throws IOException
293     */
294    protected boolean verifyOpArgument(List<String> args) {
295      if(args.size() < 2 || ! args.get(0).startsWith("-op"))
296        printUsage();
297      int krIndex = args.indexOf("-keepResults");
298      keepResults = (krIndex >= 0);
299      if(keepResults) {
300        args.remove(krIndex);
301      }
302      String type = args.get(1);
303      if(OP_ALL_NAME.equals(type)) {
304        type = getOpName();
305        return true;
306      }
307      if(!getOpName().equals(type))
308        printUsage();
309      return false;
310    }
311
312    void printStats() {
313      LOG.info("--- " + getOpName() + " stats  ---");
314      LOG.info("# operations: " + getNumOpsExecuted());
315      LOG.info("Elapsed Time: " + getElapsedTime());
316      LOG.info(" Ops per sec: " + getOpsPerSecond());
317      LOG.info("Average Time: " + getAverageTime());
318    }
319  }
320
321  /**
322   * One of the threads that perform stats operations.
323   */
324  private class StatsDaemon extends Thread {
325    private int daemonId;
326    private int opsPerThread;
327    private String arg1;      // argument passed to executeOp()
328    private volatile int  localNumOpsExecuted = 0;
329    private volatile long localCumulativeTime = 0;
330    private OperationStatsBase statsOp;
331
332    StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
333      this.daemonId = daemonId;
334      this.opsPerThread = nrOps;
335      this.statsOp = op;
336      setName(toString());
337    }
338
339    public void run() {
340      UserGroupInformation.setCurrentUser(ugi);
341      localNumOpsExecuted = 0;
342      localCumulativeTime = 0;
343      arg1 = statsOp.getExecutionArgument(daemonId);
344      try {
345        benchmarkOne();
346      } catch(IOException ex) {
347        LOG.error("StatsDaemon " + daemonId + " failed: \n" 
348            + StringUtils.stringifyException(ex));
349      }
350    }
351
352    public String toString() {
353      return "StatsDaemon-" + daemonId;
354    }
355
356    void benchmarkOne() throws IOException {
357      for(int idx = 0; idx < opsPerThread; idx++) {
358        long stat = statsOp.executeOp(daemonId, idx, arg1);
359        localNumOpsExecuted++;
360        localCumulativeTime += stat;
361      }
362    }
363
364    boolean isInProgress() {
365      return localNumOpsExecuted < opsPerThread;
366    }
367
368    /**
369     * Schedule to stop this daemon.
370     */
371    void terminate() {
372      opsPerThread = localNumOpsExecuted;
373    }
374  }
375
376  /**
377   * Clean all benchmark result directories.
378   */
379  class CleanAllStats extends OperationStatsBase {
380    // Operation types
381    static final String OP_CLEAN_NAME = "clean";
382    static final String OP_CLEAN_USAGE = "-op clean";
383
384    CleanAllStats(List<String> args) {
385      super();
386      parseArguments(args);
387      numOpsRequired = 1;
388      numThreads = 1;
389      keepResults = true;
390    }
391
392    String getOpName() {
393      return OP_CLEAN_NAME;
394    }
395
396    void parseArguments(List<String> args) {
397      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
398      if(args.size() > 2 && !ignoreUnrelatedOptions)
399        printUsage();
400    }
401
402    void generateInputs(int[] opsPerThread) throws IOException {
403      // do nothing
404    }
405
406    /**
407     * Does not require the argument
408     */
409    String getExecutionArgument(int daemonId) {
410      return null;
411    }
412
413    /**
414     * Remove entire benchmark directory.
415     */
416    long executeOp(int daemonId, int inputIdx, String ignore) 
417    throws IOException {
418      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
419      long start = System.currentTimeMillis();
420      nameNode.delete(BASE_DIR_NAME, true);
421      long end = System.currentTimeMillis();
422      return end-start;
423    }
424
425    void printResults() {
426      LOG.info("--- " + getOpName() + " inputs ---");
427      LOG.info("Remove directory " + BASE_DIR_NAME);
428      printStats();
429    }
430  }
431
432  /**
433   * File creation statistics.
434   *
435   * Each thread creates the same (+ or -1) number of files.
436   * File names are pre-generated during initialization.
437   * The created files do not have blocks.
438   */
439  class CreateFileStats extends OperationStatsBase {
440    // Operation types
441    static final String OP_CREATE_NAME = "create";
442    static final String OP_CREATE_USAGE = 
443      "-op create [-threads T] [-files N] [-filesPerDir P] [-close]";
444
445    protected FileNameGenerator nameGenerator;
446    protected String[][] fileNames;
447    private boolean closeUponCreate;
448
449    CreateFileStats(List<String> args) {
450      super();
451      parseArguments(args);
452    }
453
454    String getOpName() {
455      return OP_CREATE_NAME;
456    }
457
458    void parseArguments(List<String> args) {
459      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
460      int nrFilesPerDir = 4;
461      closeUponCreate = false;
462      for (int i = 2; i < args.size(); i++) {       // parse command line
463        if(args.get(i).equals("-files")) {
464          if(i+1 == args.size())  printUsage();
465          numOpsRequired = Integer.parseInt(args.get(++i));
466        } else if(args.get(i).equals("-threads")) {
467          if(i+1 == args.size())  printUsage();
468          numThreads = Integer.parseInt(args.get(++i));
469        } else if(args.get(i).equals("-filesPerDir")) {
470          if(i+1 == args.size())  printUsage();
471          nrFilesPerDir = Integer.parseInt(args.get(++i));
472        } else if(args.get(i).equals("-close")) {
473          closeUponCreate = true;
474        } else if(!ignoreUnrelatedOptions)
475          printUsage();
476      }
477      nameGenerator = new FileNameGenerator(getBaseDir(), nrFilesPerDir);
478    }
479
480    void generateInputs(int[] opsPerThread) throws IOException {
481      assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
482      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
483      // int generatedFileIdx = 0;
484      LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
485      fileNames = new String[numThreads][];
486      for(int idx=0; idx < numThreads; idx++) {
487        int threadOps = opsPerThread[idx];
488        fileNames[idx] = new String[threadOps];
489        for(int jdx=0; jdx < threadOps; jdx++)
490          fileNames[idx][jdx] = nameGenerator.
491                                  getNextFileName("ThroughputBench");
492      }
493    }
494
495    void dummyActionNoSynch(int daemonId, int fileIdx) {
496      for(int i=0; i < 2000; i++)
497        fileNames[daemonId][fileIdx].contains(""+i);
498    }
499
500    /**
501     * returns client name
502     */
503    String getExecutionArgument(int daemonId) {
504      return getClientName(daemonId);
505    }
506
507    /**
508     * Do file create.
509     */
510    long executeOp(int daemonId, int inputIdx, String clientName) 
511    throws IOException {
512      long start = System.currentTimeMillis();
513      // dummyActionNoSynch(fileIdx);
514      nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
515                      clientName, true, replication, BLOCK_SIZE);
516      long end = System.currentTimeMillis();
517      for(boolean written = !closeUponCreate; !written; 
518        written = nameNode.complete(fileNames[daemonId][inputIdx], clientName));
519      return end-start;
520    }
521
522    void printResults() {
523      LOG.info("--- " + getOpName() + " inputs ---");
524      LOG.info("nrFiles = " + numOpsRequired);
525      LOG.info("nrThreads = " + numThreads);
526      LOG.info("nrFilesPerDir = " + nameGenerator.getFilesPerDirectory());
527      printStats();
528    }
529  }
530
531  /**
532   * Open file statistics.
533   *
534   * Measure how many open calls (getBlockLocations())
535   * the name-node can handle per second.
536   */
537  class OpenFileStats extends CreateFileStats {
538    // Operation types
539    static final String OP_OPEN_NAME = "open";
540    static final String OP_USAGE_ARGS = 
541      " [-threads T] [-files N] [-filesPerDir P] [-useExisting]";
542    static final String OP_OPEN_USAGE = 
543      "-op " + OP_OPEN_NAME + OP_USAGE_ARGS;
544
545    private boolean useExisting;  // do not generate files, use existing ones
546
547    OpenFileStats(List<String> args) {
548      super(args);
549    }
550
551    String getOpName() {
552      return OP_OPEN_NAME;
553    }
554
555    void parseArguments(List<String> args) {
556      int ueIndex = args.indexOf("-useExisting");
557      useExisting = (ueIndex >= 0);
558      if(useExisting) {
559        args.remove(ueIndex);
560      }
561      super.parseArguments(args);
562    }
563
564    void generateInputs(int[] opsPerThread) throws IOException {
565      // create files using opsPerThread
566      String[] createArgs = new String[] {
567              "-op", "create", 
568              "-threads", String.valueOf(this.numThreads), 
569              "-files", String.valueOf(numOpsRequired),
570              "-filesPerDir", 
571              String.valueOf(nameGenerator.getFilesPerDirectory()),
572              "-close"};
573      CreateFileStats opCreate =  new CreateFileStats(Arrays.asList(createArgs));
574
575      if(!useExisting) {  // create files if they were not created before
576        opCreate.benchmark();
577        LOG.info("Created " + numOpsRequired + " files.");
578      } else {
579        LOG.info("useExisting = true. Assuming " 
580            + numOpsRequired + " files have been created before.");
581      }
582      // use the same files for open
583      super.generateInputs(opsPerThread);
584      if(nameNode.getFileInfo(opCreate.getBaseDir()) != null
585          && nameNode.getFileInfo(getBaseDir()) == null) {
586        nameNode.rename(opCreate.getBaseDir(), getBaseDir());
587      }
588      if(nameNode.getFileInfo(getBaseDir()) == null) {
589        throw new IOException(getBaseDir() + " does not exist.");
590      }
591    }
592
593    /**
594     * Do file open.
595     */
596    long executeOp(int daemonId, int inputIdx, String ignore) 
597    throws IOException {
598      long start = System.currentTimeMillis();
599      nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
600      long end = System.currentTimeMillis();
601      return end-start;
602    }
603  }
604
605  /**
606   * Delete file statistics.
607   *
608   * Measure how many delete calls the name-node can handle per second.
609   */
610  class DeleteFileStats extends OpenFileStats {
611    // Operation types
612    static final String OP_DELETE_NAME = "delete";
613    static final String OP_DELETE_USAGE = 
614      "-op " + OP_DELETE_NAME + OP_USAGE_ARGS;
615
616    DeleteFileStats(List<String> args) {
617      super(args);
618    }
619
620    String getOpName() {
621      return OP_DELETE_NAME;
622    }
623
624    long executeOp(int daemonId, int inputIdx, String ignore) 
625    throws IOException {
626      long start = System.currentTimeMillis();
627      nameNode.delete(fileNames[daemonId][inputIdx], false);
628      long end = System.currentTimeMillis();
629      return end-start;
630    }
631  }
632
633  /**
634   * Rename file statistics.
635   *
636   * Measure how many rename calls the name-node can handle per second.
637   */
638  class RenameFileStats extends OpenFileStats {
639    // Operation types
640    static final String OP_RENAME_NAME = "rename";
641    static final String OP_RENAME_USAGE = 
642      "-op " + OP_RENAME_NAME + OP_USAGE_ARGS;
643
644    protected String[][] destNames;
645
646    RenameFileStats(List<String> args) {
647      super(args);
648    }
649
650    String getOpName() {
651      return OP_RENAME_NAME;
652    }
653
654    void generateInputs(int[] opsPerThread) throws IOException {
655      super.generateInputs(opsPerThread);
656      destNames = new String[fileNames.length][];
657      for(int idx=0; idx < numThreads; idx++) {
658        int nrNames = fileNames[idx].length;
659        destNames[idx] = new String[nrNames];
660        for(int jdx=0; jdx < nrNames; jdx++)
661          destNames[idx][jdx] = fileNames[idx][jdx] + ".r";
662      }
663    }
664
665    long executeOp(int daemonId, int inputIdx, String ignore) 
666    throws IOException {
667      long start = System.currentTimeMillis();
668      nameNode.rename(fileNames[daemonId][inputIdx],
669                      destNames[daemonId][inputIdx]);
670      long end = System.currentTimeMillis();
671      return end-start;
672    }
673  }
674
675  /**
676   * Minimal data-node simulator.
677   */
678  private static class TinyDatanode implements Comparable<String> {
679    private static final long DF_CAPACITY = 100*1024*1024;
680    private static final long DF_USED = 0;
681   
682    NamespaceInfo nsInfo;
683    DatanodeRegistration dnRegistration;
684    Block[] blocks;
685    int nrBlocks; // actual number of blocks
686
687    /**
688     * Get data-node in the form
689     * <host name> : <port>
690     * where port is a 6 digit integer.
691     * This is necessary in order to provide lexocographic ordering.
692     * Host names are all the same, the ordering goes by port numbers.
693     */
694    private static String getNodeName(int port) throws IOException {
695      String machineName = DNS.getDefaultHost("default", "default");
696      String sPort = String.valueOf(100000 + port);
697      if(sPort.length() > 6)
698        throw new IOException("Too many data-nodes.");
699      return machineName + ":" + sPort;
700    }
701
702    TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
703      dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
704      this.blocks = new Block[blockCapacity];
705      this.nrBlocks = 0;
706    }
707
708    String getName() {
709      return dnRegistration.getName();
710    }
711
712    void register() throws IOException {
713      // get versions from the namenode
714      nsInfo = nameNode.versionRequest();
715      dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
716      DataNode.setNewStorageID(dnRegistration);
717      // register datanode
718      dnRegistration = nameNode.register(dnRegistration);
719    }
720
721    /**
722     * Send a heartbeat to the name-node.
723     * Ignore reply commands.
724     */
725    void sendHeartbeat() throws IOException {
726      // register datanode
727      DatanodeCommand[] cmds = nameNode.sendHeartbeat(
728          dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
729      if(cmds != null) {
730        for (DatanodeCommand cmd : cmds ) {
731          LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction());
732        }
733      }
734    }
735
736    boolean addBlock(Block blk) {
737      if(nrBlocks == blocks.length) {
738        LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
739        return false;
740      }
741      blocks[nrBlocks] = blk;
742      nrBlocks++;
743      return true;
744    }
745
746    void formBlockReport() {
747      // fill remaining slots with blocks that do not exist
748      for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
749        blocks[idx] = new Block(blocks.length - idx, 0, 0);
750    }
751
752    public int compareTo(String name) {
753      return getName().compareTo(name);
754    }
755
756    /**
757     * Send a heartbeat to the name-node and replicate blocks if requested.
758     */
759    int replicateBlocks() throws IOException {
760      // register datanode
761      DatanodeCommand[] cmds = nameNode.sendHeartbeat(
762          dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
763      if (cmds != null) {
764        for (DatanodeCommand cmd : cmds) {
765          if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
766            // Send a copy of a block to another datanode
767            BlockCommand bcmd = (BlockCommand)cmd;
768            return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
769          }
770        }
771      }
772      return 0;
773    }
774
775    /**
776     * Transfer blocks to another data-node.
777     * Just report on behalf of the other data-node
778     * that the blocks have been received.
779     */
780    private int transferBlocks( Block blocks[], 
781                                DatanodeInfo xferTargets[][] 
782                              ) throws IOException {
783      for(int i = 0; i < blocks.length; i++) {
784        DatanodeInfo blockTargets[] = xferTargets[i];
785        for(int t = 0; t < blockTargets.length; t++) {
786          DatanodeInfo dnInfo = blockTargets[t];
787          DatanodeRegistration receivedDNReg;
788          receivedDNReg = new DatanodeRegistration(dnInfo.getName());
789          receivedDNReg.setStorageInfo(
790                          new DataStorage(nsInfo, dnInfo.getStorageID()));
791          receivedDNReg.setInfoPort(dnInfo.getInfoPort());
792          nameNode.blockReceived( receivedDNReg, 
793                                  new Block[] {blocks[i]},
794                                  new String[] {DataNode.EMPTY_DEL_HINT});
795        }
796      }
797      return blocks.length;
798    }
799  }
800
801  /**
802   * Block report statistics.
803   *
804   * Each thread here represents its own data-node.
805   * Data-nodes send the same block report each time.
806   * The block report may contain missing or non-existing blocks.
807   */
808  class BlockReportStats extends OperationStatsBase {
809    static final String OP_BLOCK_REPORT_NAME = "blockReport";
810    static final String OP_BLOCK_REPORT_USAGE = 
811      "-op blockReport [-datanodes T] [-reports N] " +
812      "[-blocksPerReport B] [-blocksPerFile F]";
813
814    private int blocksPerReport;
815    private int blocksPerFile;
816    private TinyDatanode[] datanodes; // array of data-nodes sorted by name
817
818    BlockReportStats(List<String> args) {
819      super();
820      this.blocksPerReport = 100;
821      this.blocksPerFile = 10;
822      // set heartbeat interval to 3 min, so that expiration were 40 min
823      config.setLong("dfs.heartbeat.interval", 3 * 60);
824      parseArguments(args);
825      // adjust replication to the number of data-nodes
826      this.replication = (short)Math.min((int)replication, getNumDatanodes());
827    }
828
829    /**
830     * Each thread pretends its a data-node here.
831     */
832    private int getNumDatanodes() {
833      return numThreads;
834    }
835
836    String getOpName() {
837      return OP_BLOCK_REPORT_NAME;
838    }
839
840    void parseArguments(List<String> args) {
841      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
842      for (int i = 2; i < args.size(); i++) {       // parse command line
843        if(args.get(i).equals("-reports")) {
844          if(i+1 == args.size())  printUsage();
845          numOpsRequired = Integer.parseInt(args.get(++i));
846        } else if(args.get(i).equals("-datanodes")) {
847          if(i+1 == args.size())  printUsage();
848          numThreads = Integer.parseInt(args.get(++i));
849        } else if(args.get(i).equals("-blocksPerReport")) {
850          if(i+1 == args.size())  printUsage();
851          blocksPerReport = Integer.parseInt(args.get(++i));
852        } else if(args.get(i).equals("-blocksPerFile")) {
853          if(i+1 == args.size())  printUsage();
854          blocksPerFile = Integer.parseInt(args.get(++i));
855        } else if(!ignoreUnrelatedOptions)
856          printUsage();
857      }
858    }
859
860    void generateInputs(int[] ignore) throws IOException {
861      int nrDatanodes = getNumDatanodes();
862      int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes
863                                    / replication);
864      int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile);
865      datanodes = new TinyDatanode[nrDatanodes];
866      // create data-nodes
867      String prevDNName = "";
868      for(int idx=0; idx < nrDatanodes; idx++) {
869        datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
870        datanodes[idx].register();
871        assert datanodes[idx].getName().compareTo(prevDNName) > 0
872          : "Data-nodes must be sorted lexicographically.";
873        datanodes[idx].sendHeartbeat();
874        prevDNName = datanodes[idx].getName();
875      }
876
877      // create files
878      LOG.info("Creating " + nrFiles + " with " + blocksPerFile + " blocks each.");
879      FileNameGenerator nameGenerator;
880      nameGenerator = new FileNameGenerator(getBaseDir(), 100);
881      String clientName = getClientName(007);
882      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
883      for(int idx=0; idx < nrFiles; idx++) {
884        String fileName = nameGenerator.getNextFileName("ThroughputBench");
885        nameNode.create(fileName, FsPermission.getDefault(),
886                        clientName, true, replication, BLOCK_SIZE);
887        addBlocks(fileName, clientName);
888        nameNode.complete(fileName, clientName);
889      }
890      // prepare block reports
891      for(int idx=0; idx < nrDatanodes; idx++) {
892        datanodes[idx].formBlockReport();
893      }
894    }
895
896    private void addBlocks(String fileName, String clientName) throws IOException {
897      for(int jdx = 0; jdx < blocksPerFile; jdx++) {
898        LocatedBlock loc = nameNode.addBlock(fileName, clientName);
899        for(DatanodeInfo dnInfo : loc.getLocations()) {
900          int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
901          datanodes[dnIdx].addBlock(loc.getBlock());
902          nameNode.blockReceived(
903              datanodes[dnIdx].dnRegistration, 
904              new Block[] {loc.getBlock()},
905              new String[] {""});
906        }
907      }
908    }
909
910    /**
911     * Does not require the argument
912     */
913    String getExecutionArgument(int daemonId) {
914      return null;
915    }
916
917    long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
918      assert daemonId < numThreads : "Wrong daemonId.";
919      TinyDatanode dn = datanodes[daemonId];
920      long start = System.currentTimeMillis();
921      nameNode.blockReport(dn.dnRegistration,
922          BlockListAsLongs.convertToArrayLongs(dn.blocks));
923      long end = System.currentTimeMillis();
924      return end-start;
925    }
926
927    void printResults() {
928      String blockDistribution = "";
929      String delim = "(";
930      for(int idx=0; idx < getNumDatanodes(); idx++) {
931        blockDistribution += delim + datanodes[idx].nrBlocks;
932        delim = ", ";
933      }
934      blockDistribution += ")";
935      LOG.info("--- " + getOpName() + " inputs ---");
936      LOG.info("reports = " + numOpsRequired);
937      LOG.info("datanodes = " + numThreads + " " + blockDistribution);
938      LOG.info("blocksPerReport = " + blocksPerReport);
939      LOG.info("blocksPerFile = " + blocksPerFile);
940      printStats();
941    }
942  }   // end BlockReportStats
943
944  /**
945   * Measures how fast replication monitor can compute data-node work.
946   *
947   * It runs only one thread until no more work can be scheduled.
948   */
949  class ReplicationStats extends OperationStatsBase {
950    static final String OP_REPLICATION_NAME = "replication";
951    static final String OP_REPLICATION_USAGE = 
952      "-op replication [-datanodes T] [-nodesToDecommission D] " +
953      "[-nodeReplicationLimit C] [-totalBlocks B] [-replication R]";
954
955    private BlockReportStats blockReportObject;
956    private int numDatanodes;
957    private int nodesToDecommission;
958    private int nodeReplicationLimit;
959    private int totalBlocks;
960    private int numDecommissionedBlocks;
961    private int numPendingBlocks;
962
963    ReplicationStats(List<String> args) {
964      super();
965      numThreads = 1;
966      numDatanodes = 3;
967      nodesToDecommission = 1;
968      nodeReplicationLimit = 100;
969      totalBlocks = 100;
970      parseArguments(args);
971      // number of operations is 4 times the number of decommissioned
972      // blocks divided by the number of needed replications scanned
973      // by the replication monitor in one iteration
974      numOpsRequired = (totalBlocks*replication*nodesToDecommission*2)
975            / (numDatanodes*numDatanodes);
976
977      String[] blkReportArgs = {
978        "-op", "blockReport",
979        "-datanodes", String.valueOf(numDatanodes),
980        "-blocksPerReport", String.valueOf(totalBlocks*replication/numDatanodes),
981        "-blocksPerFile", String.valueOf(numDatanodes)};
982      blockReportObject = new BlockReportStats(Arrays.asList(blkReportArgs));
983      numDecommissionedBlocks = 0;
984      numPendingBlocks = 0;
985    }
986
987    String getOpName() {
988      return OP_REPLICATION_NAME;
989    }
990
991    void parseArguments(List<String> args) {
992      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
993      for (int i = 2; i < args.size(); i++) {       // parse command line
994        if(args.get(i).equals("-datanodes")) {
995          if(i+1 == args.size())  printUsage();
996          numDatanodes = Integer.parseInt(args.get(++i));
997        } else if(args.get(i).equals("-nodesToDecommission")) {
998          if(i+1 == args.size())  printUsage();
999          nodesToDecommission = Integer.parseInt(args.get(++i));
1000        } else if(args.get(i).equals("-nodeReplicationLimit")) {
1001          if(i+1 == args.size())  printUsage();
1002          nodeReplicationLimit = Integer.parseInt(args.get(++i));
1003        } else if(args.get(i).equals("-totalBlocks")) {
1004          if(i+1 == args.size())  printUsage();
1005          totalBlocks = Integer.parseInt(args.get(++i));
1006        } else if(args.get(i).equals("-replication")) {
1007          if(i+1 == args.size())  printUsage();
1008          replication = Short.parseShort(args.get(++i));
1009        } else if(!ignoreUnrelatedOptions)
1010          printUsage();
1011      }
1012    }
1013
1014    void generateInputs(int[] ignore) throws IOException {
1015      // start data-nodes; create a bunch of files; generate block reports.
1016      blockReportObject.generateInputs(ignore);
1017      // stop replication monitor
1018      nameNode.namesystem.replthread.interrupt();
1019      try {
1020        nameNode.namesystem.replthread.join();
1021      } catch(InterruptedException ei) {
1022        return;
1023      }
1024      // report blocks once
1025      int nrDatanodes = blockReportObject.getNumDatanodes();
1026      for(int idx=0; idx < nrDatanodes; idx++) {
1027        blockReportObject.executeOp(idx, 0, null);
1028      }
1029      // decommission data-nodes
1030      decommissionNodes();
1031      // set node replication limit
1032      nameNode.namesystem.setNodeReplicationLimit(nodeReplicationLimit);
1033    }
1034
1035    private void decommissionNodes() throws IOException {
1036      String excludeFN = config.get("dfs.hosts.exclude", "exclude");
1037      FileOutputStream excludeFile = new FileOutputStream(excludeFN);
1038      excludeFile.getChannel().truncate(0L);
1039      int nrDatanodes = blockReportObject.getNumDatanodes();
1040      numDecommissionedBlocks = 0;
1041      for(int i=0; i < nodesToDecommission; i++) {
1042        TinyDatanode dn = blockReportObject.datanodes[nrDatanodes-1-i];
1043        numDecommissionedBlocks += dn.nrBlocks;
1044        excludeFile.write(dn.getName().getBytes());
1045        excludeFile.write('\n');
1046        LOG.info("Datanode " + dn.getName() + " is decommissioned.");
1047      }
1048      excludeFile.close();
1049      nameNode.refreshNodes();
1050    }
1051
1052    /**
1053     * Does not require the argument
1054     */
1055    String getExecutionArgument(int daemonId) {
1056      return null;
1057    }
1058
1059    long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
1060      assert daemonId < numThreads : "Wrong daemonId.";
1061      long start = System.currentTimeMillis();
1062      // compute data-node work
1063      int work = nameNode.namesystem.computeDatanodeWork();
1064      long end = System.currentTimeMillis();
1065      numPendingBlocks += work;
1066      if(work == 0)
1067        daemons.get(daemonId).terminate();
1068      return end-start;
1069    }
1070
1071    void printResults() {
1072      String blockDistribution = "";
1073      String delim = "(";
1074      int totalReplicas = 0;
1075      for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
1076        totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
1077        blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
1078        delim = ", ";
1079      }
1080      blockDistribution += ")";
1081      LOG.info("--- " + getOpName() + " inputs ---");
1082      LOG.info("numOpsRequired = " + numOpsRequired);
1083      LOG.info("datanodes = " + numDatanodes + " " + blockDistribution);
1084      LOG.info("decommissioned datanodes = " + nodesToDecommission);
1085      LOG.info("datanode replication limit = " + nodeReplicationLimit);
1086      LOG.info("total blocks = " + totalBlocks);
1087      printStats();
1088      LOG.info("decommissioned blocks = " + numDecommissionedBlocks);
1089      LOG.info("pending replications = " + numPendingBlocks);
1090      LOG.info("replications per sec: " + getBlocksPerSecond());
1091    }
1092
1093    private double getBlocksPerSecond() {
1094      return elapsedTime == 0 ? 0 : 1000*(double)numPendingBlocks / elapsedTime;
1095    }
1096
1097  }   // end ReplicationStats
1098
1099  static void printUsage() {
1100    System.err.println("Usage: NNThroughputBenchmark"
1101        + "\n\t"    + OperationStatsBase.OP_ALL_USAGE
1102        + " | \n\t" + CreateFileStats.OP_CREATE_USAGE
1103        + " | \n\t" + OpenFileStats.OP_OPEN_USAGE
1104        + " | \n\t" + DeleteFileStats.OP_DELETE_USAGE
1105        + " | \n\t" + RenameFileStats.OP_RENAME_USAGE
1106        + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
1107        + " | \n\t" + ReplicationStats.OP_REPLICATION_USAGE
1108        + " | \n\t" + CleanAllStats.OP_CLEAN_USAGE
1109    );
1110    System.exit(-1);
1111  }
1112
1113  /**
1114   * Main method of the benchmark.
1115   * @param args command line parameters
1116   */
1117  public static void runBenchmark(Configuration conf, List<String> args) throws Exception {
1118    if(args.size() < 2 || ! args.get(0).startsWith("-op"))
1119      printUsage();
1120
1121    String type = args.get(1);
1122    boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);
1123
1124    NNThroughputBenchmark bench = null;
1125    List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
1126    OperationStatsBase opStat = null;
1127    try {
1128      bench = new NNThroughputBenchmark(conf);
1129      if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
1130        opStat = bench.new CreateFileStats(args);
1131        ops.add(opStat);
1132      }
1133      if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
1134        opStat = bench.new OpenFileStats(args);
1135        ops.add(opStat);
1136      }
1137      if(runAll || DeleteFileStats.OP_DELETE_NAME.equals(type)) {
1138        opStat = bench.new DeleteFileStats(args);
1139        ops.add(opStat);
1140      }
1141      if(runAll || RenameFileStats.OP_RENAME_NAME.equals(type)) {
1142        opStat = bench.new RenameFileStats(args);
1143        ops.add(opStat);
1144      }
1145      if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
1146        opStat = bench.new BlockReportStats(args);
1147        ops.add(opStat);
1148      }
1149      if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) {
1150        opStat = bench.new ReplicationStats(args);
1151        ops.add(opStat);
1152      }
1153      if(runAll || CleanAllStats.OP_CLEAN_NAME.equals(type)) {
1154        opStat = bench.new CleanAllStats(args);
1155        ops.add(opStat);
1156      }
1157      if(ops.size() == 0)
1158        printUsage();
1159      // run each benchmark
1160      for(OperationStatsBase op : ops) {
1161        LOG.info("Starting benchmark: " + op.getOpName());
1162        op.benchmark();
1163        op.cleanUp();
1164      }
1165      // print statistics
1166      for(OperationStatsBase op : ops) {
1167        LOG.info("");
1168        op.printResults();
1169      }
1170    } catch(Exception e) {
1171      LOG.error(StringUtils.stringifyException(e));
1172      throw e;
1173    } finally {
1174      if(bench != null)
1175        bench.close();
1176    }
1177  }
1178
1179  public static void main(String[] args) throws Exception {
1180    runBenchmark(new Configuration(), 
1181                  new ArrayList<String>(Arrays.asList(args)));
1182  }
1183}
Note: See TracBrowser for help on using the repository browser.