source: proiecte/PDAD/trunk/mediumevents_s/mapreduce/MediumEvents_straight.java @ 154

Last change on this file since 154 was 154, checked in by (none), 14 years ago

PDAD project

File size: 2.5 KB
Line 
1/**
2 * Medium length for events
3 *
4 * @author cristina
5 */
6
7package org.PP;
8
9import java.io.IOException;
10import java.util.*;
11
12import org.apache.hadoop.fs.Path;
13import org.apache.hadoop.conf.*;
14import org.apache.hadoop.io.*;
15import org.apache.hadoop.mapred.*;
16import org.apache.hadoop.util.*;
17
18public class MediumEvents_straight {
19
20        public static class Map extends MapReduceBase implements
21                        Mapper<LongWritable, Text, Text, DoubleWritable> {
22               
23                public void map(LongWritable key, Text value,
24                                OutputCollector<Text, DoubleWritable> output, Reporter reporter)
25                                throws IOException {
26
27                        String line = value.toString();
28                        // skip comments
29                        if (!line.startsWith("#")) {
30                                // collons 7 and 8 are start, end time of job respectively
31                                StringTokenizer tokenizer = new StringTokenizer(line);
32                                int count = 0;
33                                Double start = new Double(0), end = new Double(0), duration = new Double(
34                                                0);
35                                while (tokenizer.hasMoreTokens()) {
36                                        count++;
37                                        String token = tokenizer.nextToken();
38                                        if (count == 7)
39                                                start = Double.parseDouble(token);
40                                        if (count == 8) {
41                                                end = Double.parseDouble(token);
42                                                // stop, we have all the information
43                                                break;
44                                        }
45                                }
46
47                                duration = end - start;
48                                output.collect(new Text("medium duration"), new DoubleWritable(duration));
49                        }
50                }
51        }
52
53        public static class Reduce extends MapReduceBase implements
54        Reducer<Text, DoubleWritable, Text, DoubleWritable> {
55
56        public void reduce(Text key, Iterator<DoubleWritable> values,
57                        OutputCollector<Text, DoubleWritable> output, Reporter reporter)
58                        throws IOException {
59                Double sum = new Double(0);
60                Long count = new Long(0);
61                Double med = new Double(0);
62       
63                while (values.hasNext()) {
64                        count++;
65                        sum += values.next().get();
66                }
67                if (count > 0)
68                        med = sum / count;
69                else
70                        med = new Double(0);
71                output.collect(key, new DoubleWritable(med));
72       
73        }
74}
75       
76       
77
78        public static void main(String[] args) throws Exception {
79                JobConf conf = new JobConf(MediumEvents_straight.class);
80                conf.setJobName("MediumEvents_straight");
81
82                conf.setOutputKeyClass(Text.class);
83                conf.setOutputValueClass(DoubleWritable.class);
84
85                conf.setMapperClass(Map.class);
86                //conf.setCombinerClass(Reduce.class); !! there shouldn't be any reducer as it approximates!!
87                conf.setReducerClass(Reduce.class);
88
89                conf.setInputFormat(TextInputFormat.class);
90                conf.setOutputFormat(TextOutputFormat.class);
91
92                FileInputFormat.setInputPaths(conf, new Path(args[0]));
93                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
94
95                conf.setNumReduceTasks(5);
96
97                JobClient.runJob(conf);
98        }
99}
Note: See TracBrowser for help on using the repository browser.