source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/fs/DistributedFSCheck.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 11.5 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs;
20
21import java.io.*;
22
23import junit.framework.TestCase;
24import java.util.Date;
25import java.util.StringTokenizer;
26import java.util.TreeSet;
27import java.util.Vector;
28
29import org.apache.commons.logging.*;
30
31import org.apache.hadoop.mapred.*;
32import org.apache.hadoop.io.*;
33import org.apache.hadoop.io.SequenceFile.CompressionType;
34import org.apache.hadoop.conf.*;
35
36/**
37 * Distributed checkup of the file system consistency.
38 * <p>
39 * Test file system consistency by reading each block of each file
40 * of the specified file tree.
41 * Report corrupted blocks and general file statistics.
42 * <p>
43 * Optionally displays statistics on read performance.
44 *
45 */
46public class DistributedFSCheck extends TestCase {
47  // Constants
48  private static final int TEST_TYPE_READ = 0;
49  private static final int TEST_TYPE_CLEANUP = 2;
50  private static final int DEFAULT_BUFFER_SIZE = 1000000;
51  private static final String DEFAULT_RES_FILE_NAME = "DistributedFSCheck_results.log";
52  private static final long MEGA = 0x100000;
53 
54  private static Configuration fsConfig = new Configuration();
55  private static final Log LOG = FileInputFormat.LOG;
56  private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
57  private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
58  private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
59
60  private FileSystem fs;
61  private long nrFiles;
62 
63  DistributedFSCheck(Configuration conf) throws Exception {
64    fsConfig = conf;
65    this.fs = FileSystem.get(conf);
66  }
67
68  /**
69   * Run distributed checkup for the entire files system.
70   *
71   * @throws Exception
72   */
73  public void testFSBlocks() throws Exception {
74    testFSBlocks("/");
75  }
76
77  /**
78   * Run distributed checkup for the specified directory.
79   *
80   * @param rootName root directory name
81   * @throws Exception
82   */
83  public void testFSBlocks(String rootName) throws Exception {
84    createInputFile(rootName);
85    runDistributedFSCheck();
86    cleanup();  // clean up after all to restore the system state
87  }
88
89  private void createInputFile(String rootName) throws IOException {
90    cleanup();  // clean up if previous run failed
91
92    Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
93    SequenceFile.Writer writer =
94      SequenceFile.createWriter(fs, fsConfig, inputFile, 
95                                UTF8.class, LongWritable.class, CompressionType.NONE);
96   
97    try {
98      nrFiles = 0;
99      listSubtree(new Path(rootName), writer);
100    } finally {
101      writer.close();
102    }
103    LOG.info("Created map input files.");
104  }
105 
106  private void listSubtree(Path rootFile,
107                           SequenceFile.Writer writer
108                           ) throws IOException {
109    if (!fs.isDirectory(rootFile)) {
110      nrFiles++;
111      // For a regular file generate <fName,offset> pairs
112      long blockSize = fs.getDefaultBlockSize();
113      long fileLength = fs.getLength(rootFile);
114      for(long offset = 0; offset < fileLength; offset += blockSize)
115        writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
116      return;
117    }
118   
119    FileStatus children[] = fs.listStatus(rootFile);
120    if (children == null)
121      throw new IOException("Could not get listing for " + rootFile);
122    for (int i = 0; i < children.length; i++)
123      listSubtree(children[i].getPath(), writer);
124  }
125
126  /**
127   * DistributedFSCheck mapper class.
128   */
129  public static class DistributedFSCheckMapper extends IOMapperBase {
130
131    public DistributedFSCheckMapper() { 
132      super(fsConfig); 
133    }
134
135    public Object doIO(Reporter reporter, 
136                       String name, 
137                       long offset
138                       ) throws IOException {
139      // open file
140      FSDataInputStream in = null;
141      try {
142        in = fs.open(new Path(name));
143      } catch(IOException e) {
144        return name + "@(missing)";
145      }
146      in.seek(offset);
147      long actualSize = 0;
148      try {
149        long blockSize = fs.getDefaultBlockSize();
150        reporter.setStatus("reading " + name + "@" + 
151                           offset + "/" + blockSize);
152        for( int curSize = bufferSize; 
153             curSize == bufferSize && actualSize < blockSize;
154             actualSize += curSize) {
155          curSize = in.read(buffer, 0, bufferSize);
156        }
157      } catch(IOException e) {
158        LOG.info("Corrupted block detected in \"" + name + "\" at " + offset);
159        return name + "@" + offset;
160      } finally {
161        in.close();
162      }
163      return new Long(actualSize);
164    }
165   
166    void collectStats(OutputCollector<UTF8, UTF8> output, 
167                      String name, 
168                      long execTime, 
169                      Object corruptedBlock) throws IOException {
170      output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
171
172      if (corruptedBlock.getClass().getName().endsWith("String")) {
173        output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
174        return;
175      }
176      long totalSize = ((Long)corruptedBlock).longValue();
177      float ioRateMbSec = (float)totalSize * 1000 / (execTime * 0x100000);
178      LOG.info("Number of bytes processed = " + totalSize);
179      LOG.info("Exec time = " + execTime);
180      LOG.info("IO rate = " + ioRateMbSec);
181     
182      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
183      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
184      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
185    }
186  }
187 
188  private void runDistributedFSCheck() throws Exception {
189    JobConf job = new JobConf(fs.getConf(), DistributedFSCheck.class);
190
191    FileInputFormat.setInputPaths(job, MAP_INPUT_DIR);
192    job.setInputFormat(SequenceFileInputFormat.class);
193
194    job.setMapperClass(DistributedFSCheckMapper.class);
195    job.setReducerClass(AccumulatingReducer.class);
196
197    FileOutputFormat.setOutputPath(job, READ_DIR);
198    job.setOutputKeyClass(UTF8.class);
199    job.setOutputValueClass(UTF8.class);
200    job.setNumReduceTasks(1);
201    JobClient.runJob(job);
202  }
203
204  public static void main(String[] args) throws Exception {
205    int testType = TEST_TYPE_READ;
206    int bufferSize = DEFAULT_BUFFER_SIZE;
207    String resFileName = DEFAULT_RES_FILE_NAME;
208    String rootName = "/";
209    boolean viewStats = false;
210
211    String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
212   
213    if (args.length == 1 && args[0].startsWith("-h")) {
214      System.err.println(usage);
215      System.exit(-1);
216    }
217    for(int i = 0; i < args.length; i++) {       // parse command line
218      if (args[i].equals("-root")) {
219        rootName = args[++i];
220      } else if (args[i].startsWith("-clean")) {
221        testType = TEST_TYPE_CLEANUP;
222      } else if (args[i].equals("-bufferSize")) {
223        bufferSize = Integer.parseInt(args[++i]);
224      } else if (args[i].equals("-resFile")) {
225        resFileName = args[++i];
226      } else if (args[i].startsWith("-stat")) {
227        viewStats = true;
228      }
229    }
230
231    LOG.info("root = " + rootName);
232    LOG.info("bufferSize = " + bufferSize);
233 
234    Configuration conf = new Configuration(); 
235    conf.setInt("test.io.file.buffer.size", bufferSize);
236    DistributedFSCheck test = new DistributedFSCheck(conf);
237
238    if (testType == TEST_TYPE_CLEANUP) {
239      test.cleanup();
240      return;
241    }
242    test.createInputFile(rootName);
243    long tStart = System.currentTimeMillis();
244    test.runDistributedFSCheck();
245    long execTime = System.currentTimeMillis() - tStart;
246   
247    test.analyzeResult(execTime, resFileName, viewStats);
248    // test.cleanup();  // clean up after all to restore the system state
249  }
250 
251  private void analyzeResult(long execTime,
252                             String resFileName,
253                             boolean viewStats
254                             ) throws IOException {
255    Path reduceFile= new Path(READ_DIR, "part-00000");
256    DataInputStream in;
257    in = new DataInputStream(fs.open(reduceFile));
258 
259    BufferedReader lines;
260    lines = new BufferedReader(new InputStreamReader(in));
261    long blocks = 0;
262    long size = 0;
263    long time = 0;
264    float rate = 0;
265    StringTokenizer  badBlocks = null;
266    long nrBadBlocks = 0;
267    String line;
268    while((line = lines.readLine()) != null) {
269      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
270      String attr = tokens.nextToken(); 
271      if (attr.endsWith("blocks"))
272        blocks = Long.parseLong(tokens.nextToken());
273      else if (attr.endsWith("size"))
274        size = Long.parseLong(tokens.nextToken());
275      else if (attr.endsWith("time"))
276        time = Long.parseLong(tokens.nextToken());
277      else if (attr.endsWith("rate"))
278        rate = Float.parseFloat(tokens.nextToken());
279      else if (attr.endsWith("badBlocks")) {
280        badBlocks = new StringTokenizer(tokens.nextToken(), ";");
281        nrBadBlocks = badBlocks.countTokens();
282      }
283    }
284   
285    Vector<String> resultLines = new Vector<String>();
286    resultLines.add( "----- DistributedFSCheck ----- : ");
287    resultLines.add( "               Date & time: " + new Date(System.currentTimeMillis()));
288    resultLines.add( "    Total number of blocks: " + blocks);
289    resultLines.add( "    Total number of  files: " + nrFiles);
290    resultLines.add( "Number of corrupted blocks: " + nrBadBlocks);
291   
292    int nrBadFilesPos = resultLines.size();
293    TreeSet<String> badFiles = new TreeSet<String>();
294    long nrBadFiles = 0;
295    if (nrBadBlocks > 0) {
296      resultLines.add("");
297      resultLines.add("----- Corrupted Blocks (file@offset) ----- : ");
298      while(badBlocks.hasMoreTokens()) {
299        String curBlock = badBlocks.nextToken();
300        resultLines.add(curBlock);
301        badFiles.add(curBlock.substring(0, curBlock.indexOf('@')));
302      }
303      nrBadFiles = badFiles.size();
304    }
305   
306    resultLines.insertElementAt(" Number of corrupted files: " + nrBadFiles, nrBadFilesPos);
307   
308    if (viewStats) {
309      resultLines.add("");
310      resultLines.add("-----   Performance  ----- : ");
311      resultLines.add("         Total MBytes read: " + size/MEGA);
312      resultLines.add("         Throughput mb/sec: " + (float)size * 1000.0 / (time * MEGA));
313      resultLines.add("    Average IO rate mb/sec: " + rate / 1000 / blocks);
314      resultLines.add("        Test exec time sec: " + (float)execTime / 1000);
315    }
316
317    PrintStream res = new PrintStream(
318                                      new FileOutputStream(
319                                                           new File(resFileName), true)); 
320    for(int i = 0; i < resultLines.size(); i++) {
321      String cur = resultLines.get(i);
322      LOG.info(cur);
323      res.println(cur);
324    }
325  }
326
327  private void cleanup() throws IOException {
328    LOG.info("Cleaning up test files");
329    fs.delete(TEST_ROOT_DIR, true);
330  }
331}
Note: See TracBrowser for help on using the repository browser.