/** * Frequent (>1000) failure cause for different categories of events grouped by duration (short, medium, long) * * @author cristina */ package org.PP; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class FailureCause { public static class Map extends MapReduceBase implements Mapper { // Usage: a value that is greater than the previous key in a map but not // greater than the current key is classified as the current value public static HashMap failureCause = new HashMap(); public static HashMap durationGroup = new HashMap(); public static void init() { // failure cause failureCause.put(new Integer(-1), "not reported"); failureCause.put(new Integer(0), "reported as undetermined"); failureCause.put(new Integer(999), "infrastructure"); failureCause.put(new Integer(1999), "hardware"); failureCause.put(new Integer(2999), "IO"); failureCause.put(new Integer(3999), "network"); failureCause.put(new Integer(4999), "software"); failureCause.put(new Integer(5999), "human error"); failureCause.put(new Integer(6999), "user"); failureCause.put(new Integer(7000), "end of measurement"); // group by duration durationGroup.put(new Integer(1000), "short"); durationGroup.put(new Integer(100000), "medium"); durationGroup.put(Integer.MAX_VALUE, "long"); } public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { Map.init(); Text label = new Text(); String line = value.toString(); // skip comments if (!line.startsWith("#")) { Double start = new Double(0), end = new Double(0), duration = new Double( 0); Integer fault = new Integer(0); int count = 0; // collons 7, 8 and 9 are start, end time of job, failure code // respectively StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { count++; String token = tokenizer.nextToken(); if (count == 7) start = Double.parseDouble(token); if (count == 8) end = Double.parseDouble(token); if (count == 9) { if (token.compareTo("NULL") == 0) { fault = -1; } else fault = Integer.parseInt(token); } } String durationClass = "", faultClass = ""; // get duration class duration = end - start; LinkedList ll1 = new LinkedList( Map.durationGroup.keySet()); Collections.sort(ll1); Iterator iterator1 = ll1.iterator(); while (iterator1.hasNext()) { int mkey = (Integer) iterator1.next(); if (duration <= mkey) { String mvalue = (String) Map.durationGroup.get(mkey); durationClass = mvalue; break; } } // get fault class LinkedList ll2 = new LinkedList( Map.failureCause.keySet()); Collections.sort(ll2); Iterator iterator2 = ll2.iterator(); while (iterator2.hasNext()) { int mkey = (Integer) iterator2.next(); if (fault <= mkey) { String mvalue = (String) Map.failureCause.get(mkey); faultClass = mvalue; break; } } if(faultClass.compareTo("") == 0) faultClass = "TYPING"; label = new Text(durationClass + " event - " + faultClass + " fault"); // map output.collect(label, new LongWritable(1)); } } } public static class Combine extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Long sum = new Long(0); // sum them while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new LongWritable(sum)); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Long sum = new Long(0); // sum them while (values.hasNext()) { sum += values.next().get(); } if(sum > 1000) output.collect(key, new LongWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(FailureCause.class); conf.setJobName("failureCause"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Combine.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setNumReduceTasks(5); JobClient.runJob(conf); } }