source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/TestIndexCache.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 5.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import java.io.DataOutputStream;
21import java.io.FileNotFoundException;
22import java.io.IOException;
23import java.util.Random;
24import java.util.zip.CRC32;
25import java.util.zip.CheckedOutputStream;
26
27import org.apache.hadoop.fs.ChecksumException;
28import org.apache.hadoop.fs.FileStatus;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.fs.FSDataOutputStream;
32
33import junit.framework.TestCase;
34
35public class TestIndexCache extends TestCase {
36
37  public void testLRCPolicy() throws Exception {
38    Random r = new Random();
39    long seed = r.nextLong();
40    r.setSeed(seed);
41    System.out.println("seed: " + seed);
42    JobConf conf = new JobConf();
43    FileSystem fs = FileSystem.getLocal(conf).getRaw();
44    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
45        "cache").makeQualified(fs);
46    fs.delete(p, true);
47    conf.setInt("mapred.tasktracker.indexcache.mb", 1);
48    final int partsPerMap = 1000;
49    final int bytesPerFile = partsPerMap * 24;
50    IndexCache cache = new IndexCache(conf);
51
52    // fill cache
53    int totalsize = bytesPerFile;
54    for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) {
55      Path f = new Path(p, Integer.toString(totalsize, 36));
56      writeFile(fs, f, totalsize, partsPerMap);
57      IndexRecord rec = cache.getIndexInformation(
58          Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f);
59      checkRecord(rec, totalsize);
60    }
61
62    // delete files, ensure cache retains all elem
63    for (FileStatus stat : fs.listStatus(p)) {
64      fs.delete(stat.getPath(),true);
65    }
66    for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
67      Path f = new Path(p, Integer.toString(i, 36));
68      IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
69          r.nextInt(partsPerMap), f);
70      checkRecord(rec, i);
71    }
72
73    // push oldest (bytesPerFile) out of cache
74    Path f = new Path(p, Integer.toString(totalsize, 36));
75    writeFile(fs, f, totalsize, partsPerMap);
76    cache.getIndexInformation(Integer.toString(totalsize, 36),
77        r.nextInt(partsPerMap), f);
78    fs.delete(f, false);
79
80    // oldest fails to read, or error
81    boolean fnf = false;
82    try {
83      cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
84          r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)));
85    } catch (IOException e) {
86      if (e.getCause() == null ||
87          !(e.getCause()  instanceof FileNotFoundException)) {
88        throw e;
89      }
90      else {
91        fnf = true;
92      }
93    }
94    if (!fnf)
95      fail("Failed to push out last entry");
96    // should find all the other entries
97    for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
98      IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
99          r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)));
100      checkRecord(rec, i);
101    }
102    IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
103        r.nextInt(partsPerMap), f);
104    checkRecord(rec, totalsize);
105  }
106
107  public void testBadIndex() throws Exception {
108    final int parts = 30;
109    JobConf conf = new JobConf();
110    FileSystem fs = FileSystem.getLocal(conf).getRaw();
111    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
112        "cache").makeQualified(fs);
113    fs.delete(p, true);
114    conf.setInt("mapred.tasktracker.indexcache.mb", 1);
115    IndexCache cache = new IndexCache(conf);
116
117    Path f = new Path(p, "badindex");
118    FSDataOutputStream out = fs.create(f, false);
119    CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
120    DataOutputStream dout = new DataOutputStream(iout);
121    for (int i = 0; i < parts; ++i) {
122      for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
123        if (0 == (i % 3)) {
124          dout.writeLong(i);
125        } else {
126          out.writeLong(i);
127        }
128      }
129    }
130    out.writeLong(iout.getChecksum().getValue());
131    dout.close();
132    try {
133      cache.getIndexInformation("badindex", 7, f);
134      fail("Did not detect bad checksum");
135    } catch (IOException e) {
136      if (!(e.getCause() instanceof ChecksumException)) {
137        throw e;
138      }
139    }
140  }
141
142  private static void checkRecord(IndexRecord rec, long fill) {
143    assertEquals(fill, rec.startOffset);
144    assertEquals(fill, rec.rawLength);
145    assertEquals(fill, rec.partLength);
146  }
147
148  private static void writeFile(FileSystem fs, Path f, long fill, int parts)
149      throws IOException {
150    FSDataOutputStream out = fs.create(f, false);
151    CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
152    DataOutputStream dout = new DataOutputStream(iout);
153    for (int i = 0; i < parts; ++i) {
154      for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
155        dout.writeLong(fill);
156      }
157    }
158    out.writeLong(iout.getChecksum().getValue());
159    dout.close();
160  }
161}
Note: See TracBrowser for help on using the repository browser.