/** * Medium length for events * * @author cristina */ package org.PP; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class MediumEvents_straight { public static class Map extends MapReduceBase implements Mapper { public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); // skip comments if (!line.startsWith("#")) { // collons 7 and 8 are start, end time of job respectively StringTokenizer tokenizer = new StringTokenizer(line); int count = 0; Double start = new Double(0), end = new Double(0), duration = new Double( 0); while (tokenizer.hasMoreTokens()) { count++; String token = tokenizer.nextToken(); if (count == 7) start = Double.parseDouble(token); if (count == 8) { end = Double.parseDouble(token); // stop, we have all the information break; } } duration = end - start; output.collect(new Text("medium duration"), new DoubleWritable(duration)); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Double sum = new Double(0); Long count = new Long(0); Double med = new Double(0); while (values.hasNext()) { count++; sum += values.next().get(); } if (count > 0) med = sum / count; else med = new Double(0); output.collect(key, new DoubleWritable(med)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MediumEvents_straight.class); conf.setJobName("MediumEvents_straight"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(DoubleWritable.class); conf.setMapperClass(Map.class); //conf.setCombinerClass(Reduce.class); !! there shouldn't be any reducer as it approximates!! conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setNumReduceTasks(5); JobClient.runJob(conf); } }