[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | package org.apache.hadoop.hdfs; |
---|
| 19 | |
---|
| 20 | import java.io.IOException; |
---|
| 21 | import java.util.ArrayList; |
---|
| 22 | import java.util.Arrays; |
---|
| 23 | |
---|
| 24 | import junit.framework.TestCase; |
---|
| 25 | |
---|
| 26 | import org.apache.hadoop.conf.Configuration; |
---|
| 27 | import org.apache.hadoop.fs.FSDataInputStream; |
---|
| 28 | import org.apache.hadoop.fs.FSDataOutputStream; |
---|
| 29 | import org.apache.hadoop.fs.FileSystem; |
---|
| 30 | import org.apache.hadoop.fs.Path; |
---|
| 31 | import org.apache.hadoop.fs.permission.FsPermission; |
---|
| 32 | import org.apache.hadoop.hdfs.server.datanode.DataNode; |
---|
| 33 | import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; |
---|
| 34 | import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
---|
| 35 | import org.apache.hadoop.hdfs.server.namenode.LeaseManager; |
---|
| 36 | import org.apache.hadoop.hdfs.server.namenode.NameNode; |
---|
| 37 | import org.apache.hadoop.io.IOUtils; |
---|
| 38 | import org.apache.hadoop.security.AccessControlException; |
---|
| 39 | import org.apache.hadoop.security.UnixUserGroupInformation; |
---|
| 40 | import org.apache.hadoop.security.UserGroupInformation; |
---|
| 41 | |
---|
| 42 | import org.apache.commons.logging.impl.Log4JLogger; |
---|
| 43 | import org.apache.log4j.Level; |
---|
| 44 | |
---|
| 45 | /** |
---|
| 46 | * This class tests the building blocks that are needed to |
---|
| 47 | * support HDFS appends. |
---|
| 48 | */ |
---|
| 49 | public class TestFileAppend2 extends TestCase { |
---|
| 50 | |
---|
| 51 | { |
---|
| 52 | ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); |
---|
| 53 | ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); |
---|
| 54 | ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); |
---|
| 55 | ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); |
---|
| 56 | ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); |
---|
| 57 | } |
---|
| 58 | |
---|
| 59 | static final int blockSize = 1024; |
---|
| 60 | static final int numBlocks = 5; |
---|
| 61 | static final int fileSize = numBlocks * blockSize + 1; |
---|
| 62 | boolean simulatedStorage = false; |
---|
| 63 | |
---|
| 64 | private byte[] fileContents = null; |
---|
| 65 | |
---|
| 66 | int numDatanodes = 5; |
---|
| 67 | int numberOfFiles = 50; |
---|
| 68 | int numThreads = 10; |
---|
| 69 | int numAppendsPerThread = 20; |
---|
| 70 | /*** |
---|
| 71 | int numberOfFiles = 1; |
---|
| 72 | int numThreads = 1; |
---|
| 73 | int numAppendsPerThread = 2000; |
---|
| 74 | ****/ |
---|
| 75 | Workload[] workload = null; |
---|
| 76 | ArrayList<Path> testFiles = new ArrayList<Path>(); |
---|
| 77 | volatile static boolean globalStatus = true; |
---|
| 78 | |
---|
| 79 | // |
---|
| 80 | // create a buffer that contains the entire test file data. |
---|
| 81 | // |
---|
| 82 | private void initBuffer(int size) { |
---|
| 83 | long seed = AppendTestUtil.nextLong(); |
---|
| 84 | fileContents = AppendTestUtil.randomBytes(seed, size); |
---|
| 85 | } |
---|
| 86 | |
---|
| 87 | /* |
---|
| 88 | * creates a file but does not close it |
---|
| 89 | */ |
---|
| 90 | private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) |
---|
| 91 | throws IOException { |
---|
| 92 | FSDataOutputStream stm = fileSys.create(name, true, |
---|
| 93 | fileSys.getConf().getInt("io.file.buffer.size", 4096), |
---|
| 94 | (short)repl, (long)blockSize); |
---|
| 95 | return stm; |
---|
| 96 | } |
---|
| 97 | |
---|
| 98 | private void checkFile(FileSystem fs, Path name, int len) throws IOException { |
---|
| 99 | FSDataInputStream stm = fs.open(name); |
---|
| 100 | byte[] actual = new byte[len]; |
---|
| 101 | stm.readFully(0, actual); |
---|
| 102 | checkData(actual, 0, fileContents, "Read 2"); |
---|
| 103 | stm.close(); |
---|
| 104 | } |
---|
| 105 | |
---|
| 106 | private void checkFullFile(FileSystem fs, Path name) throws IOException { |
---|
| 107 | checkFile(fs, name, fileSize); |
---|
| 108 | } |
---|
| 109 | |
---|
| 110 | private void checkData(byte[] actual, int from, byte[] expected, String message) { |
---|
| 111 | for (int idx = 0; idx < actual.length; idx++) { |
---|
| 112 | assertEquals(message+" byte "+(from+idx)+" differs. expected "+ |
---|
| 113 | expected[from+idx]+" actual "+actual[idx], |
---|
| 114 | expected[from+idx], actual[idx]); |
---|
| 115 | actual[idx] = 0; |
---|
| 116 | } |
---|
| 117 | } |
---|
| 118 | |
---|
| 119 | |
---|
| 120 | /** |
---|
| 121 | * Creates one file, writes a few bytes to it and then closed it. |
---|
| 122 | * Reopens the same file for appending, write all blocks and then close. |
---|
| 123 | * Verify that all data exists in file. |
---|
| 124 | */ |
---|
| 125 | public void testSimpleAppend() throws IOException { |
---|
| 126 | Configuration conf = new Configuration(); |
---|
| 127 | if (simulatedStorage) { |
---|
| 128 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
| 129 | } |
---|
| 130 | conf.setInt("dfs.datanode.handler.count", 50); |
---|
| 131 | conf.setBoolean("dfs.support.append", true); |
---|
| 132 | initBuffer(fileSize); |
---|
| 133 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
| 134 | FileSystem fs = cluster.getFileSystem(); |
---|
| 135 | try { |
---|
| 136 | { // test appending to a file. |
---|
| 137 | |
---|
| 138 | // create a new file. |
---|
| 139 | Path file1 = new Path("/simpleAppend.dat"); |
---|
| 140 | FSDataOutputStream stm = createFile(fs, file1, 1); |
---|
| 141 | System.out.println("Created file simpleAppend.dat"); |
---|
| 142 | |
---|
| 143 | // write to file |
---|
| 144 | int mid = 186; // io.bytes.per.checksum bytes |
---|
| 145 | System.out.println("Writing " + mid + " bytes to file " + file1); |
---|
| 146 | stm.write(fileContents, 0, mid); |
---|
| 147 | stm.close(); |
---|
| 148 | System.out.println("Wrote and Closed first part of file."); |
---|
| 149 | |
---|
| 150 | // write to file |
---|
| 151 | int mid2 = 607; // io.bytes.per.checksum bytes |
---|
| 152 | System.out.println("Writing " + mid + " bytes to file " + file1); |
---|
| 153 | stm = fs.append(file1); |
---|
| 154 | stm.write(fileContents, mid, mid2-mid); |
---|
| 155 | stm.close(); |
---|
| 156 | System.out.println("Wrote and Closed second part of file."); |
---|
| 157 | |
---|
| 158 | // write the remainder of the file |
---|
| 159 | stm = fs.append(file1); |
---|
| 160 | |
---|
| 161 | // ensure getPos is set to reflect existing size of the file |
---|
| 162 | assertTrue(stm.getPos() > 0); |
---|
| 163 | |
---|
| 164 | System.out.println("Writing " + (fileSize - mid2) + " bytes to file " + file1); |
---|
| 165 | stm.write(fileContents, mid2, fileSize - mid2); |
---|
| 166 | System.out.println("Written second part of file"); |
---|
| 167 | stm.close(); |
---|
| 168 | System.out.println("Wrote and Closed second part of file."); |
---|
| 169 | |
---|
| 170 | // verify that entire file is good |
---|
| 171 | checkFullFile(fs, file1); |
---|
| 172 | } |
---|
| 173 | |
---|
| 174 | { // test appending to an non-existing file. |
---|
| 175 | FSDataOutputStream out = null; |
---|
| 176 | try { |
---|
| 177 | out = fs.append(new Path("/non-existing.dat")); |
---|
| 178 | fail("Expected to have FileNotFoundException"); |
---|
| 179 | } |
---|
| 180 | catch(java.io.FileNotFoundException fnfe) { |
---|
| 181 | System.out.println("Good: got " + fnfe); |
---|
| 182 | fnfe.printStackTrace(System.out); |
---|
| 183 | } |
---|
| 184 | finally { |
---|
| 185 | IOUtils.closeStream(out); |
---|
| 186 | } |
---|
| 187 | } |
---|
| 188 | |
---|
| 189 | { // test append permission. |
---|
| 190 | |
---|
| 191 | //set root to all writable |
---|
| 192 | Path root = new Path("/"); |
---|
| 193 | fs.setPermission(root, new FsPermission((short)0777)); |
---|
| 194 | fs.close(); |
---|
| 195 | |
---|
| 196 | // login as a different user |
---|
| 197 | final UserGroupInformation superuser = UserGroupInformation.getCurrentUGI(); |
---|
| 198 | String username = "testappenduser"; |
---|
| 199 | String group = "testappendgroup"; |
---|
| 200 | assertFalse(superuser.getUserName().equals(username)); |
---|
| 201 | assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group)); |
---|
| 202 | UnixUserGroupInformation appenduser = UnixUserGroupInformation.createImmutable( |
---|
| 203 | new String[]{username, group}); |
---|
| 204 | UnixUserGroupInformation.saveToConf(conf, |
---|
| 205 | UnixUserGroupInformation.UGI_PROPERTY_NAME, appenduser); |
---|
| 206 | fs = FileSystem.get(conf); |
---|
| 207 | |
---|
| 208 | // create a file |
---|
| 209 | Path dir = new Path(root, getClass().getSimpleName()); |
---|
| 210 | Path foo = new Path(dir, "foo.dat"); |
---|
| 211 | FSDataOutputStream out = null; |
---|
| 212 | int offset = 0; |
---|
| 213 | try { |
---|
| 214 | out = fs.create(foo); |
---|
| 215 | int len = 10 + AppendTestUtil.nextInt(100); |
---|
| 216 | out.write(fileContents, offset, len); |
---|
| 217 | offset += len; |
---|
| 218 | } |
---|
| 219 | finally { |
---|
| 220 | IOUtils.closeStream(out); |
---|
| 221 | } |
---|
| 222 | |
---|
| 223 | // change dir and foo to minimal permissions. |
---|
| 224 | fs.setPermission(dir, new FsPermission((short)0100)); |
---|
| 225 | fs.setPermission(foo, new FsPermission((short)0200)); |
---|
| 226 | |
---|
| 227 | // try append, should success |
---|
| 228 | out = null; |
---|
| 229 | try { |
---|
| 230 | out = fs.append(foo); |
---|
| 231 | int len = 10 + AppendTestUtil.nextInt(100); |
---|
| 232 | out.write(fileContents, offset, len); |
---|
| 233 | offset += len; |
---|
| 234 | } |
---|
| 235 | finally { |
---|
| 236 | IOUtils.closeStream(out); |
---|
| 237 | } |
---|
| 238 | |
---|
| 239 | // change dir and foo to all but no write on foo. |
---|
| 240 | fs.setPermission(foo, new FsPermission((short)0577)); |
---|
| 241 | fs.setPermission(dir, new FsPermission((short)0777)); |
---|
| 242 | |
---|
| 243 | // try append, should fail |
---|
| 244 | out = null; |
---|
| 245 | try { |
---|
| 246 | out = fs.append(foo); |
---|
| 247 | fail("Expected to have AccessControlException"); |
---|
| 248 | } |
---|
| 249 | catch(AccessControlException ace) { |
---|
| 250 | System.out.println("Good: got " + ace); |
---|
| 251 | ace.printStackTrace(System.out); |
---|
| 252 | } |
---|
| 253 | finally { |
---|
| 254 | IOUtils.closeStream(out); |
---|
| 255 | } |
---|
| 256 | } |
---|
| 257 | } catch (IOException e) { |
---|
| 258 | System.out.println("Exception :" + e); |
---|
| 259 | throw e; |
---|
| 260 | } catch (Throwable e) { |
---|
| 261 | System.out.println("Throwable :" + e); |
---|
| 262 | e.printStackTrace(); |
---|
| 263 | throw new IOException("Throwable : " + e); |
---|
| 264 | } finally { |
---|
| 265 | fs.close(); |
---|
| 266 | cluster.shutdown(); |
---|
| 267 | } |
---|
| 268 | } |
---|
| 269 | |
---|
| 270 | // |
---|
| 271 | // an object that does a bunch of appends to files |
---|
| 272 | // |
---|
| 273 | class Workload extends Thread { |
---|
| 274 | private int id; |
---|
| 275 | private MiniDFSCluster cluster; |
---|
| 276 | |
---|
| 277 | Workload(MiniDFSCluster cluster, int threadIndex) { |
---|
| 278 | id = threadIndex; |
---|
| 279 | this.cluster = cluster; |
---|
| 280 | } |
---|
| 281 | |
---|
| 282 | // create a bunch of files. Write to them and then verify. |
---|
| 283 | public void run() { |
---|
| 284 | System.out.println("Workload " + id + " starting... "); |
---|
| 285 | for (int i = 0; i < numAppendsPerThread; i++) { |
---|
| 286 | |
---|
| 287 | // pick a file at random and remove it from pool |
---|
| 288 | Path testfile = null; |
---|
| 289 | synchronized (testFiles) { |
---|
| 290 | if (testFiles.size() == 0) { |
---|
| 291 | System.out.println("Completed write to almost all files."); |
---|
| 292 | return; |
---|
| 293 | } |
---|
| 294 | int index = AppendTestUtil.nextInt(testFiles.size()); |
---|
| 295 | testfile = testFiles.remove(index); |
---|
| 296 | } |
---|
| 297 | |
---|
| 298 | long len = 0; |
---|
| 299 | int sizeToAppend = 0; |
---|
| 300 | try { |
---|
| 301 | FileSystem fs = cluster.getFileSystem(); |
---|
| 302 | |
---|
| 303 | // add a random number of bytes to file |
---|
| 304 | len = fs.getFileStatus(testfile).getLen(); |
---|
| 305 | |
---|
| 306 | // if file is already full, then pick another file |
---|
| 307 | if (len >= fileSize) { |
---|
| 308 | System.out.println("File " + testfile + " is full."); |
---|
| 309 | continue; |
---|
| 310 | } |
---|
| 311 | |
---|
| 312 | // do small size appends so that we can trigger multiple |
---|
| 313 | // appends to the same file. |
---|
| 314 | // |
---|
| 315 | int left = (int)(fileSize - len)/3; |
---|
| 316 | if (left <= 0) { |
---|
| 317 | left = 1; |
---|
| 318 | } |
---|
| 319 | sizeToAppend = AppendTestUtil.nextInt(left); |
---|
| 320 | |
---|
| 321 | System.out.println("Workload thread " + id + |
---|
| 322 | " appending " + sizeToAppend + " bytes " + |
---|
| 323 | " to file " + testfile + |
---|
| 324 | " of size " + len); |
---|
| 325 | FSDataOutputStream stm = fs.append(testfile); |
---|
| 326 | stm.write(fileContents, (int)len, sizeToAppend); |
---|
| 327 | stm.close(); |
---|
| 328 | |
---|
| 329 | // wait for the file size to be reflected in the namenode metadata |
---|
| 330 | while (fs.getFileStatus(testfile).getLen() != (len + sizeToAppend)) { |
---|
| 331 | try { |
---|
| 332 | System.out.println("Workload thread " + id + |
---|
| 333 | " file " + testfile + |
---|
| 334 | " size " + fs.getFileStatus(testfile).getLen() + |
---|
| 335 | " expected size " + (len + sizeToAppend) + |
---|
| 336 | " waiting for namenode metadata update."); |
---|
| 337 | Thread.sleep(5000); |
---|
| 338 | } catch (InterruptedException e) { |
---|
| 339 | } |
---|
| 340 | } |
---|
| 341 | |
---|
| 342 | assertTrue("File " + testfile + " size is " + |
---|
| 343 | fs.getFileStatus(testfile).getLen() + |
---|
| 344 | " but expected " + (len + sizeToAppend), |
---|
| 345 | fs.getFileStatus(testfile).getLen() == (len + sizeToAppend)); |
---|
| 346 | |
---|
| 347 | checkFile(fs, testfile, (int)(len + sizeToAppend)); |
---|
| 348 | } catch (Throwable e) { |
---|
| 349 | globalStatus = false; |
---|
| 350 | if (e != null && e.toString() != null) { |
---|
| 351 | System.out.println("Workload exception " + id + |
---|
| 352 | " testfile " + testfile + |
---|
| 353 | " " + e); |
---|
| 354 | e.printStackTrace(); |
---|
| 355 | } |
---|
| 356 | assertTrue("Workload exception " + id + " testfile " + testfile + |
---|
| 357 | " expected size " + (len + sizeToAppend), |
---|
| 358 | false); |
---|
| 359 | } |
---|
| 360 | |
---|
| 361 | // Add testfile back to the pool of files. |
---|
| 362 | synchronized (testFiles) { |
---|
| 363 | testFiles.add(testfile); |
---|
| 364 | } |
---|
| 365 | } |
---|
| 366 | } |
---|
| 367 | } |
---|
| 368 | |
---|
| 369 | /** |
---|
| 370 | * Test that appends to files at random offsets. |
---|
| 371 | */ |
---|
| 372 | public void testComplexAppend() throws IOException { |
---|
| 373 | initBuffer(fileSize); |
---|
| 374 | Configuration conf = new Configuration(); |
---|
| 375 | conf.setInt("heartbeat.recheck.interval", 2000); |
---|
| 376 | conf.setInt("dfs.heartbeat.interval", 2); |
---|
| 377 | conf.setInt("dfs.replication.pending.timeout.sec", 2); |
---|
| 378 | conf.setInt("dfs.socket.timeout", 30000); |
---|
| 379 | conf.setInt("dfs.datanode.socket.write.timeout", 30000); |
---|
| 380 | conf.setInt("dfs.datanode.handler.count", 50); |
---|
| 381 | conf.setBoolean("dfs.support.append", true); |
---|
| 382 | |
---|
| 383 | MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, |
---|
| 384 | true, null); |
---|
| 385 | cluster.waitActive(); |
---|
| 386 | FileSystem fs = cluster.getFileSystem(); |
---|
| 387 | |
---|
| 388 | try { |
---|
| 389 | // create a bunch of test files with random replication factors. |
---|
| 390 | // Insert them into a linked list. |
---|
| 391 | // |
---|
| 392 | for (int i = 0; i < numberOfFiles; i++) { |
---|
| 393 | short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1); |
---|
| 394 | Path testFile = new Path("/" + i + ".dat"); |
---|
| 395 | FSDataOutputStream stm = createFile(fs, testFile, replication); |
---|
| 396 | stm.close(); |
---|
| 397 | testFiles.add(testFile); |
---|
| 398 | } |
---|
| 399 | |
---|
| 400 | // Create threads and make them run workload concurrently. |
---|
| 401 | workload = new Workload[numThreads]; |
---|
| 402 | for (int i = 0; i < numThreads; i++) { |
---|
| 403 | workload[i] = new Workload(cluster, i); |
---|
| 404 | workload[i].start(); |
---|
| 405 | } |
---|
| 406 | |
---|
| 407 | // wait for all transactions to get over |
---|
| 408 | for (int i = 0; i < numThreads; i++) { |
---|
| 409 | try { |
---|
| 410 | System.out.println("Waiting for thread " + i + " to complete..."); |
---|
| 411 | workload[i].join(); |
---|
| 412 | System.out.println("Waiting for thread " + i + " complete."); |
---|
| 413 | } catch (InterruptedException e) { |
---|
| 414 | i--; // retry |
---|
| 415 | } |
---|
| 416 | } |
---|
| 417 | } finally { |
---|
| 418 | fs.close(); |
---|
| 419 | cluster.shutdown(); |
---|
| 420 | } |
---|
| 421 | |
---|
| 422 | // If any of the worker thread failed in their job, indicate that |
---|
| 423 | // this test failed. |
---|
| 424 | // |
---|
| 425 | assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus); |
---|
| 426 | } |
---|
| 427 | } |
---|