source: proiecte/HadoopA51/src/java/ro/pub/cs/pp/a51hadoop/search/Search.java @ 166

Last change on this file since 166 was 166, checked in by (none), 14 years ago

HadoopA51: imported git repository from github

File size: 4.3 KB
Line 
1package ro.pub.cs.pp.a51hadoop.table;
2
3import ro.pub.cs.pp.a51hadoop.config.Config;
4import ro.pub.cs.pp.a51hadoop.common.*;
5import ro.pub.cs.pp.a51hadoop.algorithm.*;
6import ro.pub.cs.pp.a51hadoop.algorithm.testing.*;
7
8import java.io.IOException;
9import java.util.*;
10
11import org.apache.hadoop.fs.Path;
12import org.apache.hadoop.conf.*;
13import org.apache.hadoop.io.*;
14import org.apache.hadoop.mapred.*;
15import org.apache.hadoop.util.*;
16
17public class Search
18{
19
20        public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, ArrayWritable>
21        {
22                private HashReducer[] r;
23                private Hashfn hashfn;
24                private String needle;
25                private ArrayList<String> needleGenerators;
26
27                public void create_testing_problem(int nr_reducers, int id)
28                {
29                        hashfn = new DigitHashfn();
30                        r = new DigitReducerGenerator().generate(nr_reducers, id);
31                }
32
33                public void create_a51_problem(int nr_reducers, int id)
34                {
35                        throw new RuntimeException("a51 not implemented");
36                }
37
38                public void configure(JobConf job)
39                {
40                        int nr_reducers = Config.N_R;
41                        int k = job.getInt("mapred.mapper.table_id", 0);
42                        int use_a51 = job.getInt("mapred.mapper.use_a51", 0);
43                        this.needle = job.get("mapred.mapper.needle");
44                        this.needleGenerators = new ArrayList<String>();
45
46                        for(int i = 0; i < Config.N_R; i++)
47                        {
48                                String gen = applyLast(needle, i);
49                                this.needleGenerators.add(gen);
50                        }
51
52
53                        if (use_a51 != 0)
54                                create_a51_problem(nr_reducers, k);
55                        else
56                                create_testing_problem(nr_reducers, k);
57                }
58
59                public String applyAll(String txt)
60                {
61                        for (int i = 0; i < Config.N_R; i++)
62                        {
63                                txt = r[i].apply(hashfn.hashfn(txt));
64                        }
65                        return txt;
66                }
67
68                public String applyLast(String hashstr, int n)
69                {
70                        for (int i = Config.N_R - n; i < Config.N_R - 1; i++)
71                        {
72                                hashstr = hashfn.hashfn(r[i].apply(hashstr));
73                        }
74                        return r[r.length - 1].apply(hashstr);
75                }
76
77
78                public String applyFirst(String txt, int n)
79                {
80                        for (int i = 0; i < n; i++)
81                        {
82                                txt = hashfn.hashfn(r[i].apply(txt));
83                        }
84                        return txt;
85                }
86
87
88                public void map(LongWritable key, Text value, OutputCollector<Text, ArrayWritable> output, Reporter reporter) throws IOException
89                {
90
91                        String line = value.toString();
92                        StringTokenizer tokenizer = new StringTokenizer(line);
93                        if (!tokenizer.hasMoreTokens())
94                                return;
95                        String end = tokenizer.nextToken();
96                        List<Text> list = new ArrayList<Text>();
97                        ArrayWritable arr = new TextArrayWritable();
98                        Text word = new Text();
99                        word.set(needle);
100                        while (tokenizer.hasMoreTokens())
101                        {
102                                String start = tokenizer.nextToken();
103                                for (int i = 0; i < needleGenerators.size(); i++)
104                                {
105                                        if (start.equals(needleGenerators.get(i)))
106                                        {
107                                                String hashStr = applyFirst(start, Config.N_R - i);
108                                                if (!hashStr.equals(needle))
109                                                        continue;
110                                                Text t = new Text();
111                                                t.set(start);
112                                                list.add(t);
113                                        }
114                                }
115                        }
116
117                        arr.set(list.toArray(new Text[0]));
118                        output.collect(word, arr);
119                }
120        }
121
122        public static class Reduce extends MapReduceBase implements Reducer<Text, ArrayWritable, Text, ArrayWritable>
123        {
124
125                public void reduce(Text key, Iterator<ArrayWritable> values, OutputCollector<Text, ArrayWritable> output, Reporter reporter) throws IOException
126                {
127                        List<Text> list = new ArrayList<Text>();
128                        while (values.hasNext())
129                        {
130                                //Text[] arr = (Text[]) ;
131                                Writable[] arr = values.next().get();
132                                for (int i = 0; i < arr.length; i++)
133                                {
134                                        Text t = (Text) arr[i];
135                                        list.add(t);
136                                }
137                        }
138                        ArrayWritable arr = new TextArrayWritable();
139                        arr.set(list.toArray(new Text[0]));
140                        output.collect(key, arr);
141                }
142        }
143
144        public static void run(String input, String output, int k) throws Exception
145        {
146                JobConf conf = new JobConf(TableGen.class);
147                String kstr = "" + k;
148                conf.setJobName("tablegen");
149                conf.setOutputKeyClass(Text.class);
150                conf.setOutputValueClass(TextArrayWritable.class);
151
152                conf.setMapperClass(Map.class);
153                conf.setCombinerClass(Reduce.class);
154                conf.setReducerClass(Reduce.class);
155
156                conf.setInputFormat(TextInputFormat.class);
157                conf.setOutputFormat(TextOutputFormat.class);
158
159                FileInputFormat.setInputPaths(conf, new Path(input));
160                FileOutputFormat.setOutputPath(conf, new Path(new Path(output), kstr));
161
162                JobClient.runJob(conf);
163        }
164
165        public static void main(String[] args) throws Exception
166        {
167                for(int i = 0; i < Config.N_TABLES; i++)
168                {
169                        run(args[0], args[1], i);
170                        System.out.println("LAG:XXXX:XXXX: i=" + i);
171                }
172        }
173}
Note: See TracBrowser for help on using the repository browser.