source: proiecte/HadoopJUnit/hadoop-0.20.1/src/test/org/apache/hadoop/mapred/NotificationTestCase.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import org.mortbay.jetty.Server;
22import org.mortbay.jetty.servlet.Context;
23import org.mortbay.jetty.servlet.ServletHolder;
24import org.apache.hadoop.fs.Path;
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.io.Text;
27import org.apache.hadoop.io.IntWritable;
28
29import javax.servlet.http.HttpServletRequest;
30import javax.servlet.http.HttpServletResponse;
31import javax.servlet.http.HttpServlet;
32import javax.servlet.ServletException;
33import java.io.IOException;
34import java.io.DataOutputStream;
35import java.util.Date;
36
37/**
38 * Base class to test Job end notification in local and cluster mode.
39 *
40 * Starts up hadoop on Local or Cluster mode (by extending of the
41 * HadoopTestCase class) and it starts a servlet engine that hosts
42 * a servlet that will receive the notification of job finalization.
43 *
44 * The notification servlet returns a HTTP 400 the first time is called
45 * and a HTTP 200 the second time, thus testing retry.
46 *
47 * In both cases local file system is used (this is irrelevant for
48 * the tested functionality)
49 *
50 *
51 */
52public abstract class NotificationTestCase extends HadoopTestCase {
53
54  private static void stdPrintln(String s) {
55    //System.out.println(s);
56  }
57
58  protected NotificationTestCase(int mode) throws IOException {
59    super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
60  }
61
62  private int port;
63  private String contextPath = "/notification";
64  private Class servletClass = NotificationServlet.class;
65  private String servletPath = "/mapred";
66  private Server webServer;
67
68  private void startHttpServer() throws Exception {
69
70    // Create the webServer
71    if (webServer != null) {
72      webServer.stop();
73      webServer = null;
74    }
75    webServer = new Server(0);
76
77    Context context = new Context(webServer, contextPath);
78
79    // create servlet handler
80    context.addServlet(new ServletHolder(new NotificationServlet()),
81                       servletPath);
82
83    // Start webServer
84    webServer.start();
85    port = webServer.getConnectors()[0].getLocalPort();
86
87  }
88
89  private void stopHttpServer() throws Exception {
90    if (webServer != null) {
91      webServer.stop();
92      webServer.destroy();
93      webServer = null;
94    }
95  }
96
97  public static class NotificationServlet extends HttpServlet {
98    public static int counter = 0;
99
100    protected void doGet(HttpServletRequest req, HttpServletResponse res)
101      throws ServletException, IOException {
102      switch (counter) {
103        case 0:
104        {
105          assertTrue(req.getQueryString().contains("SUCCEEDED"));
106        }
107        break;
108        case 2:
109        {
110          assertTrue(req.getQueryString().contains("KILLED"));
111        }
112        break;
113        case 4:
114        {
115          assertTrue(req.getQueryString().contains("FAILED"));
116        }
117        break;
118      }
119      if (counter % 2 == 0) {
120        stdPrintln((new Date()).toString() +
121                   "Receiving First notification for [" + req.getQueryString() +
122                   "], returning error");
123        res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
124      }
125      else {
126        stdPrintln((new Date()).toString() +
127                   "Receiving Second notification for [" + req.getQueryString() +
128                   "], returning OK");
129        res.setStatus(HttpServletResponse.SC_OK);
130      }
131      counter++;
132    }
133  }
134
135  private String getNotificationUrlTemplate() {
136    return "http://localhost:" + port + contextPath + servletPath +
137      "?jobId=$jobId&jobStatus=$jobStatus";
138  }
139
140  protected JobConf createJobConf() {
141    JobConf conf = super.createJobConf();
142    conf.setJobEndNotificationURI(getNotificationUrlTemplate());
143    conf.setInt("job.end.retry.attempts", 3);
144    conf.setInt("job.end.retry.interval", 200);
145    return conf;
146  }
147
148
149  protected void setUp() throws Exception {
150    super.setUp();
151    startHttpServer();
152  }
153
154  protected void tearDown() throws Exception {
155    stopHttpServer();
156    super.tearDown();
157  }
158
159  public void testMR() throws Exception {
160    System.out.println(launchWordCount(this.createJobConf(),
161                                       "a b c d e f g h", 1, 1));
162    synchronized(Thread.currentThread()) {
163      stdPrintln("Sleeping for 2 seconds to give time for retry");
164      Thread.currentThread().sleep(2000);
165    }
166    assertEquals(2, NotificationServlet.counter);
167   
168    Path inDir = new Path("notificationjob/input");
169    Path outDir = new Path("notificationjob/output");
170
171    // Hack for local FS that does not have the concept of a 'mounting point'
172    if (isLocalFS()) {
173      String localPathRoot = System.getProperty("test.build.data","/tmp")
174        .toString().replace(' ', '+');;
175      inDir = new Path(localPathRoot, inDir);
176      outDir = new Path(localPathRoot, outDir);
177    }
178
179    // run a job with KILLED status
180    System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
181                                                outDir).getID());
182    synchronized(Thread.currentThread()) {
183      stdPrintln("Sleeping for 2 seconds to give time for retry");
184      Thread.currentThread().sleep(2000);
185    }
186    assertEquals(4, NotificationServlet.counter);
187   
188    // run a job with FAILED status
189    System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
190                                                outDir).getID());
191    synchronized(Thread.currentThread()) {
192      stdPrintln("Sleeping for 2 seconds to give time for retry");
193      Thread.currentThread().sleep(2000);
194    }
195    assertEquals(6, NotificationServlet.counter);
196  }
197
198  private String launchWordCount(JobConf conf,
199                                 String input,
200                                 int numMaps,
201                                 int numReduces) throws IOException {
202    Path inDir = new Path("testing/wc/input");
203    Path outDir = new Path("testing/wc/output");
204
205    // Hack for local FS that does not have the concept of a 'mounting point'
206    if (isLocalFS()) {
207      String localPathRoot = System.getProperty("test.build.data","/tmp")
208        .toString().replace(' ', '+');;
209      inDir = new Path(localPathRoot, inDir);
210      outDir = new Path(localPathRoot, outDir);
211    }
212
213    FileSystem fs = FileSystem.get(conf);
214    fs.delete(outDir, true);
215    if (!fs.mkdirs(inDir)) {
216      throw new IOException("Mkdirs failed to create " + inDir.toString());
217    }
218    {
219      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
220      file.writeBytes(input);
221      file.close();
222    }
223    conf.setJobName("wordcount");
224    conf.setInputFormat(TextInputFormat.class);
225
226    // the keys are words (strings)
227    conf.setOutputKeyClass(Text.class);
228    // the values are counts (ints)
229    conf.setOutputValueClass(IntWritable.class);
230
231    conf.setMapperClass(WordCount.MapClass.class);
232    conf.setCombinerClass(WordCount.Reduce.class);
233    conf.setReducerClass(WordCount.Reduce.class);
234
235    FileInputFormat.setInputPaths(conf, inDir);
236    FileOutputFormat.setOutputPath(conf, outDir);
237    conf.setNumMapTasks(numMaps);
238    conf.setNumReduceTasks(numReduces);
239    JobClient.runJob(conf);
240    return TestMiniMRWithDFS.readOutput(outDir, conf);
241  }
242
243}
Note: See TracBrowser for help on using the repository browser.