package ro.pub.cs.pp.a51hadoop.table; import ro.pub.cs.pp.a51hadoop.config.Config; import ro.pub.cs.pp.a51hadoop.common.*; import ro.pub.cs.pp.a51hadoop.algorithm.*; import ro.pub.cs.pp.a51hadoop.algorithm.testing.*; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class TableGen { public static class R { private int i; private int k; public R(int i, int k) { this.i = i; this.k = k; } public String apply(String strX) { try { int x = Integer.parseInt(strX); int last_digit = 0; switch (i) { case 0: last_digit = i % 3; break; case 1: last_digit = i % 3 + 3; break; case 2: last_digit = i % 3 + 6; break; default: last_digit = 9; break; } x = x / 10 * 10 + last_digit; x = x ^ k; String ret = "" + x; while(ret.length() < strX.length()) ret = "0" + ret; if (ret.length() > strX.length()) ret = ret.substring(0, strX.length()); return ret; } catch (Exception e) { return strX; } } } public static class Map extends MapReduceBase implements Mapper { private HashReducer[] r; private Hashfn hashfn; public void create_testing_problem(int nr_reducers, int id) { hashfn = new DigitHashfn(); r = new DigitReducerGenerator().generate(nr_reducers, id); } public void create_a51_problem(int nr_reducers, int id) { throw new RuntimeException("a51 not implemented"); } public void configure(JobConf job) { int nr_reducers = Config.N_R; int k = job.getInt("mapred.mapper.table_id", 0); int use_a51 = job.getInt("mapred.mapper.use_a51", 0); if (use_a51 != 0) create_a51_problem(nr_reducers, k); else create_testing_problem(nr_reducers, k); } public String applyAll(String txt) { for (int i = 0; i < txt.length(); i++) { txt = r[i].apply(hashfn.hashfn(txt)); } return txt; } public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { String wordStr = tokenizer.nextToken(); String hashStr = applyAll(wordStr); Text word = new Text(); Text hash = new Text(); List list = new ArrayList(); word.set(wordStr); hash.set(hashStr); list.add(hash); ArrayWritable arr = new TextArrayWritable(); arr.set(list.toArray(new Text[0])); output.collect(word, arr); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { List list = new ArrayList(); while (values.hasNext()) { //Text[] arr = (Text[]) ; Writable[] arr = values.next().get(); for (int i = 0; i < arr.length; i++) { Text t = (Text) arr[i]; list.add(t); } } ArrayWritable arr = new TextArrayWritable(); arr.set(list.toArray(new Text[0])); output.collect(key, arr); } } public static void run(String input, String output, int k) throws Exception { JobConf conf = new JobConf(TableGen.class); String kstr = "" + k; conf.setJobName("tablegen"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(TextArrayWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(new Path(output), kstr)); JobClient.runJob(conf); } public static void main(String[] args) throws Exception { for(int i = 0; i < Config.N_TABLES; i++) { run(args[0], args[1], i); System.out.println("LAG:XXXX:XXXX: i=" + i); } } }