1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred.lib; |
---|
20 | |
---|
21 | import java.io.IOException; |
---|
22 | import java.util.ArrayList; |
---|
23 | import java.util.Arrays; |
---|
24 | |
---|
25 | import junit.framework.Test; |
---|
26 | import junit.framework.TestCase; |
---|
27 | import junit.framework.TestSuite; |
---|
28 | import junit.extensions.TestSetup; |
---|
29 | |
---|
30 | import org.apache.hadoop.fs.FileSystem; |
---|
31 | import org.apache.hadoop.fs.Path; |
---|
32 | import org.apache.hadoop.io.NullWritable; |
---|
33 | import org.apache.hadoop.io.RawComparator; |
---|
34 | import org.apache.hadoop.io.SequenceFile; |
---|
35 | import org.apache.hadoop.io.Text; |
---|
36 | import org.apache.hadoop.io.WritableComparable; |
---|
37 | import org.apache.hadoop.io.WritableComparator; |
---|
38 | import org.apache.hadoop.io.WritableUtils; |
---|
39 | import org.apache.hadoop.mapred.JobConf; |
---|
40 | |
---|
41 | public class TestTotalOrderPartitioner extends TestCase { |
---|
42 | |
---|
43 | private static final Text[] splitStrings = new Text[] { |
---|
44 | // -inf // 0 |
---|
45 | new Text("aabbb"), // 1 |
---|
46 | new Text("babbb"), // 2 |
---|
47 | new Text("daddd"), // 3 |
---|
48 | new Text("dddee"), // 4 |
---|
49 | new Text("ddhee"), // 5 |
---|
50 | new Text("dingo"), // 6 |
---|
51 | new Text("hijjj"), // 7 |
---|
52 | new Text("n"), // 8 |
---|
53 | new Text("yak"), // 9 |
---|
54 | }; |
---|
55 | |
---|
56 | static class Check<T> { |
---|
57 | T data; |
---|
58 | int part; |
---|
59 | Check(T data, int part) { |
---|
60 | this.data = data; |
---|
61 | this.part = part; |
---|
62 | } |
---|
63 | } |
---|
64 | |
---|
65 | private static final ArrayList<Check<Text>> testStrings = |
---|
66 | new ArrayList<Check<Text>>(); |
---|
67 | static { |
---|
68 | testStrings.add(new Check<Text>(new Text("aaaaa"), 0)); |
---|
69 | testStrings.add(new Check<Text>(new Text("aaabb"), 0)); |
---|
70 | testStrings.add(new Check<Text>(new Text("aabbb"), 1)); |
---|
71 | testStrings.add(new Check<Text>(new Text("aaaaa"), 0)); |
---|
72 | testStrings.add(new Check<Text>(new Text("babbb"), 2)); |
---|
73 | testStrings.add(new Check<Text>(new Text("baabb"), 1)); |
---|
74 | testStrings.add(new Check<Text>(new Text("yai"), 8)); |
---|
75 | testStrings.add(new Check<Text>(new Text("yak"), 9)); |
---|
76 | testStrings.add(new Check<Text>(new Text("z"), 9)); |
---|
77 | testStrings.add(new Check<Text>(new Text("ddngo"), 5)); |
---|
78 | testStrings.add(new Check<Text>(new Text("hi"), 6)); |
---|
79 | }; |
---|
80 | |
---|
81 | private static <T extends WritableComparable> Path writePartitionFile( |
---|
82 | String testname, JobConf conf, T[] splits) throws IOException { |
---|
83 | final FileSystem fs = FileSystem.getLocal(conf); |
---|
84 | final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") |
---|
85 | ).makeQualified(fs); |
---|
86 | Path p = new Path(testdir, testname + "/_partition.lst"); |
---|
87 | TotalOrderPartitioner.setPartitionFile(conf, p); |
---|
88 | conf.setNumReduceTasks(splits.length + 1); |
---|
89 | SequenceFile.Writer w = null; |
---|
90 | try { |
---|
91 | NullWritable nw = NullWritable.get(); |
---|
92 | w = SequenceFile.createWriter(fs, conf, p, |
---|
93 | splits[0].getClass(), NullWritable.class, |
---|
94 | SequenceFile.CompressionType.NONE); |
---|
95 | for (int i = 0; i < splits.length; ++i) { |
---|
96 | w.append(splits[i], NullWritable.get()); |
---|
97 | } |
---|
98 | } finally { |
---|
99 | if (null != w) |
---|
100 | w.close(); |
---|
101 | } |
---|
102 | return p; |
---|
103 | } |
---|
104 | |
---|
105 | public void testTotalOrderMemCmp() throws Exception { |
---|
106 | TotalOrderPartitioner<Text,NullWritable> partitioner = |
---|
107 | new TotalOrderPartitioner<Text,NullWritable>(); |
---|
108 | JobConf job = new JobConf(); |
---|
109 | Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( |
---|
110 | "totalordermemcmp", job, splitStrings); |
---|
111 | job.setMapOutputKeyClass(Text.class); |
---|
112 | try { |
---|
113 | partitioner.configure(job); |
---|
114 | NullWritable nw = NullWritable.get(); |
---|
115 | for (Check<Text> chk : testStrings) { |
---|
116 | assertEquals(chk.data.toString(), chk.part, |
---|
117 | partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); |
---|
118 | } |
---|
119 | } finally { |
---|
120 | p.getFileSystem(job).delete(p); |
---|
121 | } |
---|
122 | } |
---|
123 | |
---|
124 | public void testTotalOrderBinarySearch() throws Exception { |
---|
125 | TotalOrderPartitioner<Text,NullWritable> partitioner = |
---|
126 | new TotalOrderPartitioner<Text,NullWritable>(); |
---|
127 | JobConf job = new JobConf(); |
---|
128 | Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( |
---|
129 | "totalorderbinarysearch", job, splitStrings); |
---|
130 | job.setBoolean("total.order.partitioner.natural.order", false); |
---|
131 | job.setMapOutputKeyClass(Text.class); |
---|
132 | try { |
---|
133 | partitioner.configure(job); |
---|
134 | NullWritable nw = NullWritable.get(); |
---|
135 | for (Check<Text> chk : testStrings) { |
---|
136 | assertEquals(chk.data.toString(), chk.part, |
---|
137 | partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); |
---|
138 | } |
---|
139 | } finally { |
---|
140 | p.getFileSystem(job).delete(p); |
---|
141 | } |
---|
142 | } |
---|
143 | |
---|
144 | public static class ReverseStringComparator implements RawComparator<Text> { |
---|
145 | public int compare(Text a, Text b) { |
---|
146 | return -a.compareTo(b); |
---|
147 | } |
---|
148 | public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { |
---|
149 | int n1 = WritableUtils.decodeVIntSize(b1[s1]); |
---|
150 | int n2 = WritableUtils.decodeVIntSize(b2[s2]); |
---|
151 | return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1, |
---|
152 | b2, s2+n2, l2-n2); |
---|
153 | } |
---|
154 | } |
---|
155 | |
---|
156 | public void testTotalOrderCustomComparator() throws Exception { |
---|
157 | TotalOrderPartitioner<Text,NullWritable> partitioner = |
---|
158 | new TotalOrderPartitioner<Text,NullWritable>(); |
---|
159 | JobConf job = new JobConf(); |
---|
160 | Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length); |
---|
161 | Arrays.sort(revSplitStrings, new ReverseStringComparator()); |
---|
162 | Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( |
---|
163 | "totalordercustomcomparator", job, revSplitStrings); |
---|
164 | job.setBoolean("total.order.partitioner.natural.order", false); |
---|
165 | job.setMapOutputKeyClass(Text.class); |
---|
166 | job.setOutputKeyComparatorClass(ReverseStringComparator.class); |
---|
167 | ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>(); |
---|
168 | revCheck.add(new Check<Text>(new Text("aaaaa"), 9)); |
---|
169 | revCheck.add(new Check<Text>(new Text("aaabb"), 9)); |
---|
170 | revCheck.add(new Check<Text>(new Text("aabbb"), 9)); |
---|
171 | revCheck.add(new Check<Text>(new Text("aaaaa"), 9)); |
---|
172 | revCheck.add(new Check<Text>(new Text("babbb"), 8)); |
---|
173 | revCheck.add(new Check<Text>(new Text("baabb"), 8)); |
---|
174 | revCheck.add(new Check<Text>(new Text("yai"), 1)); |
---|
175 | revCheck.add(new Check<Text>(new Text("yak"), 1)); |
---|
176 | revCheck.add(new Check<Text>(new Text("z"), 0)); |
---|
177 | revCheck.add(new Check<Text>(new Text("ddngo"), 4)); |
---|
178 | revCheck.add(new Check<Text>(new Text("hi"), 3)); |
---|
179 | try { |
---|
180 | partitioner.configure(job); |
---|
181 | NullWritable nw = NullWritable.get(); |
---|
182 | for (Check<Text> chk : revCheck) { |
---|
183 | assertEquals(chk.data.toString(), chk.part, |
---|
184 | partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); |
---|
185 | } |
---|
186 | } finally { |
---|
187 | p.getFileSystem(job).delete(p); |
---|
188 | } |
---|
189 | } |
---|
190 | |
---|
191 | } |
---|