source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 3.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.mapred;
19
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22
23/**
24 * Class responsible for modeling the resource consumption of running tasks.
25 *
26 * For now, we just do temp space for maps
27 *
28 * There is one ResourceEstimator per JobInProgress
29 *
30 */
31class ResourceEstimator {
32
33  //Log with JobInProgress
34  private static final Log LOG = LogFactory.getLog(
35      "org.apache.hadoop.mapred.ResourceEstimator");
36
37  private long completedMapsInputSize;
38  private long completedMapsOutputSize;
39
40  private int completedMapsUpdates;
41  final private JobInProgress job;
42  final private int threshholdToUse;
43
44  public ResourceEstimator(JobInProgress job) {
45    this.job = job;
46    threshholdToUse = job.desiredMaps()/ 10;
47  }
48
49  protected synchronized void updateWithCompletedTask(TaskStatus ts, 
50      TaskInProgress tip) {
51
52    //-1 indicates error, which we don't average in.
53    if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
54      completedMapsUpdates++;
55
56      completedMapsInputSize+=(tip.getMapInputSize()+1);
57      completedMapsOutputSize+=ts.getOutputSize();
58
59      LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
60          "completedMapsInputSize:"+completedMapsInputSize+"  " +
61        "completedMapsOutputSize:"+completedMapsOutputSize);
62    }
63  }
64
65  /**
66   * @return estimated length of this job's total map output
67   */
68  protected synchronized long getEstimatedTotalMapOutputSize()  {
69    if(completedMapsUpdates < threshholdToUse) {
70      return 0;
71    } else {
72      long inputSize = job.getInputLength() + job.desiredMaps(); 
73      //add desiredMaps() so that randomwriter case doesn't blow up
74      long estimate = Math.round((inputSize * 
75          completedMapsOutputSize * 2.0)/completedMapsInputSize);
76      LOG.debug("estimate total map output will be " + estimate);
77      return estimate;
78    }
79  }
80 
81  /**
82   * @return estimated length of this job's average map output
83   */
84  long getEstimatedMapOutputSize() {
85    long estimate = 0L;
86    if (job.desiredMaps() > 0) {
87      estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
88    }
89    return estimate;
90  }
91
92  /**
93   *
94   * @return estimated length of this job's average reduce input
95   */
96  long getEstimatedReduceInputSize() {
97    if(job.desiredReduces() == 0) {//no reduce output, so no size
98      return 0;
99    } else {
100      return getEstimatedTotalMapOutputSize() / job.desiredReduces();
101      //estimate that each reduce gets an equal share of total map output
102    }
103  }
104 
105
106}
Note: See TracBrowser for help on using the repository browser.