source: proiecte/PDAD/trunk/mediumevents/mapreduce/MediumEvents.java @ 154

Last change on this file since 154 was 154, checked in by (none), 14 years ago

PDAD project

File size: 4.6 KB
Line 
1/**
2 * Medium length for events
3 *
4 * @author cristina
5 */
6
7package org.PP;
8
9import java.io.IOException;
10import java.util.*;
11
12import org.apache.hadoop.fs.Path;
13import org.apache.hadoop.conf.*;
14import org.apache.hadoop.io.*;
15import org.apache.hadoop.mapred.*;
16import org.apache.hadoop.util.*;
17
18public class MediumEvents {
19
20        public static class Map extends MapReduceBase implements
21                        Mapper<LongWritable, Text, Text, DoubleWritable> {
22                private static Text label = new Text();
23                static long anotherLabel = 0; // groups data by generating a new lable
24                                                                                // each 1000 entries
25                static long labels = 0; // number of keys generated
26
27                public enum MapDuration {
28                        AHAM
29                };
30
31                public void map(LongWritable key, Text value,
32                                OutputCollector<Text, DoubleWritable> output, Reporter reporter)
33                                throws IOException {
34
35                        Map.label.set("medium duration " + Map.labels);
36
37                        if (Map.anotherLabel == 1000) {
38                                Map.anotherLabel = 0;
39                                Map.labels++;
40
41                                // reporter.setStatus("map " + label.toString());
42                                reporter.incrCounter(MapDuration.AHAM, 1);
43                        }
44
45                        String line = value.toString();
46                        // skip comments
47                        if (!line.startsWith("#")) {
48                                // collons 7 and 8 are start, end time of job respectively
49                                StringTokenizer tokenizer = new StringTokenizer(line);
50                                int count = 0;
51                                Double start = new Double(0), end = new Double(0), duration = new Double(
52                                                0);
53                                anotherLabel++;
54                                while (tokenizer.hasMoreTokens()) {
55                                        count++;
56                                        String token = tokenizer.nextToken();
57                                        if (count == 7)
58                                                start = Double.parseDouble(token);
59                                        if (count == 8) {
60                                                end = Double.parseDouble(token);
61                                                // stop, we have all the information
62                                                break;
63                                        }
64                                }
65
66                                duration = end - start;
67                                output.collect(label, new DoubleWritable(duration));
68                        }
69                }
70        }
71
72        public static class Combiner extends MapReduceBase implements
73                        Reducer<Text, DoubleWritable, Text, DoubleWritable> {
74
75                public void reduce(Text key, Iterator<DoubleWritable> values,
76                                OutputCollector<Text, DoubleWritable> output, Reporter reporter)
77                                throws IOException {
78                        Double sum = new Double(0);
79                        Long count = new Long(0);
80                        Double med = new Double(0);
81
82                        while (values.hasNext()) {
83                                count++;
84                                sum += values.next().get();
85                        }
86                        if (count > 0)
87                                med = sum / count;
88                        else
89                                med = new Double(0);
90                        output.collect(new Text("medium duration"), new DoubleWritable(med));
91
92                }
93        }
94
95        public static class Reduce extends MapReduceBase implements
96        Reducer<Text, DoubleWritable, Text, DoubleWritable> {
97
98        public void reduce(Text key, Iterator<DoubleWritable> values,
99                        OutputCollector<Text, DoubleWritable> output, Reporter reporter)
100                        throws IOException {
101                Double sum = new Double(0);
102                Long count = new Long(0);
103                Double med = new Double(0);
104       
105                while (values.hasNext()) {
106                        count++;
107                        sum += values.next().get();
108                }
109                if (count > 0)
110                        med = sum / count;
111                else
112                        med = new Double(0);
113                output.collect(key, new DoubleWritable(med));
114       
115        }
116}
117       
118        /*
119         * public static class Reduce extends MapReduceBase implements Reducer<Text,
120         * DoubleWritable, Text, DoubleWritable> {
121         *
122         * static Text label = new Text(); static long anotherLabel = 0; // groups
123         * data by generating a new lable each 1.000.000 entries static long labels
124         * = 0; // number of keys generated
125         *
126         * public enum RedDuration { AHAM };
127         *
128         *
129         * public void reduce(Text key, Iterator<DoubleWritable> values,
130         * OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws
131         * IOException { Double sum = new Double(0); Long count = new Long(0);
132         * Double med = new Double(0);
133         *
134         * Reduce.label.set("medium duration "+Reduce.labels);
135         *
136         * if(Reduce.anotherLabel == 100) { Reduce.anotherLabel = 0;
137         * Reduce.labels++;
138         *
139         * //reporter.setStatus("map " + label.toString());
140         * reporter.incrCounter(RedDuration.AHAM, 1); }
141         *
142         * Reduce.anotherLabel++;
143         *
144         *
145         * while (values.hasNext()) { count++; sum += values.next().get(); }
146         * if(count > 0) med = sum/count; else med = new Double(0);
147         * output.collect(Reduce.label, new DoubleWritable(med));
148         *
149         * } }
150         */
151
152        public static void main(String[] args) throws Exception {
153                JobConf conf = new JobConf(MediumEvents.class);
154                conf.setJobName("mediumEvents");
155
156                conf.setOutputKeyClass(Text.class);
157                conf.setOutputValueClass(DoubleWritable.class);
158
159                conf.setMapperClass(Map.class);
160                conf.setCombinerClass(Combiner.class);
161                conf.setReducerClass(Reduce.class);
162
163                conf.setInputFormat(TextInputFormat.class);
164                conf.setOutputFormat(TextOutputFormat.class);
165
166                FileInputFormat.setInputPaths(conf, new Path(args[0]));
167                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
168
169                conf.setNumReduceTasks(5);
170
171                JobClient.runJob(conf);
172        }
173}
Note: See TracBrowser for help on using the repository browser.