[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | |
---|
| 19 | package org.apache.hadoop.mapred.lib; |
---|
| 20 | |
---|
| 21 | import java.io.IOException; |
---|
| 22 | import java.util.ArrayList; |
---|
| 23 | import java.util.Arrays; |
---|
| 24 | |
---|
| 25 | import junit.framework.Test; |
---|
| 26 | import junit.framework.TestCase; |
---|
| 27 | import junit.framework.TestSuite; |
---|
| 28 | import junit.extensions.TestSetup; |
---|
| 29 | |
---|
| 30 | import org.apache.hadoop.fs.FileSystem; |
---|
| 31 | import org.apache.hadoop.fs.Path; |
---|
| 32 | import org.apache.hadoop.io.NullWritable; |
---|
| 33 | import org.apache.hadoop.io.RawComparator; |
---|
| 34 | import org.apache.hadoop.io.SequenceFile; |
---|
| 35 | import org.apache.hadoop.io.Text; |
---|
| 36 | import org.apache.hadoop.io.WritableComparable; |
---|
| 37 | import org.apache.hadoop.io.WritableComparator; |
---|
| 38 | import org.apache.hadoop.io.WritableUtils; |
---|
| 39 | import org.apache.hadoop.mapred.JobConf; |
---|
| 40 | |
---|
| 41 | public class TestTotalOrderPartitioner extends TestCase { |
---|
| 42 | |
---|
| 43 | private static final Text[] splitStrings = new Text[] { |
---|
| 44 | // -inf // 0 |
---|
| 45 | new Text("aabbb"), // 1 |
---|
| 46 | new Text("babbb"), // 2 |
---|
| 47 | new Text("daddd"), // 3 |
---|
| 48 | new Text("dddee"), // 4 |
---|
| 49 | new Text("ddhee"), // 5 |
---|
| 50 | new Text("dingo"), // 6 |
---|
| 51 | new Text("hijjj"), // 7 |
---|
| 52 | new Text("n"), // 8 |
---|
| 53 | new Text("yak"), // 9 |
---|
| 54 | }; |
---|
| 55 | |
---|
| 56 | static class Check<T> { |
---|
| 57 | T data; |
---|
| 58 | int part; |
---|
| 59 | Check(T data, int part) { |
---|
| 60 | this.data = data; |
---|
| 61 | this.part = part; |
---|
| 62 | } |
---|
| 63 | } |
---|
| 64 | |
---|
| 65 | private static final ArrayList<Check<Text>> testStrings = |
---|
| 66 | new ArrayList<Check<Text>>(); |
---|
| 67 | static { |
---|
| 68 | testStrings.add(new Check<Text>(new Text("aaaaa"), 0)); |
---|
| 69 | testStrings.add(new Check<Text>(new Text("aaabb"), 0)); |
---|
| 70 | testStrings.add(new Check<Text>(new Text("aabbb"), 1)); |
---|
| 71 | testStrings.add(new Check<Text>(new Text("aaaaa"), 0)); |
---|
| 72 | testStrings.add(new Check<Text>(new Text("babbb"), 2)); |
---|
| 73 | testStrings.add(new Check<Text>(new Text("baabb"), 1)); |
---|
| 74 | testStrings.add(new Check<Text>(new Text("yai"), 8)); |
---|
| 75 | testStrings.add(new Check<Text>(new Text("yak"), 9)); |
---|
| 76 | testStrings.add(new Check<Text>(new Text("z"), 9)); |
---|
| 77 | testStrings.add(new Check<Text>(new Text("ddngo"), 5)); |
---|
| 78 | testStrings.add(new Check<Text>(new Text("hi"), 6)); |
---|
| 79 | }; |
---|
| 80 | |
---|
| 81 | private static <T extends WritableComparable> Path writePartitionFile( |
---|
| 82 | String testname, JobConf conf, T[] splits) throws IOException { |
---|
| 83 | final FileSystem fs = FileSystem.getLocal(conf); |
---|
| 84 | final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") |
---|
| 85 | ).makeQualified(fs); |
---|
| 86 | Path p = new Path(testdir, testname + "/_partition.lst"); |
---|
| 87 | TotalOrderPartitioner.setPartitionFile(conf, p); |
---|
| 88 | conf.setNumReduceTasks(splits.length + 1); |
---|
| 89 | SequenceFile.Writer w = null; |
---|
| 90 | try { |
---|
| 91 | NullWritable nw = NullWritable.get(); |
---|
| 92 | w = SequenceFile.createWriter(fs, conf, p, |
---|
| 93 | splits[0].getClass(), NullWritable.class, |
---|
| 94 | SequenceFile.CompressionType.NONE); |
---|
| 95 | for (int i = 0; i < splits.length; ++i) { |
---|
| 96 | w.append(splits[i], NullWritable.get()); |
---|
| 97 | } |
---|
| 98 | } finally { |
---|
| 99 | if (null != w) |
---|
| 100 | w.close(); |
---|
| 101 | } |
---|
| 102 | return p; |
---|
| 103 | } |
---|
| 104 | |
---|
| 105 | public void testTotalOrderMemCmp() throws Exception { |
---|
| 106 | TotalOrderPartitioner<Text,NullWritable> partitioner = |
---|
| 107 | new TotalOrderPartitioner<Text,NullWritable>(); |
---|
| 108 | JobConf job = new JobConf(); |
---|
| 109 | Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( |
---|
| 110 | "totalordermemcmp", job, splitStrings); |
---|
| 111 | job.setMapOutputKeyClass(Text.class); |
---|
| 112 | try { |
---|
| 113 | partitioner.configure(job); |
---|
| 114 | NullWritable nw = NullWritable.get(); |
---|
| 115 | for (Check<Text> chk : testStrings) { |
---|
| 116 | assertEquals(chk.data.toString(), chk.part, |
---|
| 117 | partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); |
---|
| 118 | } |
---|
| 119 | } finally { |
---|
| 120 | p.getFileSystem(job).delete(p); |
---|
| 121 | } |
---|
| 122 | } |
---|
| 123 | |
---|
| 124 | public void testTotalOrderBinarySearch() throws Exception { |
---|
| 125 | TotalOrderPartitioner<Text,NullWritable> partitioner = |
---|
| 126 | new TotalOrderPartitioner<Text,NullWritable>(); |
---|
| 127 | JobConf job = new JobConf(); |
---|
| 128 | Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( |
---|
| 129 | "totalorderbinarysearch", job, splitStrings); |
---|
| 130 | job.setBoolean("total.order.partitioner.natural.order", false); |
---|
| 131 | job.setMapOutputKeyClass(Text.class); |
---|
| 132 | try { |
---|
| 133 | partitioner.configure(job); |
---|
| 134 | NullWritable nw = NullWritable.get(); |
---|
| 135 | for (Check<Text> chk : testStrings) { |
---|
| 136 | assertEquals(chk.data.toString(), chk.part, |
---|
| 137 | partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); |
---|
| 138 | } |
---|
| 139 | } finally { |
---|
| 140 | p.getFileSystem(job).delete(p); |
---|
| 141 | } |
---|
| 142 | } |
---|
| 143 | |
---|
| 144 | public static class ReverseStringComparator implements RawComparator<Text> { |
---|
| 145 | public int compare(Text a, Text b) { |
---|
| 146 | return -a.compareTo(b); |
---|
| 147 | } |
---|
| 148 | public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { |
---|
| 149 | int n1 = WritableUtils.decodeVIntSize(b1[s1]); |
---|
| 150 | int n2 = WritableUtils.decodeVIntSize(b2[s2]); |
---|
| 151 | return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1, |
---|
| 152 | b2, s2+n2, l2-n2); |
---|
| 153 | } |
---|
| 154 | } |
---|
| 155 | |
---|
| 156 | public void testTotalOrderCustomComparator() throws Exception { |
---|
| 157 | TotalOrderPartitioner<Text,NullWritable> partitioner = |
---|
| 158 | new TotalOrderPartitioner<Text,NullWritable>(); |
---|
| 159 | JobConf job = new JobConf(); |
---|
| 160 | Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length); |
---|
| 161 | Arrays.sort(revSplitStrings, new ReverseStringComparator()); |
---|
| 162 | Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( |
---|
| 163 | "totalordercustomcomparator", job, revSplitStrings); |
---|
| 164 | job.setBoolean("total.order.partitioner.natural.order", false); |
---|
| 165 | job.setMapOutputKeyClass(Text.class); |
---|
| 166 | job.setOutputKeyComparatorClass(ReverseStringComparator.class); |
---|
| 167 | ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>(); |
---|
| 168 | revCheck.add(new Check<Text>(new Text("aaaaa"), 9)); |
---|
| 169 | revCheck.add(new Check<Text>(new Text("aaabb"), 9)); |
---|
| 170 | revCheck.add(new Check<Text>(new Text("aabbb"), 9)); |
---|
| 171 | revCheck.add(new Check<Text>(new Text("aaaaa"), 9)); |
---|
| 172 | revCheck.add(new Check<Text>(new Text("babbb"), 8)); |
---|
| 173 | revCheck.add(new Check<Text>(new Text("baabb"), 8)); |
---|
| 174 | revCheck.add(new Check<Text>(new Text("yai"), 1)); |
---|
| 175 | revCheck.add(new Check<Text>(new Text("yak"), 1)); |
---|
| 176 | revCheck.add(new Check<Text>(new Text("z"), 0)); |
---|
| 177 | revCheck.add(new Check<Text>(new Text("ddngo"), 4)); |
---|
| 178 | revCheck.add(new Check<Text>(new Text("hi"), 3)); |
---|
| 179 | try { |
---|
| 180 | partitioner.configure(job); |
---|
| 181 | NullWritable nw = NullWritable.get(); |
---|
| 182 | for (Check<Text> chk : revCheck) { |
---|
| 183 | assertEquals(chk.data.toString(), chk.part, |
---|
| 184 | partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); |
---|
| 185 | } |
---|
| 186 | } finally { |
---|
| 187 | p.getFileSystem(job).delete(p); |
---|
| 188 | } |
---|
| 189 | } |
---|
| 190 | |
---|
| 191 | } |
---|