import java.io.IOException; import java.util.*; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.data.DataBag; import org.apache.pig.data.DefaultBagFactory; import org.apache.pig.data.DefaultTupleFactory; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.FrontendException; public class RealLabels extends EvalFunc { public static HashMap failureCause = new HashMap(); public static HashMap durationGroup = new HashMap(); public static void init() { // failure cause failureCause.put(new Double(-1), "not reported"); failureCause.put(new Double(0), "reported as undetermined"); failureCause.put(new Double(999), "infrastructure"); failureCause.put(new Double(1999), "hardware"); failureCause.put(new Double(2999), "IO"); failureCause.put(new Double(3999), "network"); failureCause.put(new Double(4999), "software"); failureCause.put(new Double(5999), "human error"); failureCause.put(new Double(6999), "user"); failureCause.put(new Double(7000), "end of measurement"); // group by duration durationGroup.put(new Double(1000), "short"); durationGroup.put(new Double(100000), "medium"); durationGroup.put(Double.MAX_VALUE, "long"); } @Override public DataBag exec(Tuple input) throws IOException { RealLabels.init(); if (input == null || input.size() == 0) return null; try{ Double duration = (Double)input.get(0); //System.out.println("duration="+duration); Double failure = new Double(0); if(duration != null && failure != null) { String f = new String(input.get(1).toString()); StringTokenizer st = new StringTokenizer(f); String token = st.nextToken(); if(token.compareTo("NULL") == 0) failure = -1.0; else failure = Double.parseDouble(token); //System.out.println("failure="+failure); String durationClass = "", faultClass = ""; // get duration class LinkedList ll1 = new LinkedList( RealLabels.durationGroup.keySet()); Collections.sort(ll1); Iterator iterator1 = ll1.iterator(); while (iterator1.hasNext()) { double mkey = (Double) iterator1.next(); if (duration <= mkey) { String mvalue = (String) RealLabels.durationGroup.get(mkey); durationClass = mvalue; break; } } // get fault class LinkedList ll2 = new LinkedList( RealLabels.failureCause.keySet()); Collections.sort(ll2); Iterator iterator2 = ll2.iterator(); while (iterator2.hasNext()) { double mkey = (Double) iterator2.next(); if (failure <= mkey) { String mvalue = (String) RealLabels.failureCause.get(mkey); faultClass = mvalue; break; } } if(faultClass.compareTo("") == 0) faultClass = "TYPING"; DataBag output = DefaultBagFactory.getInstance().newDefaultBag(); Tuple t = DefaultTupleFactory.getInstance().newTuple(1); String label = durationClass + " event - " + faultClass+ " fault"; //System.out.println("label="+label); t.set(0, label); output.add(t); return output; } return null; }catch(Exception e){ System.err.println("RealLabelsr: failed to process input; error - " + e.getMessage()); e.printStackTrace(); return null; } } }