source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 26.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import java.io.BufferedReader;
21import java.io.File;
22import java.io.FileReader;
23import java.io.IOException;
24import java.net.InetSocketAddress;
25
26import org.apache.commons.logging.impl.Log4JLogger;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.fs.BlockLocation;
29import org.apache.hadoop.fs.FSDataInputStream;
30import org.apache.hadoop.fs.FSDataOutputStream;
31import org.apache.hadoop.fs.FileStatus;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.hdfs.protocol.Block;
35import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
36import org.apache.hadoop.hdfs.protocol.FSConstants;
37import org.apache.hadoop.hdfs.protocol.LocatedBlock;
38import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
39import org.apache.hadoop.hdfs.server.datanode.DataNode;
40import org.apache.hadoop.hdfs.server.datanode.FSDataset;
41import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
42import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
43import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
44import org.apache.hadoop.io.IOUtils;
45import org.apache.log4j.Level;
46
47
48/**
49 * This class tests that a file need not be closed before its
50 * data can be read by another client.
51 */
52public class TestFileCreation extends junit.framework.TestCase {
53  static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/";
54
55  {
56    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
57    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
58    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
59    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
60  }
61
62  static final long seed = 0xDEADBEEFL;
63  static final int blockSize = 8192;
64  static final int numBlocks = 2;
65  static final int fileSize = numBlocks * blockSize + 1;
66  boolean simulatedStorage = false;
67
68  // The test file is 2 times the blocksize plus one. This means that when the
69  // entire file is written, the first two blocks definitely get flushed to
70  // the datanodes.
71
72  // creates a file but does not close it
73  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
74    throws IOException {
75    System.out.println("createFile: Created " + name + " with " + repl + " replica.");
76    FSDataOutputStream stm = fileSys.create(name, true,
77                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
78                                            (short)repl, (long)blockSize);
79    return stm;
80  }
81
82  //
83  // writes to file but does not close it
84  //
85  static void writeFile(FSDataOutputStream stm) throws IOException {
86    writeFile(stm, fileSize);
87  }
88
89  //
90  // writes specified bytes to file.
91  //
92  static void writeFile(FSDataOutputStream stm, int size) throws IOException {
93    byte[] buffer = AppendTestUtil.randomBytes(seed, size);
94    stm.write(buffer, 0, size);
95  }
96
97  //
98  // verify that the data written to the full blocks are sane
99  //
100  private void checkFile(FileSystem fileSys, Path name, int repl)
101    throws IOException {
102    boolean done = false;
103
104    // wait till all full blocks are confirmed by the datanodes.
105    while (!done) {
106      try {
107        Thread.sleep(1000);
108      } catch (InterruptedException e) {}
109      done = true;
110      BlockLocation[] locations = fileSys.getFileBlockLocations(
111          fileSys.getFileStatus(name), 0, fileSize);
112      if (locations.length < numBlocks) {
113        done = false;
114        continue;
115      }
116      for (int idx = 0; idx < locations.length; idx++) {
117        if (locations[idx].getHosts().length < repl) {
118          done = false;
119          break;
120        }
121      }
122    }
123    FSDataInputStream stm = fileSys.open(name);
124    final byte[] expected;
125    if (simulatedStorage) {
126      expected = new byte[numBlocks * blockSize];
127      for (int i= 0; i < expected.length; i++) { 
128        expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
129      }
130    } else {
131      expected = AppendTestUtil.randomBytes(seed, numBlocks*blockSize);
132    }
133    // do a sanity check. Read the file
134    byte[] actual = new byte[numBlocks * blockSize];
135    stm.readFully(0, actual);
136    stm.close();
137    checkData(actual, 0, expected, "Read 1");
138  }
139
140  static private void checkData(byte[] actual, int from, byte[] expected, String message) {
141    for (int idx = 0; idx < actual.length; idx++) {
142      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
143                   expected[from+idx]+" actual "+actual[idx],
144                   expected[from+idx], actual[idx]);
145      actual[idx] = 0;
146    }
147  }
148
149  static void checkFullFile(FileSystem fs, Path name) throws IOException {
150    FileStatus stat = fs.getFileStatus(name);
151    BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
152                                                         fileSize);
153    for (int idx = 0; idx < locations.length; idx++) {
154      String[] hosts = locations[idx].getNames();
155      for (int i = 0; i < hosts.length; i++) {
156        System.out.print( hosts[i] + " ");
157      }
158      System.out.println(" off " + locations[idx].getOffset() +
159                         " len " + locations[idx].getLength());
160    }
161
162    byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
163    FSDataInputStream stm = fs.open(name);
164    byte[] actual = new byte[fileSize];
165    stm.readFully(0, actual);
166    checkData(actual, 0, expected, "Read 2");
167    stm.close();
168  }
169
170  /**
171   * Test that file data becomes available before file is closed.
172   */
173  public void testFileCreation() throws IOException {
174    Configuration conf = new Configuration();
175    if (simulatedStorage) {
176      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
177    }
178    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
179    FileSystem fs = cluster.getFileSystem();
180    try {
181
182      //
183      // check that / exists
184      //
185      Path path = new Path("/");
186      System.out.println("Path : \"" + path.toString() + "\"");
187      System.out.println(fs.getFileStatus(path).isDir()); 
188      assertTrue("/ should be a directory", 
189                 fs.getFileStatus(path).isDir() == true);
190
191      //
192      // Create a directory inside /, then try to overwrite it
193      //
194      Path dir1 = new Path("/test_dir");
195      fs.mkdirs(dir1);
196      System.out.println("createFile: Creating " + dir1.getName() + 
197        " for overwrite of existing directory.");
198      try {
199        fs.create(dir1, true); // Create path, overwrite=true
200        fs.close();
201        assertTrue("Did not prevent directory from being overwritten.", false);
202      } catch (IOException ie) {
203        if (!ie.getMessage().contains("already exists as a directory."))
204          throw ie;
205      }
206     
207      // create a new file in home directory. Do not close it.
208      //
209      Path file1 = new Path("filestatus.dat");
210      FSDataOutputStream stm = createFile(fs, file1, 1);
211
212      // verify that file exists in FS namespace
213      assertTrue(file1 + " should be a file", 
214                  fs.getFileStatus(file1).isDir() == false);
215      System.out.println("Path : \"" + file1 + "\"");
216
217      // write to file
218      writeFile(stm);
219
220      // Make sure a client can read it before it is closed.
221      checkFile(fs, file1, 1);
222
223      // verify that file size has changed
224      long len = fs.getFileStatus(file1).getLen();
225      assertTrue(file1 + " should be of size " + (numBlocks * blockSize) +
226                 " but found to be of size " + len, 
227                  len == numBlocks * blockSize);
228
229      stm.close();
230
231      // verify that file size has changed to the full size
232      len = fs.getFileStatus(file1).getLen();
233      assertTrue(file1 + " should be of size " + fileSize +
234                 " but found to be of size " + len, 
235                  len == fileSize);
236     
237     
238      // Check storage usage
239      // can't check capacities for real storage since the OS file system may be changing under us.
240      if (simulatedStorage) {
241        DataNode dn = cluster.getDataNodes().get(0);
242        assertEquals(fileSize, dn.getFSDataset().getDfsUsed());
243        assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining());
244      }
245    } finally {
246      cluster.shutdown();
247    }
248  }
249
250  /**
251   * Test deleteOnExit
252   */
253  public void testDeleteOnExit() throws IOException {
254    Configuration conf = new Configuration();
255    if (simulatedStorage) {
256      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
257    }
258    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
259    FileSystem fs = cluster.getFileSystem();
260    FileSystem localfs = FileSystem.getLocal(conf);
261
262    try {
263
264      // Creates files in HDFS and local file system.
265      //
266      Path file1 = new Path("filestatus.dat");
267      Path file2 = new Path("filestatus2.dat");
268      Path file3 = new Path("filestatus3.dat");
269      FSDataOutputStream stm1 = createFile(fs, file1, 1);
270      FSDataOutputStream stm2 = createFile(fs, file2, 1);
271      FSDataOutputStream stm3 = createFile(localfs, file3, 1);
272      System.out.println("DeleteOnExit: Created files.");
273
274      // write to files and close. Purposely, do not close file2.
275      writeFile(stm1);
276      writeFile(stm3);
277      stm1.close();
278      stm2.close();
279      stm3.close();
280
281      // set delete on exit flag on files.
282      fs.deleteOnExit(file1);
283      fs.deleteOnExit(file2);
284      localfs.deleteOnExit(file3);
285
286      // close the file system. This should make the above files
287      // disappear.
288      fs.close();
289      localfs.close();
290      fs = null;
291      localfs = null;
292
293      // reopen file system and verify that file does not exist.
294      fs = cluster.getFileSystem();
295      localfs = FileSystem.getLocal(conf);
296
297      assertTrue(file1 + " still exists inspite of deletOnExit set.",
298                 !fs.exists(file1));
299      assertTrue(file2 + " still exists inspite of deletOnExit set.",
300                 !fs.exists(file2));
301      assertTrue(file3 + " still exists inspite of deletOnExit set.",
302                 !localfs.exists(file3));
303      System.out.println("DeleteOnExit successful.");
304
305    } finally {
306      IOUtils.closeStream(fs);
307      IOUtils.closeStream(localfs);
308      cluster.shutdown();
309    }
310  }
311
312  /**
313   * Test that file data does not become corrupted even in the face of errors.
314   */
315  public void testFileCreationError1() throws IOException {
316    Configuration conf = new Configuration();
317    conf.setInt("heartbeat.recheck.interval", 1000);
318    conf.setInt("dfs.heartbeat.interval", 1);
319    if (simulatedStorage) {
320      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
321    }
322    // create cluster
323    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
324    FileSystem fs = cluster.getFileSystem();
325    cluster.waitActive();
326    InetSocketAddress addr = new InetSocketAddress("localhost",
327                                                   cluster.getNameNodePort());
328    DFSClient client = new DFSClient(addr, conf);
329
330    try {
331
332      // create a new file.
333      //
334      Path file1 = new Path("/filestatus.dat");
335      FSDataOutputStream stm = createFile(fs, file1, 1);
336
337      // verify that file exists in FS namespace
338      assertTrue(file1 + " should be a file", 
339                  fs.getFileStatus(file1).isDir() == false);
340      System.out.println("Path : \"" + file1 + "\"");
341
342      // kill the datanode
343      cluster.shutdownDataNodes();
344
345      // wait for the datanode to be declared dead
346      while (true) {
347        DatanodeInfo[] info = client.datanodeReport(
348            FSConstants.DatanodeReportType.LIVE);
349        if (info.length == 0) {
350          break;
351        }
352        System.out.println("testFileCreationError1: waiting for datanode " +
353                           " to die.");
354        try {
355          Thread.sleep(1000);
356        } catch (InterruptedException e) {
357        }
358      }
359
360      // write 1 byte to file.
361      // This should fail because all datanodes are dead.
362      byte[] buffer = AppendTestUtil.randomBytes(seed, 1);
363      try {
364        stm.write(buffer);
365        stm.close();
366      } catch (Exception e) {
367        System.out.println("Encountered expected exception");
368      }
369
370      // verify that no blocks are associated with this file
371      // bad block allocations were cleaned up earlier.
372      LocatedBlocks locations = client.namenode.getBlockLocations(
373                                  file1.toString(), 0, Long.MAX_VALUE);
374      System.out.println("locations = " + locations.locatedBlockCount());
375      assertTrue("Error blocks were not cleaned up",
376                 locations.locatedBlockCount() == 0);
377    } finally {
378      cluster.shutdown();
379      client.close();
380    }
381  }
382
383  /**
384   * Test that the filesystem removes the last block from a file if its
385   * lease expires.
386   */
387  public void testFileCreationError2() throws IOException {
388    long leasePeriod = 1000;
389    System.out.println("testFileCreationError2 start");
390    Configuration conf = new Configuration();
391    conf.setInt("heartbeat.recheck.interval", 1000);
392    conf.setInt("dfs.heartbeat.interval", 1);
393    if (simulatedStorage) {
394      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
395    }
396    // create cluster
397    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
398    DistributedFileSystem dfs = null;
399    try {
400      cluster.waitActive();
401      dfs = (DistributedFileSystem)cluster.getFileSystem();
402      DFSClient client = dfs.dfs;
403
404      // create a new file.
405      //
406      Path file1 = new Path("/filestatus.dat");
407      createFile(dfs, file1, 1);
408      System.out.println("testFileCreationError2: "
409                         + "Created file filestatus.dat with one replicas.");
410
411      LocatedBlocks locations = client.namenode.getBlockLocations(
412                                  file1.toString(), 0, Long.MAX_VALUE);
413      System.out.println("testFileCreationError2: "
414          + "The file has " + locations.locatedBlockCount() + " blocks.");
415
416      // add another block to the file
417      LocatedBlock location = client.namenode.addBlock(file1.toString(), 
418          client.clientName);
419      System.out.println("testFileCreationError2: "
420          + "Added block " + location.getBlock());
421
422      locations = client.namenode.getBlockLocations(file1.toString(), 
423                                                    0, Long.MAX_VALUE);
424      int count = locations.locatedBlockCount();
425      System.out.println("testFileCreationError2: "
426          + "The file now has " + count + " blocks.");
427     
428      // set the soft and hard limit to be 1 second so that the
429      // namenode triggers lease recovery
430      cluster.setLeasePeriod(leasePeriod, leasePeriod);
431
432      // wait for the lease to expire
433      try {
434        Thread.sleep(5 * leasePeriod);
435      } catch (InterruptedException e) {
436      }
437
438      // verify that the last block was synchronized.
439      locations = client.namenode.getBlockLocations(file1.toString(), 
440                                                    0, Long.MAX_VALUE);
441      System.out.println("testFileCreationError2: "
442          + "locations = " + locations.locatedBlockCount());
443      assertEquals(0, locations.locatedBlockCount());
444      System.out.println("testFileCreationError2 successful");
445    } finally {
446      IOUtils.closeStream(dfs);
447      cluster.shutdown();
448    }
449  }
450
451  /**
452   * Test that file leases are persisted across namenode restarts.
453   * This test is currently not triggered because more HDFS work is
454   * is needed to handle persistent leases.
455   */
456  public void xxxtestFileCreationNamenodeRestart() throws IOException {
457    Configuration conf = new Configuration();
458    final int MAX_IDLE_TIME = 2000; // 2s
459    conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
460    conf.setInt("heartbeat.recheck.interval", 1000);
461    conf.setInt("dfs.heartbeat.interval", 1);
462    if (simulatedStorage) {
463      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
464    }
465
466    // create cluster
467    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
468    FileSystem fs = null;
469    try {
470      cluster.waitActive();
471      fs = cluster.getFileSystem();
472      final int nnport = cluster.getNameNodePort();
473
474      // create a new file.
475      Path file1 = new Path("/filestatus.dat");
476      FSDataOutputStream stm = createFile(fs, file1, 1);
477      System.out.println("testFileCreationNamenodeRestart: "
478                         + "Created file " + file1);
479
480      // write two full blocks.
481      writeFile(stm, numBlocks * blockSize);
482      stm.sync();
483
484      // rename file wile keeping it open.
485      Path fileRenamed = new Path("/filestatusRenamed.dat");
486      fs.rename(file1, fileRenamed);
487      System.out.println("testFileCreationNamenodeRestart: "
488                         + "Renamed file " + file1 + " to " +
489                         fileRenamed);
490      file1 = fileRenamed;
491
492      // create another new file.
493      //
494      Path file2 = new Path("/filestatus2.dat");
495      FSDataOutputStream stm2 = createFile(fs, file2, 1);
496      System.out.println("testFileCreationNamenodeRestart: "
497                         + "Created file " + file2);
498
499      // create yet another new file with full path name.
500      // rename it while open
501      //
502      Path file3 = new Path("/user/home/fullpath.dat");
503      FSDataOutputStream stm3 = createFile(fs, file3, 1);
504      System.out.println("testFileCreationNamenodeRestart: "
505                         + "Created file " + file3);
506      Path file4 = new Path("/user/home/fullpath4.dat");
507      FSDataOutputStream stm4 = createFile(fs, file4, 1);
508      System.out.println("testFileCreationNamenodeRestart: "
509                         + "Created file " + file4);
510
511      fs.mkdirs(new Path("/bin"));
512      fs.rename(new Path("/user/home"), new Path("/bin"));
513      Path file3new = new Path("/bin/home/fullpath.dat");
514      System.out.println("testFileCreationNamenodeRestart: "
515                         + "Renamed file " + file3 + " to " +
516                         file3new);
517      Path file4new = new Path("/bin/home/fullpath4.dat");
518      System.out.println("testFileCreationNamenodeRestart: "
519                         + "Renamed file " + file4 + " to " +
520                         file4new);
521
522      // restart cluster with the same namenode port as before.
523      // This ensures that leases are persisted in fsimage.
524      cluster.shutdown();
525      try {
526        Thread.sleep(2*MAX_IDLE_TIME);
527      } catch (InterruptedException e) {
528      }
529      cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
530                                   null, null, null);
531      cluster.waitActive();
532
533      // restart cluster yet again. This triggers the code to read in
534      // persistent leases from fsimage.
535      cluster.shutdown();
536      try {
537        Thread.sleep(5000);
538      } catch (InterruptedException e) {
539      }
540      cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
541                                   null, null, null);
542      cluster.waitActive();
543      fs = cluster.getFileSystem();
544
545      // instruct the dfsclient to use a new filename when it requests
546      // new blocks for files that were renamed.
547      DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
548                                                 (stm.getWrappedStream());
549      dfstream.setTestFilename(file1.toString());
550      dfstream = (DFSClient.DFSOutputStream) (stm3.getWrappedStream());
551      dfstream.setTestFilename(file3new.toString());
552      dfstream = (DFSClient.DFSOutputStream) (stm4.getWrappedStream());
553      dfstream.setTestFilename(file4new.toString());
554
555      // write 1 byte to file.  This should succeed because the
556      // namenode should have persisted leases.
557      byte[] buffer = AppendTestUtil.randomBytes(seed, 1);
558      stm.write(buffer);
559      stm.close();
560      stm2.write(buffer);
561      stm2.close();
562      stm3.close();
563      stm4.close();
564
565      // verify that new block is associated with this file
566      DFSClient client = ((DistributedFileSystem)fs).dfs;
567      LocatedBlocks locations = client.namenode.getBlockLocations(
568                                  file1.toString(), 0, Long.MAX_VALUE);
569      System.out.println("locations = " + locations.locatedBlockCount());
570      assertTrue("Error blocks were not cleaned up for file " + file1,
571                 locations.locatedBlockCount() == 3);
572
573      // verify filestatus2.dat
574      locations = client.namenode.getBlockLocations(
575                                  file2.toString(), 0, Long.MAX_VALUE);
576      System.out.println("locations = " + locations.locatedBlockCount());
577      assertTrue("Error blocks were not cleaned up for file " + file2,
578                 locations.locatedBlockCount() == 1);
579    } finally {
580      IOUtils.closeStream(fs);
581      cluster.shutdown();
582    }
583  }
584
585  /**
586   * Test that all open files are closed when client dies abnormally.
587   */
588  public void testDFSClientDeath() throws IOException {
589    Configuration conf = new Configuration();
590    System.out.println("Testing adbornal client death.");
591    if (simulatedStorage) {
592      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
593    }
594    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
595    FileSystem fs = cluster.getFileSystem();
596    DistributedFileSystem dfs = (DistributedFileSystem) fs;
597    DFSClient dfsclient = dfs.dfs;
598    try {
599
600      // create a new file in home directory. Do not close it.
601      //
602      Path file1 = new Path("/clienttest.dat");
603      FSDataOutputStream stm = createFile(fs, file1, 1);
604      System.out.println("Created file clienttest.dat");
605
606      // write to file
607      writeFile(stm);
608
609      // close the dfsclient before closing the output stream.
610      // This should close all existing file.
611      dfsclient.close();
612
613      // reopen file system and verify that file exists.
614      assertTrue(file1 + " does not exist.", 
615          AppendTestUtil.createHdfsWithDifferentUsername(conf).exists(file1));
616    } finally {
617      cluster.shutdown();
618    }
619  }
620
621/**
622 * Test that file data becomes available before file is closed.
623 */
624  public void testFileCreationSimulated() throws IOException {
625    simulatedStorage = true;
626    testFileCreation();
627    simulatedStorage = false;
628  }
629
630  /**
631   * Test creating two files at the same time.
632   */
633  public void testConcurrentFileCreation() throws IOException {
634    Configuration conf = new Configuration();
635    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
636
637    try {
638      FileSystem fs = cluster.getFileSystem();
639     
640      Path[] p = {new Path("/foo"), new Path("/bar")};
641     
642      //write 2 files at the same time
643      FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
644      int i = 0;
645      for(; i < 100; i++) {
646        out[0].write(i);
647        out[1].write(i);
648      }
649      out[0].close();
650      for(; i < 200; i++) {out[1].write(i);}
651      out[1].close();
652
653      //verify
654      FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])}; 
655      for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
656      for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
657    } finally {
658      if (cluster != null) {cluster.shutdown();}
659    }
660  }
661
662  /**
663   * Create a file, write something, fsync but not close.
664   * Then change lease period and wait for lease recovery.
665   * Finally, read the block directly from each Datanode and verify the content.
666   */
667  public void testLeaseExpireHardLimit() throws Exception {
668    System.out.println("testLeaseExpireHardLimit start");
669    final long leasePeriod = 1000;
670    final int DATANODE_NUM = 3;
671
672    Configuration conf = new Configuration();
673    conf.setInt("heartbeat.recheck.interval", 1000);
674    conf.setInt("dfs.heartbeat.interval", 1);
675
676    // create cluster
677    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
678    DistributedFileSystem dfs = null;
679    try {
680      cluster.waitActive();
681      dfs = (DistributedFileSystem)cluster.getFileSystem();
682
683      // create a new file.
684      final String f = DIR + "foo";
685      final Path fpath = new Path(f);
686      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
687      out.write("something".getBytes());
688      out.sync();
689
690      // set the soft and hard limit to be 1 second so that the
691      // namenode triggers lease recovery
692      cluster.setLeasePeriod(leasePeriod, leasePeriod);
693      // wait for the lease to expire
694      try {Thread.sleep(5 * leasePeriod);} catch (InterruptedException e) {}
695
696      LocatedBlocks locations = dfs.dfs.namenode.getBlockLocations(
697          f, 0, Long.MAX_VALUE);
698      assertEquals(1, locations.locatedBlockCount());
699      LocatedBlock locatedblock = locations.getLocatedBlocks().get(0);
700      int successcount = 0;
701      for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) {
702        DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort);
703        FSDataset dataset = (FSDataset)datanode.data;
704        Block b = dataset.getStoredBlock(locatedblock.getBlock().getBlockId());
705        File blockfile = dataset.findBlockFile(b.getBlockId());
706        System.out.println("blockfile=" + blockfile);
707        if (blockfile != null) {
708          BufferedReader in = new BufferedReader(new FileReader(blockfile));
709          assertEquals("something", in.readLine());
710          in.close();
711          successcount++;
712        }
713      }
714      System.out.println("successcount=" + successcount);
715      assertTrue(successcount > 0); 
716    } finally {
717      IOUtils.closeStream(dfs);
718      cluster.shutdown();
719    }
720
721    System.out.println("testLeaseExpireHardLimit successful");
722  }
723
724  // test closing file system before all file handles are closed.
725  public void testFsClose() throws Exception {
726    System.out.println("test file system close start");
727    final int DATANODE_NUM = 3;
728
729    Configuration conf = new Configuration();
730
731    // create cluster
732    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
733    DistributedFileSystem dfs = null;
734    try {
735      cluster.waitActive();
736      dfs = (DistributedFileSystem)cluster.getFileSystem();
737
738      // create a new file.
739      final String f = DIR + "foofs";
740      final Path fpath = new Path(f);
741      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
742      out.write("something".getBytes());
743
744      // close file system without closing file
745      dfs.close();
746    } finally {
747      System.out.println("testFsClose successful");
748    }
749  }
750}
Note: See TracBrowser for help on using the repository browser.