/** * Medium length for events * * @author cristina */ package org.PP; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class MediumEvents { public static class Map extends MapReduceBase implements Mapper { private static Text label = new Text(); static long anotherLabel = 0; // groups data by generating a new lable // each 1000 entries static long labels = 0; // number of keys generated public enum MapDuration { AHAM }; public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { Map.label.set("medium duration " + Map.labels); if (Map.anotherLabel == 1000) { Map.anotherLabel = 0; Map.labels++; // reporter.setStatus("map " + label.toString()); reporter.incrCounter(MapDuration.AHAM, 1); } String line = value.toString(); // skip comments if (!line.startsWith("#")) { // collons 7 and 8 are start, end time of job respectively StringTokenizer tokenizer = new StringTokenizer(line); int count = 0; Double start = new Double(0), end = new Double(0), duration = new Double( 0); anotherLabel++; while (tokenizer.hasMoreTokens()) { count++; String token = tokenizer.nextToken(); if (count == 7) start = Double.parseDouble(token); if (count == 8) { end = Double.parseDouble(token); // stop, we have all the information break; } } duration = end - start; output.collect(label, new DoubleWritable(duration)); } } } public static class Combiner extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Double sum = new Double(0); Long count = new Long(0); Double med = new Double(0); while (values.hasNext()) { count++; sum += values.next().get(); } if (count > 0) med = sum / count; else med = new Double(0); output.collect(new Text("medium duration"), new DoubleWritable(med)); } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Double sum = new Double(0); Long count = new Long(0); Double med = new Double(0); while (values.hasNext()) { count++; sum += values.next().get(); } if (count > 0) med = sum / count; else med = new Double(0); output.collect(key, new DoubleWritable(med)); } } /* * public static class Reduce extends MapReduceBase implements Reducer { * * static Text label = new Text(); static long anotherLabel = 0; // groups * data by generating a new lable each 1.000.000 entries static long labels * = 0; // number of keys generated * * public enum RedDuration { AHAM }; * * * public void reduce(Text key, Iterator values, * OutputCollector output, Reporter reporter) throws * IOException { Double sum = new Double(0); Long count = new Long(0); * Double med = new Double(0); * * Reduce.label.set("medium duration "+Reduce.labels); * * if(Reduce.anotherLabel == 100) { Reduce.anotherLabel = 0; * Reduce.labels++; * * //reporter.setStatus("map " + label.toString()); * reporter.incrCounter(RedDuration.AHAM, 1); } * * Reduce.anotherLabel++; * * * while (values.hasNext()) { count++; sum += values.next().get(); } * if(count > 0) med = sum/count; else med = new Double(0); * output.collect(Reduce.label, new DoubleWritable(med)); * * } } */ public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MediumEvents.class); conf.setJobName("mediumEvents"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(DoubleWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Combiner.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setNumReduceTasks(5); JobClient.runJob(conf); } }