source: proiecte/PDAD/trunk/nodeslocation/mapreduce/NodesLocation.java @ 154

Last change on this file since 154 was 154, checked in by (none), 14 years ago

PDAD project

File size: 6.9 KB
Line 
1/**
2 * Number of faults per geographical location.
3 *
4 * @author cristina
5 */
6
7package org.PP;
8
9import java.util.*;
10import java.util.concurrent.atomic.*;
11import java.io.*;
12
13import org.apache.hadoop.fs.Path;
14import org.apache.hadoop.filecache.DistributedCache;
15import org.apache.hadoop.conf.*;
16import org.apache.hadoop.io.*;
17import org.apache.hadoop.mapred.*;
18import org.apache.hadoop.util.*;
19
20public class NodesLocation {
21
22        public static class Map extends MapReduceBase implements
23                        Mapper<LongWritable, Text, Text, Text> {
24
25                static Text valOne = new Text("1");             
26
27                public void map(LongWritable key, Text value,
28                                OutputCollector<Text, Text> output, Reporter reporter)
29                                throws IOException {
30                       
31                        Text label = new Text();
32                        String line = value.toString();
33                        Text valLoc = new Text();
34                       
35
36                        // skip comments
37                        if (!line.startsWith("#")) {
38                       
39                                Integer nodeId = new Integer(0), platformId = new Integer(0);
40                                String nodeLocation = "";
41                                int count = 0;
42
43                                StringTokenizer tokenizer = new StringTokenizer(line);
44                                String token = "";
45                               
46                                if(tokenizer.countTokens() == 9) {
47                                        count = 0;
48                                        // event_trace file
49                                        while (tokenizer.hasMoreTokens()) {
50                                       
51                                                token = tokenizer.nextToken();
52                                                if (count == 2)
53                                                        nodeId = Integer.parseInt(token);
54                                                if (count == 3)
55                                                        platformId = Integer.parseInt(token);
56                                                count++;       
57                                        }
58                                        String skey = nodeId+";"+platformId;
59                                        label.set(skey);
60                                        output.collect(label, valOne);
61                                }
62                                else {
63                                        // nodes file
64                                        tokenizer = new StringTokenizer(line);
65                                        count = 0;
66                                        while (tokenizer.hasMoreTokens()) {
67                                                token = tokenizer.nextToken();
68                                                if(count == 0)
69                                                        nodeId = Integer.parseInt(token);
70                                                if(count == 1)
71                                                        platformId = Integer.parseInt(token);
72                                                if(count == 4) {
73                                                        nodeLocation = token;
74                                                        //System.out.println("locatia citita e "+token);
75                                                        break;
76                                                }
77                                                count++;
78                                        }
79                                        String skey = nodeId+";"+platformId;
80                                        label.set(skey);
81                                        valLoc.set(nodeLocation);
82                                        //System.out.println("generez locatia "+nodeLocation);
83                                        output.collect(label, valLoc);
84                                }
85
86                        }
87                }
88        }
89
90
91        public static class Combine extends MapReduceBase implements
92                        Reducer<Text, Text, Text, Text> {
93
94
95                public void reduce(Text key, Iterator<Text> values,
96                                OutputCollector<Text, Text> output, Reporter reporter)
97                                throws IOException {
98
99                        Long sum = new Long(0);
100                        Text val = new Text();
101                        Text location = new Text();
102
103                        System.out.println("in combiner cheia "+key);
104                        // sum them
105                        while (values.hasNext()) {
106                                String crt = values.next().toString();
107                                try {
108                                        sum += Long.parseLong(crt);
109                                }
110                                catch (Exception e) {
111                                        location.set(crt);
112                                        System.out.println("in combiner am gasit locatia " + crt);
113                                }
114                                /*
115                                if(crt.compareTo("1") == 0) {
116                                        System.out.println("in combiner am gasit 1");
117                                        sum ++;
118                                }
119                                else {
120                                        System.out.println("in combiner am gasit locatia " + crt);
121                                        location.set(crt);
122                                }
123                                */
124                        }
125
126                        //System.out.println("din combiner sum e "+sum);
127                        val.set(sum.toString());
128                        output.collect(key, val);
129                        //output.collect(key, val);
130                        //output.collect(key, location);
131                        if(location.toString().compareTo("") != 0)
132                                output.collect(key, location);
133                        //System.out.println("din combiner generez locatia "+location);
134
135
136                }
137        }
138
139
140        public static class Map2 extends MapReduceBase implements
141                        Mapper<LongWritable, Text, Text, LongWritable> {
142
143                static Text valOne = new Text("1");             
144
145                public void map(LongWritable key, Text value,
146                                OutputCollector<Text, LongWritable> output, Reporter reporter)
147                                throws IOException {
148                       
149                        Text label = new Text();
150                        LongWritable val = new LongWritable();
151                        String line = value.toString();
152                       
153                        Long lval = new Long(0);
154                        String location = "";
155                        StringTokenizer tokenizer = new StringTokenizer(line);
156                        if(tokenizer.hasMoreTokens()) {
157                                location = tokenizer.nextToken();
158                                if(tokenizer.hasMoreTokens()) {
159                                        lval = Long.parseLong(tokenizer.nextToken());
160                                        label.set(location);
161                                        val.set(lval); 
162                                        output.collect(label, val);
163                                }
164                        }
165                       
166                }
167        }
168
169        public static class Reduce extends MapReduceBase implements
170                        Reducer<Text, Text, Text, LongWritable> {
171
172               
173
174                public void reduce(Text key, Iterator<Text> values,
175                                OutputCollector<Text, LongWritable> output, Reporter reporter)
176                                throws IOException {
177
178                        Long sum = new Long(0);
179                        LongWritable s = new LongWritable(0);
180                        Text label = new Text();
181
182                        System.out.println("INCEP cheia "+key);
183
184                        // sum them
185                        while (values.hasNext()) {
186                                String crt = values.next().toString();
187                                try {
188                                       
189                                        sum += Long.parseLong(crt);
190                                        System.out.println("suma e "+sum);
191                                        System.out.println("in reducer adun "+crt);
192                                }
193                                catch (Exception e) {
194                                        System.out.println("in reducer locatia "+crt);
195                                        label.set(crt);
196                                }
197                        }
198                       
199                        System.out.println("in reducer suma totala "+sum);
200                        s.set(sum);
201                        //output.collect(label, s);
202                        output.collect(label, s);       
203                }
204        }
205
206        public static class Reduce2 extends MapReduceBase implements
207                        Reducer<Text, LongWritable, Text, LongWritable> {
208
209               
210                public void reduce(Text key, Iterator<LongWritable> values,
211                                OutputCollector<Text, LongWritable> output, Reporter reporter)
212                                throws IOException {
213
214                        Long sum = new Long(0);
215                        LongWritable s = new LongWritable(0);
216                       
217                        // sum them
218                        while (values.hasNext()) {
219                                sum += values.next().get();
220                        }
221                       
222                        //System.out.println("in reducer suma totala "+sum);
223                        s.set(sum);
224                        //output.collect(label, s);
225                        output.collect(key, s); 
226                }
227        }
228
229        public static void main(String[] args) throws Exception {
230               
231                JobConf conf = new JobConf(NodesLocation.class);
232                conf.setJobName("nodesLocation");
233
234                conf.setMapOutputKeyClass(Text.class); 
235                conf.setMapOutputValueClass(Text.class); 
236
237                conf.setOutputKeyClass(Text.class);
238                conf.setOutputValueClass(LongWritable.class);
239
240                conf.setMapperClass(Map.class);
241                conf.setCombinerClass(Combine.class);
242                conf.setReducerClass(Reduce.class);
243
244                conf.setInputFormat(TextInputFormat.class);
245                conf.setOutputFormat(TextOutputFormat.class);
246
247                FileInputFormat.setInputPaths(conf, new Path(args[0]), new Path(args[2]));
248                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
249
250                conf.setNumReduceTasks(5);
251
252                JobClient.runJob(conf);
253
254// -----
255                JobConf conf2 = new JobConf(NodesLocation.class);
256                conf2.setJobName("nodesLocation_ctd");
257
258                conf2.setMapOutputKeyClass(Text.class); 
259                conf2.setMapOutputValueClass(LongWritable.class);               
260
261                conf2.setOutputKeyClass(Text.class);
262                conf2.setOutputValueClass(LongWritable.class);
263
264                conf2.setMapperClass(Map2.class);
265                conf.setCombinerClass(Reduce2.class);
266                conf2.setReducerClass(Reduce2.class);
267
268                conf2.setInputFormat(TextInputFormat.class);
269                conf2.setOutputFormat(TextOutputFormat.class);
270
271                FileInputFormat.setInputPaths(conf2, new Path(args[1]));
272                FileOutputFormat.setOutputPath(conf2, new Path(args[3]));
273
274                conf2.setNumReduceTasks(1);
275                JobClient.runJob(conf2);
276
277        }
278}
Note: See TracBrowser for help on using the repository browser.