source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 23.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred.lib;
19
20import java.io.IOException;
21import java.io.DataOutputStream;
22import java.util.BitSet;
23import java.util.HashMap;
24import java.util.HashSet;
25import java.util.Random;
26
27import junit.framework.TestCase;
28
29import org.apache.commons.logging.Log;
30import org.apache.commons.logging.LogFactory;
31import org.apache.hadoop.fs.FSDataOutputStream;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.FileStatus;
34import org.apache.hadoop.fs.Path;
35import org.apache.hadoop.io.Text;
36import org.apache.hadoop.hdfs.MiniDFSCluster;
37import org.apache.hadoop.fs.BlockLocation;
38import org.apache.hadoop.io.BytesWritable;
39import org.apache.hadoop.hdfs.DFSTestUtil;
40import org.apache.hadoop.hdfs.DistributedFileSystem;
41import org.apache.hadoop.conf.Configuration;
42import org.apache.hadoop.io.SequenceFile;
43import org.apache.hadoop.io.SequenceFile.CompressionType;
44import org.apache.hadoop.fs.PathFilter;
45import org.apache.hadoop.mapred.InputSplit;
46import org.apache.hadoop.mapred.JobConf;
47import org.apache.hadoop.mapred.Reporter;
48import org.apache.hadoop.mapred.RecordReader;
49import org.apache.hadoop.mapred.MiniMRCluster;
50
51public class TestCombineFileInputFormat extends TestCase{
52
53  private static final String rack1[] = new String[] {
54    "/r1"
55  };
56  private static final String hosts1[] = new String[] {
57    "host1.rack1.com"
58  };
59  private static final String rack2[] = new String[] {
60    "/r2"
61  };
62  private static final String hosts2[] = new String[] {
63    "host2.rack2.com"
64  };
65  private static final String rack3[] = new String[] {
66    "/r3"
67  };
68  private static final String hosts3[] = new String[] {
69    "host3.rack3.com"
70  };
71  final Path inDir = new Path("/racktesting");
72  final Path outputPath = new Path("/output");
73  final Path dir1 = new Path(inDir, "/dir1");
74  final Path dir2 = new Path(inDir, "/dir2");
75  final Path dir3 = new Path(inDir, "/dir3");
76  final Path dir4 = new Path(inDir, "/dir4");
77
78  static final int BLOCKSIZE = 1024;
79  static final byte[] databuf = new byte[BLOCKSIZE];
80
81  private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
82 
83  /** Dummy class to extend CombineFileInputFormat*/
84  private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
85    @Override
86    public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf job
87        , Reporter reporter) throws IOException {
88      return null;
89    }
90  }
91
92  public void testSplitPlacement() throws IOException {
93    String namenode = null;
94    MiniDFSCluster dfs = null;
95    MiniMRCluster mr = null;
96    FileSystem fileSys = null;
97    String testName = "TestSplitPlacement";
98    try {
99      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
100       * 1) file1, just after starting the datanode on r1, with
101       *    a repl factor of 1, and,
102       * 2) file2, just after starting the datanode on r2, with
103       *    a repl factor of 2, and,
104       * 3) file3 after starting the all three datanodes, with a repl
105       *    factor of 3.
106       * At the end, file1 will be present on only datanode1, file2 will be
107       * present on datanode 1 and datanode2 and
108       * file3 will be present on all datanodes.
109       */
110      JobConf conf = new JobConf();
111      conf.setBoolean("dfs.replication.considerLoad", false);
112      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
113      dfs.waitActive();
114
115      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
116                 (dfs.getFileSystem()).getUri().getPort();
117
118      fileSys = dfs.getFileSystem();
119      if (!fileSys.mkdirs(inDir)) {
120        throw new IOException("Mkdirs failed to create " + inDir.toString());
121      }
122      Path file1 = new Path(dir1 + "/file1");
123      writeFile(conf, file1, (short)1, 1);
124      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
125      dfs.waitActive();
126
127      // create file on two datanodes.
128      Path file2 = new Path(dir2 + "/file2");
129      writeFile(conf, file2, (short)2, 2);
130
131      // split it using a CombinedFile input format
132      DummyInputFormat inFormat = new DummyInputFormat();
133      inFormat.setInputPaths(conf, dir1 + "," + dir2);
134      inFormat.setMinSplitSizeRack(BLOCKSIZE);
135      InputSplit[] splits = inFormat.getSplits(conf, 1);
136      System.out.println("Made splits(Test1): " + splits.length);
137
138      // make sure that each split has different locations
139      CombineFileSplit fileSplit = null;
140      for (int i = 0; i < splits.length; ++i) {
141        fileSplit = (CombineFileSplit) splits[i];
142        System.out.println("File split(Test1): " + fileSplit);
143      }
144      assertEquals(splits.length, 2);
145      fileSplit = (CombineFileSplit) splits[0];
146      assertEquals(fileSplit.getNumPaths(), 2);
147      assertEquals(fileSplit.getLocations().length, 1);
148      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
149      assertEquals(fileSplit.getOffset(0), 0);
150      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
151      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
152      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
153      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
154      assertEquals(fileSplit.getLocations()[0], "/r2");
155      fileSplit = (CombineFileSplit) splits[1];
156      assertEquals(fileSplit.getNumPaths(), 1);
157      assertEquals(fileSplit.getLocations().length, 1);
158      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
159      assertEquals(fileSplit.getOffset(0), 0);
160      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
161      assertEquals(fileSplit.getLocations()[0], "/r1");
162
163      // create another file on 3 datanodes and 3 racks.
164      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
165      dfs.waitActive();
166      Path file3 = new Path(dir3 + "/file3");
167      writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
168      inFormat = new DummyInputFormat();
169      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3);
170      inFormat.setMinSplitSizeRack(BLOCKSIZE);
171      splits = inFormat.getSplits(conf, 1);
172      for (int i = 0; i < splits.length; ++i) {
173        fileSplit = (CombineFileSplit) splits[i];
174        System.out.println("File split(Test2): " + fileSplit);
175      }
176      assertEquals(splits.length, 3);
177      fileSplit = (CombineFileSplit) splits[0];
178      assertEquals(fileSplit.getNumPaths(), 3);
179      assertEquals(fileSplit.getLocations().length, 1);
180      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
181      assertEquals(fileSplit.getOffset(0), 0);
182      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
183      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
184      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
185      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
186      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
187      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
188      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
189      assertEquals(fileSplit.getLocations()[0], "/r3");
190      fileSplit = (CombineFileSplit) splits[1];
191      assertEquals(fileSplit.getNumPaths(), 2);
192      assertEquals(fileSplit.getLocations().length, 1);
193      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
194      assertEquals(fileSplit.getOffset(0), 0);
195      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
196      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
197      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
198      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
199      assertEquals(fileSplit.getLocations()[0], "/r2");
200      fileSplit = (CombineFileSplit) splits[2];
201      assertEquals(fileSplit.getNumPaths(), 1);
202      assertEquals(fileSplit.getLocations().length, 1);
203      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
204      assertEquals(fileSplit.getOffset(0), 0);
205      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
206      assertEquals(fileSplit.getLocations()[0], "/r1");
207
208      // create file4 on all three racks
209      Path file4 = new Path(dir4 + "/file4");
210      writeFile(conf, file4, (short)3, 3);
211      inFormat = new DummyInputFormat();
212      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
213      inFormat.setMinSplitSizeRack(BLOCKSIZE);
214      splits = inFormat.getSplits(conf, 1);
215      for (int i = 0; i < splits.length; ++i) {
216        fileSplit = (CombineFileSplit) splits[i];
217        System.out.println("File split(Test3): " + fileSplit);
218      }
219      assertEquals(splits.length, 3);
220      fileSplit = (CombineFileSplit) splits[0];
221      assertEquals(fileSplit.getNumPaths(), 6);
222      assertEquals(fileSplit.getLocations().length, 1);
223      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
224      assertEquals(fileSplit.getOffset(0), 0);
225      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
226      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
227      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
228      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
229      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
230      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
231      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
232      assertEquals(fileSplit.getLocations()[0], "/r3");
233      fileSplit = (CombineFileSplit) splits[1];
234      assertEquals(fileSplit.getNumPaths(), 2);
235      assertEquals(fileSplit.getLocations().length, 1);
236      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
237      assertEquals(fileSplit.getOffset(0), 0);
238      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
239      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
240      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
241      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
242      assertEquals(fileSplit.getLocations()[0], "/r2");
243      fileSplit = (CombineFileSplit) splits[2];
244      assertEquals(fileSplit.getNumPaths(), 1);
245      assertEquals(fileSplit.getLocations().length, 1);
246      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
247      assertEquals(fileSplit.getOffset(0), 0);
248      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
249      assertEquals(fileSplit.getLocations()[0], "/r1");
250
251      // maximum split size is 2 blocks
252      inFormat = new DummyInputFormat();
253      inFormat.setMinSplitSizeNode(BLOCKSIZE);
254      inFormat.setMaxSplitSize(2*BLOCKSIZE);
255      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
256      splits = inFormat.getSplits(conf, 1);
257      for (int i = 0; i < splits.length; ++i) {
258        fileSplit = (CombineFileSplit) splits[i];
259        System.out.println("File split(Test4): " + fileSplit);
260      }
261      assertEquals(splits.length, 5);
262      fileSplit = (CombineFileSplit) splits[0];
263      assertEquals(fileSplit.getNumPaths(), 2);
264      assertEquals(fileSplit.getLocations().length, 1);
265      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
266      assertEquals(fileSplit.getOffset(0), 0);
267      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
268      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
269      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
270      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
271      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
272      fileSplit = (CombineFileSplit) splits[1];
273      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
274      assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE);
275      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
276      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
277      assertEquals(fileSplit.getOffset(1), 0);
278      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
279      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
280      fileSplit = (CombineFileSplit) splits[2];
281      assertEquals(fileSplit.getNumPaths(), 2);
282      assertEquals(fileSplit.getLocations().length, 1);
283      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
284      assertEquals(fileSplit.getOffset(0), BLOCKSIZE);
285      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
286      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
287      assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE);
288      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
289      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
290
291      // maximum split size is 3 blocks
292      inFormat = new DummyInputFormat();
293      inFormat.setMinSplitSizeNode(BLOCKSIZE);
294      inFormat.setMaxSplitSize(3*BLOCKSIZE);
295      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
296      splits = inFormat.getSplits(conf, 1);
297      for (int i = 0; i < splits.length; ++i) {
298        fileSplit = (CombineFileSplit) splits[i];
299        System.out.println("File split(Test5): " + fileSplit);
300      }
301      assertEquals(splits.length, 4);
302      fileSplit = (CombineFileSplit) splits[0];
303      assertEquals(fileSplit.getNumPaths(), 3);
304      assertEquals(fileSplit.getLocations().length, 1);
305      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
306      assertEquals(fileSplit.getOffset(0), 0);
307      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
308      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
309      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
310      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
311      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
312      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
313      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
314      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
315      fileSplit = (CombineFileSplit) splits[1];
316      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
317      assertEquals(fileSplit.getOffset(0), 0);
318      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
319      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
320      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
321      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
322      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
323      assertEquals(fileSplit.getOffset(2),  2 * BLOCKSIZE);
324      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
325      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
326      fileSplit = (CombineFileSplit) splits[2];
327      assertEquals(fileSplit.getNumPaths(), 2);
328      assertEquals(fileSplit.getLocations().length, 1);
329      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
330      assertEquals(fileSplit.getOffset(0), 0);
331      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
332      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
333      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
334      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
335      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
336      fileSplit = (CombineFileSplit) splits[3];
337      assertEquals(fileSplit.getNumPaths(), 1);
338      assertEquals(fileSplit.getLocations().length, 1);
339      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
340      assertEquals(fileSplit.getOffset(0), 0);
341      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
342      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
343
344      // maximum split size is 4 blocks
345      inFormat = new DummyInputFormat();
346      inFormat.setMaxSplitSize(4*BLOCKSIZE);
347      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
348      splits = inFormat.getSplits(conf, 1);
349      for (int i = 0; i < splits.length; ++i) {
350        fileSplit = (CombineFileSplit) splits[i];
351        System.out.println("File split(Test6): " + fileSplit);
352      }
353      assertEquals(splits.length, 3);
354      fileSplit = (CombineFileSplit) splits[0];
355      assertEquals(fileSplit.getNumPaths(), 4);
356      assertEquals(fileSplit.getLocations().length, 1);
357      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
358      assertEquals(fileSplit.getOffset(0), 0);
359      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
360      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
361      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
362      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
363      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
364      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
365      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
366      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
367      fileSplit = (CombineFileSplit) splits[1];
368      assertEquals(fileSplit.getNumPaths(), 4);
369      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
370      assertEquals(fileSplit.getOffset(0), 0);
371      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
372      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
373      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
374      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
375      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
376      assertEquals(fileSplit.getOffset(2), BLOCKSIZE);
377      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
378      assertEquals(fileSplit.getPath(3).getName(), file4.getName());
379      assertEquals(fileSplit.getOffset(3),  2 * BLOCKSIZE);
380      assertEquals(fileSplit.getLength(3), BLOCKSIZE);
381      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
382      fileSplit = (CombineFileSplit) splits[2];
383      assertEquals(fileSplit.getNumPaths(), 1);
384      assertEquals(fileSplit.getLocations().length, 1);
385      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
386      assertEquals(fileSplit.getOffset(0), 0);
387      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
388      assertEquals(fileSplit.getLocations()[0], "/r1");
389
390      // maximum split size is 7 blocks and min is 3 blocks
391      inFormat = new DummyInputFormat();
392      inFormat.setMaxSplitSize(7*BLOCKSIZE);
393      inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
394      inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
395      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
396      splits = inFormat.getSplits(conf, 1);
397      for (int i = 0; i < splits.length; ++i) {
398        fileSplit = (CombineFileSplit) splits[i];
399        System.out.println("File split(Test7): " + fileSplit);
400      }
401      assertEquals(splits.length, 2);
402      fileSplit = (CombineFileSplit) splits[0];
403      assertEquals(fileSplit.getNumPaths(), 6);
404      assertEquals(fileSplit.getLocations().length, 1);
405      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
406      fileSplit = (CombineFileSplit) splits[1];
407      assertEquals(fileSplit.getNumPaths(), 3);
408      assertEquals(fileSplit.getLocations().length, 1);
409      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
410
411      // Rack 1 has file1, file2 and file3 and file4
412      // Rack 2 has file2 and file3 and file4
413      // Rack 3 has file3 and file4
414      file1 = new Path(conf.getWorkingDirectory(), file1);
415      file2 = new Path(conf.getWorkingDirectory(), file2);
416      file3 = new Path(conf.getWorkingDirectory(), file3);
417      file4 = new Path(conf.getWorkingDirectory(), file4);
418
419      // setup a filter so that only file1 and file2 can be combined
420      inFormat = new DummyInputFormat();
421      inFormat.addInputPath(conf, inDir);
422      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
423      inFormat.createPool(conf, new TestFilter(dir1), 
424                          new TestFilter(dir2));
425      splits = inFormat.getSplits(conf, 1);
426      for (int i = 0; i < splits.length; ++i) {
427        fileSplit = (CombineFileSplit) splits[i];
428        System.out.println("File split(TestPool1): " + fileSplit);
429      }
430      assertEquals(splits.length, 3);
431      fileSplit = (CombineFileSplit) splits[0];
432      assertEquals(fileSplit.getNumPaths(), 2);
433      assertEquals(fileSplit.getLocations().length, 1);
434      assertEquals(fileSplit.getLocations()[0], "/r2");
435      fileSplit = (CombineFileSplit) splits[1];
436      assertEquals(fileSplit.getNumPaths(), 1);
437      assertEquals(fileSplit.getLocations().length, 1);
438      assertEquals(fileSplit.getLocations()[0], "/r1");
439      fileSplit = (CombineFileSplit) splits[2];
440      assertEquals(fileSplit.getNumPaths(), 6);
441      assertEquals(fileSplit.getLocations().length, 1);
442      assertEquals(fileSplit.getLocations()[0], "/r3");
443    } finally {
444      if (dfs != null) {
445        dfs.shutdown();
446      }
447    }
448  }
449
450  static void writeFile(Configuration conf, Path name,
451      short replication, int numBlocks) throws IOException {
452    FileSystem fileSys = FileSystem.get(conf);
453
454    FSDataOutputStream stm = fileSys.create(name, true,
455                                            conf.getInt("io.file.buffer.size", 4096),
456                                            replication, (long)BLOCKSIZE);
457    for (int i = 0; i < numBlocks; i++) {
458      stm.write(databuf);
459    }
460    stm.close();
461    DFSTestUtil.waitReplication(fileSys, name, replication);
462  }
463
464  static class TestFilter implements PathFilter {
465    private Path p;
466
467    // store a path prefix in this TestFilter
468    public TestFilter(Path p) {
469      this.p = p;
470    }
471
472    // returns true if the specified path matches the prefix stored
473    // in this TestFilter.
474    public boolean accept(Path path) {
475      if (path.toString().indexOf(p.toString()) == 0) {
476        return true;
477      }
478      return false;
479    }
480
481    public String toString() {
482      return "PathFilter:" + p;
483    }
484  }
485
486  /*
487   * Prints out the input splits for the specified files
488   */
489  private void splitRealFiles(String[] args) throws IOException {
490    JobConf conf = new JobConf();
491    FileSystem fs = FileSystem.get(conf);
492    if (!(fs instanceof DistributedFileSystem)) {
493      throw new IOException("Wrong file system: " + fs.getClass().getName());
494    }
495    int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
496
497    DummyInputFormat inFormat = new DummyInputFormat();
498    for (int i = 0; i < args.length; i++) {
499      inFormat.addInputPaths(conf, args[i]);
500    }
501    inFormat.setMinSplitSizeRack(blockSize);
502    inFormat.setMaxSplitSize(10 * blockSize);
503
504    InputSplit[] splits = inFormat.getSplits(conf, 1);
505    System.out.println("Total number of splits " + splits.length);
506    for (int i = 0; i < splits.length; ++i) {
507      CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
508      System.out.println("Split[" + i + "] " + fileSplit);
509    }
510  }
511
512  public static void main(String[] args) throws Exception{
513
514    // if there are some parameters specified, then use those paths
515    if (args.length != 0) {
516      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
517      test.splitRealFiles(args);
518    } else {
519      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
520      test.testSplitPlacement();
521    }
522  }
523}
Note: See TracBrowser for help on using the repository browser.