source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.7 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred.lib;
20
21import org.apache.hadoop.util.ReflectionUtils;
22import org.apache.hadoop.mapred.MapRunnable;
23import org.apache.hadoop.mapred.JobConf;
24import org.apache.hadoop.mapred.Mapper;
25import org.apache.hadoop.mapred.RecordReader;
26import org.apache.hadoop.mapred.OutputCollector;
27import org.apache.hadoop.mapred.Reporter;
28import org.apache.hadoop.mapred.SkipBadRecords;
29import org.apache.commons.logging.Log;
30import org.apache.commons.logging.LogFactory;
31
32import java.io.IOException;
33import java.util.concurrent.*;
34
35/**
36 * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
37 * <p>
38 * It can be used instead of the default implementation,
39 * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
40 * bound in order to improve throughput.
41 * <p>
42 * Map implementations using this MapRunnable must be thread-safe.
43 * <p>
44 * The Map-Reduce job has to be configured to use this MapRunnable class (using
45 * the JobConf.setMapRunnerClass method) and
46 * the number of thread the thread-pool can use with the
47 * <code>mapred.map.multithreadedrunner.threads</code> property, its default
48 * value is 10 threads.
49 * <p>
50 */
51public class MultithreadedMapRunner<K1, V1, K2, V2>
52    implements MapRunnable<K1, V1, K2, V2> {
53
54  private static final Log LOG =
55    LogFactory.getLog(MultithreadedMapRunner.class.getName());
56
57  private JobConf job;
58  private Mapper<K1, V1, K2, V2> mapper;
59  private ExecutorService executorService;
60  private volatile IOException ioException;
61  private volatile RuntimeException runtimeException;
62  private boolean incrProcCount;
63
64  @SuppressWarnings("unchecked")
65  public void configure(JobConf jobConf) {
66    int numberOfThreads =
67      jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
68    if (LOG.isDebugEnabled()) {
69      LOG.debug("Configuring jobConf " + jobConf.getJobName() +
70                " to use " + numberOfThreads + " threads");
71    }
72
73    this.job = jobConf;
74    //increment processed counter only if skipping feature is enabled
75    this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
76      SkipBadRecords.getAutoIncrMapperProcCount(job);
77    this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
78        jobConf);
79
80    // Creating a threadpool of the configured size to execute the Mapper
81    // map method in parallel.
82    executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
83                                             0L, TimeUnit.MILLISECONDS,
84                                             new BlockingArrayQueue
85                                               (numberOfThreads));
86  }
87
88  /**
89   * A blocking array queue that replaces offer and add, which throws on a full
90   * queue, to a put, which waits on a full queue.
91   */
92  private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
93    public BlockingArrayQueue(int capacity) {
94      super(capacity);
95    }
96    public boolean offer(Runnable r) {
97      return add(r);
98    }
99    public boolean add(Runnable r) {
100      try {
101        put(r);
102      } catch (InterruptedException ie) {
103        Thread.currentThread().interrupt();
104      }
105      return true;
106    }
107  }
108
109  private void checkForExceptionsFromProcessingThreads()
110      throws IOException, RuntimeException {
111    // Checking if a Mapper.map within a Runnable has generated an
112    // IOException. If so we rethrow it to force an abort of the Map
113    // operation thus keeping the semantics of the default
114    // implementation.
115    if (ioException != null) {
116      throw ioException;
117    }
118
119    // Checking if a Mapper.map within a Runnable has generated a
120    // RuntimeException. If so we rethrow it to force an abort of the Map
121    // operation thus keeping the semantics of the default
122    // implementation.
123    if (runtimeException != null) {
124      throw runtimeException;
125    }
126  }
127
128  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
129                  Reporter reporter)
130    throws IOException {
131    try {
132      // allocate key & value instances these objects will not be reused
133      // because execution of Mapper.map is not serialized.
134      K1 key = input.createKey();
135      V1 value = input.createValue();
136
137      while (input.next(key, value)) {
138
139        executorService.execute(new MapperInvokeRunable(key, value, output,
140                                reporter));
141
142        checkForExceptionsFromProcessingThreads();
143
144        // Allocate new key & value instances as mapper is running in parallel
145        key = input.createKey();
146        value = input.createValue();
147      }
148
149      if (LOG.isDebugEnabled()) {
150        LOG.debug("Finished dispatching all Mappper.map calls, job "
151                  + job.getJobName());
152      }
153
154      // Graceful shutdown of the Threadpool, it will let all scheduled
155      // Runnables to end.
156      executorService.shutdown();
157
158      try {
159
160        // Now waiting for all Runnables to end.
161        while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
162          if (LOG.isDebugEnabled()) {
163            LOG.debug("Awaiting all running Mappper.map calls to finish, job "
164                      + job.getJobName());
165          }
166
167          // NOTE: while Mapper.map dispatching has concluded there are still
168          // map calls in progress and exceptions would be thrown.
169          checkForExceptionsFromProcessingThreads();
170
171        }
172
173        // NOTE: it could be that a map call has had an exception after the
174        // call for awaitTermination() returing true. And edge case but it
175        // could happen.
176        checkForExceptionsFromProcessingThreads();
177
178      } catch (IOException ioEx) {
179        // Forcing a shutdown of all thread of the threadpool and rethrowing
180        // the IOException
181        executorService.shutdownNow();
182        throw ioEx;
183      } catch (InterruptedException iEx) {
184        throw new RuntimeException(iEx);
185      }
186
187    } finally {
188      mapper.close();
189    }
190  }
191
192
193  /**
194   * Runnable to execute a single Mapper.map call from a forked thread.
195   */
196  private class MapperInvokeRunable implements Runnable {
197    private K1 key;
198    private V1 value;
199    private OutputCollector<K2, V2> output;
200    private Reporter reporter;
201
202    /**
203     * Collecting all required parameters to execute a Mapper.map call.
204     * <p>
205     *
206     * @param key
207     * @param value
208     * @param output
209     * @param reporter
210     */
211    public MapperInvokeRunable(K1 key, V1 value,
212                               OutputCollector<K2, V2> output,
213                               Reporter reporter) {
214      this.key = key;
215      this.value = value;
216      this.output = output;
217      this.reporter = reporter;
218    }
219
220    /**
221     * Executes a Mapper.map call with the given Mapper and parameters.
222     * <p>
223     * This method is called from the thread-pool thread.
224     *
225     */
226    public void run() {
227      try {
228        // map pair to output
229        MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
230        if(incrProcCount) {
231          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
232              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
233        }
234      } catch (IOException ex) {
235        // If there is an IOException during the call it is set in an instance
236        // variable of the MultithreadedMapRunner from where it will be
237        // rethrown.
238        synchronized (MultithreadedMapRunner.this) {
239          if (MultithreadedMapRunner.this.ioException == null) {
240            MultithreadedMapRunner.this.ioException = ex;
241          }
242        }
243      } catch (RuntimeException ex) {
244        // If there is a RuntimeException during the call it is set in an
245        // instance variable of the MultithreadedMapRunner from where it will be
246        // rethrown.
247        synchronized (MultithreadedMapRunner.this) {
248          if (MultithreadedMapRunner.this.runtimeException == null) {
249            MultithreadedMapRunner.this.runtimeException = ex;
250          }
251        }
252      }
253    }
254  }
255
256}
Note: See TracBrowser for help on using the repository browser.