1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.fs.loadGenerator; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.io.InputStream; |
---|
23 | import java.net.InetAddress; |
---|
24 | import java.net.UnknownHostException; |
---|
25 | import java.util.ArrayList; |
---|
26 | import java.util.Random; |
---|
27 | |
---|
28 | import org.apache.hadoop.conf.Configuration; |
---|
29 | import org.apache.hadoop.conf.Configured; |
---|
30 | import org.apache.hadoop.fs.FSDataOutputStream; |
---|
31 | import org.apache.hadoop.fs.FileStatus; |
---|
32 | import org.apache.hadoop.fs.FileSystem; |
---|
33 | import org.apache.hadoop.fs.Path; |
---|
34 | import org.apache.hadoop.util.Tool; |
---|
35 | import org.apache.hadoop.util.ToolRunner; |
---|
36 | |
---|
37 | /** The load generator is a tool for testing NameNode behavior under |
---|
38 | * different client loads. |
---|
39 | * It allows the user to generate different mixes of read, write, |
---|
40 | * and list requests by specifying the probabilities of read and |
---|
41 | * write. The user controls the intensity of the load by |
---|
42 | * adjusting parameters for the number of worker threads and the delay |
---|
43 | * between operations. While load generators are running, the user |
---|
44 | * can profile and monitor the running of the NameNode. When a load |
---|
45 | * generator exits, it print some NameNode statistics like the average |
---|
46 | * execution time of each kind of operations and the NameNode |
---|
47 | * throughput. |
---|
48 | * |
---|
49 | * After command line argument parsing and data initialization, |
---|
50 | * the load generator spawns the number of worker threads |
---|
51 | * as specified by the user. |
---|
52 | * Each thread sends a stream of requests to the NameNode. |
---|
53 | * For each iteration, it first decides if it is going to read a file, |
---|
54 | * create a file, or listing a directory following the read and write |
---|
55 | * probabilities specified by the user. |
---|
56 | * When reading, it randomly picks a file in the test space and reads |
---|
57 | * the entire file. When writing, it randomly picks a directory in the |
---|
58 | * test space and creates a file whose name consists of the current |
---|
59 | * machine's host name and the thread id. The length of the file |
---|
60 | * follows Gaussian distribution with an average size of 2 blocks and |
---|
61 | * the standard deviation of 1 block. The new file is filled with 'a'. |
---|
62 | * Immediately after the file creation completes, the file is deleted |
---|
63 | * from the test space. |
---|
64 | * While listing, it randomly picks a directory in the test space and |
---|
65 | * list the directory content. |
---|
66 | * Between two consecutive operations, the thread pauses for a random |
---|
67 | * amount of time in the range of [0, maxDelayBetweenOps] |
---|
68 | * if the specified max delay is not zero. |
---|
69 | * All threads are stopped when the specified elapsed time is passed. |
---|
70 | * Before exiting, the program prints the average execution for |
---|
71 | * each kind of NameNode operations, and the number of requests |
---|
72 | * served by the NameNode. |
---|
73 | * |
---|
74 | * The synopsis of the command is |
---|
75 | * java LoadGenerator |
---|
76 | * -readProbability <read probability>: read probability [0, 1] |
---|
77 | * with a default value of 0.3333. |
---|
78 | * -writeProbability <write probability>: write probability [0, 1] |
---|
79 | * with a default value of 0.3333. |
---|
80 | * -root <root>: test space with a default value of /testLoadSpace |
---|
81 | * -maxDelayBetweenOps <maxDelayBetweenOpsInMillis>: |
---|
82 | * Max delay in the unit of milliseconds between two operations with a |
---|
83 | * default value of 0 indicating no delay. |
---|
84 | * -numOfThreads <numOfThreads>: |
---|
85 | * number of threads to spawn with a default value of 200. |
---|
86 | * -elapsedTime <elapsedTimeInSecs>: |
---|
87 | * the elapsed time of program with a default value of 0 |
---|
88 | * indicating running forever |
---|
89 | * -startTime <startTimeInMillis> : when the threads start to run. |
---|
90 | */ |
---|
91 | public class LoadGenerator extends Configured implements Tool { |
---|
92 | private volatile boolean shouldRun = true; |
---|
93 | private Path root = DataGenerator.DEFAULT_ROOT; |
---|
94 | private FileSystem fs; |
---|
95 | private int maxDelayBetweenOps = 0; |
---|
96 | private int numOfThreads = 200; |
---|
97 | private double readPr = 0.3333; |
---|
98 | private double writePr = 0.3333; |
---|
99 | private long elapsedTime = 0; |
---|
100 | private long startTime = System.currentTimeMillis()+10000; |
---|
101 | final static private int BLOCK_SIZE = 10; |
---|
102 | private ArrayList<String> files = new ArrayList<String>(); // a table of file names |
---|
103 | private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names |
---|
104 | private Random r = null; |
---|
105 | final private static String USAGE = "java LoadGenerator\n" + |
---|
106 | "-readProbability <read probability>\n" + |
---|
107 | "-writeProbability <write probability>\n" + |
---|
108 | "-root <root>\n" + |
---|
109 | "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" + |
---|
110 | "-numOfThreads <numOfThreads>\n" + |
---|
111 | "-elapsedTime <elapsedTimeInSecs>\n" + |
---|
112 | "-startTime <startTimeInMillis>"; |
---|
113 | final private String hostname; |
---|
114 | |
---|
115 | /** Constructor */ |
---|
116 | public LoadGenerator() throws IOException, UnknownHostException { |
---|
117 | InetAddress addr = InetAddress.getLocalHost(); |
---|
118 | hostname = addr.getHostName(); |
---|
119 | } |
---|
120 | |
---|
121 | private final static int OPEN = 0; |
---|
122 | private final static int LIST = 1; |
---|
123 | private final static int CREATE = 2; |
---|
124 | private final static int WRITE_CLOSE = 3; |
---|
125 | private final static int DELETE = 4; |
---|
126 | private final static int TOTAL_OP_TYPES =5; |
---|
127 | private long [] executionTime = new long[TOTAL_OP_TYPES]; |
---|
128 | private long [] totalNumOfOps = new long[TOTAL_OP_TYPES]; |
---|
129 | |
---|
130 | /** A thread sends a stream of requests to the NameNode. |
---|
131 | * At each iteration, it first decides if it is going to read a file, |
---|
132 | * create a file, or listing a directory following the read |
---|
133 | * and write probabilities. |
---|
134 | * When reading, it randomly picks a file in the test space and reads |
---|
135 | * the entire file. When writing, it randomly picks a directory in the |
---|
136 | * test space and creates a file whose name consists of the current |
---|
137 | * machine's host name and the thread id. The length of the file |
---|
138 | * follows Gaussian distribution with an average size of 2 blocks and |
---|
139 | * the standard deviation of 1 block. The new file is filled with 'a'. |
---|
140 | * Immediately after the file creation completes, the file is deleted |
---|
141 | * from the test space. |
---|
142 | * While listing, it randomly picks a directory in the test space and |
---|
143 | * list the directory content. |
---|
144 | * Between two consecutive operations, the thread pauses for a random |
---|
145 | * amount of time in the range of [0, maxDelayBetweenOps] |
---|
146 | * if the specified max delay is not zero. |
---|
147 | * A thread runs for the specified elapsed time if the time isn't zero. |
---|
148 | * Otherwise, it runs forever. |
---|
149 | */ |
---|
150 | private class DFSClientThread extends Thread { |
---|
151 | private int id; |
---|
152 | private long [] executionTime = new long[TOTAL_OP_TYPES]; |
---|
153 | private long [] totalNumOfOps = new long[TOTAL_OP_TYPES]; |
---|
154 | private byte[] buffer = new byte[1024]; |
---|
155 | |
---|
156 | private DFSClientThread(int id) { |
---|
157 | this.id = id; |
---|
158 | } |
---|
159 | |
---|
160 | /** Main loop |
---|
161 | * Each iteration decides what's the next operation and then pauses. |
---|
162 | */ |
---|
163 | public void run() { |
---|
164 | try { |
---|
165 | while (shouldRun) { |
---|
166 | nextOp(); |
---|
167 | delay(); |
---|
168 | } |
---|
169 | } catch (Exception ioe) { |
---|
170 | System.err.println(ioe.getLocalizedMessage()); |
---|
171 | ioe.printStackTrace(); |
---|
172 | } |
---|
173 | } |
---|
174 | |
---|
175 | /** Let the thread pause for a random amount of time in the range of |
---|
176 | * [0, maxDelayBetweenOps] if the delay is not zero. Otherwise, no pause. |
---|
177 | */ |
---|
178 | private void delay() throws InterruptedException { |
---|
179 | if (maxDelayBetweenOps>0) { |
---|
180 | int delay = r.nextInt(maxDelayBetweenOps); |
---|
181 | Thread.sleep(delay); |
---|
182 | } |
---|
183 | } |
---|
184 | |
---|
185 | /** Perform the next operation. |
---|
186 | * |
---|
187 | * Depending on the read and write probabilities, the next |
---|
188 | * operation could be either read, write, or list. |
---|
189 | */ |
---|
190 | private void nextOp() throws IOException { |
---|
191 | double rn = r.nextDouble(); |
---|
192 | if (rn < readPr) { |
---|
193 | read(); |
---|
194 | } else if (rn < readPr+writePr) { |
---|
195 | write(); |
---|
196 | } else { |
---|
197 | list(); |
---|
198 | } |
---|
199 | } |
---|
200 | |
---|
201 | /** Read operation randomly picks a file in the test space and reads |
---|
202 | * the entire file */ |
---|
203 | private void read() throws IOException { |
---|
204 | String fileName = files.get(r.nextInt(files.size())); |
---|
205 | long startTime = System.currentTimeMillis(); |
---|
206 | InputStream in = fs.open(new Path(fileName)); |
---|
207 | executionTime[OPEN] += (System.currentTimeMillis()-startTime); |
---|
208 | totalNumOfOps[OPEN]++; |
---|
209 | while (in.read(buffer) != -1) {} |
---|
210 | in.close(); |
---|
211 | } |
---|
212 | |
---|
213 | /** The write operation randomly picks a directory in the |
---|
214 | * test space and creates a file whose name consists of the current |
---|
215 | * machine's host name and the thread id. The length of the file |
---|
216 | * follows Gaussian distribution with an average size of 2 blocks and |
---|
217 | * the standard deviation of 1 block. The new file is filled with 'a'. |
---|
218 | * Immediately after the file creation completes, the file is deleted |
---|
219 | * from the test space. |
---|
220 | */ |
---|
221 | private void write() throws IOException { |
---|
222 | String dirName = dirs.get(r.nextInt(dirs.size())); |
---|
223 | Path file = new Path(dirName, hostname+id); |
---|
224 | double fileSize = 0; |
---|
225 | while ((fileSize = r.nextGaussian()+2)<=0) {} |
---|
226 | genFile(file, (long)(fileSize*BLOCK_SIZE)); |
---|
227 | long startTime = System.currentTimeMillis(); |
---|
228 | fs.delete(file, true); |
---|
229 | executionTime[DELETE] += (System.currentTimeMillis()-startTime); |
---|
230 | totalNumOfOps[DELETE]++; |
---|
231 | } |
---|
232 | |
---|
233 | /** The list operation randomly picks a directory in the test space and |
---|
234 | * list the directory content. |
---|
235 | */ |
---|
236 | private void list() throws IOException { |
---|
237 | String dirName = dirs.get(r.nextInt(dirs.size())); |
---|
238 | long startTime = System.currentTimeMillis(); |
---|
239 | fs.listStatus(new Path(dirName)); |
---|
240 | executionTime[LIST] += (System.currentTimeMillis()-startTime); |
---|
241 | totalNumOfOps[LIST]++; |
---|
242 | } |
---|
243 | } |
---|
244 | |
---|
245 | /** Main function: |
---|
246 | * It first initializes data by parsing the command line arguments. |
---|
247 | * It then starts the number of DFSClient threads as specified by |
---|
248 | * the user. |
---|
249 | * It stops all the threads when the specified elapsed time is passed. |
---|
250 | * Before exiting, it prints the average execution for |
---|
251 | * each operation and operation throughput. |
---|
252 | */ |
---|
253 | public int run(String[] args) throws Exception { |
---|
254 | int exitCode = init(args); |
---|
255 | if (exitCode != 0) { |
---|
256 | return exitCode; |
---|
257 | } |
---|
258 | |
---|
259 | barrier(); |
---|
260 | |
---|
261 | DFSClientThread[] threads = new DFSClientThread[numOfThreads]; |
---|
262 | for (int i=0; i<numOfThreads; i++) { |
---|
263 | threads[i] = new DFSClientThread(i); |
---|
264 | threads[i].start(); |
---|
265 | } |
---|
266 | if (elapsedTime>0) { |
---|
267 | Thread.sleep(elapsedTime*1000); |
---|
268 | shouldRun = false; |
---|
269 | } |
---|
270 | for (DFSClientThread thread : threads) { |
---|
271 | thread.join(); |
---|
272 | for (int i=0; i<TOTAL_OP_TYPES; i++) { |
---|
273 | executionTime[i] += thread.executionTime[i]; |
---|
274 | totalNumOfOps[i] += thread.totalNumOfOps[i]; |
---|
275 | } |
---|
276 | } |
---|
277 | long totalOps = 0; |
---|
278 | for (int i=0; i<TOTAL_OP_TYPES; i++) { |
---|
279 | totalOps += totalNumOfOps[i]; |
---|
280 | } |
---|
281 | |
---|
282 | if (totalNumOfOps[OPEN] != 0) { |
---|
283 | System.out.println("Average open execution time: " + |
---|
284 | (double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms"); |
---|
285 | } |
---|
286 | if (totalNumOfOps[LIST] != 0) { |
---|
287 | System.out.println("Average list execution time: " + |
---|
288 | (double)executionTime[LIST]/totalNumOfOps[LIST] + "ms"); |
---|
289 | } |
---|
290 | if (totalNumOfOps[DELETE] != 0) { |
---|
291 | System.out.println("Average deletion execution time: " + |
---|
292 | (double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms"); |
---|
293 | System.out.println("Average create execution time: " + |
---|
294 | (double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms"); |
---|
295 | System.out.println("Average write_close execution time: " + |
---|
296 | (double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms"); |
---|
297 | } |
---|
298 | if (elapsedTime != 0) { |
---|
299 | System.out.println("Average operations per second: " + |
---|
300 | (double)totalOps/elapsedTime +"ops/s"); |
---|
301 | } |
---|
302 | System.out.println(); |
---|
303 | return exitCode; |
---|
304 | } |
---|
305 | |
---|
306 | /** Parse the command line arguments and initialize the data */ |
---|
307 | private int init(String[] args) throws IOException { |
---|
308 | try { |
---|
309 | fs = FileSystem.get(getConf()); |
---|
310 | } catch (IOException ioe) { |
---|
311 | System.err.println("Can not initialize the file system: " + |
---|
312 | ioe.getLocalizedMessage()); |
---|
313 | return -1; |
---|
314 | } |
---|
315 | int hostHashCode = hostname.hashCode(); |
---|
316 | try { |
---|
317 | for (int i = 0; i < args.length; i++) { // parse command line |
---|
318 | if (args[i].equals("-readProbability")) { |
---|
319 | readPr = Double.parseDouble(args[++i]); |
---|
320 | if (readPr<0 || readPr>1) { |
---|
321 | System.err.println( |
---|
322 | "The read probability must be [0, 1]: " + readPr); |
---|
323 | return -1; |
---|
324 | } |
---|
325 | } else if (args[i].equals("-writeProbability")) { |
---|
326 | writePr = Double.parseDouble(args[++i]); |
---|
327 | if (writePr<0 || writePr>1) { |
---|
328 | System.err.println( |
---|
329 | "The write probability must be [0, 1]: " + writePr); |
---|
330 | return -1; |
---|
331 | } |
---|
332 | } else if (args[i].equals("-root")) { |
---|
333 | root = new Path(args[++i]); |
---|
334 | } else if (args[i].equals("-maxDelayBetweenOps")) { |
---|
335 | maxDelayBetweenOps = Integer.parseInt(args[++i]); // in milliseconds |
---|
336 | } else if (args[i].equals("-numOfThreads")) { |
---|
337 | numOfThreads = Integer.parseInt(args[++i]); |
---|
338 | if (numOfThreads <= 0) { |
---|
339 | System.err.println( |
---|
340 | "Number of threads must be positive: " + numOfThreads); |
---|
341 | return -1; |
---|
342 | } |
---|
343 | } else if (args[i].equals("-startTime")) { |
---|
344 | startTime = Long.parseLong(args[++i]); |
---|
345 | } else if (args[i].equals("-elapsedTime")) { |
---|
346 | elapsedTime = Long.parseLong(args[++i]); |
---|
347 | } else if (args[i].equals("-seed")) { |
---|
348 | r = new Random(Long.parseLong(args[++i])+hostHashCode); |
---|
349 | } else { |
---|
350 | System.err.println(USAGE); |
---|
351 | ToolRunner.printGenericCommandUsage(System.err); |
---|
352 | return -1; |
---|
353 | } |
---|
354 | } |
---|
355 | } catch (NumberFormatException e) { |
---|
356 | System.err.println("Illegal parameter: " + e.getLocalizedMessage()); |
---|
357 | System.err.println(USAGE); |
---|
358 | return -1; |
---|
359 | } |
---|
360 | |
---|
361 | if (readPr+writePr <0 || readPr+writePr>1) { |
---|
362 | System.err.println( |
---|
363 | "The sum of read probability and write probability must be [0, 1]: " + |
---|
364 | readPr + " "+writePr); |
---|
365 | return -1; |
---|
366 | } |
---|
367 | |
---|
368 | if (r==null) { |
---|
369 | r = new Random(System.currentTimeMillis()+hostHashCode); |
---|
370 | } |
---|
371 | |
---|
372 | return initFileDirTables(); |
---|
373 | } |
---|
374 | |
---|
375 | /** Create a table that contains all directories under root and |
---|
376 | * another table that contains all files under root. |
---|
377 | */ |
---|
378 | private int initFileDirTables() { |
---|
379 | try { |
---|
380 | initFileDirTables(root); |
---|
381 | } catch (IOException e) { |
---|
382 | System.err.println(e.getLocalizedMessage()); |
---|
383 | e.printStackTrace(); |
---|
384 | return -1; |
---|
385 | } |
---|
386 | if (dirs.isEmpty()) { |
---|
387 | System.err.println("The test space " + root + " is empty"); |
---|
388 | return -1; |
---|
389 | } |
---|
390 | if (files.isEmpty()) { |
---|
391 | System.err.println("The test space " + root + |
---|
392 | " does not have any file"); |
---|
393 | return -1; |
---|
394 | } |
---|
395 | return 0; |
---|
396 | } |
---|
397 | |
---|
398 | /** Create a table that contains all directories under the specified path and |
---|
399 | * another table that contains all files under the specified path and |
---|
400 | * whose name starts with "_file_". |
---|
401 | */ |
---|
402 | private void initFileDirTables(Path path) throws IOException { |
---|
403 | FileStatus[] stats = fs.listStatus(path); |
---|
404 | if (stats != null) { |
---|
405 | for (FileStatus stat : stats) { |
---|
406 | if (stat.isDir()) { |
---|
407 | dirs.add(stat.getPath().toString()); |
---|
408 | initFileDirTables(stat.getPath()); |
---|
409 | } else { |
---|
410 | Path filePath = stat.getPath(); |
---|
411 | if (filePath.getName().startsWith(StructureGenerator.FILE_NAME_PREFIX)) { |
---|
412 | files.add(filePath.toString()); |
---|
413 | } |
---|
414 | } |
---|
415 | } |
---|
416 | } |
---|
417 | } |
---|
418 | |
---|
419 | /** Returns when the current number of seconds from the epoch equals |
---|
420 | * the command line argument given by <code>-startTime</code>. |
---|
421 | * This allows multiple instances of this program, running on clock |
---|
422 | * synchronized nodes, to start at roughly the same time. |
---|
423 | */ |
---|
424 | private void barrier() { |
---|
425 | long sleepTime; |
---|
426 | while ((sleepTime = startTime - System.currentTimeMillis()) > 0) { |
---|
427 | try { |
---|
428 | Thread.sleep(sleepTime); |
---|
429 | } catch (InterruptedException ex) { |
---|
430 | } |
---|
431 | } |
---|
432 | } |
---|
433 | |
---|
434 | /** Create a file with a length of <code>fileSize</code>. |
---|
435 | * The file is filled with 'a'. |
---|
436 | */ |
---|
437 | private void genFile(Path file, long fileSize) throws IOException { |
---|
438 | long startTime = System.currentTimeMillis(); |
---|
439 | FSDataOutputStream out = fs.create(file, true, |
---|
440 | getConf().getInt("io.file.buffer.size", 4096), |
---|
441 | (short)getConf().getInt("dfs.replication", 3), |
---|
442 | fs.getDefaultBlockSize()); |
---|
443 | executionTime[CREATE] += (System.currentTimeMillis()-startTime); |
---|
444 | totalNumOfOps[CREATE]++; |
---|
445 | |
---|
446 | for (long i=0; i<fileSize; i++) { |
---|
447 | out.writeByte('a'); |
---|
448 | } |
---|
449 | startTime = System.currentTimeMillis(); |
---|
450 | out.close(); |
---|
451 | executionTime[WRITE_CLOSE] += (System.currentTimeMillis()-startTime); |
---|
452 | totalNumOfOps[WRITE_CLOSE]++; |
---|
453 | } |
---|
454 | |
---|
455 | /** Main program |
---|
456 | * |
---|
457 | * @param args command line arguments |
---|
458 | * @throws Exception |
---|
459 | */ |
---|
460 | public static void main(String[] args) throws Exception { |
---|
461 | int res = ToolRunner.run(new Configuration(), |
---|
462 | new LoadGenerator(), args); |
---|
463 | System.exit(res); |
---|
464 | } |
---|
465 | |
---|
466 | } |
---|