source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/hdfs/TestFileAppend2.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs;
19
20import java.io.IOException;
21import java.util.ArrayList;
22import java.util.Arrays;
23
24import junit.framework.TestCase;
25
26import org.apache.hadoop.conf.Configuration;
27import org.apache.hadoop.fs.FSDataInputStream;
28import org.apache.hadoop.fs.FSDataOutputStream;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.fs.permission.FsPermission;
32import org.apache.hadoop.hdfs.server.datanode.DataNode;
33import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
34import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
35import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
36import org.apache.hadoop.hdfs.server.namenode.NameNode;
37import org.apache.hadoop.io.IOUtils;
38import org.apache.hadoop.security.AccessControlException;
39import org.apache.hadoop.security.UnixUserGroupInformation;
40import org.apache.hadoop.security.UserGroupInformation;
41
42import org.apache.commons.logging.impl.Log4JLogger;
43import org.apache.log4j.Level;
44
45/**
46 * This class tests the building blocks that are needed to
47 * support HDFS appends.
48 */
49public class TestFileAppend2 extends TestCase {
50
51  {
52    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
53    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
54    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
55    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
56    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
57  }
58
59  static final int blockSize = 1024;
60  static final int numBlocks = 5;
61  static final int fileSize = numBlocks * blockSize + 1;
62  boolean simulatedStorage = false;
63
64  private byte[] fileContents = null;
65
66  int numDatanodes = 5;
67  int numberOfFiles = 50;
68  int numThreads = 10;
69  int numAppendsPerThread = 20;
70/***
71  int numberOfFiles = 1;
72  int numThreads = 1;
73  int numAppendsPerThread = 2000;
74****/
75  Workload[] workload = null;
76  ArrayList<Path> testFiles = new ArrayList<Path>();
77  volatile static boolean globalStatus = true;
78
79  //
80  // create a buffer that contains the entire test file data.
81  //
82  private void initBuffer(int size) {
83    long seed = AppendTestUtil.nextLong();
84    fileContents = AppendTestUtil.randomBytes(seed, size);
85  }
86
87  /*
88   * creates a file but does not close it
89   */ 
90  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
91    throws IOException {
92    FSDataOutputStream stm = fileSys.create(name, true,
93                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
94                                            (short)repl, (long)blockSize);
95    return stm;
96  }
97
98  private void checkFile(FileSystem fs, Path name, int len) throws IOException {
99    FSDataInputStream stm = fs.open(name);
100    byte[] actual = new byte[len];
101    stm.readFully(0, actual);
102    checkData(actual, 0, fileContents, "Read 2");
103    stm.close();
104  }
105
106  private void checkFullFile(FileSystem fs, Path name) throws IOException {
107    checkFile(fs, name, fileSize);
108  }
109
110  private void checkData(byte[] actual, int from, byte[] expected, String message) {
111    for (int idx = 0; idx < actual.length; idx++) {
112      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
113                   expected[from+idx]+" actual "+actual[idx],
114                   expected[from+idx], actual[idx]);
115      actual[idx] = 0;
116    }
117  }
118
119
120  /**
121   * Creates one file, writes a few bytes to it and then closed it.
122   * Reopens the same file for appending, write all blocks and then close.
123   * Verify that all data exists in file.
124   */ 
125  public void testSimpleAppend() throws IOException {
126    Configuration conf = new Configuration();
127    if (simulatedStorage) {
128      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
129    }
130    conf.setInt("dfs.datanode.handler.count", 50);
131    conf.setBoolean("dfs.support.append", true);
132    initBuffer(fileSize);
133    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
134    FileSystem fs = cluster.getFileSystem();
135    try {
136      { // test appending to a file.
137
138        // create a new file.
139        Path file1 = new Path("/simpleAppend.dat");
140        FSDataOutputStream stm = createFile(fs, file1, 1);
141        System.out.println("Created file simpleAppend.dat");
142 
143        // write to file
144        int mid = 186;   // io.bytes.per.checksum bytes
145        System.out.println("Writing " + mid + " bytes to file " + file1);
146        stm.write(fileContents, 0, mid);
147        stm.close();
148        System.out.println("Wrote and Closed first part of file.");
149 
150        // write to file
151        int mid2 = 607;   // io.bytes.per.checksum bytes
152        System.out.println("Writing " + mid + " bytes to file " + file1);
153        stm = fs.append(file1);
154        stm.write(fileContents, mid, mid2-mid);
155        stm.close();
156        System.out.println("Wrote and Closed second part of file.");
157 
158        // write the remainder of the file
159        stm = fs.append(file1);
160
161        // ensure getPos is set to reflect existing size of the file
162        assertTrue(stm.getPos() > 0);
163
164        System.out.println("Writing " + (fileSize - mid2) + " bytes to file " + file1);
165        stm.write(fileContents, mid2, fileSize - mid2);
166        System.out.println("Written second part of file");
167        stm.close();
168        System.out.println("Wrote and Closed second part of file.");
169 
170        // verify that entire file is good
171        checkFullFile(fs, file1);
172      }
173
174      { // test appending to an non-existing file.
175        FSDataOutputStream out = null;
176        try {
177          out = fs.append(new Path("/non-existing.dat"));
178          fail("Expected to have FileNotFoundException");
179        }
180        catch(java.io.FileNotFoundException fnfe) {
181          System.out.println("Good: got " + fnfe);
182          fnfe.printStackTrace(System.out);
183        }
184        finally {
185          IOUtils.closeStream(out);
186        }
187      }
188
189      { // test append permission.
190
191        //set root to all writable
192        Path root = new Path("/");
193        fs.setPermission(root, new FsPermission((short)0777));
194        fs.close();
195
196        // login as a different user
197        final UserGroupInformation superuser = UserGroupInformation.getCurrentUGI();
198        String username = "testappenduser";
199        String group = "testappendgroup";
200        assertFalse(superuser.getUserName().equals(username));
201        assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
202        UnixUserGroupInformation appenduser = UnixUserGroupInformation.createImmutable(
203            new String[]{username, group});
204        UnixUserGroupInformation.saveToConf(conf,
205            UnixUserGroupInformation.UGI_PROPERTY_NAME, appenduser);
206        fs = FileSystem.get(conf);
207
208        // create a file
209        Path dir = new Path(root, getClass().getSimpleName());
210        Path foo = new Path(dir, "foo.dat");
211        FSDataOutputStream out = null;
212        int offset = 0;
213        try {
214          out = fs.create(foo);
215          int len = 10 + AppendTestUtil.nextInt(100);
216          out.write(fileContents, offset, len);
217          offset += len;
218        }
219        finally {
220          IOUtils.closeStream(out);
221        }
222
223        // change dir and foo to minimal permissions.
224        fs.setPermission(dir, new FsPermission((short)0100));
225        fs.setPermission(foo, new FsPermission((short)0200));
226
227        // try append, should success
228        out = null;
229        try {
230          out = fs.append(foo);
231          int len = 10 + AppendTestUtil.nextInt(100);
232          out.write(fileContents, offset, len);
233          offset += len;
234        }
235        finally {
236          IOUtils.closeStream(out);
237        }
238
239        // change dir and foo to all but no write on foo.
240        fs.setPermission(foo, new FsPermission((short)0577));
241        fs.setPermission(dir, new FsPermission((short)0777));
242
243        // try append, should fail
244        out = null;
245        try {
246          out = fs.append(foo);
247          fail("Expected to have AccessControlException");
248        }
249        catch(AccessControlException ace) {
250          System.out.println("Good: got " + ace);
251          ace.printStackTrace(System.out);
252        }
253        finally {
254          IOUtils.closeStream(out);
255        }
256      }
257    } catch (IOException e) {
258      System.out.println("Exception :" + e);
259      throw e; 
260    } catch (Throwable e) {
261      System.out.println("Throwable :" + e);
262      e.printStackTrace();
263      throw new IOException("Throwable : " + e);
264    } finally {
265      fs.close();
266      cluster.shutdown();
267    }
268  }
269
270  //
271  // an object that does a bunch of appends to files
272  //
273  class Workload extends Thread {
274    private int id;
275    private MiniDFSCluster cluster;
276
277    Workload(MiniDFSCluster cluster, int threadIndex) {
278      id = threadIndex;
279      this.cluster = cluster;
280    }
281
282    // create a bunch of files. Write to them and then verify.
283    public void run() {
284      System.out.println("Workload " + id + " starting... ");
285      for (int i = 0; i < numAppendsPerThread; i++) {
286   
287        // pick a file at random and remove it from pool
288        Path testfile = null;
289        synchronized (testFiles) {
290          if (testFiles.size() == 0) {
291            System.out.println("Completed write to almost all files.");
292            return; 
293          }
294          int index = AppendTestUtil.nextInt(testFiles.size());
295          testfile = testFiles.remove(index);
296        }
297
298        long len = 0;
299        int sizeToAppend = 0;
300        try {
301          FileSystem fs = cluster.getFileSystem();
302
303          // add a random number of bytes to file
304          len = fs.getFileStatus(testfile).getLen();
305
306          // if file is already full, then pick another file
307          if (len >= fileSize) {
308            System.out.println("File " + testfile + " is full.");
309            continue;
310          }
311 
312          // do small size appends so that we can trigger multiple
313          // appends to the same file.
314          //
315          int left = (int)(fileSize - len)/3;
316          if (left <= 0) {
317            left = 1;
318          }
319          sizeToAppend = AppendTestUtil.nextInt(left);
320
321          System.out.println("Workload thread " + id +
322                             " appending " + sizeToAppend + " bytes " +
323                             " to file " + testfile +
324                             " of size " + len);
325          FSDataOutputStream stm = fs.append(testfile);
326          stm.write(fileContents, (int)len, sizeToAppend);
327          stm.close();
328
329          // wait for the file size to be reflected in the namenode metadata
330          while (fs.getFileStatus(testfile).getLen() != (len + sizeToAppend)) {
331            try {
332              System.out.println("Workload thread " + id +
333                                 " file " + testfile  +
334                                 " size " + fs.getFileStatus(testfile).getLen() +
335                                 " expected size " + (len + sizeToAppend) +
336                                 " waiting for namenode metadata update.");
337              Thread.sleep(5000);
338            } catch (InterruptedException e) { 
339            }
340          }
341
342          assertTrue("File " + testfile + " size is " + 
343                     fs.getFileStatus(testfile).getLen() +
344                     " but expected " + (len + sizeToAppend),
345                    fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
346
347          checkFile(fs, testfile, (int)(len + sizeToAppend));
348        } catch (Throwable e) {
349          globalStatus = false;
350          if (e != null && e.toString() != null) {
351            System.out.println("Workload exception " + id + 
352                               " testfile " + testfile +
353                               " " + e);
354            e.printStackTrace();
355          }
356          assertTrue("Workload exception " + id + " testfile " + testfile +
357                     " expected size " + (len + sizeToAppend),
358                     false);
359        }
360
361        // Add testfile back to the pool of files.
362        synchronized (testFiles) {
363          testFiles.add(testfile);
364        }
365      }
366    }
367  }
368
369  /**
370   * Test that appends to files at random offsets.
371   */
372  public void testComplexAppend() throws IOException {
373    initBuffer(fileSize);
374    Configuration conf = new Configuration();
375    conf.setInt("heartbeat.recheck.interval", 2000);
376    conf.setInt("dfs.heartbeat.interval", 2);
377    conf.setInt("dfs.replication.pending.timeout.sec", 2);
378    conf.setInt("dfs.socket.timeout", 30000);
379    conf.setInt("dfs.datanode.socket.write.timeout", 30000);
380    conf.setInt("dfs.datanode.handler.count", 50);
381    conf.setBoolean("dfs.support.append", true);
382
383    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
384                                                true, null);
385    cluster.waitActive();
386    FileSystem fs = cluster.getFileSystem();
387
388    try {
389      // create a bunch of test files with random replication factors.
390      // Insert them into a linked list.
391      //
392      for (int i = 0; i < numberOfFiles; i++) {
393        short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1);
394        Path testFile = new Path("/" + i + ".dat");
395        FSDataOutputStream stm = createFile(fs, testFile, replication);
396        stm.close();
397        testFiles.add(testFile);
398      }
399
400      // Create threads and make them run workload concurrently.
401      workload = new Workload[numThreads];
402      for (int i = 0; i < numThreads; i++) {
403        workload[i] = new Workload(cluster, i);
404        workload[i].start();
405      }
406
407      // wait for all transactions to get over
408      for (int i = 0; i < numThreads; i++) {
409        try {
410          System.out.println("Waiting for thread " + i + " to complete...");
411          workload[i].join();
412          System.out.println("Waiting for thread " + i + " complete.");
413        } catch (InterruptedException e) {
414          i--;      // retry
415        }
416      }
417    } finally {
418      fs.close();
419      cluster.shutdown();
420    }
421
422    // If any of the worker thread failed in their job, indicate that
423    // this test failed.
424    //
425    assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
426  }
427}
Note: See TracBrowser for help on using the repository browser.