source: proiecte/HadoopJUnit/hadoop-0.20.1/src/examples/org/apache/hadoop/examples/DBCountPageView.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 12.8 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.examples;
20
21import java.io.DataInput;
22import java.io.DataOutput;
23import java.io.IOException;
24import java.sql.Connection;
25import java.sql.DriverManager;
26import java.sql.PreparedStatement;
27import java.sql.ResultSet;
28import java.sql.SQLException;
29import java.sql.Statement;
30import java.util.Iterator;
31import java.util.Random;
32
33import org.apache.commons.logging.Log;
34import org.apache.commons.logging.LogFactory;
35import org.apache.hadoop.conf.Configured;
36import org.apache.hadoop.io.LongWritable;
37import org.apache.hadoop.io.NullWritable;
38import org.apache.hadoop.io.Text;
39import org.apache.hadoop.io.Writable;
40import org.apache.hadoop.mapred.JobClient;
41import org.apache.hadoop.mapred.JobConf;
42import org.apache.hadoop.mapred.MapReduceBase;
43import org.apache.hadoop.mapred.Mapper;
44import org.apache.hadoop.mapred.OutputCollector;
45import org.apache.hadoop.mapred.Reducer;
46import org.apache.hadoop.mapred.Reporter;
47import org.apache.hadoop.mapred.lib.LongSumReducer;
48import org.apache.hadoop.mapred.lib.db.DBConfiguration;
49import org.apache.hadoop.mapred.lib.db.DBInputFormat;
50import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
51import org.apache.hadoop.mapred.lib.db.DBWritable;
52import org.apache.hadoop.util.StringUtils;
53import org.apache.hadoop.util.Tool;
54import org.apache.hadoop.util.ToolRunner;
55import org.hsqldb.Server;
56
57/**
58 * This is a demonstrative program, which uses DBInputFormat for reading
59 * the input data from a database, and DBOutputFormat for writing the data
60 * to the database.
61 * <br>
62 * The Program first creates the necessary tables, populates the input table
63 * and runs the mapred job.
64 * <br>
65 * The input data is a mini access log, with a <code>&lt;url,referrer,time&gt;
66 * </code> schema.The output is the number of pageviews of each url in the log,
67 * having the schema <code>&lt;url,pageview&gt;</code>. 
68 *
69 * When called with no arguments the program starts a local HSQLDB server, and
70 * uses this database for storing/retrieving the data.
71 */
72public class DBCountPageView extends Configured implements Tool {
73
74  private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
75 
76  private Connection connection;
77  private boolean initialized = false;
78
79  private static final String[] AccessFieldNames = {"url", "referrer", "time"};
80  private static final String[] PageviewFieldNames = {"url", "pageview"};
81 
82  private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
83  private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
84 
85  private Server server;
86 
87  private void startHsqldbServer() {
88    server = new Server();
89    server.setDatabasePath(0, 
90        System.getProperty("test.build.data",".") + "/URLAccess");
91    server.setDatabaseName(0, "URLAccess");
92    server.start();
93  }
94 
95  private void createConnection(String driverClassName
96      , String url) throws Exception {
97   
98    Class.forName(driverClassName);
99    connection = DriverManager.getConnection(url);
100    connection.setAutoCommit(false);
101  }
102
103  private void shutdown() {
104    try {
105      connection.commit();
106      connection.close();
107    }catch (Throwable ex) {
108      LOG.warn("Exception occurred while closing connection :"
109          + StringUtils.stringifyException(ex));
110    } finally {
111      try {
112        if(server != null) {
113          server.shutdown();
114        }
115      }catch (Throwable ex) {
116        LOG.warn("Exception occurred while shutting down HSQLDB :"
117            + StringUtils.stringifyException(ex));
118      }
119    }
120  }
121
122  private void initialize(String driverClassName, String url) 
123    throws Exception {
124    if(!this.initialized) {
125      if(driverClassName.equals(DRIVER_CLASS)) {
126        startHsqldbServer();
127      }
128      createConnection(driverClassName, url);
129      dropTables();
130      createTables();
131      populateAccess();
132      this.initialized = true; 
133    }
134  }
135 
136  private void dropTables() {
137    String dropAccess = "DROP TABLE Access";
138    String dropPageview = "DROP TABLE Pageview";
139   
140    try {
141      Statement st = connection.createStatement();
142      st.executeUpdate(dropAccess);
143      st.executeUpdate(dropPageview);
144      connection.commit();
145      st.close();
146    }catch (SQLException ex) {
147      //ignore
148    }
149  }
150 
151  private void createTables() throws SQLException {
152
153    String createAccess = 
154      "CREATE TABLE " +
155      "Access(url      VARCHAR(100) NOT NULL," +
156            " referrer VARCHAR(100)," +
157            " time     BIGINT NOT NULL, " +
158            " PRIMARY KEY (url, time))";
159
160    String createPageview = 
161      "CREATE TABLE " +
162      "Pageview(url      VARCHAR(100) NOT NULL," +
163              " pageview     BIGINT NOT NULL, " +
164               " PRIMARY KEY (url))";
165   
166    Statement st = connection.createStatement();
167    try {
168      st.executeUpdate(createAccess);
169      st.executeUpdate(createPageview);
170      connection.commit();
171    } finally {
172      st.close();
173    }
174  }
175
176  /**
177   * Populates the Access table with generated records.
178   */
179  private void populateAccess() throws SQLException {
180
181    PreparedStatement statement = null ;
182    try {
183      statement = connection.prepareStatement(
184          "INSERT INTO Access(url, referrer, time)" +
185          " VALUES (?, ?, ?)");
186
187      Random random = new Random();
188
189      int time = random.nextInt(50) + 50;
190
191      final int PROBABILITY_PRECISION = 100; //  1 / 100
192      final int NEW_PAGE_PROBABILITY  = 15;  //  15 / 100
193
194
195      //Pages in the site :
196      String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
197      //linkMatrix[i] is the array of pages(indexes) that page_i links to. 
198      int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
199          {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
200
201      //a mini model of user browsing a la pagerank
202      int currentPage = random.nextInt(pages.length); 
203      String referrer = null;
204
205      for(int i=0; i<time; i++) {
206
207        statement.setString(1, pages[currentPage]);
208        statement.setString(2, referrer);
209        statement.setLong(3, i);
210        statement.execute();
211
212        int action = random.nextInt(PROBABILITY_PRECISION);
213
214        //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
215        if(action < NEW_PAGE_PROBABILITY) { 
216          currentPage = random.nextInt(pages.length); // a random page
217          referrer = null;
218        }
219        else {
220          referrer = pages[currentPage];
221          action = random.nextInt(linkMatrix[currentPage].length);
222          currentPage = linkMatrix[currentPage][action];
223        }
224      }
225     
226      connection.commit();
227     
228    }catch (SQLException ex) {
229      connection.rollback();
230      throw ex;
231    } finally {
232      if(statement != null) {
233        statement.close();
234      }
235    }
236  }
237 
238  /**Verifies the results are correct */
239  private boolean verify() throws SQLException {
240    //check total num pageview
241    String countAccessQuery = "SELECT COUNT(*) FROM Access";
242    String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
243    Statement st = null;
244    ResultSet rs = null;
245    try {
246      st = connection.createStatement();
247      rs = st.executeQuery(countAccessQuery);
248      rs.next();
249      long totalPageview = rs.getLong(1);
250
251      rs = st.executeQuery(sumPageviewQuery);
252      rs.next();
253      long sumPageview = rs.getLong(1);
254
255      LOG.info("totalPageview=" + totalPageview);
256      LOG.info("sumPageview=" + sumPageview);
257
258      return totalPageview == sumPageview && totalPageview != 0;
259    }finally {
260      if(st != null)
261        st.close();
262      if(rs != null)
263        rs.close();
264    }
265  }
266 
267  /** Holds a &lt;url, referrer, time &gt; tuple */
268  static class AccessRecord implements Writable, DBWritable {
269    String url;
270    String referrer;
271    long time;
272   
273    @Override
274    public void readFields(DataInput in) throws IOException {
275      this.url = Text.readString(in);
276      this.referrer = Text.readString(in);
277      this.time = in.readLong();
278    }
279   
280    @Override
281    public void write(DataOutput out) throws IOException {
282      Text.writeString(out, url);
283      Text.writeString(out, referrer);
284      out.writeLong(time);
285    }
286   
287    @Override
288    public void readFields(ResultSet resultSet) throws SQLException {
289      this.url = resultSet.getString(1);
290      this.referrer = resultSet.getString(2);
291      this.time = resultSet.getLong(3);
292    }
293    @Override
294    public void write(PreparedStatement statement) throws SQLException {
295      statement.setString(1, url);
296      statement.setString(2, referrer);
297      statement.setLong(3, time);
298    }
299  }
300  /** Holds a &lt;url, pageview &gt; tuple */
301  static class PageviewRecord implements Writable, DBWritable {
302    String url;
303    long pageview;
304   
305    public PageviewRecord(String url, long pageview) {
306      this.url = url;
307      this.pageview = pageview;
308    }
309   
310    @Override
311    public void readFields(DataInput in) throws IOException {
312      this.url = Text.readString(in);
313      this.pageview = in.readLong();
314    }
315    @Override
316    public void write(DataOutput out) throws IOException {
317      Text.writeString(out, url);
318      out.writeLong(pageview);
319    }
320    @Override
321    public void readFields(ResultSet resultSet) throws SQLException {
322      this.url = resultSet.getString(1);
323      this.pageview = resultSet.getLong(2);
324    }
325    @Override
326    public void write(PreparedStatement statement) throws SQLException {
327      statement.setString(1, url);
328      statement.setLong(2, pageview);
329    }
330    @Override
331    public String toString() {
332      return url + " " + pageview;
333    }
334  }
335 
336  /**
337   * Mapper extracts URLs from the AccessRecord (tuples from db),
338   * and emits a &lt;url,1&gt; pair for each access record.
339   */
340  static class PageviewMapper extends MapReduceBase
341    implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
342   
343    LongWritable ONE = new LongWritable(1L);
344    @Override
345    public void map(LongWritable key, AccessRecord value,
346        OutputCollector<Text, LongWritable> output, Reporter reporter)
347        throws IOException {
348     
349      Text oKey = new Text(value.url);
350      output.collect(oKey, ONE);
351    }
352  }
353 
354  /**
355   * Reducer sums up the pageviews and emits a PageviewRecord,
356   * which will correspond to one tuple in the db.
357   */
358  static class PageviewReducer extends MapReduceBase
359    implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
360   
361    NullWritable n = NullWritable.get();
362    @Override
363    public void reduce(Text key, Iterator<LongWritable> values,
364        OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
365        throws IOException {
366     
367      long sum = 0L;
368      while(values.hasNext()) {
369        sum += values.next().get();
370      }
371      output.collect(new PageviewRecord(key.toString(), sum), n);
372    }
373  }
374 
375  @Override
376  //Usage DBCountPageView [driverClass dburl]
377  public int run(String[] args) throws Exception {
378   
379    String driverClassName = DRIVER_CLASS;
380    String url = DB_URL;
381   
382    if(args.length > 1) {
383      driverClassName = args[0];
384      url = args[1];
385    }
386   
387    initialize(driverClassName, url);
388
389    JobConf job = new JobConf(getConf(), DBCountPageView.class);
390       
391    job.setJobName("Count Pageviews of URLs");
392
393    job.setMapperClass(PageviewMapper.class);
394    job.setCombinerClass(LongSumReducer.class);
395    job.setReducerClass(PageviewReducer.class);
396
397    DBConfiguration.configureDB(job, driverClassName, url);
398   
399    DBInputFormat.setInput(job, AccessRecord.class, "Access"
400        , null, "url", AccessFieldNames);
401
402    DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
403   
404    job.setMapOutputKeyClass(Text.class);
405    job.setMapOutputValueClass(LongWritable.class);
406
407    job.setOutputKeyClass(PageviewRecord.class);
408    job.setOutputValueClass(NullWritable.class);
409
410    try {
411      JobClient.runJob(job);
412     
413      boolean correct = verify();
414      if(!correct) {
415        throw new RuntimeException("Evaluation was not correct!");
416      }
417    } finally {
418      shutdown();   
419    }
420    return 0;
421  }
422
423  public static void main(String[] args) throws Exception {
424    int ret = ToolRunner.run(new DBCountPageView(), args);
425    System.exit(ret);
426  }
427
428}
Note: See TracBrowser for help on using the repository browser.