package ro.pub.cs.pp.pdad; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EndReason { private static int[] domains = {-1, 0, 1, 1000, 2000, 3000, 4000, 5000, 6000, 7000}; private static String[] reasonName = {"not reported", "undetermined", "infrastructure", "hardware", "IO", "network", "software", "human error", "user"}; public static class Map extends Mapper { private static String null_reason = "NULL"; private final static IntWritable one = new IntWritable(1); private final static IntWritable zero = new IntWritable(0); private final static int fieldNo = 9; private int findDomainCode(int reasonCode) { for (int domainCode = 0; domainCode < domains.length - 1; domainCode++) { if (domains[domainCode] <= reasonCode && reasonCode < domains[domainCode + 1]) return domainCode; } return 0; } private int extractReasonDomainCode(String line) { String[] pieces = line.split("\\s+"); if (pieces.length < fieldNo + 1) { return 0; } String reason = pieces[fieldNo]; if (reason.equals(null_reason)) { return 0; } else { int reasonCode = Integer.parseInt(reason); return findDomainCode(reasonCode); } } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); try { int reasonDomain = extractReasonDomainCode(line); context.write(new IntWritable(reasonDomain), one); } catch (NumberFormatException ex) { context.write(zero, zero); } } } public static class Reduce extends Reducer { private Text reasonDomain = new Text(); public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable cnt : values) { sum += cnt.get(); } reasonDomain.set(reasonName[key.get()]); context.write(reasonDomain, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "EndReason"); job.setNumReduceTasks(5); job.setJarByClass(EndReason.class); /* set map output key/value classes */ job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); /* set application output key/value classes */ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); /* don't set any combiner class */ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }