source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/fs/TestDFSIO.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs;
20
21import java.io.*;
22
23import junit.framework.TestCase;
24import java.util.Date;
25import java.util.StringTokenizer;
26
27import org.apache.commons.logging.*;
28
29import org.apache.hadoop.mapred.*;
30import org.apache.hadoop.util.StringUtils;
31import org.apache.hadoop.io.*;
32import org.apache.hadoop.io.SequenceFile.CompressionType;
33import org.apache.hadoop.conf.*;
34
35/**
36 * Distributed i/o benchmark.
37 * <p>
38 * This test writes into or reads from a specified number of files.
39 * File size is specified as a parameter to the test.
40 * Each file is accessed in a separate map task.
41 * <p>
42 * The reducer collects the following statistics:
43 * <ul>
44 * <li>number of tasks completed</li>
45 * <li>number of bytes written/read</li>
46 * <li>execution time</li>
47 * <li>io rate</li>
48 * <li>io rate squared</li>
49 * </ul>
50 *   
51 * Finally, the following information is appended to a local file
52 * <ul>
53 * <li>read or write test</li>
54 * <li>date and time the test finished</li>   
55 * <li>number of files</li>
56 * <li>total number of bytes processed</li>
57 * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
58 * <li>average i/o rate in mb/sec per file</li>
59 * <li>standard deviation of i/o rate </li>
60 * </ul>
61 */
62public class TestDFSIO extends TestCase {
63  // Constants
64  private static final int TEST_TYPE_READ = 0;
65  private static final int TEST_TYPE_WRITE = 1;
66  private static final int TEST_TYPE_CLEANUP = 2;
67  private static final int DEFAULT_BUFFER_SIZE = 1000000;
68  private static final String BASE_FILE_NAME = "test_io_";
69  private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
70 
71  private static final Log LOG = FileInputFormat.LOG;
72  private static Configuration fsConfig = new Configuration();
73  private static final long MEGA = 0x100000;
74  private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
75  private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
76  private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
77  private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
78  private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
79
80  /**
81   * Run the test with default parameters.
82   *
83   * @throws Exception
84   */
85  public void testIOs() throws Exception {
86    testIOs(10, 10);
87  }
88
89  /**
90   * Run the test with the specified parameters.
91   *
92   * @param fileSize file size
93   * @param nrFiles number of files
94   * @throws IOException
95   */
96  public static void testIOs(int fileSize, int nrFiles)
97    throws IOException {
98
99    FileSystem fs = FileSystem.get(fsConfig);
100
101    createControlFile(fs, fileSize, nrFiles);
102    writeTest(fs);
103    readTest(fs);
104    cleanup(fs);
105  }
106
107  private static void createControlFile(
108                                        FileSystem fs,
109                                        int fileSize, // in MB
110                                        int nrFiles
111                                        ) throws IOException {
112    LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
113
114    fs.delete(CONTROL_DIR, true);
115
116    for(int i=0; i < nrFiles; i++) {
117      String name = getFileName(i);
118      Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
119      SequenceFile.Writer writer = null;
120      try {
121        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
122                                           UTF8.class, LongWritable.class,
123                                           CompressionType.NONE);
124        writer.append(new UTF8(name), new LongWritable(fileSize));
125      } catch(Exception e) {
126        throw new IOException(e.getLocalizedMessage());
127      } finally {
128        if (writer != null)
129          writer.close();
130        writer = null;
131      }
132    }
133    LOG.info("created control files for: "+nrFiles+" files");
134  }
135
136  private static String getFileName(int fIdx) {
137    return BASE_FILE_NAME + Integer.toString(fIdx);
138  }
139 
140  /**
141   * Write/Read mapper base class.
142   * <p>
143   * Collects the following statistics per task:
144   * <ul>
145   * <li>number of tasks completed</li>
146   * <li>number of bytes written/read</li>
147   * <li>execution time</li>
148   * <li>i/o rate</li>
149   * <li>i/o rate squared</li>
150   * </ul>
151   */
152  private abstract static class IOStatMapper extends IOMapperBase {
153    IOStatMapper() { 
154      super(fsConfig);
155    }
156   
157    void collectStats(OutputCollector<UTF8, UTF8> output, 
158                      String name,
159                      long execTime, 
160                      Object objSize) throws IOException {
161      long totalSize = ((Long)objSize).longValue();
162      float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
163      LOG.info("Number of bytes processed = " + totalSize);
164      LOG.info("Exec time = " + execTime);
165      LOG.info("IO rate = " + ioRateMbSec);
166     
167      output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
168      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
169      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
170      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
171      output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
172    }
173  }
174
175  /**
176   * Write mapper class.
177   */
178  public static class WriteMapper extends IOStatMapper {
179
180    public WriteMapper() { 
181      super(); 
182      for(int i=0; i < bufferSize; i++)
183        buffer[i] = (byte)('0' + i % 50);
184    }
185
186    public Object doIO(Reporter reporter, 
187                       String name, 
188                       long totalSize
189                       ) throws IOException {
190      // create file
191      totalSize *= MEGA;
192      OutputStream out;
193      out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
194     
195      try {
196        // write to the file
197        long nrRemaining;
198        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
199          int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
200          out.write(buffer, 0, curSize);
201          reporter.setStatus("writing " + name + "@" + 
202                             (totalSize - nrRemaining) + "/" + totalSize
203                             + " ::host = " + hostName);
204        }
205      } finally {
206        out.close();
207      }
208      return new Long(totalSize);
209    }
210  }
211
212  private static void writeTest(FileSystem fs)
213    throws IOException {
214
215    fs.delete(DATA_DIR, true);
216    fs.delete(WRITE_DIR, true);
217   
218    runIOTest(WriteMapper.class, WRITE_DIR);
219  }
220 
221  private static void runIOTest( Class<? extends Mapper> mapperClass, 
222                                 Path outputDir
223                                 ) throws IOException {
224    JobConf job = new JobConf(fsConfig, TestDFSIO.class);
225
226    FileInputFormat.setInputPaths(job, CONTROL_DIR);
227    job.setInputFormat(SequenceFileInputFormat.class);
228
229    job.setMapperClass(mapperClass);
230    job.setReducerClass(AccumulatingReducer.class);
231
232    FileOutputFormat.setOutputPath(job, outputDir);
233    job.setOutputKeyClass(UTF8.class);
234    job.setOutputValueClass(UTF8.class);
235    job.setNumReduceTasks(1);
236    JobClient.runJob(job);
237  }
238
239  /**
240   * Read mapper class.
241   */
242  public static class ReadMapper extends IOStatMapper {
243
244    public ReadMapper() { 
245      super(); 
246    }
247
248    public Object doIO(Reporter reporter, 
249                       String name, 
250                       long totalSize
251                       ) throws IOException {
252      totalSize *= MEGA;
253      // open file
254      DataInputStream in = fs.open(new Path(DATA_DIR, name));
255      try {
256        long actualSize = 0;
257        for(int curSize = bufferSize; curSize == bufferSize;) {
258          curSize = in.read(buffer, 0, bufferSize);
259          actualSize += curSize;
260          reporter.setStatus("reading " + name + "@" + 
261                             actualSize + "/" + totalSize
262                             + " ::host = " + hostName);
263        }
264      } finally {
265        in.close();
266      }
267      return new Long(totalSize);
268    }
269  }
270
271  private static void readTest(FileSystem fs) throws IOException {
272    fs.delete(READ_DIR, true);
273    runIOTest(ReadMapper.class, READ_DIR);
274  }
275
276  private static void sequentialTest(
277                                     FileSystem fs, 
278                                     int testType, 
279                                     int fileSize, 
280                                     int nrFiles
281                                     ) throws Exception {
282    IOStatMapper ioer = null;
283    if (testType == TEST_TYPE_READ)
284      ioer = new ReadMapper();
285    else if (testType == TEST_TYPE_WRITE)
286      ioer = new WriteMapper();
287    else
288      return;
289    for(int i=0; i < nrFiles; i++)
290      ioer.doIO(Reporter.NULL,
291                BASE_FILE_NAME+Integer.toString(i), 
292                MEGA*fileSize);
293  }
294
295  public static void main(String[] args) {
296    int testType = TEST_TYPE_READ;
297    int bufferSize = DEFAULT_BUFFER_SIZE;
298    int fileSize = 1;
299    int nrFiles = 1;
300    String resFileName = DEFAULT_RES_FILE_NAME;
301    boolean isSequential = false;
302
303    String version="TestFDSIO.0.0.4";
304    String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
305   
306    System.out.println(version);
307    if (args.length == 0) {
308      System.err.println(usage);
309      System.exit(-1);
310    }
311    for (int i = 0; i < args.length; i++) {       // parse command line
312      if (args[i].startsWith("-read")) {
313        testType = TEST_TYPE_READ;
314      } else if (args[i].equals("-write")) {
315        testType = TEST_TYPE_WRITE;
316      } else if (args[i].equals("-clean")) {
317        testType = TEST_TYPE_CLEANUP;
318      } else if (args[i].startsWith("-seq")) {
319        isSequential = true;
320      } else if (args[i].equals("-nrFiles")) {
321        nrFiles = Integer.parseInt(args[++i]);
322      } else if (args[i].equals("-fileSize")) {
323        fileSize = Integer.parseInt(args[++i]);
324      } else if (args[i].equals("-bufferSize")) {
325        bufferSize = Integer.parseInt(args[++i]);
326      } else if (args[i].equals("-resFile")) {
327        resFileName = args[++i];
328      }
329    }
330
331    LOG.info("nrFiles = " + nrFiles);
332    LOG.info("fileSize (MB) = " + fileSize);
333    LOG.info("bufferSize = " + bufferSize);
334 
335    try {
336      fsConfig.setInt("test.io.file.buffer.size", bufferSize);
337      FileSystem fs = FileSystem.get(fsConfig);
338
339      if (isSequential) {
340        long tStart = System.currentTimeMillis();
341        sequentialTest(fs, testType, fileSize, nrFiles);
342        long execTime = System.currentTimeMillis() - tStart;
343        String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
344        LOG.info(resultLine);
345        return;
346      }
347      if (testType == TEST_TYPE_CLEANUP) {
348        cleanup(fs);
349        return;
350      }
351      createControlFile(fs, fileSize, nrFiles);
352      long tStart = System.currentTimeMillis();
353      if (testType == TEST_TYPE_WRITE)
354        writeTest(fs);
355      if (testType == TEST_TYPE_READ)
356        readTest(fs);
357      long execTime = System.currentTimeMillis() - tStart;
358   
359      analyzeResult(fs, testType, execTime, resFileName);
360    } catch(Exception e) {
361      System.err.print(StringUtils.stringifyException(e));
362      System.exit(-1);
363    }
364  }
365 
366  private static void analyzeResult( FileSystem fs, 
367                                     int testType,
368                                     long execTime,
369                                     String resFileName
370                                     ) throws IOException {
371    Path reduceFile;
372    if (testType == TEST_TYPE_WRITE)
373      reduceFile = new Path(WRITE_DIR, "part-00000");
374    else
375      reduceFile = new Path(READ_DIR, "part-00000");
376    DataInputStream in;
377    in = new DataInputStream(fs.open(reduceFile));
378 
379    BufferedReader lines;
380    lines = new BufferedReader(new InputStreamReader(in));
381    long tasks = 0;
382    long size = 0;
383    long time = 0;
384    float rate = 0;
385    float sqrate = 0;
386    String line;
387    while((line = lines.readLine()) != null) {
388      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
389      String attr = tokens.nextToken(); 
390      if (attr.endsWith(":tasks"))
391        tasks = Long.parseLong(tokens.nextToken());
392      else if (attr.endsWith(":size"))
393        size = Long.parseLong(tokens.nextToken());
394      else if (attr.endsWith(":time"))
395        time = Long.parseLong(tokens.nextToken());
396      else if (attr.endsWith(":rate"))
397        rate = Float.parseFloat(tokens.nextToken());
398      else if (attr.endsWith(":sqrate"))
399        sqrate = Float.parseFloat(tokens.nextToken());
400    }
401   
402    double med = rate / 1000 / tasks;
403    double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
404    String resultLines[] = {
405      "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
406                                    (testType == TEST_TYPE_READ) ? "read" : 
407                                    "unknown"),
408      "           Date & time: " + new Date(System.currentTimeMillis()),
409      "       Number of files: " + tasks,
410      "Total MBytes processed: " + size/MEGA,
411      "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
412      "Average IO rate mb/sec: " + med,
413      " IO rate std deviation: " + stdDev,
414      "    Test exec time sec: " + (float)execTime / 1000,
415      "" };
416
417    PrintStream res = new PrintStream(
418                                      new FileOutputStream(
419                                                           new File(resFileName), true)); 
420    for(int i = 0; i < resultLines.length; i++) {
421      LOG.info(resultLines[i]);
422      res.println(resultLines[i]);
423    }
424  }
425
426  private static void cleanup(FileSystem fs) throws IOException {
427    LOG.info("Cleaning up test files");
428    fs.delete(new Path(TEST_ROOT_DIR), true);
429  }
430}
Note: See TracBrowser for help on using the repository browser.