source: proiecte/HadoopJUnit/src/HadoopRunner.java @ 142

Last change on this file since 142 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 15.4 KB
Line 
1import java.io.BufferedReader;
2import java.io.ByteArrayInputStream;
3import java.io.ByteArrayOutputStream;
4import java.io.DataInputStream;
5import java.io.File;
6import java.io.File;
7import java.io.FileReader;
8import java.io.FileWriter;
9import java.io.IOException;
10import java.io.InputStreamReader;
11import java.io.ObjectInputStream;
12import java.io.ObjectOutputStream;
13import java.io.PrintWriter;
14import java.lang.Thread;
15import java.net.ServerSocket;
16import java.net.Socket;
17import java.net.URL;
18import java.net.URLClassLoader;
19import java.net.URLDecoder;
20import java.util.ArrayList;
21import java.util.Enumeration;
22import java.util.StringTokenizer;
23import org.apache.hadoop.conf.Configuration;
24import org.apache.hadoop.filecache.DistributedCache;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.Path;
27import org.apache.hadoop.io.Text;
28import org.apache.hadoop.mapred.JobConf;
29import org.apache.hadoop.mapreduce.Job;
30import org.apache.hadoop.mapreduce.Mapper;
31import org.apache.hadoop.mapreduce.Reducer;
32import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
33import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
34import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
35import org.junit.runner.Description;
36import org.junit.runner.JUnitCore;
37import org.junit.runner.Request;
38import org.junit.runner.Result;
39import org.junit.runner.Runner;
40import org.junit.runner.manipulation.Filter;
41import org.junit.runner.manipulation.NoTestsRemainException;
42import org.junit.runner.notification.Failure;
43import org.junit.runner.notification.RunNotifier;
44import org.junit.runners.JUnit4;
45
46
47public class HadoopRunner extends Runner {
48
49    /* TODO(mgruber): Place host port pair into the xml config,
50     * under junit.classloader.
51     */
52    private static final int HADOOP_LOADER_PORT = 9002;
53    private static final String HADOOP_LOADER_HOST = "imac.local";
54    private static Thread resourceServer;
55
56    public static class JUnitMapper
57        extends Mapper<Object, Text, Text, Text>
58    {
59        Configuration conf;
60        private Text word = new Text();
61        private static ClassLoader loader;
62
63        public class JUnitClassLoader extends ClassLoader {
64            FileSystem hdfs;
65
66            String host;
67            int port;
68
69            public JUnitClassLoader(String host, int port) {
70                try {
71                    this.host = host;
72                    this.port = port;
73                    String location = this.getClass().getProtectionDomain().
74                        getCodeSource().
75                        getLocation().
76                        getFile();
77                    hdfs = new Path(location).getFileSystem(conf);
78                }
79                catch (Exception e) {
80                    e.printStackTrace();
81                }
82            }
83            /*! Connect to master, and request class file. */
84            public Class findClass(String name) {
85                try {
86                    /* Create a new connection. */
87                    Socket socket = new Socket(host, port);
88                    PrintWriter writer = new PrintWriter(socket.
89                                                         getOutputStream());
90                    BufferedReader reader =
91                        new BufferedReader(new InputStreamReader(socket.
92                                                             getInputStream()));
93                    /* Send the class name. */
94                    writer.println(name);
95                    writer.flush();
96                    /* Get the location of the class. */
97                    String resourceFile = reader.readLine();
98                    if (resourceFile.endsWith(".jar")) {
99                        throw new Exception("Not implemented");
100                    }
101                    else {
102                        /* Open the class file from hdfs. */
103                        Path resourcePath = new Path(resourceFile);
104                        DataInputStream input = hdfs.open(resourcePath);
105                        long length = resourcePath.getFileSystem(conf).
106                            getFileStatus(resourcePath).getLen();
107                        byte b[] = new byte[(int) length];
108                        input.readFully(b);
109                        socket.close();
110                        return defineClass(name, b, 0, b.length);
111                    }
112                }
113                catch (Exception e) {
114                    /* Nothing to do. */
115                    e.printStackTrace();
116                }
117                return null;
118            }
119        };
120
121        @Override
122        protected void setup(Mapper.Context context) {
123            conf = context.getConfiguration();
124            loader = new JUnitClassLoader(HADOOP_LOADER_HOST,
125                                          HADOOP_LOADER_PORT);
126        }
127
128        public void map(Object key, Text value, Context context)
129            throws IOException, InterruptedException {
130                StringTokenizer tokenizer = 
131                    new StringTokenizer(value.toString());
132                if (tokenizer.countTokens() != 2)
133                    throw new IOException();
134                /* The first token is a org.junit.runner.Description for the
135                 * test class, and the second for the test method.
136                 */
137                String className = tokenizer.nextToken();
138                final String methodDescription = tokenizer.nextToken();
139                try {
140                    Class testClass = loader.loadClass(className);
141                    /* Create a local JUnit runner for this class. */
142                    JUnit4 runner = new JUnit4(testClass);
143                    runner.filter(
144                        new Filter() { 
145                            @Override
146                            public boolean shouldRun(Description description) {
147                                return methodDescription.equals(description.
148                                                                toString());
149                            }
150                            @Override
151                            public String describe() {
152                                return null;
153                            }
154                        }
155                    );
156                    /* Run with local runner. */
157                    Result result = new JUnitCore().run(runner);
158
159                    /* Since we filter out a single test, we can assert the
160                     * following:
161                     */
162                    assert result.getFailures().size() < 2;
163                    /* Collect the result in a list. If the list is empty,
164                     * no failures occured. We do this, since Result is not
165                     * serializable.
166                     */
167                    ArrayList<Throwable> outcome = new ArrayList<Throwable>();
168                    ByteArrayOutputStream byteStream =
169                        new ByteArrayOutputStream();
170                    ObjectOutputStream objectStream = 
171                        new ObjectOutputStream(byteStream);
172
173                    for (Failure failure: result.getFailures()) {
174                        Throwable exception = failure.getException();
175                        /* Serialize the exception into a byte array. */
176
177                        outcome.add(exception);
178                    }
179                    objectStream.writeObject(outcome);
180                    /* Get the backing byte array. */
181                    byte[] bytes = byteStream.toByteArray();
182                    context.write(new Text(methodDescription),
183                                  new Text(Base64.byteArrayToBase64(bytes)));
184                }
185                catch (NoTestsRemainException e) {
186                    System.out.println(e.toString());
187                }
188                catch (Exception e) {
189                    /* TODO: Output something in the mapper. */
190                    e.printStackTrace();
191                    throw new IOException();
192                }
193        }
194    }
195
196    public static class IdentityReducer 
197        extends Reducer<Text, Text, Text, Text> 
198    {
199        public void reduce(Text key, Iterable<Text> values, Context context)
200            throws IOException, InterruptedException {
201            System.out.println(key);
202            /* TODO: Make better use of the aggregated failure descriptions. */
203            for (Text value: values) {
204                context.write(key, value);
205                System.out.println(value);
206            }
207        }
208    }
209
210    Class testClass;
211    private final Runner delegate;
212    private final String HADOOP_RUNNER = "hadoop_runner";
213    private final String TMP_FILE_NAME = "junit_tests";
214    private final String OUTPUT_DIR_NAME = "junit_results";
215    private final String OUTPUT_FILE_PREFIX = "part-r-";
216    private final String JAR_FILE_NAME = "junit-runner-0.1.jar";
217
218
219
220    public Description getDescription() {
221        return delegate.getDescription();
222    }
223
224    public void doMapReduce(Path in, Path out) throws Exception {
225        Configuration conf = new Configuration();
226        /* Add the JUnit jar to the distributed cache, for the mappers. */
227        Job job = new Job(conf, HADOOP_RUNNER);
228        /* NOTE: File has been already copied to hdfs, the Path refers to
229         * a hdfs location.
230         */
231        DistributedCache.addFileToClassPath(new Path(JUnit4.class.
232                                                     getProtectionDomain().
233                                                     getCodeSource().
234                                                     getLocation().
235                                                     getFile()),
236                                            job.getConfiguration());
237        ((JobConf) (job.getConfiguration())).setJar(JAR_FILE_NAME);
238        job.setMapperClass(JUnitMapper.class);
239        job.setReducerClass(IdentityReducer.class);
240        job.setOutputKeyClass(Text.class);
241        job.setOutputValueClass(Text.class);
242        FileInputFormat.addInputPath(job, in);
243        TextOutputFormat.setOutputPath(job, out);
244        job.waitForCompletion(true);
245    }
246
247    private Description lookupDescription(String desc) {
248        for (Description d: delegate.getDescription().getChildren()) {
249            if (desc.equals(d.toString()))
250                return d;
251        }
252        /* Unreachable */
253        return null;
254    }
255
256    public void run(RunNotifier notifier) {
257        try {
258            File in = File.createTempFile(TMP_FILE_NAME, null);
259            PrintWriter writer = new PrintWriter(new FileWriter(in));
260            for (Description d: delegate.getDescription().getChildren()) {
261                notifier.fireTestStarted(d);
262                /* Write the description of the test class and the method. */
263                writer.println(delegate.getDescription() + "\t" + d);
264                /* TODO: Replace this separator by a constant. */
265            }
266            writer.close();
267
268            /* Output is a folder. */
269            File out = File.createTempFile(OUTPUT_DIR_NAME, "");
270            out.delete();
271
272            FileSystem hdfs =
273                new Path(in.getName()).getFileSystem(new Configuration());
274
275            /* TODO(mgruber): Copy on DFS. */
276            hdfs.copyFromLocalFile(new Path(in.getCanonicalPath()),
277                                   new Path(in.getName()));
278
279            /* TODO(mgruber): Refactor. Copy the junit jar to the lib folder. */
280            URL jUnitJarLocation = JUnit4.class.getProtectionDomain().
281                getCodeSource().
282                getLocation();
283            hdfs.copyFromLocalFile(new Path(jUnitJarLocation.getPath()),
284                                   new Path(jUnitJarLocation.getFile()));
285            /* Do the stuff. */
286            doMapReduce(new Path(in.getName()), new Path(out.getName()));
287
288            hdfs.copyToLocalFile(new Path(out.getName()),
289                                 new Path(out.getCanonicalPath()));
290           
291            /* Get the results, and notify the JUnit master. */
292            for (File f: out.listFiles()) {
293                if (!f.getName().startsWith(OUTPUT_FILE_PREFIX))
294                    continue;
295
296                BufferedReader reader = new BufferedReader(new FileReader(f));
297                while (true) {
298                    String result = reader.readLine();
299                    if (result == null)
300                        /* EOF */
301                        break;
302                    /* Each line of output is a pair (Class, List<Failure>). */
303                    StringTokenizer tokenizer = new StringTokenizer(result,
304                                                                    "\t");
305                    assert tokenizer.countTokens() == 2;
306                    String methodDescription = tokenizer.nextToken();
307                    String outcomeDescription = tokenizer.nextToken();
308                        result.substring(result.indexOf(' ') + 1);
309                    ArrayList<Throwable> outcome = null;
310                    try {
311                        /* Decode the failure description, and create a
312                         * Failure object.
313                         */
314                        byte[] bytes =
315                            Base64.base64ToByteArray(outcomeDescription);
316
317                        ByteArrayInputStream byteStream =
318                            new ByteArrayInputStream(bytes);
319                        ObjectInputStream objectStream =
320                            new ObjectInputStream(byteStream);
321                        outcome =
322                            (ArrayList<Throwable>) objectStream.readObject();
323                    }
324                    catch (Exception e) {
325                        /* TODO: Handle the exception. */
326                        e.printStackTrace();
327                        assert false;
328                    }
329
330                    Description d = lookupDescription(methodDescription);
331                    assert d != null;
332                    if (outcome.size() > 0) {
333                        notifier.fireTestFailure(new Failure(d,
334                                                             outcome.get(0)));
335                    }
336                    notifier.fireTestFinished(d);
337                }
338            }
339
340        }
341        catch (Exception e) {
342            e.printStackTrace();
343        }
344    }
345
346    public HadoopRunner(Class testClass) throws Exception {
347        this.testClass = testClass;
348        this.delegate = new JUnit4(testClass);
349
350        Thread server = new Thread() {
351            @Override
352            public void run() {
353                try {
354                    ServerSocket server = new ServerSocket(HADOOP_LOADER_PORT);
355                    while (true) {
356                        Socket client = server.accept();
357                        BufferedReader reader =
358                            new BufferedReader(new InputStreamReader(client.getInputStream()));
359                        String className = reader.readLine();
360                        PrintWriter writer = new PrintWriter(client.
361                                                             getOutputStream());
362                        /* Find the requested class, and place it in a hdfs
363                         * location.
364                         */
365                        try {
366                            /* Copy to hdfs. */
367                            Class req = Class.forName(className);
368                            String location = req.getProtectionDomain().
369                                getCodeSource().
370                                getLocation().
371                                getFile();
372                            File classFile = new File(location);
373                            if (classFile.isDirectory()) {
374                                /* Name the class. */
375                                location += className + ".class";
376                            }
377                            else {
378                                /* Class came from a jar. */
379                                assert location.endsWith(".jar");
380                            }
381
382                            FileSystem hdfs =
383                                new Path(location).getFileSystem(new Configuration());
384                            /* TODO: Clear this location at job start, and
385                             * don't overwrite during subsequent calls.
386                             */
387                            hdfs.copyFromLocalFile(false,
388                                                   true,
389                                                   new Path(location),
390                                                   new Path(location));
391                            writer.println(location);
392                            writer.flush();
393                        }
394                        catch (Exception e) {
395                            /* Nothing to do, class was not found. */
396                            e.printStackTrace();
397                        }
398                        client.close();
399                    }
400                }
401                catch (Exception e) {
402                    e.printStackTrace();
403                }
404            }
405        };
406
407        /* We only need one instance of the resource server per process. */
408        synchronized (this) {
409            if (resourceServer == null) {
410                resourceServer = server;
411                server.start();
412            }
413        }
414    }
415}
Note: See TracBrowser for help on using the repository browser.