source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/fs/TestCopyFiles.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 32.9 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.fs;
20
21import java.io.ByteArrayOutputStream;
22import java.io.FileNotFoundException;
23import java.io.IOException;
24import java.io.PrintStream;
25import java.net.URI;
26import java.util.ArrayList;
27import java.util.List;
28import java.util.Random;
29import java.util.StringTokenizer;
30
31import junit.framework.TestCase;
32
33import org.apache.commons.logging.LogFactory;
34import org.apache.commons.logging.impl.Log4JLogger;
35import org.apache.hadoop.conf.Configuration;
36import org.apache.hadoop.fs.permission.FsPermission;
37import org.apache.hadoop.hdfs.MiniDFSCluster;
38import org.apache.hadoop.hdfs.server.datanode.DataNode;
39import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
40import org.apache.hadoop.mapred.MiniMRCluster;
41import org.apache.hadoop.security.UnixUserGroupInformation;
42import org.apache.hadoop.security.UserGroupInformation;
43import org.apache.hadoop.tools.DistCp;
44import org.apache.hadoop.util.ToolRunner;
45import org.apache.log4j.Level;
46
47
48/**
49 * A JUnit test for copying files recursively.
50 */
51public class TestCopyFiles extends TestCase {
52  {
53    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
54        ).getLogger().setLevel(Level.OFF);
55    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
56    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
57    ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
58  }
59 
60  static final URI LOCAL_FS = URI.create("file:///");
61 
62  private static final Random RAN = new Random();
63  private static final int NFILES = 20;
64  private static String TEST_ROOT_DIR =
65    new Path(System.getProperty("test.build.data","/tmp"))
66    .toString().replace(' ', '+');
67
68  /** class MyFile contains enough information to recreate the contents of
69   * a single file.
70   */
71  private static class MyFile {
72    private static Random gen = new Random();
73    private static final int MAX_LEVELS = 3;
74    private static final int MAX_SIZE = 8*1024;
75    private static String[] dirNames = {
76      "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
77    };
78    private final String name;
79    private int size = 0;
80    private long seed = 0L;
81
82    MyFile() {
83      this(gen.nextInt(MAX_LEVELS));
84    }
85    MyFile(int nLevels) {
86      String xname = "";
87      if (nLevels != 0) {
88        int[] levels = new int[nLevels];
89        for (int idx = 0; idx < nLevels; idx++) {
90          levels[idx] = gen.nextInt(10);
91        }
92        StringBuffer sb = new StringBuffer();
93        for (int idx = 0; idx < nLevels; idx++) {
94          sb.append(dirNames[levels[idx]]);
95          sb.append("/");
96        }
97        xname = sb.toString();
98      }
99      long fidx = gen.nextLong() & Long.MAX_VALUE;
100      name = xname + Long.toString(fidx);
101      reset();
102    }
103    void reset() {
104      final int oldsize = size;
105      do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size);
106      final long oldseed = seed;
107      do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed);
108    }
109    String getName() { return name; }
110    int getSize() { return size; }
111    long getSeed() { return seed; }
112  }
113
114  private static MyFile[] createFiles(URI fsname, String topdir)
115    throws IOException {
116    return createFiles(FileSystem.get(fsname, new Configuration()), topdir);
117  }
118
119  /** create NFILES with random names and directory hierarchies
120   * with random (but reproducible) data in them.
121   */
122  private static MyFile[] createFiles(FileSystem fs, String topdir)
123    throws IOException {
124    Path root = new Path(topdir);
125    MyFile[] files = new MyFile[NFILES];
126    for (int i = 0; i < NFILES; i++) {
127      files[i] = createFile(root, fs);
128    }
129    return files;
130  }
131
132  static MyFile createFile(Path root, FileSystem fs, int levels)
133      throws IOException {
134    MyFile f = levels < 0 ? new MyFile() : new MyFile(levels);
135    Path p = new Path(root, f.getName());
136    FSDataOutputStream out = fs.create(p);
137    byte[] toWrite = new byte[f.getSize()];
138    new Random(f.getSeed()).nextBytes(toWrite);
139    out.write(toWrite);
140    out.close();
141    FileSystem.LOG.info("created: " + p + ", size=" + f.getSize());
142    return f;
143  }
144
145  static MyFile createFile(Path root, FileSystem fs) throws IOException {
146    return createFile(root, fs, -1);
147  }
148
149  private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files
150      ) throws IOException {
151    return checkFiles(fs, topdir, files, false);   
152  }
153
154  private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files,
155      boolean existingOnly) throws IOException {
156    Path root = new Path(topdir);
157   
158    for (int idx = 0; idx < files.length; idx++) {
159      Path fPath = new Path(root, files[idx].getName());
160      try {
161        fs.getFileStatus(fPath);
162        FSDataInputStream in = fs.open(fPath);
163        byte[] toRead = new byte[files[idx].getSize()];
164        byte[] toCompare = new byte[files[idx].getSize()];
165        Random rb = new Random(files[idx].getSeed());
166        rb.nextBytes(toCompare);
167        assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
168        in.close();
169        for (int i = 0; i < toRead.length; i++) {
170          if (toRead[i] != toCompare[i]) {
171            return false;
172          }
173        }
174        toRead = null;
175        toCompare = null;
176      }
177      catch(FileNotFoundException fnfe) {
178        if (!existingOnly) {
179          throw fnfe;
180        }
181      }
182    }
183   
184    return true;
185  }
186
187  private static void updateFiles(FileSystem fs, String topdir, MyFile[] files,
188        int nupdate) throws IOException {
189    assert nupdate <= NFILES;
190
191    Path root = new Path(topdir);
192
193    for (int idx = 0; idx < nupdate; ++idx) {
194      Path fPath = new Path(root, files[idx].getName());
195      // overwrite file
196      assertTrue(fPath.toString() + " does not exist", fs.exists(fPath));
197      FSDataOutputStream out = fs.create(fPath);
198      files[idx].reset();
199      byte[] toWrite = new byte[files[idx].getSize()];
200      Random rb = new Random(files[idx].getSeed());
201      rb.nextBytes(toWrite);
202      out.write(toWrite);
203      out.close();
204    }
205  }
206
207  private static FileStatus[] getFileStatus(FileSystem fs,
208      String topdir, MyFile[] files) throws IOException {
209    return getFileStatus(fs, topdir, files, false);
210  }
211  private static FileStatus[] getFileStatus(FileSystem fs,
212      String topdir, MyFile[] files, boolean existingOnly) throws IOException {
213    Path root = new Path(topdir);
214    List<FileStatus> statuses = new ArrayList<FileStatus>();
215    for (int idx = 0; idx < NFILES; ++idx) {
216      try {
217        statuses.add(fs.getFileStatus(new Path(root, files[idx].getName())));
218      } catch(FileNotFoundException fnfe) {
219        if (!existingOnly) {
220          throw fnfe;
221        }
222      }
223    }
224    return statuses.toArray(new FileStatus[statuses.size()]);
225  }
226
227  private static boolean checkUpdate(FileSystem fs, FileStatus[] old,
228      String topdir, MyFile[] upd, final int nupdate) throws IOException {
229    Path root = new Path(topdir);
230
231    // overwrote updated files
232    for (int idx = 0; idx < nupdate; ++idx) {
233      final FileStatus stat =
234        fs.getFileStatus(new Path(root, upd[idx].getName()));
235      if (stat.getModificationTime() <= old[idx].getModificationTime()) {
236        return false;
237      }
238    }
239    // did not overwrite files not updated
240    for (int idx = nupdate; idx < NFILES; ++idx) {
241      final FileStatus stat =
242        fs.getFileStatus(new Path(root, upd[idx].getName()));
243      if (stat.getModificationTime() != old[idx].getModificationTime()) {
244        return false;
245      }
246    }
247    return true;
248  }
249
250  /** delete directory and everything underneath it.*/
251  private static void deldir(FileSystem fs, String topdir) throws IOException {
252    fs.delete(new Path(topdir), true);
253  }
254 
255  /** copy files from local file system to local file system */
256  public void testCopyFromLocalToLocal() throws Exception {
257    Configuration conf = new Configuration();
258    FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
259    MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
260    ToolRunner.run(new DistCp(new Configuration()),
261                           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
262                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
263    assertTrue("Source and destination directories do not match.",
264               checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
265    deldir(localfs, TEST_ROOT_DIR+"/destdat");
266    deldir(localfs, TEST_ROOT_DIR+"/srcdat");
267  }
268 
269  /** copy files from dfs file system to dfs file system */
270  public void testCopyFromDfsToDfs() throws Exception {
271    String namenode = null;
272    MiniDFSCluster cluster = null;
273    try {
274      Configuration conf = new Configuration();
275      cluster = new MiniDFSCluster(conf, 2, true, null);
276      final FileSystem hdfs = cluster.getFileSystem();
277      namenode = FileSystem.getDefaultUri(conf).toString();
278      if (namenode.startsWith("hdfs://")) {
279        MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
280        ToolRunner.run(new DistCp(conf), new String[] {
281                                         "-log",
282                                         namenode+"/logs",
283                                         namenode+"/srcdat",
284                                         namenode+"/destdat"});
285        assertTrue("Source and destination directories do not match.",
286                   checkFiles(hdfs, "/destdat", files));
287        FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
288        assertTrue("Log directory does not exist.",
289                   fs.exists(new Path(namenode+"/logs")));
290        deldir(hdfs, "/destdat");
291        deldir(hdfs, "/srcdat");
292        deldir(hdfs, "/logs");
293      }
294    } finally {
295      if (cluster != null) { cluster.shutdown(); }
296    }
297  }
298 
299  /** copy files from local file system to dfs file system */
300  public void testCopyFromLocalToDfs() throws Exception {
301    MiniDFSCluster cluster = null;
302    try {
303      Configuration conf = new Configuration();
304      cluster = new MiniDFSCluster(conf, 1, true, null);
305      final FileSystem hdfs = cluster.getFileSystem();
306      final String namenode = hdfs.getUri().toString();
307      if (namenode.startsWith("hdfs://")) {
308        MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
309        ToolRunner.run(new DistCp(conf), new String[] {
310                                         "-log",
311                                         namenode+"/logs",
312                                         "file:///"+TEST_ROOT_DIR+"/srcdat",
313                                         namenode+"/destdat"});
314        assertTrue("Source and destination directories do not match.",
315                   checkFiles(cluster.getFileSystem(), "/destdat", files));
316        assertTrue("Log directory does not exist.",
317                    hdfs.exists(new Path(namenode+"/logs")));
318        deldir(hdfs, "/destdat");
319        deldir(hdfs, "/logs");
320        deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
321      }
322    } finally {
323      if (cluster != null) { cluster.shutdown(); }
324    }
325  }
326
327  /** copy files from dfs file system to local file system */
328  public void testCopyFromDfsToLocal() throws Exception {
329    MiniDFSCluster cluster = null;
330    try {
331      Configuration conf = new Configuration();
332      final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
333      cluster = new MiniDFSCluster(conf, 1, true, null);
334      final FileSystem hdfs = cluster.getFileSystem();
335      final String namenode = FileSystem.getDefaultUri(conf).toString();
336      if (namenode.startsWith("hdfs://")) {
337        MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
338        ToolRunner.run(new DistCp(conf), new String[] {
339                                         "-log",
340                                         "/logs",
341                                         namenode+"/srcdat",
342                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
343        assertTrue("Source and destination directories do not match.",
344                   checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
345        assertTrue("Log directory does not exist.",
346                    hdfs.exists(new Path("/logs")));
347        deldir(localfs, TEST_ROOT_DIR+"/destdat");
348        deldir(hdfs, "/logs");
349        deldir(hdfs, "/srcdat");
350      }
351    } finally {
352      if (cluster != null) { cluster.shutdown(); }
353    }
354  }
355
356  public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
357    MiniDFSCluster cluster = null;
358    try {
359      Configuration conf = new Configuration();
360      cluster = new MiniDFSCluster(conf, 2, true, null);
361      final FileSystem hdfs = cluster.getFileSystem();
362      final String namenode = hdfs.getUri().toString();
363      if (namenode.startsWith("hdfs://")) {
364        MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
365        ToolRunner.run(new DistCp(conf), new String[] {
366                                         "-p",
367                                         "-log",
368                                         namenode+"/logs",
369                                         namenode+"/srcdat",
370                                         namenode+"/destdat"});
371        assertTrue("Source and destination directories do not match.",
372                   checkFiles(hdfs, "/destdat", files));
373        FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
374        assertTrue("Log directory does not exist.",
375                    fs.exists(new Path(namenode+"/logs")));
376
377        FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files);
378        final int nupdate = NFILES>>2;
379        updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate);
380        deldir(hdfs, "/logs");
381
382        ToolRunner.run(new DistCp(conf), new String[] {
383                                         "-p",
384                                         "-update",
385                                         "-log",
386                                         namenode+"/logs",
387                                         namenode+"/srcdat",
388                                         namenode+"/destdat"});
389        assertTrue("Source and destination directories do not match.",
390                   checkFiles(hdfs, "/destdat", files));
391        assertTrue("Update failed to replicate all changes in src",
392                 checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate));
393
394        deldir(hdfs, "/logs");
395        ToolRunner.run(new DistCp(conf), new String[] {
396                                         "-p",
397                                         "-overwrite",
398                                         "-log",
399                                         namenode+"/logs",
400                                         namenode+"/srcdat",
401                                         namenode+"/destdat"});
402        assertTrue("Source and destination directories do not match.",
403                   checkFiles(hdfs, "/destdat", files));
404        assertTrue("-overwrite didn't.",
405                 checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES));
406
407        deldir(hdfs, "/destdat");
408        deldir(hdfs, "/srcdat");
409        deldir(hdfs, "/logs");
410      }
411    } finally {
412      if (cluster != null) { cluster.shutdown(); }
413    }
414  }
415
416  public void testCopyDuplication() throws Exception {
417    final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration());
418    try {   
419      MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat");
420      ToolRunner.run(new DistCp(new Configuration()),
421          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
422                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
423      assertTrue("Source and destination directories do not match.",
424                 checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files));
425 
426      assertEquals(DistCp.DuplicationException.ERROR_CODE,
427          ToolRunner.run(new DistCp(new Configuration()),
428          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
429                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
430                        "file:///"+TEST_ROOT_DIR+"/destdat",}));
431    }
432    finally {
433      deldir(localfs, TEST_ROOT_DIR+"/destdat");
434      deldir(localfs, TEST_ROOT_DIR+"/srcdat");
435      deldir(localfs, TEST_ROOT_DIR+"/src2");
436    }
437  }
438
439  public void testCopySingleFile() throws Exception {
440    FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration());
441    Path root = new Path(TEST_ROOT_DIR+"/srcdat");
442    try {   
443      MyFile[] files = {createFile(root, fs)};
444      //copy a dir with a single file
445      ToolRunner.run(new DistCp(new Configuration()),
446          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
447                        "file:///"+TEST_ROOT_DIR+"/destdat"});
448      assertTrue("Source and destination directories do not match.",
449                 checkFiles(fs, TEST_ROOT_DIR+"/destdat", files));
450     
451      //copy a single file
452      String fname = files[0].getName();
453      Path p = new Path(root, fname);
454      FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
455      ToolRunner.run(new DistCp(new Configuration()),
456          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
457                        "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
458      assertTrue("Source and destination directories do not match.",
459          checkFiles(fs, TEST_ROOT_DIR+"/dest2", files));     
460      //copy single file to existing dir
461      deldir(fs, TEST_ROOT_DIR+"/dest2");
462      fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2"));
463      MyFile[] files2 = {createFile(root, fs, 0)};
464      String sname = files2[0].getName();
465      ToolRunner.run(new DistCp(new Configuration()),
466          new String[] {"-update",
467                        "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
468                        "file:///"+TEST_ROOT_DIR+"/dest2/"});
469      assertTrue("Source and destination directories do not match.",
470          checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
471      updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1);
472      //copy single file to existing dir w/ dst name conflict
473      ToolRunner.run(new DistCp(new Configuration()),
474          new String[] {"-update",
475                        "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
476                        "file:///"+TEST_ROOT_DIR+"/dest2/"});
477      assertTrue("Source and destination directories do not match.",
478          checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
479    }
480    finally {
481      deldir(fs, TEST_ROOT_DIR+"/destdat");
482      deldir(fs, TEST_ROOT_DIR+"/dest2");
483      deldir(fs, TEST_ROOT_DIR+"/srcdat");
484    }
485  }
486
487  public void testPreserveOption() throws Exception {
488    Configuration conf = new Configuration();
489    MiniDFSCluster cluster = null;
490    try {
491      cluster = new MiniDFSCluster(conf, 2, true, null);
492      String nnUri = FileSystem.getDefaultUri(conf).toString();
493      FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
494
495      {//test preserving user
496        MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
497        FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files);
498        for(int i = 0; i < srcstat.length; i++) {
499          fs.setOwner(srcstat[i].getPath(), "u" + i, null);
500        }
501        ToolRunner.run(new DistCp(conf),
502            new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"});
503        assertTrue("Source and destination directories do not match.",
504                   checkFiles(fs, "/destdat", files));
505       
506        FileStatus[] dststat = getFileStatus(fs, "/destdat", files);
507        for(int i = 0; i < dststat.length; i++) {
508          assertEquals("i=" + i, "u" + i, dststat[i].getOwner());
509        }
510        deldir(fs, "/destdat");
511        deldir(fs, "/srcdat");
512      }
513
514      {//test preserving group
515        MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
516        FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files);
517        for(int i = 0; i < srcstat.length; i++) {
518          fs.setOwner(srcstat[i].getPath(), null, "g" + i);
519        }
520        ToolRunner.run(new DistCp(conf),
521            new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"});
522        assertTrue("Source and destination directories do not match.",
523                   checkFiles(fs, "/destdat", files));
524       
525        FileStatus[] dststat = getFileStatus(fs, "/destdat", files);
526        for(int i = 0; i < dststat.length; i++) {
527          assertEquals("i=" + i, "g" + i, dststat[i].getGroup());
528        }
529        deldir(fs, "/destdat");
530        deldir(fs, "/srcdat");
531      }
532
533      {//test preserving mode
534        MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
535        FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files);
536        FsPermission[] permissions = new FsPermission[srcstat.length];
537        for(int i = 0; i < srcstat.length; i++) {
538          permissions[i] = new FsPermission((short)(i & 0666));
539          fs.setPermission(srcstat[i].getPath(), permissions[i]);
540        }
541
542        ToolRunner.run(new DistCp(conf),
543            new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"});
544        assertTrue("Source and destination directories do not match.",
545                   checkFiles(fs, "/destdat", files));
546 
547        FileStatus[] dststat = getFileStatus(fs, "/destdat", files);
548        for(int i = 0; i < dststat.length; i++) {
549          assertEquals("i=" + i, permissions[i], dststat[i].getPermission());
550        }
551        deldir(fs, "/destdat");
552        deldir(fs, "/srcdat");
553      }
554    } finally {
555      if (cluster != null) { cluster.shutdown(); }
556    }
557  }
558
559  public void testMapCount() throws Exception {
560    String namenode = null;
561    MiniDFSCluster dfs = null;
562    MiniMRCluster mr = null;
563    try {
564      Configuration conf = new Configuration();
565      dfs = new MiniDFSCluster(conf, 3, true, null);
566      FileSystem fs = dfs.getFileSystem();
567      final FsShell shell = new FsShell(conf);
568      namenode = fs.getUri().toString();
569      mr = new MiniMRCluster(3, namenode, 1);
570      MyFile[] files = createFiles(fs.getUri(), "/srcdat");
571      long totsize = 0;
572      for (MyFile f : files) {
573        totsize += f.getSize();
574      }
575      Configuration job = mr.createJobConf();
576      job.setLong("distcp.bytes.per.map", totsize / 3);
577      ToolRunner.run(new DistCp(job),
578          new String[] {"-m", "100",
579                        "-log",
580                        namenode+"/logs",
581                        namenode+"/srcdat",
582                        namenode+"/destdat"});
583      assertTrue("Source and destination directories do not match.",
584                 checkFiles(fs, "/destdat", files));
585
586      String logdir = namenode + "/logs";
587      System.out.println(execCmd(shell, "-lsr", logdir));
588      FileStatus[] logs = fs.listStatus(new Path(logdir));
589      // rare case where splits are exact, logs.length can be 4
590      assertTrue("Unexpected map count, logs.length=" + logs.length,
591          logs.length == 5 || logs.length == 4);
592
593      deldir(fs, "/destdat");
594      deldir(fs, "/logs");
595      ToolRunner.run(new DistCp(job),
596          new String[] {"-m", "1",
597                        "-log",
598                        namenode+"/logs",
599                        namenode+"/srcdat",
600                        namenode+"/destdat"});
601
602      System.out.println(execCmd(shell, "-lsr", logdir));
603      logs = fs.listStatus(new Path(namenode+"/logs"));
604      assertTrue("Unexpected map count, logs.length=" + logs.length,
605          logs.length == 2);
606    } finally {
607      if (dfs != null) { dfs.shutdown(); }
608      if (mr != null) { mr.shutdown(); }
609    }
610  }
611
612  public void testLimits() throws Exception {
613    Configuration conf = new Configuration();
614    MiniDFSCluster cluster = null;
615    try {
616      cluster = new MiniDFSCluster(conf, 2, true, null);
617      final String nnUri = FileSystem.getDefaultUri(conf).toString();
618      final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
619      final DistCp distcp = new DistCp(conf);
620      final FsShell shell = new FsShell(conf); 
621
622      final String srcrootdir =  "/src_root";
623      final Path srcrootpath = new Path(srcrootdir); 
624      final String dstrootdir =  "/dst_root";
625      final Path dstrootpath = new Path(dstrootdir); 
626
627      {//test -filelimit
628        MyFile[] files = createFiles(URI.create(nnUri), srcrootdir);
629        int filelimit = files.length / 2;
630        System.out.println("filelimit=" + filelimit);
631
632        ToolRunner.run(distcp,
633            new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir});
634        String results = execCmd(shell, "-lsr", dstrootdir);
635        results = removePrefix(results, dstrootdir);
636        System.out.println("results=" +  results);
637
638        FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true);
639        assertEquals(filelimit, dststat.length);
640        deldir(fs, dstrootdir);
641        deldir(fs, srcrootdir);
642      }
643
644      {//test -sizelimit
645        createFiles(URI.create(nnUri), srcrootdir);
646        long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2;
647        System.out.println("sizelimit=" + sizelimit);
648
649        ToolRunner.run(distcp,
650            new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir});
651       
652        ContentSummary summary = fs.getContentSummary(dstrootpath);
653        System.out.println("summary=" + summary);
654        assertTrue(summary.getLength() <= sizelimit);
655        deldir(fs, dstrootdir);
656        deldir(fs, srcrootdir);
657      }
658
659      {//test update
660        final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir);
661        final long totalsize = fs.getContentSummary(srcrootpath).getLength();
662        System.out.println("src.length=" + srcs.length);
663        System.out.println("totalsize =" + totalsize);
664        fs.mkdirs(dstrootpath);
665        final int parts = RAN.nextInt(NFILES/3 - 1) + 2;
666        final int filelimit = srcs.length/parts;
667        final long sizelimit = totalsize/parts;
668        System.out.println("filelimit=" + filelimit);
669        System.out.println("sizelimit=" + sizelimit);
670        System.out.println("parts    =" + parts);
671        final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit,
672            "-update", nnUri+srcrootdir, nnUri+dstrootdir};
673
674        int dstfilecount = 0;
675        long dstsize = 0;
676        for(int i = 0; i <= parts; i++) {
677          ToolRunner.run(distcp, args);
678       
679          FileStatus[] dststat = getFileStatus(fs, dstrootdir, srcs, true);
680          System.out.println(i + ") dststat.length=" + dststat.length);
681          assertTrue(dststat.length - dstfilecount <= filelimit);
682          ContentSummary summary = fs.getContentSummary(dstrootpath);
683          System.out.println(i + ") summary.getLength()=" + summary.getLength());
684          assertTrue(summary.getLength() - dstsize <= sizelimit);
685          assertTrue(checkFiles(fs, dstrootdir, srcs, true));
686          dstfilecount = dststat.length;
687          dstsize = summary.getLength();
688        }
689
690        deldir(fs, dstrootdir);
691        deldir(fs, srcrootdir);
692      }
693    } finally {
694      if (cluster != null) { cluster.shutdown(); }
695    }
696  }
697
698  static final long now = System.currentTimeMillis();
699
700  static UnixUserGroupInformation createUGI(String name, boolean issuper) {
701    String username = name + now;
702    String group = issuper? "supergroup": username;
703    return UnixUserGroupInformation.createImmutable(
704        new String[]{username, group});
705  }
706
707  static Path createHomeDirectory(FileSystem fs, UserGroupInformation ugi
708      ) throws IOException {
709    final Path home = new Path("/user/" + ugi.getUserName());
710    fs.mkdirs(home);
711    fs.setOwner(home, ugi.getUserName(), ugi.getGroupNames()[0]);
712    fs.setPermission(home, new FsPermission((short)0700));
713    return home;
714  }
715
716  public void testHftpAccessControl() throws Exception {
717    MiniDFSCluster cluster = null;
718    try {
719      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
720      final UnixUserGroupInformation USER_UGI = createUGI("user", false); 
721
722      //start cluster by DFS_UGI
723      final Configuration dfsConf = new Configuration();
724      UnixUserGroupInformation.saveToConf(dfsConf,
725          UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI);
726      cluster = new MiniDFSCluster(dfsConf, 2, true, null);
727      cluster.waitActive();
728
729      final String httpAdd = dfsConf.get("dfs.http.address");
730      final URI nnURI = FileSystem.getDefaultUri(dfsConf);
731      final String nnUri = nnURI.toString();
732      final Path home = createHomeDirectory(FileSystem.get(nnURI, dfsConf), USER_UGI);
733     
734      //now, login as USER_UGI
735      final Configuration userConf = new Configuration();
736      UnixUserGroupInformation.saveToConf(userConf,
737          UnixUserGroupInformation.UGI_PROPERTY_NAME, USER_UGI);
738      final FileSystem fs = FileSystem.get(nnURI, userConf);
739
740      final Path srcrootpath = new Path(home, "src_root"); 
741      final String srcrootdir =  srcrootpath.toString();
742      final Path dstrootpath = new Path(home, "dst_root"); 
743      final String dstrootdir =  dstrootpath.toString();
744      final DistCp distcp = new DistCp(userConf);
745
746      FileSystem.mkdirs(fs, srcrootpath, new FsPermission((short)0700));
747      final String[] args = {"hftp://"+httpAdd+srcrootdir, nnUri+dstrootdir};
748
749      { //copy with permission 000, should fail
750        fs.setPermission(srcrootpath, new FsPermission((short)0));
751        assertEquals(-3, ToolRunner.run(distcp, args));
752      }
753    } finally {
754      if (cluster != null) { cluster.shutdown(); }
755    }
756  }
757
758  /** test -delete */
759  public void testDelete() throws Exception {
760    final Configuration conf = new Configuration();
761    MiniDFSCluster cluster = null;
762    try {
763      cluster = new MiniDFSCluster(conf, 2, true, null);
764      final URI nnURI = FileSystem.getDefaultUri(conf);
765      final String nnUri = nnURI.toString();
766      final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
767
768      final DistCp distcp = new DistCp(conf);
769      final FsShell shell = new FsShell(conf); 
770
771      final String srcrootdir = "/src_root";
772      final String dstrootdir = "/dst_root";
773
774      {
775        //create source files
776        createFiles(nnURI, srcrootdir);
777        String srcresults = execCmd(shell, "-lsr", srcrootdir);
778        srcresults = removePrefix(srcresults, srcrootdir);
779        System.out.println("srcresults=" +  srcresults);
780
781        //create some files in dst
782        createFiles(nnURI, dstrootdir);
783        System.out.println("dstrootdir=" +  dstrootdir);
784        shell.run(new String[]{"-lsr", dstrootdir});
785
786        //run distcp
787        ToolRunner.run(distcp,
788            new String[]{"-delete", "-update", "-log", "/log",
789                         nnUri+srcrootdir, nnUri+dstrootdir});
790
791        //make sure src and dst contains the same files
792        String dstresults = execCmd(shell, "-lsr", dstrootdir);
793        dstresults = removePrefix(dstresults, dstrootdir);
794        System.out.println("first dstresults=" +  dstresults);
795        assertEquals(srcresults, dstresults);
796
797        //create additional file in dst
798        create(fs, new Path(dstrootdir, "foo"));
799        create(fs, new Path(dstrootdir, "foobar"));
800
801        //run distcp again
802        ToolRunner.run(distcp,
803            new String[]{"-delete", "-update", "-log", "/log2",
804                         nnUri+srcrootdir, nnUri+dstrootdir});
805       
806        //make sure src and dst contains the same files
807        dstresults = execCmd(shell, "-lsr", dstrootdir);
808        dstresults = removePrefix(dstresults, dstrootdir);
809        System.out.println("second dstresults=" +  dstresults);
810        assertEquals(srcresults, dstresults);
811
812        //cleanup
813        deldir(fs, dstrootdir);
814        deldir(fs, srcrootdir);
815      }
816    } finally {
817      if (cluster != null) { cluster.shutdown(); }
818    }
819  }
820
821  static void create(FileSystem fs, Path f) throws IOException {
822    FSDataOutputStream out = fs.create(f);
823    try {
824      byte[] b = new byte[1024 + RAN.nextInt(1024)];
825      RAN.nextBytes(b);
826      out.write(b);
827    } finally {
828      if (out != null) out.close();
829    }
830  }
831 
832  static String execCmd(FsShell shell, String... args) throws Exception {
833    ByteArrayOutputStream baout = new ByteArrayOutputStream();
834    PrintStream out = new PrintStream(baout, true);
835    PrintStream old = System.out;
836    System.setOut(out);
837    shell.run(args);
838    out.close();
839    System.setOut(old);
840    return baout.toString();
841  }
842 
843  private static String removePrefix(String lines, String prefix) {
844    final int prefixlen = prefix.length();
845    final StringTokenizer t = new StringTokenizer(lines, "\n");
846    final StringBuffer results = new StringBuffer(); 
847    for(; t.hasMoreTokens(); ) {
848      String s = t.nextToken();
849      results.append(s.substring(s.indexOf(prefix) + prefixlen) + "\n");
850    }
851    return results.toString();
852  }
853}
Note: See TracBrowser for help on using the repository browser.