package ro.pub.cs.pp.a51hadoop.table; import ro.pub.cs.pp.a51hadoop.config.Config; import ro.pub.cs.pp.a51hadoop.common.*; import ro.pub.cs.pp.a51hadoop.algorithm.*; import ro.pub.cs.pp.a51hadoop.algorithm.testing.*; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class Search { public static class Map extends MapReduceBase implements Mapper { private HashReducer[] r; private Hashfn hashfn; private String needle; private ArrayList needleGenerators; public void create_testing_problem(int nr_reducers, int id) { hashfn = new DigitHashfn(); r = new DigitReducerGenerator().generate(nr_reducers, id); } public void create_a51_problem(int nr_reducers, int id) { throw new RuntimeException("a51 not implemented"); } public void configure(JobConf job) { int nr_reducers = Config.N_R; int k = job.getInt("mapred.mapper.table_id", 0); int use_a51 = job.getInt("mapred.mapper.use_a51", 0); this.needle = job.get("mapred.mapper.needle"); this.needleGenerators = new ArrayList(); for(int i = 0; i < Config.N_R; i++) { String gen = applyLast(needle, i); this.needleGenerators.add(gen); } if (use_a51 != 0) create_a51_problem(nr_reducers, k); else create_testing_problem(nr_reducers, k); } public String applyAll(String txt) { for (int i = 0; i < Config.N_R; i++) { txt = r[i].apply(hashfn.hashfn(txt)); } return txt; } public String applyLast(String hashstr, int n) { for (int i = Config.N_R - n; i < Config.N_R - 1; i++) { hashstr = hashfn.hashfn(r[i].apply(hashstr)); } return r[r.length - 1].apply(hashstr); } public String applyFirst(String txt, int n) { for (int i = 0; i < n; i++) { txt = hashfn.hashfn(r[i].apply(txt)); } return txt; } public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); if (!tokenizer.hasMoreTokens()) return; String end = tokenizer.nextToken(); List list = new ArrayList(); ArrayWritable arr = new TextArrayWritable(); Text word = new Text(); word.set(needle); while (tokenizer.hasMoreTokens()) { String start = tokenizer.nextToken(); for (int i = 0; i < needleGenerators.size(); i++) { if (start.equals(needleGenerators.get(i))) { String hashStr = applyFirst(start, Config.N_R - i); if (!hashStr.equals(needle)) continue; Text t = new Text(); t.set(start); list.add(t); } } } arr.set(list.toArray(new Text[0])); output.collect(word, arr); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { List list = new ArrayList(); while (values.hasNext()) { //Text[] arr = (Text[]) ; Writable[] arr = values.next().get(); for (int i = 0; i < arr.length; i++) { Text t = (Text) arr[i]; list.add(t); } } ArrayWritable arr = new TextArrayWritable(); arr.set(list.toArray(new Text[0])); output.collect(key, arr); } } public static void run(String input, String output, int k) throws Exception { JobConf conf = new JobConf(TableGen.class); String kstr = "" + k; conf.setJobName("tablegen"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(TextArrayWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(new Path(output), kstr)); JobClient.runJob(conf); } public static void main(String[] args) throws Exception { for(int i = 0; i < Config.N_TABLES; i++) { run(args[0], args[1], i); System.out.println("LAG:XXXX:XXXX: i=" + i); } } }