package ro.pub.cs.pp.pdad; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ComponentType { public static class Map extends Mapper { private final static IntWritable one = new IntWritable(1); private final static IntWritable zero = new IntWritable(0); private int extractComponentType(String line) { String[] pieces = line.split("\\s+"); return Integer.parseInt(pieces[6]); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); try { int componentType = extractComponentType(line); context.write(new IntWritable(componentType), one); } catch (NumberFormatException ex) { context.write(zero, zero); } } } public static class Reduce extends Reducer { public static String[] compTypeMap = {"host", "CPU", "IO", "network", "memory", "software"}; private Text compType = new Text(); public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable cnt : values) { sum += cnt.get(); } compType.set(compTypeMap[key.get()]); context.write(compType, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "ComponentType"); job.setNumReduceTasks(5); job.setJarByClass(ComponentType.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); /* don't set any combiner class */ job.setReducerClass(Reduce.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }