source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.examples.terasort;
20
21import java.io.IOException;
22import java.io.PrintStream;
23import java.net.URI;
24import java.util.ArrayList;
25import java.util.List;
26
27import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.conf.Configured;
30import org.apache.hadoop.filecache.DistributedCache;
31import org.apache.hadoop.fs.FileSystem;
32import org.apache.hadoop.fs.Path;
33import org.apache.hadoop.io.NullWritable;
34import org.apache.hadoop.io.SequenceFile;
35import org.apache.hadoop.io.Text;
36import org.apache.hadoop.mapred.FileOutputFormat;
37import org.apache.hadoop.mapred.JobClient;
38import org.apache.hadoop.mapred.JobConf;
39import org.apache.hadoop.mapred.Partitioner;
40import org.apache.hadoop.util.Tool;
41import org.apache.hadoop.util.ToolRunner;
42
43/**
44 * Generates the sampled split points, launches the job, and waits for it to
45 * finish.
46 * <p>
47 * To run the program:
48 * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
49 */
50public class TeraSort extends Configured implements Tool {
51  private static final Log LOG = LogFactory.getLog(TeraSort.class);
52
53  /**
54   * A partitioner that splits text keys into roughly equal partitions
55   * in a global sorted order.
56   */
57  static class TotalOrderPartitioner implements Partitioner<Text,Text>{
58    private TrieNode trie;
59    private Text[] splitPoints;
60
61    /**
62     * A generic trie node
63     */
64    static abstract class TrieNode {
65      private int level;
66      TrieNode(int level) {
67        this.level = level;
68      }
69      abstract int findPartition(Text key);
70      abstract void print(PrintStream strm) throws IOException;
71      int getLevel() {
72        return level;
73      }
74    }
75
76    /**
77     * An inner trie node that contains 256 children based on the next
78     * character.
79     */
80    static class InnerTrieNode extends TrieNode {
81      private TrieNode[] child = new TrieNode[256];
82     
83      InnerTrieNode(int level) {
84        super(level);
85      }
86      int findPartition(Text key) {
87        int level = getLevel();
88        if (key.getLength() <= level) {
89          return child[0].findPartition(key);
90        }
91        return child[key.getBytes()[level]].findPartition(key);
92      }
93      void setChild(int idx, TrieNode child) {
94        this.child[idx] = child;
95      }
96      void print(PrintStream strm) throws IOException {
97        for(int ch=0; ch < 255; ++ch) {
98          for(int i = 0; i < 2*getLevel(); ++i) {
99            strm.print(' ');
100          }
101          strm.print(ch);
102          strm.println(" ->");
103          if (child[ch] != null) {
104            child[ch].print(strm);
105          }
106        }
107      }
108    }
109
110    /**
111     * A leaf trie node that does string compares to figure out where the given
112     * key belongs between lower..upper.
113     */
114    static class LeafTrieNode extends TrieNode {
115      int lower;
116      int upper;
117      Text[] splitPoints;
118      LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
119        super(level);
120        this.splitPoints = splitPoints;
121        this.lower = lower;
122        this.upper = upper;
123      }
124      int findPartition(Text key) {
125        for(int i=lower; i<upper; ++i) {
126          if (splitPoints[i].compareTo(key) >= 0) {
127            return i;
128          }
129        }
130        return upper;
131      }
132      void print(PrintStream strm) throws IOException {
133        for(int i = 0; i < 2*getLevel(); ++i) {
134          strm.print(' ');
135        }
136        strm.print(lower);
137        strm.print(", ");
138        strm.println(upper);
139      }
140    }
141
142
143    /**
144     * Read the cut points from the given sequence file.
145     * @param fs the file system
146     * @param p the path to read
147     * @param job the job config
148     * @return the strings to split the partitions on
149     * @throws IOException
150     */
151    private static Text[] readPartitions(FileSystem fs, Path p, 
152                                         JobConf job) throws IOException {
153      SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
154      List<Text> parts = new ArrayList<Text>();
155      Text key = new Text();
156      NullWritable value = NullWritable.get();
157      while (reader.next(key, value)) {
158        parts.add(key);
159        key = new Text();
160      }
161      reader.close();
162      return parts.toArray(new Text[parts.size()]); 
163    }
164
165    /**
166     * Given a sorted set of cut points, build a trie that will find the correct
167     * partition quickly.
168     * @param splits the list of cut points
169     * @param lower the lower bound of partitions 0..numPartitions-1
170     * @param upper the upper bound of partitions 0..numPartitions-1
171     * @param prefix the prefix that we have already checked against
172     * @param maxDepth the maximum depth we will build a trie for
173     * @return the trie node that will divide the splits correctly
174     */
175    private static TrieNode buildTrie(Text[] splits, int lower, int upper, 
176                                      Text prefix, int maxDepth) {
177      int depth = prefix.getLength();
178      if (depth >= maxDepth || lower == upper) {
179        return new LeafTrieNode(depth, splits, lower, upper);
180      }
181      InnerTrieNode result = new InnerTrieNode(depth);
182      Text trial = new Text(prefix);
183      // append an extra byte on to the prefix
184      trial.append(new byte[1], 0, 1);
185      int currentBound = lower;
186      for(int ch = 0; ch < 255; ++ch) {
187        trial.getBytes()[depth] = (byte) (ch + 1);
188        lower = currentBound;
189        while (currentBound < upper) {
190          if (splits[currentBound].compareTo(trial) >= 0) {
191            break;
192          }
193          currentBound += 1;
194        }
195        trial.getBytes()[depth] = (byte) ch;
196        result.child[ch] = buildTrie(splits, lower, currentBound, trial, 
197                                     maxDepth);
198      }
199      // pick up the rest
200      trial.getBytes()[depth] = 127;
201      result.child[255] = buildTrie(splits, currentBound, upper, trial,
202                                    maxDepth);
203      return result;
204    }
205
206    public void configure(JobConf job) {
207      try {
208        FileSystem fs = FileSystem.getLocal(job);
209        Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
210        splitPoints = readPartitions(fs, partFile, job);
211        trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
212      } catch (IOException ie) {
213        throw new IllegalArgumentException("can't read paritions file", ie);
214      }
215    }
216
217    public TotalOrderPartitioner() {
218    }
219
220    public int getPartition(Text key, Text value, int numPartitions) {
221      return trie.findPartition(key);
222    }
223   
224  }
225 
226  public int run(String[] args) throws Exception {
227    LOG.info("starting");
228    JobConf job = (JobConf) getConf();
229    Path inputDir = new Path(args[0]);
230    inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
231    Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
232    URI partitionUri = new URI(partitionFile.toString() +
233                               "#" + TeraInputFormat.PARTITION_FILENAME);
234    TeraInputFormat.setInputPaths(job, new Path(args[0]));
235    FileOutputFormat.setOutputPath(job, new Path(args[1]));
236    job.setJobName("TeraSort");
237    job.setJarByClass(TeraSort.class);
238    job.setOutputKeyClass(Text.class);
239    job.setOutputValueClass(Text.class);
240    job.setInputFormat(TeraInputFormat.class);
241    job.setOutputFormat(TeraOutputFormat.class);
242    job.setPartitionerClass(TotalOrderPartitioner.class);
243    TeraInputFormat.writePartitionFile(job, partitionFile);
244    DistributedCache.addCacheFile(partitionUri, job);
245    DistributedCache.createSymlink(job);
246    job.setInt("dfs.replication", 1);
247    TeraOutputFormat.setFinalSync(job, true);
248    JobClient.runJob(job);
249    LOG.info("done");
250    return 0;
251  }
252
253  /**
254   * @param args
255   */
256  public static void main(String[] args) throws Exception {
257    int res = ToolRunner.run(new JobConf(), new TeraSort(), args);
258    System.exit(res);
259  }
260
261}
Note: See TracBrowser for help on using the repository browser.