source: proiecte/HadoopJUnit/hadoop-0.20.1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 8.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package org.apache.hadoop.hdfs.server.namenode;
19
20import java.util.*;
21
22import org.apache.hadoop.hdfs.protocol.Block;
23
24/* Class for keeping track of under replication blocks
25 * Blocks have replication priority, with priority 0 indicating the highest
26 * Blocks have only one replicas has the highest
27 */
28class UnderReplicatedBlocks implements Iterable<Block> {
29  static final int LEVEL = 3;
30  private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
31     
32  /* constructor */
33  UnderReplicatedBlocks() {
34    for(int i=0; i<LEVEL; i++) {
35      priorityQueues.add(new TreeSet<Block>());
36    }
37  }
38
39  /**
40   * Empty the queues.
41   */
42  void clear() {
43    for(int i=0; i<LEVEL; i++) {
44      priorityQueues.get(i).clear();
45    }
46  }
47
48  /* Return the total number of under replication blocks */
49  synchronized int size() {
50    int size = 0;
51    for (int i=0; i<LEVEL; i++) {
52      size += priorityQueues.get(i).size();
53    }
54    return size;
55  }
56       
57  /* Check if a block is in the neededReplication queue */
58  synchronized boolean contains(Block block) {
59    for(TreeSet<Block> set:priorityQueues) {
60      if(set.contains(block)) { return true; }
61    }
62    return false;
63  }
64     
65  /* Return the priority of a block
66   * @param block a under replication block
67   * @param curReplicas current number of replicas of the block
68   * @param expectedReplicas expected number of replicas of the block
69   */
70  private int getPriority(Block block, 
71                          int curReplicas, 
72                          int decommissionedReplicas,
73                          int expectedReplicas) {
74    if (curReplicas<0 || curReplicas>=expectedReplicas) {
75      return LEVEL; // no need to replicate
76    } else if(curReplicas==0) {
77      // If there are zero non-decommissioned replica but there are
78      // some decommissioned replicas, then assign them highest priority
79      if (decommissionedReplicas > 0) {
80        return 0;
81      }
82      return 2; // keep these blocks in needed replication.
83    } else if(curReplicas==1) {
84      return 0; // highest priority
85    } else if(curReplicas*3<expectedReplicas) {
86      return 1;
87    } else {
88      return 2;
89    }
90  }
91     
92  /* add a block to a under replication queue according to its priority
93   * @param block a under replication block
94   * @param curReplicas current number of replicas of the block
95   * @param expectedReplicas expected number of replicas of the block
96   */
97  synchronized boolean add(
98                           Block block,
99                           int curReplicas, 
100                           int decomissionedReplicas,
101                           int expectedReplicas) {
102    if(curReplicas<0 || expectedReplicas <= curReplicas) {
103      return false;
104    }
105    int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
106                               expectedReplicas);
107    if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
108      NameNode.stateChangeLog.debug(
109                                    "BLOCK* NameSystem.UnderReplicationBlock.add:"
110                                    + block
111                                    + " has only "+curReplicas
112                                    + " replicas and need " + expectedReplicas
113                                    + " replicas so is added to neededReplications"
114                                    + " at priority level " + priLevel);
115      return true;
116    }
117    return false;
118  }
119
120  /* remove a block from a under replication queue */
121  synchronized boolean remove(Block block, 
122                              int oldReplicas, 
123                              int decommissionedReplicas,
124                              int oldExpectedReplicas) {
125    int priLevel = getPriority(block, oldReplicas, 
126                               decommissionedReplicas,
127                               oldExpectedReplicas);
128    return remove(block, priLevel);
129  }
130     
131  /* remove a block from a under replication queue given a priority*/
132  boolean remove(Block block, int priLevel) {
133    if(priLevel >= 0 && priLevel < LEVEL
134        && priorityQueues.get(priLevel).remove(block)) {
135      NameNode.stateChangeLog.debug(
136                                    "BLOCK* NameSystem.UnderReplicationBlock.remove: "
137                                    + "Removing block " + block
138                                    + " from priority queue "+ priLevel);
139      return true;
140    } else {
141      for(int i=0; i<LEVEL; i++) {
142        if(i!=priLevel && priorityQueues.get(i).remove(block)) {
143          NameNode.stateChangeLog.debug(
144                                        "BLOCK* NameSystem.UnderReplicationBlock.remove: "
145                                        + "Removing block " + block
146                                        + " from priority queue "+ i);
147          return true;
148        }
149      }
150    }
151    return false;
152  }
153     
154  /* update the priority level of a block */
155  synchronized void update(Block block, int curReplicas, 
156                           int decommissionedReplicas,
157                           int curExpectedReplicas,
158                           int curReplicasDelta, int expectedReplicasDelta) {
159    int oldReplicas = curReplicas-curReplicasDelta;
160    int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
161    int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);
162    int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);
163    NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
164                                  block +
165                                  " curReplicas " + curReplicas +
166                                  " curExpectedReplicas " + curExpectedReplicas +
167                                  " oldReplicas " + oldReplicas +
168                                  " oldExpectedReplicas  " + oldExpectedReplicas +
169                                  " curPri  " + curPri +
170                                  " oldPri  " + oldPri);
171    if(oldPri != LEVEL && oldPri != curPri) {
172      remove(block, oldPri);
173    }
174    if(curPri != LEVEL && priorityQueues.get(curPri).add(block)) {
175      NameNode.stateChangeLog.debug(
176                                    "BLOCK* NameSystem.UnderReplicationBlock.update:"
177                                    + block
178                                    + " has only "+curReplicas
179                                    + " replicas and need " + curExpectedReplicas
180                                    + " replicas so is added to neededReplications"
181                                    + " at priority level " + curPri);
182    }
183  }
184     
185  /* return an iterator of all the under replication blocks */
186  public synchronized BlockIterator iterator() {
187    return new BlockIterator();
188  }
189 
190    class BlockIterator implements Iterator<Block> {
191      private int level;
192      private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
193      BlockIterator() 
194      {
195        level=0;
196        for(int i=0; i<LEVEL; i++) {
197          iterators.add(priorityQueues.get(i).iterator());
198        }
199      }
200             
201      private void update() {
202        while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
203          level++;
204        }
205      }
206             
207      public Block next() {
208        update();
209        return iterators.get(level).next();
210      }
211             
212      public boolean hasNext() {
213        update();
214        return iterators.get(level).hasNext();
215      }
216             
217      public void remove() {
218        iterators.get(level).remove();
219      }
220     
221      public int getPriority() {
222        return level;
223    };
224  }
225}
Note: See TracBrowser for help on using the repository browser.