import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.File; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.PrintWriter; import java.lang.Thread; import java.net.ServerSocket; import java.net.Socket; import java.net.URL; import java.net.URLClassLoader; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Enumeration; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.junit.runner.Description; import org.junit.runner.JUnitCore; import org.junit.runner.Request; import org.junit.runner.Result; import org.junit.runner.Runner; import org.junit.runner.manipulation.Filter; import org.junit.runner.manipulation.NoTestsRemainException; import org.junit.runner.notification.Failure; import org.junit.runner.notification.RunNotifier; import org.junit.runners.JUnit4; public class HadoopRunner extends Runner { /* TODO(mgruber): Place host port pair into the xml config, * under junit.classloader. */ private static final int HADOOP_LOADER_PORT = 9002; private static final String HADOOP_LOADER_HOST = "imac.local"; private static Thread resourceServer; public static class JUnitMapper extends Mapper { Configuration conf; private Text word = new Text(); private static ClassLoader loader; public class JUnitClassLoader extends ClassLoader { FileSystem hdfs; String host; int port; public JUnitClassLoader(String host, int port) { try { this.host = host; this.port = port; String location = this.getClass().getProtectionDomain(). getCodeSource(). getLocation(). getFile(); hdfs = new Path(location).getFileSystem(conf); } catch (Exception e) { e.printStackTrace(); } } /*! Connect to master, and request class file. */ public Class findClass(String name) { try { /* Create a new connection. */ Socket socket = new Socket(host, port); PrintWriter writer = new PrintWriter(socket. getOutputStream()); BufferedReader reader = new BufferedReader(new InputStreamReader(socket. getInputStream())); /* Send the class name. */ writer.println(name); writer.flush(); /* Get the location of the class. */ String resourceFile = reader.readLine(); if (resourceFile.endsWith(".jar")) { throw new Exception("Not implemented"); } else { /* Open the class file from hdfs. */ Path resourcePath = new Path(resourceFile); DataInputStream input = hdfs.open(resourcePath); long length = resourcePath.getFileSystem(conf). getFileStatus(resourcePath).getLen(); byte b[] = new byte[(int) length]; input.readFully(b); socket.close(); return defineClass(name, b, 0, b.length); } } catch (Exception e) { /* Nothing to do. */ e.printStackTrace(); } return null; } }; @Override protected void setup(Mapper.Context context) { conf = context.getConfiguration(); loader = new JUnitClassLoader(HADOOP_LOADER_HOST, HADOOP_LOADER_PORT); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); if (tokenizer.countTokens() != 2) throw new IOException(); /* The first token is a org.junit.runner.Description for the * test class, and the second for the test method. */ String className = tokenizer.nextToken(); final String methodDescription = tokenizer.nextToken(); try { Class testClass = loader.loadClass(className); /* Create a local JUnit runner for this class. */ JUnit4 runner = new JUnit4(testClass); runner.filter( new Filter() { @Override public boolean shouldRun(Description description) { return methodDescription.equals(description. toString()); } @Override public String describe() { return null; } } ); /* Run with local runner. */ Result result = new JUnitCore().run(runner); /* Since we filter out a single test, we can assert the * following: */ assert result.getFailures().size() < 2; /* Collect the result in a list. If the list is empty, * no failures occured. We do this, since Result is not * serializable. */ ArrayList outcome = new ArrayList(); ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); ObjectOutputStream objectStream = new ObjectOutputStream(byteStream); for (Failure failure: result.getFailures()) { Throwable exception = failure.getException(); /* Serialize the exception into a byte array. */ outcome.add(exception); } objectStream.writeObject(outcome); /* Get the backing byte array. */ byte[] bytes = byteStream.toByteArray(); context.write(new Text(methodDescription), new Text(Base64.byteArrayToBase64(bytes))); } catch (NoTestsRemainException e) { System.out.println(e.toString()); } catch (Exception e) { /* TODO: Output something in the mapper. */ e.printStackTrace(); throw new IOException(); } } } public static class IdentityReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println(key); /* TODO: Make better use of the aggregated failure descriptions. */ for (Text value: values) { context.write(key, value); System.out.println(value); } } } Class testClass; private final Runner delegate; private final String HADOOP_RUNNER = "hadoop_runner"; private final String TMP_FILE_NAME = "junit_tests"; private final String OUTPUT_DIR_NAME = "junit_results"; private final String OUTPUT_FILE_PREFIX = "part-r-"; private final String JAR_FILE_NAME = "junit-runner-0.1.jar"; public Description getDescription() { return delegate.getDescription(); } public void doMapReduce(Path in, Path out) throws Exception { Configuration conf = new Configuration(); /* Add the JUnit jar to the distributed cache, for the mappers. */ Job job = new Job(conf, HADOOP_RUNNER); /* NOTE: File has been already copied to hdfs, the Path refers to * a hdfs location. */ DistributedCache.addFileToClassPath(new Path(JUnit4.class. getProtectionDomain(). getCodeSource(). getLocation(). getFile()), job.getConfiguration()); ((JobConf) (job.getConfiguration())).setJar(JAR_FILE_NAME); job.setMapperClass(JUnitMapper.class); job.setReducerClass(IdentityReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, in); TextOutputFormat.setOutputPath(job, out); job.waitForCompletion(true); } private Description lookupDescription(String desc) { for (Description d: delegate.getDescription().getChildren()) { if (desc.equals(d.toString())) return d; } /* Unreachable */ return null; } public void run(RunNotifier notifier) { try { File in = File.createTempFile(TMP_FILE_NAME, null); PrintWriter writer = new PrintWriter(new FileWriter(in)); for (Description d: delegate.getDescription().getChildren()) { notifier.fireTestStarted(d); /* Write the description of the test class and the method. */ writer.println(delegate.getDescription() + "\t" + d); /* TODO: Replace this separator by a constant. */ } writer.close(); /* Output is a folder. */ File out = File.createTempFile(OUTPUT_DIR_NAME, ""); out.delete(); FileSystem hdfs = new Path(in.getName()).getFileSystem(new Configuration()); /* TODO(mgruber): Copy on DFS. */ hdfs.copyFromLocalFile(new Path(in.getCanonicalPath()), new Path(in.getName())); /* TODO(mgruber): Refactor. Copy the junit jar to the lib folder. */ URL jUnitJarLocation = JUnit4.class.getProtectionDomain(). getCodeSource(). getLocation(); hdfs.copyFromLocalFile(new Path(jUnitJarLocation.getPath()), new Path(jUnitJarLocation.getFile())); /* Do the stuff. */ doMapReduce(new Path(in.getName()), new Path(out.getName())); hdfs.copyToLocalFile(new Path(out.getName()), new Path(out.getCanonicalPath())); /* Get the results, and notify the JUnit master. */ for (File f: out.listFiles()) { if (!f.getName().startsWith(OUTPUT_FILE_PREFIX)) continue; BufferedReader reader = new BufferedReader(new FileReader(f)); while (true) { String result = reader.readLine(); if (result == null) /* EOF */ break; /* Each line of output is a pair (Class, List). */ StringTokenizer tokenizer = new StringTokenizer(result, "\t"); assert tokenizer.countTokens() == 2; String methodDescription = tokenizer.nextToken(); String outcomeDescription = tokenizer.nextToken(); result.substring(result.indexOf(' ') + 1); ArrayList outcome = null; try { /* Decode the failure description, and create a * Failure object. */ byte[] bytes = Base64.base64ToByteArray(outcomeDescription); ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes); ObjectInputStream objectStream = new ObjectInputStream(byteStream); outcome = (ArrayList) objectStream.readObject(); } catch (Exception e) { /* TODO: Handle the exception. */ e.printStackTrace(); assert false; } Description d = lookupDescription(methodDescription); assert d != null; if (outcome.size() > 0) { notifier.fireTestFailure(new Failure(d, outcome.get(0))); } notifier.fireTestFinished(d); } } } catch (Exception e) { e.printStackTrace(); } } public HadoopRunner(Class testClass) throws Exception { this.testClass = testClass; this.delegate = new JUnit4(testClass); Thread server = new Thread() { @Override public void run() { try { ServerSocket server = new ServerSocket(HADOOP_LOADER_PORT); while (true) { Socket client = server.accept(); BufferedReader reader = new BufferedReader(new InputStreamReader(client.getInputStream())); String className = reader.readLine(); PrintWriter writer = new PrintWriter(client. getOutputStream()); /* Find the requested class, and place it in a hdfs * location. */ try { /* Copy to hdfs. */ Class req = Class.forName(className); String location = req.getProtectionDomain(). getCodeSource(). getLocation(). getFile(); File classFile = new File(location); if (classFile.isDirectory()) { /* Name the class. */ location += className + ".class"; } else { /* Class came from a jar. */ assert location.endsWith(".jar"); } FileSystem hdfs = new Path(location).getFileSystem(new Configuration()); /* TODO: Clear this location at job start, and * don't overwrite during subsequent calls. */ hdfs.copyFromLocalFile(false, true, new Path(location), new Path(location)); writer.println(location); writer.flush(); } catch (Exception e) { /* Nothing to do, class was not found. */ e.printStackTrace(); } client.close(); } } catch (Exception e) { e.printStackTrace(); } } }; /* We only need one instance of the resource server per process. */ synchronized (this) { if (resourceServer == null) { resourceServer = server; server.start(); } } } }