1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.hdfs; |
---|
19 | |
---|
20 | import java.io.BufferedReader; |
---|
21 | import java.io.File; |
---|
22 | import java.io.FileReader; |
---|
23 | import java.io.IOException; |
---|
24 | import java.net.InetSocketAddress; |
---|
25 | |
---|
26 | import org.apache.commons.logging.impl.Log4JLogger; |
---|
27 | import org.apache.hadoop.conf.Configuration; |
---|
28 | import org.apache.hadoop.fs.BlockLocation; |
---|
29 | import org.apache.hadoop.fs.FSDataInputStream; |
---|
30 | import org.apache.hadoop.fs.FSDataOutputStream; |
---|
31 | import org.apache.hadoop.fs.FileStatus; |
---|
32 | import org.apache.hadoop.fs.FileSystem; |
---|
33 | import org.apache.hadoop.fs.Path; |
---|
34 | import org.apache.hadoop.hdfs.protocol.Block; |
---|
35 | import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
---|
36 | import org.apache.hadoop.hdfs.protocol.FSConstants; |
---|
37 | import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
---|
38 | import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
---|
39 | import org.apache.hadoop.hdfs.server.datanode.DataNode; |
---|
40 | import org.apache.hadoop.hdfs.server.datanode.FSDataset; |
---|
41 | import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; |
---|
42 | import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
---|
43 | import org.apache.hadoop.hdfs.server.namenode.LeaseManager; |
---|
44 | import org.apache.hadoop.io.IOUtils; |
---|
45 | import org.apache.log4j.Level; |
---|
46 | |
---|
47 | |
---|
48 | /** |
---|
49 | * This class tests that a file need not be closed before its |
---|
50 | * data can be read by another client. |
---|
51 | */ |
---|
52 | public class TestFileCreation extends junit.framework.TestCase { |
---|
53 | static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/"; |
---|
54 | |
---|
55 | { |
---|
56 | //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); |
---|
57 | ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); |
---|
58 | ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); |
---|
59 | ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); |
---|
60 | } |
---|
61 | |
---|
62 | static final long seed = 0xDEADBEEFL; |
---|
63 | static final int blockSize = 8192; |
---|
64 | static final int numBlocks = 2; |
---|
65 | static final int fileSize = numBlocks * blockSize + 1; |
---|
66 | boolean simulatedStorage = false; |
---|
67 | |
---|
68 | // The test file is 2 times the blocksize plus one. This means that when the |
---|
69 | // entire file is written, the first two blocks definitely get flushed to |
---|
70 | // the datanodes. |
---|
71 | |
---|
72 | // creates a file but does not close it |
---|
73 | static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) |
---|
74 | throws IOException { |
---|
75 | System.out.println("createFile: Created " + name + " with " + repl + " replica."); |
---|
76 | FSDataOutputStream stm = fileSys.create(name, true, |
---|
77 | fileSys.getConf().getInt("io.file.buffer.size", 4096), |
---|
78 | (short)repl, (long)blockSize); |
---|
79 | return stm; |
---|
80 | } |
---|
81 | |
---|
82 | // |
---|
83 | // writes to file but does not close it |
---|
84 | // |
---|
85 | static void writeFile(FSDataOutputStream stm) throws IOException { |
---|
86 | writeFile(stm, fileSize); |
---|
87 | } |
---|
88 | |
---|
89 | // |
---|
90 | // writes specified bytes to file. |
---|
91 | // |
---|
92 | static void writeFile(FSDataOutputStream stm, int size) throws IOException { |
---|
93 | byte[] buffer = AppendTestUtil.randomBytes(seed, size); |
---|
94 | stm.write(buffer, 0, size); |
---|
95 | } |
---|
96 | |
---|
97 | // |
---|
98 | // verify that the data written to the full blocks are sane |
---|
99 | // |
---|
100 | private void checkFile(FileSystem fileSys, Path name, int repl) |
---|
101 | throws IOException { |
---|
102 | boolean done = false; |
---|
103 | |
---|
104 | // wait till all full blocks are confirmed by the datanodes. |
---|
105 | while (!done) { |
---|
106 | try { |
---|
107 | Thread.sleep(1000); |
---|
108 | } catch (InterruptedException e) {} |
---|
109 | done = true; |
---|
110 | BlockLocation[] locations = fileSys.getFileBlockLocations( |
---|
111 | fileSys.getFileStatus(name), 0, fileSize); |
---|
112 | if (locations.length < numBlocks) { |
---|
113 | done = false; |
---|
114 | continue; |
---|
115 | } |
---|
116 | for (int idx = 0; idx < locations.length; idx++) { |
---|
117 | if (locations[idx].getHosts().length < repl) { |
---|
118 | done = false; |
---|
119 | break; |
---|
120 | } |
---|
121 | } |
---|
122 | } |
---|
123 | FSDataInputStream stm = fileSys.open(name); |
---|
124 | final byte[] expected; |
---|
125 | if (simulatedStorage) { |
---|
126 | expected = new byte[numBlocks * blockSize]; |
---|
127 | for (int i= 0; i < expected.length; i++) { |
---|
128 | expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE; |
---|
129 | } |
---|
130 | } else { |
---|
131 | expected = AppendTestUtil.randomBytes(seed, numBlocks*blockSize); |
---|
132 | } |
---|
133 | // do a sanity check. Read the file |
---|
134 | byte[] actual = new byte[numBlocks * blockSize]; |
---|
135 | stm.readFully(0, actual); |
---|
136 | stm.close(); |
---|
137 | checkData(actual, 0, expected, "Read 1"); |
---|
138 | } |
---|
139 | |
---|
140 | static private void checkData(byte[] actual, int from, byte[] expected, String message) { |
---|
141 | for (int idx = 0; idx < actual.length; idx++) { |
---|
142 | assertEquals(message+" byte "+(from+idx)+" differs. expected "+ |
---|
143 | expected[from+idx]+" actual "+actual[idx], |
---|
144 | expected[from+idx], actual[idx]); |
---|
145 | actual[idx] = 0; |
---|
146 | } |
---|
147 | } |
---|
148 | |
---|
149 | static void checkFullFile(FileSystem fs, Path name) throws IOException { |
---|
150 | FileStatus stat = fs.getFileStatus(name); |
---|
151 | BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, |
---|
152 | fileSize); |
---|
153 | for (int idx = 0; idx < locations.length; idx++) { |
---|
154 | String[] hosts = locations[idx].getNames(); |
---|
155 | for (int i = 0; i < hosts.length; i++) { |
---|
156 | System.out.print( hosts[i] + " "); |
---|
157 | } |
---|
158 | System.out.println(" off " + locations[idx].getOffset() + |
---|
159 | " len " + locations[idx].getLength()); |
---|
160 | } |
---|
161 | |
---|
162 | byte[] expected = AppendTestUtil.randomBytes(seed, fileSize); |
---|
163 | FSDataInputStream stm = fs.open(name); |
---|
164 | byte[] actual = new byte[fileSize]; |
---|
165 | stm.readFully(0, actual); |
---|
166 | checkData(actual, 0, expected, "Read 2"); |
---|
167 | stm.close(); |
---|
168 | } |
---|
169 | |
---|
170 | /** |
---|
171 | * Test that file data becomes available before file is closed. |
---|
172 | */ |
---|
173 | public void testFileCreation() throws IOException { |
---|
174 | Configuration conf = new Configuration(); |
---|
175 | if (simulatedStorage) { |
---|
176 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
177 | } |
---|
178 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
179 | FileSystem fs = cluster.getFileSystem(); |
---|
180 | try { |
---|
181 | |
---|
182 | // |
---|
183 | // check that / exists |
---|
184 | // |
---|
185 | Path path = new Path("/"); |
---|
186 | System.out.println("Path : \"" + path.toString() + "\""); |
---|
187 | System.out.println(fs.getFileStatus(path).isDir()); |
---|
188 | assertTrue("/ should be a directory", |
---|
189 | fs.getFileStatus(path).isDir() == true); |
---|
190 | |
---|
191 | // |
---|
192 | // Create a directory inside /, then try to overwrite it |
---|
193 | // |
---|
194 | Path dir1 = new Path("/test_dir"); |
---|
195 | fs.mkdirs(dir1); |
---|
196 | System.out.println("createFile: Creating " + dir1.getName() + |
---|
197 | " for overwrite of existing directory."); |
---|
198 | try { |
---|
199 | fs.create(dir1, true); // Create path, overwrite=true |
---|
200 | fs.close(); |
---|
201 | assertTrue("Did not prevent directory from being overwritten.", false); |
---|
202 | } catch (IOException ie) { |
---|
203 | if (!ie.getMessage().contains("already exists as a directory.")) |
---|
204 | throw ie; |
---|
205 | } |
---|
206 | |
---|
207 | // create a new file in home directory. Do not close it. |
---|
208 | // |
---|
209 | Path file1 = new Path("filestatus.dat"); |
---|
210 | FSDataOutputStream stm = createFile(fs, file1, 1); |
---|
211 | |
---|
212 | // verify that file exists in FS namespace |
---|
213 | assertTrue(file1 + " should be a file", |
---|
214 | fs.getFileStatus(file1).isDir() == false); |
---|
215 | System.out.println("Path : \"" + file1 + "\""); |
---|
216 | |
---|
217 | // write to file |
---|
218 | writeFile(stm); |
---|
219 | |
---|
220 | // Make sure a client can read it before it is closed. |
---|
221 | checkFile(fs, file1, 1); |
---|
222 | |
---|
223 | // verify that file size has changed |
---|
224 | long len = fs.getFileStatus(file1).getLen(); |
---|
225 | assertTrue(file1 + " should be of size " + (numBlocks * blockSize) + |
---|
226 | " but found to be of size " + len, |
---|
227 | len == numBlocks * blockSize); |
---|
228 | |
---|
229 | stm.close(); |
---|
230 | |
---|
231 | // verify that file size has changed to the full size |
---|
232 | len = fs.getFileStatus(file1).getLen(); |
---|
233 | assertTrue(file1 + " should be of size " + fileSize + |
---|
234 | " but found to be of size " + len, |
---|
235 | len == fileSize); |
---|
236 | |
---|
237 | |
---|
238 | // Check storage usage |
---|
239 | // can't check capacities for real storage since the OS file system may be changing under us. |
---|
240 | if (simulatedStorage) { |
---|
241 | DataNode dn = cluster.getDataNodes().get(0); |
---|
242 | assertEquals(fileSize, dn.getFSDataset().getDfsUsed()); |
---|
243 | assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining()); |
---|
244 | } |
---|
245 | } finally { |
---|
246 | cluster.shutdown(); |
---|
247 | } |
---|
248 | } |
---|
249 | |
---|
250 | /** |
---|
251 | * Test deleteOnExit |
---|
252 | */ |
---|
253 | public void testDeleteOnExit() throws IOException { |
---|
254 | Configuration conf = new Configuration(); |
---|
255 | if (simulatedStorage) { |
---|
256 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
257 | } |
---|
258 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
259 | FileSystem fs = cluster.getFileSystem(); |
---|
260 | FileSystem localfs = FileSystem.getLocal(conf); |
---|
261 | |
---|
262 | try { |
---|
263 | |
---|
264 | // Creates files in HDFS and local file system. |
---|
265 | // |
---|
266 | Path file1 = new Path("filestatus.dat"); |
---|
267 | Path file2 = new Path("filestatus2.dat"); |
---|
268 | Path file3 = new Path("filestatus3.dat"); |
---|
269 | FSDataOutputStream stm1 = createFile(fs, file1, 1); |
---|
270 | FSDataOutputStream stm2 = createFile(fs, file2, 1); |
---|
271 | FSDataOutputStream stm3 = createFile(localfs, file3, 1); |
---|
272 | System.out.println("DeleteOnExit: Created files."); |
---|
273 | |
---|
274 | // write to files and close. Purposely, do not close file2. |
---|
275 | writeFile(stm1); |
---|
276 | writeFile(stm3); |
---|
277 | stm1.close(); |
---|
278 | stm2.close(); |
---|
279 | stm3.close(); |
---|
280 | |
---|
281 | // set delete on exit flag on files. |
---|
282 | fs.deleteOnExit(file1); |
---|
283 | fs.deleteOnExit(file2); |
---|
284 | localfs.deleteOnExit(file3); |
---|
285 | |
---|
286 | // close the file system. This should make the above files |
---|
287 | // disappear. |
---|
288 | fs.close(); |
---|
289 | localfs.close(); |
---|
290 | fs = null; |
---|
291 | localfs = null; |
---|
292 | |
---|
293 | // reopen file system and verify that file does not exist. |
---|
294 | fs = cluster.getFileSystem(); |
---|
295 | localfs = FileSystem.getLocal(conf); |
---|
296 | |
---|
297 | assertTrue(file1 + " still exists inspite of deletOnExit set.", |
---|
298 | !fs.exists(file1)); |
---|
299 | assertTrue(file2 + " still exists inspite of deletOnExit set.", |
---|
300 | !fs.exists(file2)); |
---|
301 | assertTrue(file3 + " still exists inspite of deletOnExit set.", |
---|
302 | !localfs.exists(file3)); |
---|
303 | System.out.println("DeleteOnExit successful."); |
---|
304 | |
---|
305 | } finally { |
---|
306 | IOUtils.closeStream(fs); |
---|
307 | IOUtils.closeStream(localfs); |
---|
308 | cluster.shutdown(); |
---|
309 | } |
---|
310 | } |
---|
311 | |
---|
312 | /** |
---|
313 | * Test that file data does not become corrupted even in the face of errors. |
---|
314 | */ |
---|
315 | public void testFileCreationError1() throws IOException { |
---|
316 | Configuration conf = new Configuration(); |
---|
317 | conf.setInt("heartbeat.recheck.interval", 1000); |
---|
318 | conf.setInt("dfs.heartbeat.interval", 1); |
---|
319 | if (simulatedStorage) { |
---|
320 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
321 | } |
---|
322 | // create cluster |
---|
323 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
324 | FileSystem fs = cluster.getFileSystem(); |
---|
325 | cluster.waitActive(); |
---|
326 | InetSocketAddress addr = new InetSocketAddress("localhost", |
---|
327 | cluster.getNameNodePort()); |
---|
328 | DFSClient client = new DFSClient(addr, conf); |
---|
329 | |
---|
330 | try { |
---|
331 | |
---|
332 | // create a new file. |
---|
333 | // |
---|
334 | Path file1 = new Path("/filestatus.dat"); |
---|
335 | FSDataOutputStream stm = createFile(fs, file1, 1); |
---|
336 | |
---|
337 | // verify that file exists in FS namespace |
---|
338 | assertTrue(file1 + " should be a file", |
---|
339 | fs.getFileStatus(file1).isDir() == false); |
---|
340 | System.out.println("Path : \"" + file1 + "\""); |
---|
341 | |
---|
342 | // kill the datanode |
---|
343 | cluster.shutdownDataNodes(); |
---|
344 | |
---|
345 | // wait for the datanode to be declared dead |
---|
346 | while (true) { |
---|
347 | DatanodeInfo[] info = client.datanodeReport( |
---|
348 | FSConstants.DatanodeReportType.LIVE); |
---|
349 | if (info.length == 0) { |
---|
350 | break; |
---|
351 | } |
---|
352 | System.out.println("testFileCreationError1: waiting for datanode " + |
---|
353 | " to die."); |
---|
354 | try { |
---|
355 | Thread.sleep(1000); |
---|
356 | } catch (InterruptedException e) { |
---|
357 | } |
---|
358 | } |
---|
359 | |
---|
360 | // write 1 byte to file. |
---|
361 | // This should fail because all datanodes are dead. |
---|
362 | byte[] buffer = AppendTestUtil.randomBytes(seed, 1); |
---|
363 | try { |
---|
364 | stm.write(buffer); |
---|
365 | stm.close(); |
---|
366 | } catch (Exception e) { |
---|
367 | System.out.println("Encountered expected exception"); |
---|
368 | } |
---|
369 | |
---|
370 | // verify that no blocks are associated with this file |
---|
371 | // bad block allocations were cleaned up earlier. |
---|
372 | LocatedBlocks locations = client.namenode.getBlockLocations( |
---|
373 | file1.toString(), 0, Long.MAX_VALUE); |
---|
374 | System.out.println("locations = " + locations.locatedBlockCount()); |
---|
375 | assertTrue("Error blocks were not cleaned up", |
---|
376 | locations.locatedBlockCount() == 0); |
---|
377 | } finally { |
---|
378 | cluster.shutdown(); |
---|
379 | client.close(); |
---|
380 | } |
---|
381 | } |
---|
382 | |
---|
383 | /** |
---|
384 | * Test that the filesystem removes the last block from a file if its |
---|
385 | * lease expires. |
---|
386 | */ |
---|
387 | public void testFileCreationError2() throws IOException { |
---|
388 | long leasePeriod = 1000; |
---|
389 | System.out.println("testFileCreationError2 start"); |
---|
390 | Configuration conf = new Configuration(); |
---|
391 | conf.setInt("heartbeat.recheck.interval", 1000); |
---|
392 | conf.setInt("dfs.heartbeat.interval", 1); |
---|
393 | if (simulatedStorage) { |
---|
394 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
395 | } |
---|
396 | // create cluster |
---|
397 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
398 | DistributedFileSystem dfs = null; |
---|
399 | try { |
---|
400 | cluster.waitActive(); |
---|
401 | dfs = (DistributedFileSystem)cluster.getFileSystem(); |
---|
402 | DFSClient client = dfs.dfs; |
---|
403 | |
---|
404 | // create a new file. |
---|
405 | // |
---|
406 | Path file1 = new Path("/filestatus.dat"); |
---|
407 | createFile(dfs, file1, 1); |
---|
408 | System.out.println("testFileCreationError2: " |
---|
409 | + "Created file filestatus.dat with one replicas."); |
---|
410 | |
---|
411 | LocatedBlocks locations = client.namenode.getBlockLocations( |
---|
412 | file1.toString(), 0, Long.MAX_VALUE); |
---|
413 | System.out.println("testFileCreationError2: " |
---|
414 | + "The file has " + locations.locatedBlockCount() + " blocks."); |
---|
415 | |
---|
416 | // add another block to the file |
---|
417 | LocatedBlock location = client.namenode.addBlock(file1.toString(), |
---|
418 | client.clientName); |
---|
419 | System.out.println("testFileCreationError2: " |
---|
420 | + "Added block " + location.getBlock()); |
---|
421 | |
---|
422 | locations = client.namenode.getBlockLocations(file1.toString(), |
---|
423 | 0, Long.MAX_VALUE); |
---|
424 | int count = locations.locatedBlockCount(); |
---|
425 | System.out.println("testFileCreationError2: " |
---|
426 | + "The file now has " + count + " blocks."); |
---|
427 | |
---|
428 | // set the soft and hard limit to be 1 second so that the |
---|
429 | // namenode triggers lease recovery |
---|
430 | cluster.setLeasePeriod(leasePeriod, leasePeriod); |
---|
431 | |
---|
432 | // wait for the lease to expire |
---|
433 | try { |
---|
434 | Thread.sleep(5 * leasePeriod); |
---|
435 | } catch (InterruptedException e) { |
---|
436 | } |
---|
437 | |
---|
438 | // verify that the last block was synchronized. |
---|
439 | locations = client.namenode.getBlockLocations(file1.toString(), |
---|
440 | 0, Long.MAX_VALUE); |
---|
441 | System.out.println("testFileCreationError2: " |
---|
442 | + "locations = " + locations.locatedBlockCount()); |
---|
443 | assertEquals(0, locations.locatedBlockCount()); |
---|
444 | System.out.println("testFileCreationError2 successful"); |
---|
445 | } finally { |
---|
446 | IOUtils.closeStream(dfs); |
---|
447 | cluster.shutdown(); |
---|
448 | } |
---|
449 | } |
---|
450 | |
---|
451 | /** |
---|
452 | * Test that file leases are persisted across namenode restarts. |
---|
453 | * This test is currently not triggered because more HDFS work is |
---|
454 | * is needed to handle persistent leases. |
---|
455 | */ |
---|
456 | public void xxxtestFileCreationNamenodeRestart() throws IOException { |
---|
457 | Configuration conf = new Configuration(); |
---|
458 | final int MAX_IDLE_TIME = 2000; // 2s |
---|
459 | conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); |
---|
460 | conf.setInt("heartbeat.recheck.interval", 1000); |
---|
461 | conf.setInt("dfs.heartbeat.interval", 1); |
---|
462 | if (simulatedStorage) { |
---|
463 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
464 | } |
---|
465 | |
---|
466 | // create cluster |
---|
467 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
468 | FileSystem fs = null; |
---|
469 | try { |
---|
470 | cluster.waitActive(); |
---|
471 | fs = cluster.getFileSystem(); |
---|
472 | final int nnport = cluster.getNameNodePort(); |
---|
473 | |
---|
474 | // create a new file. |
---|
475 | Path file1 = new Path("/filestatus.dat"); |
---|
476 | FSDataOutputStream stm = createFile(fs, file1, 1); |
---|
477 | System.out.println("testFileCreationNamenodeRestart: " |
---|
478 | + "Created file " + file1); |
---|
479 | |
---|
480 | // write two full blocks. |
---|
481 | writeFile(stm, numBlocks * blockSize); |
---|
482 | stm.sync(); |
---|
483 | |
---|
484 | // rename file wile keeping it open. |
---|
485 | Path fileRenamed = new Path("/filestatusRenamed.dat"); |
---|
486 | fs.rename(file1, fileRenamed); |
---|
487 | System.out.println("testFileCreationNamenodeRestart: " |
---|
488 | + "Renamed file " + file1 + " to " + |
---|
489 | fileRenamed); |
---|
490 | file1 = fileRenamed; |
---|
491 | |
---|
492 | // create another new file. |
---|
493 | // |
---|
494 | Path file2 = new Path("/filestatus2.dat"); |
---|
495 | FSDataOutputStream stm2 = createFile(fs, file2, 1); |
---|
496 | System.out.println("testFileCreationNamenodeRestart: " |
---|
497 | + "Created file " + file2); |
---|
498 | |
---|
499 | // create yet another new file with full path name. |
---|
500 | // rename it while open |
---|
501 | // |
---|
502 | Path file3 = new Path("/user/home/fullpath.dat"); |
---|
503 | FSDataOutputStream stm3 = createFile(fs, file3, 1); |
---|
504 | System.out.println("testFileCreationNamenodeRestart: " |
---|
505 | + "Created file " + file3); |
---|
506 | Path file4 = new Path("/user/home/fullpath4.dat"); |
---|
507 | FSDataOutputStream stm4 = createFile(fs, file4, 1); |
---|
508 | System.out.println("testFileCreationNamenodeRestart: " |
---|
509 | + "Created file " + file4); |
---|
510 | |
---|
511 | fs.mkdirs(new Path("/bin")); |
---|
512 | fs.rename(new Path("/user/home"), new Path("/bin")); |
---|
513 | Path file3new = new Path("/bin/home/fullpath.dat"); |
---|
514 | System.out.println("testFileCreationNamenodeRestart: " |
---|
515 | + "Renamed file " + file3 + " to " + |
---|
516 | file3new); |
---|
517 | Path file4new = new Path("/bin/home/fullpath4.dat"); |
---|
518 | System.out.println("testFileCreationNamenodeRestart: " |
---|
519 | + "Renamed file " + file4 + " to " + |
---|
520 | file4new); |
---|
521 | |
---|
522 | // restart cluster with the same namenode port as before. |
---|
523 | // This ensures that leases are persisted in fsimage. |
---|
524 | cluster.shutdown(); |
---|
525 | try { |
---|
526 | Thread.sleep(2*MAX_IDLE_TIME); |
---|
527 | } catch (InterruptedException e) { |
---|
528 | } |
---|
529 | cluster = new MiniDFSCluster(nnport, conf, 1, false, true, |
---|
530 | null, null, null); |
---|
531 | cluster.waitActive(); |
---|
532 | |
---|
533 | // restart cluster yet again. This triggers the code to read in |
---|
534 | // persistent leases from fsimage. |
---|
535 | cluster.shutdown(); |
---|
536 | try { |
---|
537 | Thread.sleep(5000); |
---|
538 | } catch (InterruptedException e) { |
---|
539 | } |
---|
540 | cluster = new MiniDFSCluster(nnport, conf, 1, false, true, |
---|
541 | null, null, null); |
---|
542 | cluster.waitActive(); |
---|
543 | fs = cluster.getFileSystem(); |
---|
544 | |
---|
545 | // instruct the dfsclient to use a new filename when it requests |
---|
546 | // new blocks for files that were renamed. |
---|
547 | DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream) |
---|
548 | (stm.getWrappedStream()); |
---|
549 | dfstream.setTestFilename(file1.toString()); |
---|
550 | dfstream = (DFSClient.DFSOutputStream) (stm3.getWrappedStream()); |
---|
551 | dfstream.setTestFilename(file3new.toString()); |
---|
552 | dfstream = (DFSClient.DFSOutputStream) (stm4.getWrappedStream()); |
---|
553 | dfstream.setTestFilename(file4new.toString()); |
---|
554 | |
---|
555 | // write 1 byte to file. This should succeed because the |
---|
556 | // namenode should have persisted leases. |
---|
557 | byte[] buffer = AppendTestUtil.randomBytes(seed, 1); |
---|
558 | stm.write(buffer); |
---|
559 | stm.close(); |
---|
560 | stm2.write(buffer); |
---|
561 | stm2.close(); |
---|
562 | stm3.close(); |
---|
563 | stm4.close(); |
---|
564 | |
---|
565 | // verify that new block is associated with this file |
---|
566 | DFSClient client = ((DistributedFileSystem)fs).dfs; |
---|
567 | LocatedBlocks locations = client.namenode.getBlockLocations( |
---|
568 | file1.toString(), 0, Long.MAX_VALUE); |
---|
569 | System.out.println("locations = " + locations.locatedBlockCount()); |
---|
570 | assertTrue("Error blocks were not cleaned up for file " + file1, |
---|
571 | locations.locatedBlockCount() == 3); |
---|
572 | |
---|
573 | // verify filestatus2.dat |
---|
574 | locations = client.namenode.getBlockLocations( |
---|
575 | file2.toString(), 0, Long.MAX_VALUE); |
---|
576 | System.out.println("locations = " + locations.locatedBlockCount()); |
---|
577 | assertTrue("Error blocks were not cleaned up for file " + file2, |
---|
578 | locations.locatedBlockCount() == 1); |
---|
579 | } finally { |
---|
580 | IOUtils.closeStream(fs); |
---|
581 | cluster.shutdown(); |
---|
582 | } |
---|
583 | } |
---|
584 | |
---|
585 | /** |
---|
586 | * Test that all open files are closed when client dies abnormally. |
---|
587 | */ |
---|
588 | public void testDFSClientDeath() throws IOException { |
---|
589 | Configuration conf = new Configuration(); |
---|
590 | System.out.println("Testing adbornal client death."); |
---|
591 | if (simulatedStorage) { |
---|
592 | conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
---|
593 | } |
---|
594 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
595 | FileSystem fs = cluster.getFileSystem(); |
---|
596 | DistributedFileSystem dfs = (DistributedFileSystem) fs; |
---|
597 | DFSClient dfsclient = dfs.dfs; |
---|
598 | try { |
---|
599 | |
---|
600 | // create a new file in home directory. Do not close it. |
---|
601 | // |
---|
602 | Path file1 = new Path("/clienttest.dat"); |
---|
603 | FSDataOutputStream stm = createFile(fs, file1, 1); |
---|
604 | System.out.println("Created file clienttest.dat"); |
---|
605 | |
---|
606 | // write to file |
---|
607 | writeFile(stm); |
---|
608 | |
---|
609 | // close the dfsclient before closing the output stream. |
---|
610 | // This should close all existing file. |
---|
611 | dfsclient.close(); |
---|
612 | |
---|
613 | // reopen file system and verify that file exists. |
---|
614 | assertTrue(file1 + " does not exist.", |
---|
615 | AppendTestUtil.createHdfsWithDifferentUsername(conf).exists(file1)); |
---|
616 | } finally { |
---|
617 | cluster.shutdown(); |
---|
618 | } |
---|
619 | } |
---|
620 | |
---|
621 | /** |
---|
622 | * Test that file data becomes available before file is closed. |
---|
623 | */ |
---|
624 | public void testFileCreationSimulated() throws IOException { |
---|
625 | simulatedStorage = true; |
---|
626 | testFileCreation(); |
---|
627 | simulatedStorage = false; |
---|
628 | } |
---|
629 | |
---|
630 | /** |
---|
631 | * Test creating two files at the same time. |
---|
632 | */ |
---|
633 | public void testConcurrentFileCreation() throws IOException { |
---|
634 | Configuration conf = new Configuration(); |
---|
635 | MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
---|
636 | |
---|
637 | try { |
---|
638 | FileSystem fs = cluster.getFileSystem(); |
---|
639 | |
---|
640 | Path[] p = {new Path("/foo"), new Path("/bar")}; |
---|
641 | |
---|
642 | //write 2 files at the same time |
---|
643 | FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])}; |
---|
644 | int i = 0; |
---|
645 | for(; i < 100; i++) { |
---|
646 | out[0].write(i); |
---|
647 | out[1].write(i); |
---|
648 | } |
---|
649 | out[0].close(); |
---|
650 | for(; i < 200; i++) {out[1].write(i);} |
---|
651 | out[1].close(); |
---|
652 | |
---|
653 | //verify |
---|
654 | FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])}; |
---|
655 | for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());} |
---|
656 | for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());} |
---|
657 | } finally { |
---|
658 | if (cluster != null) {cluster.shutdown();} |
---|
659 | } |
---|
660 | } |
---|
661 | |
---|
662 | /** |
---|
663 | * Create a file, write something, fsync but not close. |
---|
664 | * Then change lease period and wait for lease recovery. |
---|
665 | * Finally, read the block directly from each Datanode and verify the content. |
---|
666 | */ |
---|
667 | public void testLeaseExpireHardLimit() throws Exception { |
---|
668 | System.out.println("testLeaseExpireHardLimit start"); |
---|
669 | final long leasePeriod = 1000; |
---|
670 | final int DATANODE_NUM = 3; |
---|
671 | |
---|
672 | Configuration conf = new Configuration(); |
---|
673 | conf.setInt("heartbeat.recheck.interval", 1000); |
---|
674 | conf.setInt("dfs.heartbeat.interval", 1); |
---|
675 | |
---|
676 | // create cluster |
---|
677 | MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null); |
---|
678 | DistributedFileSystem dfs = null; |
---|
679 | try { |
---|
680 | cluster.waitActive(); |
---|
681 | dfs = (DistributedFileSystem)cluster.getFileSystem(); |
---|
682 | |
---|
683 | // create a new file. |
---|
684 | final String f = DIR + "foo"; |
---|
685 | final Path fpath = new Path(f); |
---|
686 | FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM); |
---|
687 | out.write("something".getBytes()); |
---|
688 | out.sync(); |
---|
689 | |
---|
690 | // set the soft and hard limit to be 1 second so that the |
---|
691 | // namenode triggers lease recovery |
---|
692 | cluster.setLeasePeriod(leasePeriod, leasePeriod); |
---|
693 | // wait for the lease to expire |
---|
694 | try {Thread.sleep(5 * leasePeriod);} catch (InterruptedException e) {} |
---|
695 | |
---|
696 | LocatedBlocks locations = dfs.dfs.namenode.getBlockLocations( |
---|
697 | f, 0, Long.MAX_VALUE); |
---|
698 | assertEquals(1, locations.locatedBlockCount()); |
---|
699 | LocatedBlock locatedblock = locations.getLocatedBlocks().get(0); |
---|
700 | int successcount = 0; |
---|
701 | for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) { |
---|
702 | DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort); |
---|
703 | FSDataset dataset = (FSDataset)datanode.data; |
---|
704 | Block b = dataset.getStoredBlock(locatedblock.getBlock().getBlockId()); |
---|
705 | File blockfile = dataset.findBlockFile(b.getBlockId()); |
---|
706 | System.out.println("blockfile=" + blockfile); |
---|
707 | if (blockfile != null) { |
---|
708 | BufferedReader in = new BufferedReader(new FileReader(blockfile)); |
---|
709 | assertEquals("something", in.readLine()); |
---|
710 | in.close(); |
---|
711 | successcount++; |
---|
712 | } |
---|
713 | } |
---|
714 | System.out.println("successcount=" + successcount); |
---|
715 | assertTrue(successcount > 0); |
---|
716 | } finally { |
---|
717 | IOUtils.closeStream(dfs); |
---|
718 | cluster.shutdown(); |
---|
719 | } |
---|
720 | |
---|
721 | System.out.println("testLeaseExpireHardLimit successful"); |
---|
722 | } |
---|
723 | |
---|
724 | // test closing file system before all file handles are closed. |
---|
725 | public void testFsClose() throws Exception { |
---|
726 | System.out.println("test file system close start"); |
---|
727 | final int DATANODE_NUM = 3; |
---|
728 | |
---|
729 | Configuration conf = new Configuration(); |
---|
730 | |
---|
731 | // create cluster |
---|
732 | MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null); |
---|
733 | DistributedFileSystem dfs = null; |
---|
734 | try { |
---|
735 | cluster.waitActive(); |
---|
736 | dfs = (DistributedFileSystem)cluster.getFileSystem(); |
---|
737 | |
---|
738 | // create a new file. |
---|
739 | final String f = DIR + "foofs"; |
---|
740 | final Path fpath = new Path(f); |
---|
741 | FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM); |
---|
742 | out.write("something".getBytes()); |
---|
743 | |
---|
744 | // close file system without closing file |
---|
745 | dfs.close(); |
---|
746 | } finally { |
---|
747 | System.out.println("testFsClose successful"); |
---|
748 | } |
---|
749 | } |
---|
750 | } |
---|