source: proiecte/HadoopA51/src/java/ro/pub/cs/pp/a51hadoop/table/TableGen.java @ 166

Last change on this file since 166 was 166, checked in by (none), 14 years ago

HadoopA51: imported git repository from github

File size: 4.2 KB
Line 
1package ro.pub.cs.pp.a51hadoop.table;
2
3import ro.pub.cs.pp.a51hadoop.config.Config;
4import ro.pub.cs.pp.a51hadoop.common.*;
5import ro.pub.cs.pp.a51hadoop.algorithm.*;
6import ro.pub.cs.pp.a51hadoop.algorithm.testing.*;
7
8import java.io.IOException;
9import java.util.*;
10
11import org.apache.hadoop.fs.Path;
12import org.apache.hadoop.conf.*;
13import org.apache.hadoop.io.*;
14import org.apache.hadoop.mapred.*;
15import org.apache.hadoop.util.*;
16
17public class TableGen
18{
19
20        public static class R
21        {
22                private int i;
23                private int k;
24                public R(int i, int k)
25                {
26                        this.i = i;
27                        this.k = k;
28                }
29
30                public String apply(String strX)
31                {
32                        try {
33                                int x = Integer.parseInt(strX);
34                                int last_digit = 0;
35                                switch (i)
36                                {
37                                case 0:
38                                        last_digit = i % 3;
39                                        break;
40                                case 1:
41                                        last_digit = i % 3 + 3;
42                                        break;
43                                case 2:
44                                        last_digit = i % 3 + 6;
45                                        break;
46                                default:
47                                        last_digit = 9;
48                                        break;
49                                }
50                                x = x / 10 * 10 + last_digit;
51
52                                x = x ^ k;
53
54                                String ret = "" + x;
55                                while(ret.length() < strX.length())
56                                        ret = "0" + ret;
57                                if (ret.length() > strX.length())
58                                        ret = ret.substring(0, strX.length());
59                                return ret;
60                        } catch (Exception e) {
61                                return strX;
62                        }
63                }
64        }
65
66        public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, ArrayWritable>
67        {
68                private HashReducer[] r;
69                private Hashfn hashfn;
70
71                public void create_testing_problem(int nr_reducers, int id)
72                {
73                        hashfn = new DigitHashfn();
74                        r = new DigitReducerGenerator().generate(nr_reducers, id);
75                }
76
77                public void create_a51_problem(int nr_reducers, int id)
78                {
79                        throw new RuntimeException("a51 not implemented");
80                }
81
82                public void configure(JobConf job)
83                {
84                        int nr_reducers = Config.N_R;
85                        int k = job.getInt("mapred.mapper.table_id", 0);
86                        int use_a51 = job.getInt("mapred.mapper.use_a51", 0);
87                        if (use_a51 != 0)
88                                create_a51_problem(nr_reducers, k);
89                        else
90                                create_testing_problem(nr_reducers, k);
91                }
92
93                public String applyAll(String txt)
94                {
95                        for (int i = 0; i < txt.length(); i++)
96                        {
97                                txt = r[i].apply(hashfn.hashfn(txt));
98                        }
99                        return txt;
100                }
101
102                public void map(LongWritable key, Text value, OutputCollector<Text, ArrayWritable> output, Reporter reporter) throws IOException
103                {
104
105                        String line = value.toString();
106                        StringTokenizer tokenizer = new StringTokenizer(line);
107                        while (tokenizer.hasMoreTokens())
108                        {
109                                String wordStr = tokenizer.nextToken();
110                                String hashStr = applyAll(wordStr);
111                                Text word = new Text();
112                                Text hash = new Text();
113                                List<Text> list = new ArrayList<Text>();
114
115                                word.set(wordStr);
116                                hash.set(hashStr);
117                                list.add(hash);
118                                ArrayWritable arr = new TextArrayWritable();
119                                arr.set(list.toArray(new Text[0]));
120
121                                output.collect(word, arr);
122                        }
123                }
124        }
125
126        public static class Reduce extends MapReduceBase implements Reducer<Text, ArrayWritable, Text, ArrayWritable>
127        {
128
129                public void reduce(Text key, Iterator<ArrayWritable> values, OutputCollector<Text, ArrayWritable> output, Reporter reporter) throws IOException
130                {
131                        List<Text> list = new ArrayList<Text>();
132                        while (values.hasNext())
133                        {
134                                //Text[] arr = (Text[]) ;
135                                Writable[] arr = values.next().get();
136                                for (int i = 0; i < arr.length; i++)
137                                {
138                                        Text t = (Text) arr[i];
139                                        list.add(t);
140                                }
141                        }
142                        ArrayWritable arr = new TextArrayWritable();
143                        arr.set(list.toArray(new Text[0]));
144                        output.collect(key, arr);
145                }
146        }
147
148        public static void run(String input, String output, int k) throws Exception
149        {
150                JobConf conf = new JobConf(TableGen.class);
151                String kstr = "" + k;
152                conf.setJobName("tablegen");
153                conf.setOutputKeyClass(Text.class);
154                conf.setOutputValueClass(TextArrayWritable.class);
155
156                conf.setMapperClass(Map.class);
157                conf.setCombinerClass(Reduce.class);
158                conf.setReducerClass(Reduce.class);
159
160                conf.setInputFormat(TextInputFormat.class);
161                conf.setOutputFormat(TextOutputFormat.class);
162
163                FileInputFormat.setInputPaths(conf, new Path(input));
164                FileOutputFormat.setOutputPath(conf, new Path(new Path(output), kstr));
165
166                JobClient.runJob(conf);
167        }
168
169        public static void main(String[] args) throws Exception
170        {
171                for(int i = 0; i < Config.N_TABLES; i++)
172                {
173                        run(args[0], args[1], i);
174                        System.out.println("LAG:XXXX:XXXX: i=" + i);
175                }
176        }
177}
Note: See TracBrowser for help on using the repository browser.