source: proiecte/HadoopJUnit/hadoop-0.20.1/src/mapred/org/apache/hadoop/mapred/QueueManager.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 10.2 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.util.ArrayList;
22import java.util.HashMap;
23import java.util.Set;
24import java.util.TreeSet;
25
26import org.apache.commons.logging.Log;
27import org.apache.commons.logging.LogFactory;
28
29import org.apache.hadoop.conf.Configuration;
30import org.apache.hadoop.security.UserGroupInformation;
31import org.apache.hadoop.security.SecurityUtil.AccessControlList;
32
33/**
34 * Class that exposes information about queues maintained by the Hadoop
35 * Map/Reduce framework.
36 *
37 * The Map/Reduce framework can be configured with one or more queues,
38 * depending on the scheduler it is configured with. While some
39 * schedulers work only with one queue, some schedulers support multiple
40 * queues.
41 * 
42 * Queues can be configured with various properties. Some of these
43 * properties are common to all schedulers, and those are handled by this
44 * class. Schedulers might also associate several custom properties with
45 * queues. Where such a case exists, the queue name must be used to link
46 * the common properties with the scheduler specific ones. 
47 */
48class QueueManager {
49 
50  private static final Log LOG = LogFactory.getLog(QueueManager.class);
51 
52  // Prefix in configuration for queue related keys
53  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
54                                                        = "mapred.queue.";
55  // Configured queues
56  private Set<String> queueNames;
57  // Map of a queue and ACL property name with an ACL
58  private HashMap<String, AccessControlList> aclsMap;
59  // Map of a queue name to any generic object that represents
60  // scheduler information
61  private HashMap<String, Object> schedulerInfoObjects;
62  // Whether ACLs are enabled in the system or not.
63  private boolean aclsEnabled;
64 
65  /**
66   * Enum representing an operation that can be performed on a queue.
67   */
68  static enum QueueOperation {
69    SUBMIT_JOB ("acl-submit-job", false),
70    ADMINISTER_JOBS ("acl-administer-jobs", true);
71    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
72    //       users in UI
73    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
74    //       configuring queues.
75   
76    private final String aclName;
77    private final boolean jobOwnerAllowed;
78   
79    QueueOperation(String aclName, boolean jobOwnerAllowed) {
80      this.aclName = aclName;
81      this.jobOwnerAllowed = jobOwnerAllowed;
82    }
83
84    final String getAclName() {
85      return aclName;
86    }
87   
88    final boolean isJobOwnerAllowed() {
89      return jobOwnerAllowed;
90    }
91  }
92 
93  /**
94   * Construct a new QueueManager using configuration specified in the passed
95   * in {@link org.apache.hadoop.conf.Configuration} object.
96   *
97   * @param conf Configuration object where queue configuration is specified.
98   */
99  public QueueManager(Configuration conf) {
100    queueNames = new TreeSet<String>();
101    aclsMap = new HashMap<String, AccessControlList>();
102    schedulerInfoObjects = new HashMap<String, Object>();
103    initialize(conf);
104  }
105 
106  /**
107   * Return the set of queues configured in the system.
108   *
109   * The number of queues configured should be dependent on the Scheduler
110   * configured. Note that some schedulers work with only one queue, whereas
111   * others can support multiple queues.
112   * 
113   * @return Set of queue names.
114   */
115  public synchronized Set<String> getQueues() {
116    return queueNames;
117  }
118 
119  /**
120   * Return true if the given {@link QueueManager.QueueOperation} can be
121   * performed by the specified user on the given queue.
122   *
123   * An operation is allowed if all users are provided access for this
124   * operation, or if either the user or any of the groups specified is
125   * provided access.
126   *
127   * @param queueName Queue on which the operation needs to be performed.
128   * @param oper The operation to perform
129   * @param ugi The user and groups who wish to perform the operation.
130   *
131   * @return true if the operation is allowed, false otherwise.
132   */
133  public synchronized boolean hasAccess(String queueName, QueueOperation oper,
134                                UserGroupInformation ugi) {
135    return hasAccess(queueName, null, oper, ugi);
136  }
137 
138  /**
139   * Return true if the given {@link QueueManager.QueueOperation} can be
140   * performed by the specified user on the specified job in the given queue.
141   *
142   * An operation is allowed either if the owner of the job is the user
143   * performing the task, all users are provided access for this
144   * operation, or if either the user or any of the groups specified is
145   * provided access.
146   *
147   * If the {@link QueueManager.QueueOperation} is not job specific then the
148   * job parameter is ignored.
149   *
150   * @param queueName Queue on which the operation needs to be performed.
151   * @param job The {@link JobInProgress} on which the operation is being
152   *            performed.
153   * @param oper The operation to perform
154   * @param ugi The user and groups who wish to perform the operation.
155   *
156   * @return true if the operation is allowed, false otherwise.
157   */
158  public synchronized boolean hasAccess(String queueName, JobInProgress job, 
159                                QueueOperation oper, 
160                                UserGroupInformation ugi) {
161    if (!aclsEnabled) {
162      return true;
163    }
164   
165    if (LOG.isDebugEnabled()) {
166      LOG.debug("checking access for : " + toFullPropertyName(queueName, 
167                                            oper.getAclName()));     
168    }
169   
170    if (oper.isJobOwnerAllowed()) {
171      if (job.getJobConf().getUser().equals(ugi.getUserName())) {
172        return true;
173      }
174    }
175   
176    AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
177    if (acl == null) {
178      return false;
179    }
180   
181    // Check the ACL list
182    boolean allowed = acl.allAllowed();
183    if (!allowed) {
184      // Check the allowed users list
185      if (acl.getUsers().contains(ugi.getUserName())) {
186        allowed = true;
187      } else {
188        // Check the allowed groups list
189        Set<String> allowedGroups = acl.getGroups();
190        for (String group : ugi.getGroupNames()) {
191          if (allowedGroups.contains(group)) {
192            allowed = true;
193            break;
194          }
195        }
196      }
197    }
198   
199    return allowed;   
200  }
201 
202  /**
203   * Set a generic Object that represents scheduling information relevant
204   * to a queue.
205   *
206   * A string representation of this Object will be used by the framework
207   * to display in user facing applications like the JobTracker web UI and
208   * the hadoop CLI.
209   *
210   * @param queueName queue for which the scheduling information is to be set.
211   * @param queueInfo scheduling information for this queue.
212   */
213  public synchronized void setSchedulerInfo(String queueName, 
214                                              Object queueInfo) {
215    schedulerInfoObjects.put(queueName, queueInfo);
216  }
217 
218  /**
219   * Return the scheduler information configured for this queue.
220   *
221   * @param queueName queue for which the scheduling information is required.
222   * @return The scheduling information for this queue.
223   *
224   * @see #setSchedulerInfo(String, Object)
225   */
226  public synchronized Object getSchedulerInfo(String queueName) {
227    return schedulerInfoObjects.get(queueName);
228  }
229 
230  /**
231   * Refresh information configured for queues in the system by reading
232   * it from the passed in {@link org.apache.hadoop.conf.Configuration}.
233   *
234   * Previously stored information about queues is removed and new
235   * information populated from the configuration.
236   *
237   * @param conf New configuration for the queues.
238   */
239  public synchronized void refresh(Configuration conf) {
240    queueNames.clear();
241    aclsMap.clear();
242    schedulerInfoObjects.clear();
243    initialize(conf);
244  }
245 
246  private void initialize(Configuration conf) {
247    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
248    String[] queues = conf.getStrings("mapred.queue.names", 
249                                  new String[] {JobConf.DEFAULT_QUEUE_NAME});
250    addToSet(queueNames, queues);
251   
252    // for every queue, and every operation, get the ACL
253    // if any is specified and store in aclsMap.
254    for (String queue : queues) {
255      for (QueueOperation oper : QueueOperation.values()) {
256        String key = toFullPropertyName(queue, oper.getAclName());
257        String aclString = conf.get(key, "*");
258        aclsMap.put(key, new AccessControlList(aclString));
259      }
260    }
261  }
262 
263  private static final String toFullPropertyName(String queue, 
264      String property) {
265    return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
266  }
267 
268  private static final void addToSet(Set<String> set, String[] elems) {
269    for (String elem : elems) {
270      set.add(elem);
271    }
272  }
273 
274  synchronized JobQueueInfo[] getJobQueueInfos() {
275    ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
276    for(String queue : queueNames) {
277      Object schedulerInfo = schedulerInfoObjects.get(queue);
278      if(schedulerInfo != null) {
279        queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString()));
280      }else {
281        queueInfoList.add(new JobQueueInfo(queue,null));
282      }
283    }
284    return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList
285        .size()]);
286  }
287
288  JobQueueInfo getJobQueueInfo(String queue) {
289    Object schedulingInfo = schedulerInfoObjects.get(queue);
290    if(schedulingInfo!=null){
291      return new JobQueueInfo(queue,schedulingInfo.toString());
292    }else {
293      return new JobQueueInfo(queue,null);
294    }
295  }
296}
Note: See TracBrowser for help on using the repository browser.