source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 17.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs.loadGenerator;
20
21import java.io.IOException;
22import java.io.InputStream;
23import java.net.InetAddress;
24import java.net.UnknownHostException;
25import java.util.ArrayList;
26import java.util.Random;
27
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.conf.Configured;
30import org.apache.hadoop.fs.FSDataOutputStream;
31import org.apache.hadoop.fs.FileStatus;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.util.Tool;
35import org.apache.hadoop.util.ToolRunner;
36
37/** The load generator is a tool for testing NameNode behavior under
38 * different client loads.
39 * It allows the user to generate different mixes of read, write,
40 * and list requests by specifying the probabilities of read and
41 * write. The user controls the intensity of the load by
42 * adjusting parameters for the number of worker threads and the delay
43 * between operations. While load generators are running, the user
44 * can profile and monitor the running of the NameNode. When a load
45 * generator exits, it print some NameNode statistics like the average
46 * execution time of each kind of operations and the NameNode
47 * throughput.
48 *
49 * After command line argument parsing and data initialization,
50 * the load generator spawns the number of worker threads
51 * as specified by the user.
52 * Each thread sends a stream of requests to the NameNode.
53 * For each iteration, it first decides if it is going to read a file,
54 * create a file, or listing a directory following the read and write
55 * probabilities specified by the user.
56 * When reading, it randomly picks a file in the test space and reads
57 * the entire file. When writing, it randomly picks a directory in the
58 * test space and creates a file whose name consists of the current
59 * machine's host name and the thread id. The length of the file
60 * follows Gaussian distribution with an average size of 2 blocks and
61 * the standard deviation of 1 block. The new file is filled with 'a'.
62 * Immediately after the file creation completes, the file is deleted
63 * from the test space.
64 * While listing, it randomly picks a directory in the test space and
65 * list the directory content.
66 * Between two consecutive operations, the thread pauses for a random
67 * amount of time in the range of [0, maxDelayBetweenOps]
68 * if the specified max delay is not zero.
69 * All threads are stopped when the specified elapsed time is passed.
70 * Before exiting, the program prints the average execution for
71 * each kind of NameNode operations, and the number of requests
72 * served by the NameNode.
73 *
74 * The synopsis of the command is
75 * java LoadGenerator
76 *   -readProbability <read probability>: read probability [0, 1]
77 *                                        with a default value of 0.3333.
78 *   -writeProbability <write probability>: write probability [0, 1]
79 *                                         with a default value of 0.3333.
80 *   -root <root>: test space with a default value of /testLoadSpace
81 *   -maxDelayBetweenOps <maxDelayBetweenOpsInMillis>:
82 *      Max delay in the unit of milliseconds between two operations with a
83 *      default value of 0 indicating no delay.
84 *   -numOfThreads <numOfThreads>:
85 *      number of threads to spawn with a default value of 200.
86 *   -elapsedTime <elapsedTimeInSecs>:
87 *      the elapsed time of program with a default value of 0
88 *      indicating running forever
89 *   -startTime <startTimeInMillis> : when the threads start to run.
90 */
91public class LoadGenerator extends Configured implements Tool {
92  private volatile boolean shouldRun = true;
93  private Path root = DataGenerator.DEFAULT_ROOT;
94  private FileSystem fs;
95  private int maxDelayBetweenOps = 0;
96  private int numOfThreads = 200;
97  private double readPr = 0.3333;
98  private double writePr = 0.3333;
99  private long elapsedTime = 0;
100  private long startTime = System.currentTimeMillis()+10000;
101  final static private int BLOCK_SIZE = 10;
102  private ArrayList<String> files = new ArrayList<String>();  // a table of file names
103  private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
104  private Random r = null;
105  final private static String USAGE = "java LoadGenerator\n" +
106        "-readProbability <read probability>\n" +
107    "-writeProbability <write probability>\n" +
108    "-root <root>\n" +
109    "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" +
110    "-numOfThreads <numOfThreads>\n" +
111    "-elapsedTime <elapsedTimeInSecs>\n" +
112    "-startTime <startTimeInMillis>";
113  final private String hostname;
114 
115  /** Constructor */
116  public LoadGenerator() throws IOException, UnknownHostException {
117    InetAddress addr = InetAddress.getLocalHost();
118    hostname = addr.getHostName();
119  }
120
121  private final static int OPEN = 0;
122  private final static int LIST = 1;
123  private final static int CREATE = 2;
124  private final static int WRITE_CLOSE = 3;
125  private final static int DELETE = 4;
126  private final static int TOTAL_OP_TYPES =5;
127  private long [] executionTime = new long[TOTAL_OP_TYPES];
128  private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
129 
130  /** A thread sends a stream of requests to the NameNode.
131   * At each iteration, it first decides if it is going to read a file,
132   * create a file, or listing a directory following the read
133   * and write probabilities.
134   * When reading, it randomly picks a file in the test space and reads
135   * the entire file. When writing, it randomly picks a directory in the
136   * test space and creates a file whose name consists of the current
137   * machine's host name and the thread id. The length of the file
138   * follows Gaussian distribution with an average size of 2 blocks and
139   * the standard deviation of 1 block. The new file is filled with 'a'.
140   * Immediately after the file creation completes, the file is deleted
141   * from the test space.
142   * While listing, it randomly picks a directory in the test space and
143   * list the directory content.
144   * Between two consecutive operations, the thread pauses for a random
145   * amount of time in the range of [0, maxDelayBetweenOps]
146   * if the specified max delay is not zero.
147   * A thread runs for the specified elapsed time if the time isn't zero.
148   * Otherwise, it runs forever.
149   */
150  private class DFSClientThread extends Thread {
151    private int id;
152    private long [] executionTime = new long[TOTAL_OP_TYPES];
153    private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
154    private byte[] buffer = new byte[1024];
155   
156    private DFSClientThread(int id) {
157      this.id = id;
158    }
159   
160    /** Main loop
161     * Each iteration decides what's the next operation and then pauses.
162     */
163    public void run() {
164      try {
165        while (shouldRun) {
166          nextOp();
167          delay();
168        }
169      } catch (Exception ioe) {
170        System.err.println(ioe.getLocalizedMessage());
171        ioe.printStackTrace();
172      }
173    }
174   
175    /** Let the thread pause for a random amount of time in the range of
176     * [0, maxDelayBetweenOps] if the delay is not zero. Otherwise, no pause.
177     */
178    private void delay() throws InterruptedException {
179      if (maxDelayBetweenOps>0) {
180        int delay = r.nextInt(maxDelayBetweenOps);
181        Thread.sleep(delay);
182      }
183    }
184   
185    /** Perform the next operation.
186     *
187     * Depending on the read and write probabilities, the next
188     * operation could be either read, write, or list.
189     */
190    private void nextOp() throws IOException {
191      double rn = r.nextDouble();
192      if (rn < readPr) {
193        read();
194      } else if (rn < readPr+writePr) {
195        write();
196      } else {
197        list();
198      }
199    }
200   
201    /** Read operation randomly picks a file in the test space and reads
202     * the entire file */
203    private void read() throws IOException {
204      String fileName = files.get(r.nextInt(files.size()));
205      long startTime = System.currentTimeMillis();
206      InputStream in = fs.open(new Path(fileName));
207      executionTime[OPEN] += (System.currentTimeMillis()-startTime);
208      totalNumOfOps[OPEN]++;
209      while (in.read(buffer) != -1) {}
210      in.close();
211    }
212   
213    /** The write operation randomly picks a directory in the
214     * test space and creates a file whose name consists of the current
215     * machine's host name and the thread id. The length of the file
216     * follows Gaussian distribution with an average size of 2 blocks and
217     * the standard deviation of 1 block. The new file is filled with 'a'.
218     * Immediately after the file creation completes, the file is deleted
219     * from the test space.
220     */
221    private void write() throws IOException {
222      String dirName = dirs.get(r.nextInt(dirs.size()));
223      Path file = new Path(dirName, hostname+id);
224      double fileSize = 0;
225      while ((fileSize = r.nextGaussian()+2)<=0) {}
226      genFile(file, (long)(fileSize*BLOCK_SIZE));
227      long startTime = System.currentTimeMillis();
228      fs.delete(file, true);
229      executionTime[DELETE] += (System.currentTimeMillis()-startTime);
230      totalNumOfOps[DELETE]++;
231    }
232   
233    /** The list operation randomly picks a directory in the test space and
234     * list the directory content.
235     */
236    private void list() throws IOException {
237      String dirName = dirs.get(r.nextInt(dirs.size()));
238      long startTime = System.currentTimeMillis();
239      fs.listStatus(new Path(dirName));
240      executionTime[LIST] += (System.currentTimeMillis()-startTime);
241      totalNumOfOps[LIST]++;
242    }
243  }
244 
245  /** Main function:
246   * It first initializes data by parsing the command line arguments.
247   * It then starts the number of DFSClient threads as specified by
248   * the user.
249   * It stops all the threads when the specified elapsed time is passed.
250   * Before exiting, it prints the average execution for
251   * each operation and operation throughput.
252   */
253  public int run(String[] args) throws Exception {
254    int exitCode = init(args);
255    if (exitCode != 0) {
256      return exitCode;
257    }
258   
259    barrier();
260   
261    DFSClientThread[] threads = new DFSClientThread[numOfThreads];
262    for (int i=0; i<numOfThreads; i++) {
263      threads[i] = new DFSClientThread(i); 
264      threads[i].start();
265    }
266    if (elapsedTime>0) {
267      Thread.sleep(elapsedTime*1000);
268      shouldRun = false;
269    } 
270    for (DFSClientThread thread : threads) {
271      thread.join();
272      for (int i=0; i<TOTAL_OP_TYPES; i++) {
273        executionTime[i] += thread.executionTime[i];
274        totalNumOfOps[i] += thread.totalNumOfOps[i];
275      }
276    }
277    long totalOps = 0;
278    for (int i=0; i<TOTAL_OP_TYPES; i++) {
279      totalOps += totalNumOfOps[i];
280    }
281   
282    if (totalNumOfOps[OPEN] != 0) {
283      System.out.println("Average open execution time: " + 
284          (double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms");
285    }
286    if (totalNumOfOps[LIST] != 0) {
287      System.out.println("Average list execution time: " + 
288          (double)executionTime[LIST]/totalNumOfOps[LIST] + "ms");
289    }
290    if (totalNumOfOps[DELETE] != 0) {
291      System.out.println("Average deletion execution time: " + 
292          (double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms");
293      System.out.println("Average create execution time: " + 
294          (double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms");
295      System.out.println("Average write_close execution time: " + 
296          (double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms");
297    }
298    if (elapsedTime != 0) { 
299      System.out.println("Average operations per second: " + 
300          (double)totalOps/elapsedTime +"ops/s");
301    }
302    System.out.println();
303    return exitCode;
304  }
305
306  /** Parse the command line arguments and initialize the data */
307  private int init(String[] args) throws IOException {
308    try {
309      fs = FileSystem.get(getConf());
310    } catch (IOException ioe) {
311      System.err.println("Can not initialize the file system: " + 
312          ioe.getLocalizedMessage());
313      return -1;
314    }
315    int hostHashCode = hostname.hashCode();
316    try {
317      for (int i = 0; i < args.length; i++) { // parse command line
318        if (args[i].equals("-readProbability")) {
319          readPr = Double.parseDouble(args[++i]);
320          if (readPr<0 || readPr>1) {
321            System.err.println( 
322                "The read probability must be [0, 1]: " + readPr);
323            return -1;
324          }
325        } else if (args[i].equals("-writeProbability")) {
326          writePr = Double.parseDouble(args[++i]);
327          if (writePr<0 || writePr>1) {
328            System.err.println( 
329                "The write probability must be [0, 1]: " + writePr);
330            return -1;
331          }
332        } else if (args[i].equals("-root")) {
333          root = new Path(args[++i]);
334        } else if (args[i].equals("-maxDelayBetweenOps")) {
335          maxDelayBetweenOps = Integer.parseInt(args[++i]); // in milliseconds
336        } else if (args[i].equals("-numOfThreads")) {
337          numOfThreads = Integer.parseInt(args[++i]);
338          if (numOfThreads <= 0) {
339            System.err.println(
340                "Number of threads must be positive: " + numOfThreads);
341            return -1;
342          }
343        } else if (args[i].equals("-startTime")) {
344          startTime = Long.parseLong(args[++i]);
345        } else if (args[i].equals("-elapsedTime")) {
346          elapsedTime = Long.parseLong(args[++i]);
347        } else if (args[i].equals("-seed")) {
348          r = new Random(Long.parseLong(args[++i])+hostHashCode);
349        } else {
350          System.err.println(USAGE);
351          ToolRunner.printGenericCommandUsage(System.err);
352          return -1;
353        }
354      }
355    } catch (NumberFormatException e) {
356      System.err.println("Illegal parameter: " + e.getLocalizedMessage());
357      System.err.println(USAGE);
358      return -1;
359    }
360
361    if (readPr+writePr <0 || readPr+writePr>1) {
362      System.err.println(
363          "The sum of read probability and write probability must be [0, 1]: " +
364          readPr + " "+writePr);
365      return -1;
366    }
367   
368    if (r==null) {
369      r = new Random(System.currentTimeMillis()+hostHashCode);
370    }
371   
372    return initFileDirTables();
373  }
374 
375  /** Create a table that contains all directories under root and
376   * another table that contains all files under root.
377   */
378  private int initFileDirTables() {
379    try {
380      initFileDirTables(root);
381    } catch (IOException e) {
382      System.err.println(e.getLocalizedMessage());
383      e.printStackTrace();
384      return -1;
385    }
386    if (dirs.isEmpty()) {
387      System.err.println("The test space " + root + " is empty");
388      return -1;
389    }
390    if (files.isEmpty()) {
391      System.err.println("The test space " + root + 
392          " does not have any file");
393      return -1;
394    }
395    return 0;
396  }
397 
398  /** Create a table that contains all directories under the specified path and
399   * another table that contains all files under the specified path and
400   * whose name starts with "_file_".
401   */
402  private void initFileDirTables(Path path) throws IOException {
403    FileStatus[] stats = fs.listStatus(path);
404    if (stats != null) { 
405      for (FileStatus stat : stats) {
406        if (stat.isDir()) {
407          dirs.add(stat.getPath().toString());
408          initFileDirTables(stat.getPath());
409        } else {
410          Path filePath = stat.getPath();
411          if (filePath.getName().startsWith(StructureGenerator.FILE_NAME_PREFIX)) {
412            files.add(filePath.toString());
413          }
414        }
415      }
416    }
417  }
418 
419  /** Returns when the current number of seconds from the epoch equals
420   * the command line argument given by <code>-startTime</code>.
421   * This allows multiple instances of this program, running on clock
422   * synchronized nodes, to start at roughly the same time.
423   */
424  private void barrier() {
425    long sleepTime;
426    while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
427      try {
428        Thread.sleep(sleepTime);
429      } catch (InterruptedException ex) {
430      }
431    }
432  }
433
434  /** Create a file with a length of <code>fileSize</code>.
435   * The file is filled with 'a'.
436   */
437  private void genFile(Path file, long fileSize) throws IOException {
438    long startTime = System.currentTimeMillis();
439    FSDataOutputStream out = fs.create(file, true, 
440        getConf().getInt("io.file.buffer.size", 4096),
441        (short)getConf().getInt("dfs.replication", 3),
442        fs.getDefaultBlockSize());
443    executionTime[CREATE] += (System.currentTimeMillis()-startTime);
444    totalNumOfOps[CREATE]++;
445
446    for (long i=0; i<fileSize; i++) {
447      out.writeByte('a');
448    }
449    startTime = System.currentTimeMillis();
450    out.close();
451    executionTime[WRITE_CLOSE] += (System.currentTimeMillis()-startTime);
452    totalNumOfOps[WRITE_CLOSE]++;
453  }
454 
455  /** Main program
456   *
457   * @param args command line arguments
458   * @throws Exception
459   */
460  public static void main(String[] args) throws Exception {
461    int res = ToolRunner.run(new Configuration(),
462        new LoadGenerator(), args);
463    System.exit(res);
464  }
465
466}
Note: See TracBrowser for help on using the repository browser.