source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/fs/TestFileSystem.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 19.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs;
20
21import java.io.DataInputStream;
22import java.io.IOException;
23import java.io.OutputStream;
24import java.util.Arrays;
25import java.util.Random;
26import java.util.List;
27import java.util.ArrayList;
28import java.util.Set;
29import java.util.HashSet;
30import java.util.Map;
31import java.util.HashMap;
32import java.net.InetSocketAddress;
33import java.net.URI;
34
35import junit.framework.TestCase;
36
37import org.apache.commons.logging.Log;
38import org.apache.hadoop.conf.Configuration;
39import org.apache.hadoop.conf.Configured;
40import org.apache.hadoop.hdfs.MiniDFSCluster;
41import org.apache.hadoop.hdfs.server.namenode.NameNode;
42import org.apache.hadoop.fs.shell.CommandFormat;
43import org.apache.hadoop.io.LongWritable;
44import org.apache.hadoop.io.SequenceFile;
45import org.apache.hadoop.io.UTF8;
46import org.apache.hadoop.io.WritableComparable;
47import org.apache.hadoop.io.SequenceFile.CompressionType;
48import org.apache.hadoop.mapred.FileInputFormat;
49import org.apache.hadoop.mapred.FileOutputFormat;
50import org.apache.hadoop.mapred.JobClient;
51import org.apache.hadoop.mapred.JobConf;
52import org.apache.hadoop.mapred.Mapper;
53import org.apache.hadoop.mapred.OutputCollector;
54import org.apache.hadoop.mapred.Reporter;
55import org.apache.hadoop.mapred.SequenceFileInputFormat;
56import org.apache.hadoop.mapred.lib.LongSumReducer;
57import org.apache.hadoop.security.UnixUserGroupInformation;
58
59public class TestFileSystem extends TestCase {
60  private static final Log LOG = FileSystem.LOG;
61
62  private static Configuration conf = new Configuration();
63  private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
64
65  private static final long MEGA = 1024 * 1024;
66  private static final int SEEKS_PER_FILE = 4;
67
68  private static String ROOT = System.getProperty("test.build.data","fs_test");
69  private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
70  private static Path WRITE_DIR = new Path(ROOT, "fs_write");
71  private static Path READ_DIR = new Path(ROOT, "fs_read");
72  private static Path DATA_DIR = new Path(ROOT, "fs_data");
73
74  public void testFs() throws Exception {
75    testFs(10 * MEGA, 100, 0);
76  }
77
78  public static void testFs(long megaBytes, int numFiles, long seed)
79    throws Exception {
80
81    FileSystem fs = FileSystem.get(conf);
82
83    if (seed == 0)
84      seed = new Random().nextLong();
85
86    LOG.info("seed = "+seed);
87
88    createControlFile(fs, megaBytes, numFiles, seed);
89    writeTest(fs, false);
90    readTest(fs, false);
91    seekTest(fs, false);
92    fs.delete(CONTROL_DIR, true);
93    fs.delete(DATA_DIR, true);
94    fs.delete(WRITE_DIR, true);
95    fs.delete(READ_DIR, true);
96  }
97
98  public static void testCommandFormat() throws Exception {
99    // This should go to TestFsShell.java when it is added.
100    CommandFormat cf;
101    cf= new CommandFormat("copyToLocal", 2,2,"crc","ignoreCrc");
102    assertEquals(cf.parse(new String[] {"-get","file", "-"}, 1).get(1), "-");
103    assertEquals(cf.parse(new String[] {"-get","file","-ignoreCrc","/foo"}, 1).get(1),"/foo");
104    cf = new CommandFormat("tail", 1, 1, "f");
105    assertEquals(cf.parse(new String[] {"-tail","fileName"}, 1).get(0),"fileName");
106    assertEquals(cf.parse(new String[] {"-tail","-f","fileName"}, 1).get(0),"fileName");
107    cf = new CommandFormat("setrep", 2, 2, "R", "w");
108    assertEquals(cf.parse(new String[] {"-setrep","-R","2","/foo/bar"}, 1).get(1), "/foo/bar");
109    cf = new CommandFormat("put", 2, 10000);
110    assertEquals(cf.parse(new String[] {"-put", "-", "dest"}, 1).get(1), "dest"); 
111  }
112
113  public static void createControlFile(FileSystem fs,
114                                       long megaBytes, int numFiles,
115                                       long seed) throws Exception {
116
117    LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files");
118
119    Path controlFile = new Path(CONTROL_DIR, "files");
120    fs.delete(controlFile, true);
121    Random random = new Random(seed);
122
123    SequenceFile.Writer writer =
124      SequenceFile.createWriter(fs, conf, controlFile, 
125                                UTF8.class, LongWritable.class, CompressionType.NONE);
126
127    long totalSize = 0;
128    long maxSize = ((megaBytes / numFiles) * 2) + 1;
129    try {
130      while (totalSize < megaBytes) {
131        UTF8 name = new UTF8(Long.toString(random.nextLong()));
132
133        long size = random.nextLong();
134        if (size < 0)
135          size = -size;
136        size = size % maxSize;
137
138        //LOG.info(" adding: name="+name+" size="+size);
139
140        writer.append(name, new LongWritable(size));
141
142        totalSize += size;
143      }
144    } finally {
145      writer.close();
146    }
147    LOG.info("created control file for: "+totalSize+" bytes");
148  }
149
150  public static class WriteMapper extends Configured
151      implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
152   
153    private Random random = new Random();
154    private byte[] buffer = new byte[BUFFER_SIZE];
155    private FileSystem fs;
156    private boolean fastCheck;
157
158    // a random suffix per task
159    private String suffix = "-"+random.nextLong();
160   
161    {
162      try {
163        fs = FileSystem.get(conf);
164      } catch (IOException e) {
165        throw new RuntimeException(e);
166      }
167    }
168
169    public WriteMapper() { super(null); }
170   
171    public WriteMapper(Configuration conf) { super(conf); }
172
173    public void configure(JobConf job) {
174      setConf(job);
175      fastCheck = job.getBoolean("fs.test.fastCheck", false);
176    }
177
178    public void map(UTF8 key, LongWritable value,
179                    OutputCollector<UTF8, LongWritable> collector,
180                    Reporter reporter)
181      throws IOException {
182     
183      String name = key.toString();
184      long size = value.get();
185      long seed = Long.parseLong(name);
186
187      random.setSeed(seed);
188      reporter.setStatus("creating " + name);
189
190      // write to temp file initially to permit parallel execution
191      Path tempFile = new Path(DATA_DIR, name+suffix);
192      OutputStream out = fs.create(tempFile);
193
194      long written = 0;
195      try {
196        while (written < size) {
197          if (fastCheck) {
198            Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
199          } else {
200            random.nextBytes(buffer);
201          }
202          long remains = size - written;
203          int length = (remains<=buffer.length) ? (int)remains : buffer.length;
204          out.write(buffer, 0, length);
205          written += length;
206          reporter.setStatus("writing "+name+"@"+written+"/"+size);
207        }
208      } finally {
209        out.close();
210      }
211      // rename to final location
212      fs.rename(tempFile, new Path(DATA_DIR, name));
213
214      collector.collect(new UTF8("bytes"), new LongWritable(written));
215
216      reporter.setStatus("wrote " + name);
217    }
218   
219    public void close() {
220    }
221   
222  }
223
224  public static void writeTest(FileSystem fs, boolean fastCheck)
225    throws Exception {
226
227    fs.delete(DATA_DIR, true);
228    fs.delete(WRITE_DIR, true);
229   
230    JobConf job = new JobConf(conf, TestFileSystem.class);
231    job.setBoolean("fs.test.fastCheck", fastCheck);
232
233    FileInputFormat.setInputPaths(job, CONTROL_DIR);
234    job.setInputFormat(SequenceFileInputFormat.class);
235
236    job.setMapperClass(WriteMapper.class);
237    job.setReducerClass(LongSumReducer.class);
238
239    FileOutputFormat.setOutputPath(job, WRITE_DIR);
240    job.setOutputKeyClass(UTF8.class);
241    job.setOutputValueClass(LongWritable.class);
242    job.setNumReduceTasks(1);
243    JobClient.runJob(job);
244  }
245
246  public static class ReadMapper extends Configured
247      implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
248   
249    private Random random = new Random();
250    private byte[] buffer = new byte[BUFFER_SIZE];
251    private byte[] check  = new byte[BUFFER_SIZE];
252    private FileSystem fs;
253    private boolean fastCheck;
254
255    {
256      try {
257        fs = FileSystem.get(conf);
258      } catch (IOException e) {
259        throw new RuntimeException(e);
260      }
261    }
262
263    public ReadMapper() { super(null); }
264   
265    public ReadMapper(Configuration conf) { super(conf); }
266
267    public void configure(JobConf job) {
268      setConf(job);
269      fastCheck = job.getBoolean("fs.test.fastCheck", false);
270    }
271
272    public void map(UTF8 key, LongWritable value,
273                    OutputCollector<UTF8, LongWritable> collector,
274                    Reporter reporter)
275      throws IOException {
276     
277      String name = key.toString();
278      long size = value.get();
279      long seed = Long.parseLong(name);
280
281      random.setSeed(seed);
282      reporter.setStatus("opening " + name);
283
284      DataInputStream in =
285        new DataInputStream(fs.open(new Path(DATA_DIR, name)));
286
287      long read = 0;
288      try {
289        while (read < size) {
290          long remains = size - read;
291          int n = (remains<=buffer.length) ? (int)remains : buffer.length;
292          in.readFully(buffer, 0, n);
293          read += n;
294          if (fastCheck) {
295            Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE));
296          } else {
297            random.nextBytes(check);
298          }
299          if (n != buffer.length) {
300            Arrays.fill(buffer, n, buffer.length, (byte)0);
301            Arrays.fill(check, n, check.length, (byte)0);
302          }
303          assertTrue(Arrays.equals(buffer, check));
304
305          reporter.setStatus("reading "+name+"@"+read+"/"+size);
306
307        }
308      } finally {
309        in.close();
310      }
311
312      collector.collect(new UTF8("bytes"), new LongWritable(read));
313
314      reporter.setStatus("read " + name);
315    }
316   
317    public void close() {
318    }
319   
320  }
321
322  public static void readTest(FileSystem fs, boolean fastCheck)
323    throws Exception {
324
325    fs.delete(READ_DIR, true);
326
327    JobConf job = new JobConf(conf, TestFileSystem.class);
328    job.setBoolean("fs.test.fastCheck", fastCheck);
329
330
331    FileInputFormat.setInputPaths(job, CONTROL_DIR);
332    job.setInputFormat(SequenceFileInputFormat.class);
333
334    job.setMapperClass(ReadMapper.class);
335    job.setReducerClass(LongSumReducer.class);
336
337    FileOutputFormat.setOutputPath(job, READ_DIR);
338    job.setOutputKeyClass(UTF8.class);
339    job.setOutputValueClass(LongWritable.class);
340    job.setNumReduceTasks(1);
341    JobClient.runJob(job);
342  }
343
344
345  public static class SeekMapper<K> extends Configured
346    implements Mapper<WritableComparable, LongWritable, K, LongWritable> {
347   
348    private Random random = new Random();
349    private byte[] check  = new byte[BUFFER_SIZE];
350    private FileSystem fs;
351    private boolean fastCheck;
352
353    {
354      try {
355        fs = FileSystem.get(conf);
356      } catch (IOException e) {
357        throw new RuntimeException(e);
358      }
359    }
360
361    public SeekMapper() { super(null); }
362   
363    public SeekMapper(Configuration conf) { super(conf); }
364
365    public void configure(JobConf job) {
366      setConf(job);
367      fastCheck = job.getBoolean("fs.test.fastCheck", false);
368    }
369
370    public void map(WritableComparable key, LongWritable value,
371                    OutputCollector<K, LongWritable> collector,
372                    Reporter reporter)
373      throws IOException {
374      String name = key.toString();
375      long size = value.get();
376      long seed = Long.parseLong(name);
377
378      if (size == 0) return;
379
380      reporter.setStatus("opening " + name);
381
382      FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
383       
384      try {
385        for (int i = 0; i < SEEKS_PER_FILE; i++) {
386          // generate a random position
387          long position = Math.abs(random.nextLong()) % size;
388         
389          // seek file to that position
390          reporter.setStatus("seeking " + name);
391          in.seek(position);
392          byte b = in.readByte();
393         
394          // check that byte matches
395          byte checkByte = 0;
396          // advance random state to that position
397          random.setSeed(seed);
398          for (int p = 0; p <= position; p+= check.length) {
399            reporter.setStatus("generating data for " + name);
400            if (fastCheck) {
401              checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
402            } else {
403              random.nextBytes(check);
404              checkByte = check[(int)(position % check.length)];
405            }
406          }
407          assertEquals(b, checkByte);
408        }
409      } finally {
410        in.close();
411      }
412    }
413   
414    public void close() {
415    }
416   
417  }
418
419  public static void seekTest(FileSystem fs, boolean fastCheck)
420    throws Exception {
421
422    fs.delete(READ_DIR, true);
423
424    JobConf job = new JobConf(conf, TestFileSystem.class);
425    job.setBoolean("fs.test.fastCheck", fastCheck);
426
427    FileInputFormat.setInputPaths(job,CONTROL_DIR);
428    job.setInputFormat(SequenceFileInputFormat.class);
429
430    job.setMapperClass(SeekMapper.class);
431    job.setReducerClass(LongSumReducer.class);
432
433    FileOutputFormat.setOutputPath(job, READ_DIR);
434    job.setOutputKeyClass(UTF8.class);
435    job.setOutputValueClass(LongWritable.class);
436    job.setNumReduceTasks(1);
437    JobClient.runJob(job);
438  }
439
440
441  public static void main(String[] args) throws Exception {
442    int megaBytes = 10;
443    int files = 100;
444    boolean noRead = false;
445    boolean noWrite = false;
446    boolean noSeek = false;
447    boolean fastCheck = false;
448    long seed = new Random().nextLong();
449
450    String usage = "Usage: TestFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]";
451   
452    if (args.length == 0) {
453      System.err.println(usage);
454      System.exit(-1);
455    }
456    for (int i = 0; i < args.length; i++) {       // parse command line
457      if (args[i].equals("-files")) {
458        files = Integer.parseInt(args[++i]);
459      } else if (args[i].equals("-megaBytes")) {
460        megaBytes = Integer.parseInt(args[++i]);
461      } else if (args[i].equals("-noread")) {
462        noRead = true;
463      } else if (args[i].equals("-nowrite")) {
464        noWrite = true;
465      } else if (args[i].equals("-noseek")) {
466        noSeek = true;
467      } else if (args[i].equals("-fastcheck")) {
468        fastCheck = true;
469      }
470    }
471
472    LOG.info("seed = "+seed);
473    LOG.info("files = " + files);
474    LOG.info("megaBytes = " + megaBytes);
475 
476    FileSystem fs = FileSystem.get(conf);
477
478    if (!noWrite) {
479      createControlFile(fs, megaBytes*MEGA, files, seed);
480      writeTest(fs, fastCheck);
481    }
482    if (!noRead) {
483      readTest(fs, fastCheck);
484    }
485    if (!noSeek) {
486      seekTest(fs, fastCheck);
487    }
488  }
489
490  static Configuration createConf4Testing(String username) throws Exception {
491    Configuration conf = new Configuration();
492    UnixUserGroupInformation.saveToConf(conf,
493        UnixUserGroupInformation.UGI_PROPERTY_NAME,
494        new UnixUserGroupInformation(username, new String[]{"group"}));
495    return conf;   
496  }
497
498  public void testFsCache() throws Exception {
499    {
500      long now = System.currentTimeMillis();
501      Configuration[] conf = {new Configuration(),
502          createConf4Testing("foo" + now), createConf4Testing("bar" + now)};
503      FileSystem[] fs = new FileSystem[conf.length];
504 
505      for(int i = 0; i < conf.length; i++) {
506        fs[i] = FileSystem.get(conf[i]);
507        assertEquals(fs[i], FileSystem.get(conf[i]));
508        for(int j = 0; j < i; j++) {
509          assertFalse(fs[j] == fs[i]);
510        }
511      }
512      FileSystem.closeAll();
513    }
514   
515    {
516      try {
517        runTestCache(NameNode.DEFAULT_PORT);
518      } catch(java.net.BindException be) {
519        LOG.warn("Cannot test NameNode.DEFAULT_PORT (="
520            + NameNode.DEFAULT_PORT + ")", be);
521      }
522
523      runTestCache(0);
524    }
525  }
526 
527  static void runTestCache(int port) throws Exception {
528    Configuration conf = new Configuration();
529    MiniDFSCluster cluster = null;
530    try {
531      cluster = new MiniDFSCluster(port, conf, 2, true, true, null, null);
532      URI uri = cluster.getFileSystem().getUri();
533      LOG.info("uri=" + uri);
534
535      {
536        FileSystem fs = FileSystem.get(uri, new Configuration());
537        checkPath(cluster, fs);
538        for(int i = 0; i < 100; i++) {
539          assertTrue(fs == FileSystem.get(uri, new Configuration()));
540        }
541      }
542     
543      if (port == NameNode.DEFAULT_PORT) {
544        //test explicit default port
545        URI uri2 = new URI(uri.getScheme(), uri.getUserInfo(),
546            uri.getHost(), NameNode.DEFAULT_PORT, uri.getPath(),
547            uri.getQuery(), uri.getFragment()); 
548        LOG.info("uri2=" + uri2);
549        FileSystem fs = FileSystem.get(uri2, conf);
550        checkPath(cluster, fs);
551        for(int i = 0; i < 100; i++) {
552          assertTrue(fs == FileSystem.get(uri2, new Configuration()));
553        }
554      }
555    } finally {
556      if (cluster != null) cluster.shutdown(); 
557    }
558  }
559 
560  static void checkPath(MiniDFSCluster cluster, FileSystem fileSys) throws IOException {
561    InetSocketAddress add = cluster.getNameNode().getNameNodeAddress();
562    // Test upper/lower case
563    fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
564  }
565
566  public void testFsClose() throws Exception {
567    {
568      Configuration conf = new Configuration();
569      new Path("file:///").getFileSystem(conf);
570      UnixUserGroupInformation.login(conf, true);
571      FileSystem.closeAll();
572    }
573
574    {
575      Configuration conf = new Configuration();
576      new Path("hftp://localhost:12345/").getFileSystem(conf);
577      UnixUserGroupInformation.login(conf, true);
578      FileSystem.closeAll();
579    }
580
581    {
582      Configuration conf = new Configuration();
583      FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
584      UnixUserGroupInformation.login(fs.getConf(), true);
585      FileSystem.closeAll();
586    }
587  }
588
589
590  public void testCacheKeysAreCaseInsensitive()
591    throws Exception
592  {
593    Configuration conf = new Configuration();
594   
595    // check basic equality
596    FileSystem.Cache.Key lowercaseCachekey1 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf);
597    FileSystem.Cache.Key lowercaseCachekey2 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf);
598    assertEquals( lowercaseCachekey1, lowercaseCachekey2 );
599
600    // check insensitive equality   
601    FileSystem.Cache.Key uppercaseCachekey = new FileSystem.Cache.Key(new URI("HFTP://Localhost:12345/"), conf);
602    assertEquals( lowercaseCachekey2, uppercaseCachekey );
603
604    // check behaviour with collections
605    List<FileSystem.Cache.Key> list = new ArrayList<FileSystem.Cache.Key>();
606    list.add(uppercaseCachekey);
607    assertTrue(list.contains(uppercaseCachekey));
608    assertTrue(list.contains(lowercaseCachekey2));
609
610    Set<FileSystem.Cache.Key> set = new HashSet<FileSystem.Cache.Key>();
611    set.add(uppercaseCachekey);
612    assertTrue(set.contains(uppercaseCachekey));
613    assertTrue(set.contains(lowercaseCachekey2));
614
615    Map<FileSystem.Cache.Key, String> map = new HashMap<FileSystem.Cache.Key, String>();
616    map.put(uppercaseCachekey, "");
617    assertTrue(map.containsKey(uppercaseCachekey));
618    assertTrue(map.containsKey(lowercaseCachekey2));   
619
620  }
621}
Note: See TracBrowser for help on using the repository browser.