source: proiecte/PDAD/trunk/endreasonstat/src/ro/pub/cs/pp/pdad/EndReasonStat.java @ 154

Last change on this file since 154 was 154, checked in by (none), 14 years ago

PDAD project

File size: 5.6 KB
Line 
1package ro.pub.cs.pp.pdad;
2
3import java.io.DataInput;
4import java.io.DataOutput;
5import java.io.IOException;
6import java.util.HashMap;
7
8import org.apache.hadoop.fs.Path;
9import org.apache.hadoop.conf.Configuration;
10import org.apache.hadoop.io.Text;
11import org.apache.hadoop.io.IntWritable;
12import org.apache.hadoop.io.WritableComparable;
13import org.apache.hadoop.io.WritableComparator;
14import org.apache.hadoop.mapreduce.Job; 
15import org.apache.hadoop.mapreduce.Mapper;
16import org.apache.hadoop.mapreduce.Reducer;
17import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
19
20public class EndReasonStat {
21
22        private static int[] domains = {-1, 0, 1, 1000, 2000, 3000, 4000, 5000, 6000, 7000};
23        private static String[] reasonName = {"not reported", "undetermined", "infrastructure", "hardware", "IO", "network", "software", "human error", "user"}; 
24       
25        public static class IntPair implements WritableComparable<IntPair> {
26                private int first = 0;
27                private int second = 0;
28
29                /**
30                 * Set the left and right values.
31                 */
32                public void set(int left, int right) {
33                        first = left;
34                        second = right;
35                }
36
37                public int getFirst() {
38                        return first;
39                }
40
41                public int getSecond() {
42                        return second;
43                }
44
45                /**
46                 * Read the two integers. Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE,
47                 * MAX_VALUE-> -1
48                 */
49                @Override
50                public void readFields(DataInput in) throws IOException {
51                        first = in.readInt() + Integer.MIN_VALUE;
52                        second = in.readInt() + Integer.MIN_VALUE;
53                }
54
55                @Override
56                public void write(DataOutput out) throws IOException {
57                        out.writeInt(first - Integer.MIN_VALUE);
58                        out.writeInt(second - Integer.MIN_VALUE);
59                }
60
61                @Override
62                public int hashCode() {
63                        return first * 157 + second;
64                }
65
66                @Override
67                public boolean equals(Object right) {
68                        if (right instanceof IntPair) {
69                                IntPair r = (IntPair) right;
70                                return r.first == first && r.second == second;
71                        } else {
72                                return false;
73                        }
74                }
75
76                /** A Comparator that compares serialized IntPair. */
77                public static class Comparator extends WritableComparator {
78                        public Comparator() {
79                                super(IntPair.class);
80                        }
81
82                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
83                                        int l2) {
84                                return compareBytes(b1, s1, l1, b2, s2, l2);
85                        }
86                }
87
88                static { // register this comparator
89                        WritableComparator.define(IntPair.class, new Comparator());
90                }
91
92                @Override
93                public int compareTo(IntPair o) {
94                        if (first != o.first) {
95                                return first < o.first ? -1 : 1;
96                        } else if (second != o.second) {
97                                return second < o.second ? -1 : 1;
98                        } else {
99                                return 0;
100                        }
101                }
102        }
103
104          public static class Map extends Mapper<Object, Text, IntWritable, IntPair> {
105               
106                private static String null_reason = "NULL"; 
107               
108                private final static IntPair onePair = new IntPair(); 
109                private final static IntWritable zero = new IntWritable(0); 
110                private final static int fieldNo = 9; 
111               
112                private int findDomainCode(int reasonCode) {
113                        for (int domainCode = 0; domainCode < domains.length - 1; domainCode++) {
114                                if (domains[domainCode] <= reasonCode && reasonCode < domains[domainCode + 1])
115                                        return domainCode; 
116                        }
117                        return 0; 
118                }
119               
120                private int extractReasonCode(String line) {
121                        String[] pieces = line.split("\\s+");
122                        if (pieces.length < fieldNo + 1) {
123                                return 0; 
124                        }
125                        String reason = pieces[fieldNo];
126                       
127                        if (reason.equals(null_reason)) {
128                                return 0;                               
129                        } else {
130                                int reasonCode = Integer.parseInt(reason);
131                                return reasonCode; 
132                        }
133                }
134               
135                public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
136                        String line = value.toString(); 
137                        try { 
138                                int reasonCode = extractReasonCode(line);
139                                int reasonDomainCode = findDomainCode(reasonCode); 
140                                onePair.set(reasonCode, 1); 
141                                context.write(new IntWritable(reasonDomainCode), onePair);
142                        } catch (NumberFormatException ex) {
143                                onePair.set(0, 0);
144                                context.write(zero, onePair); 
145                        }
146                       
147                }
148        }
149       
150       
151        public static class Reduce extends Reducer <IntWritable, IntPair, Text, IntWritable> {
152               
153                private Text reason = new Text(); 
154               
155                public void reduce(IntWritable key, Iterable<IntPair> values, Context context) throws IOException, InterruptedException {
156                        /* write the domain name */
157                        reason.set(reasonName[key.get()]); 
158                        context.write(reason, null);
159                       
160                        HashMap<Integer, Integer> reasonSum = new HashMap<Integer, Integer>();
161                        for (IntPair pair : values) {
162                                int reasonCode = pair.getFirst();
163                                if (reasonSum.containsKey(reasonCode)) {
164                                        int newSum = reasonSum.get(reasonCode) + pair.getSecond(); 
165                                        reasonSum.put(reasonCode, newSum); 
166                                } else {
167                                        reasonSum.put(reasonCode, pair.getSecond()); 
168                                }
169                        }
170                       
171                        for (int reasonCode : reasonSum.keySet()) {
172                                reason.set(new String("\t" +reasonCode));
173                                context.write(reason, new IntWritable(reasonSum.get(reasonCode))); 
174                        }
175                }
176        }
177       
178        public static void main(String[] args) throws Exception {
179                Configuration conf = new Configuration(); 
180                Job job = new Job(conf, "EndReasonStat"); 
181                job.setNumReduceTasks(5);
182                job.setJarByClass(EndReasonStat.class);
183               
184                /* set map output key/value classes */
185                job.setMapOutputKeyClass(IntWritable.class); 
186                job.setMapOutputValueClass(IntPair.class); 
187               
188                /* set application output key/value classes */
189                job.setOutputKeyClass(Text.class); 
190                job.setOutputValueClass(IntWritable.class);
191               
192                job.setMapperClass(Map.class);
193                job.setReducerClass(Reduce.class);             
194                /* don't set any combiner class */
195               
196                FileInputFormat.addInputPath(job, new Path(args[0])); 
197                FileOutputFormat.setOutputPath(job, new Path(args[1])); 
198               
199                System.exit(job.waitForCompletion(true) ? 0 : 1);
200        }
201       
202}
203
Note: See TracBrowser for help on using the repository browser.