1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | |
---|
19 | package org.apache.hadoop.mapred; |
---|
20 | |
---|
21 | import java.util.ArrayList; |
---|
22 | import java.util.HashMap; |
---|
23 | import java.util.Set; |
---|
24 | import java.util.TreeSet; |
---|
25 | |
---|
26 | import org.apache.commons.logging.Log; |
---|
27 | import org.apache.commons.logging.LogFactory; |
---|
28 | |
---|
29 | import org.apache.hadoop.conf.Configuration; |
---|
30 | import org.apache.hadoop.security.UserGroupInformation; |
---|
31 | import org.apache.hadoop.security.SecurityUtil.AccessControlList; |
---|
32 | |
---|
33 | /** |
---|
34 | * Class that exposes information about queues maintained by the Hadoop |
---|
35 | * Map/Reduce framework. |
---|
36 | * |
---|
37 | * The Map/Reduce framework can be configured with one or more queues, |
---|
38 | * depending on the scheduler it is configured with. While some |
---|
39 | * schedulers work only with one queue, some schedulers support multiple |
---|
40 | * queues. |
---|
41 | * |
---|
42 | * Queues can be configured with various properties. Some of these |
---|
43 | * properties are common to all schedulers, and those are handled by this |
---|
44 | * class. Schedulers might also associate several custom properties with |
---|
45 | * queues. Where such a case exists, the queue name must be used to link |
---|
46 | * the common properties with the scheduler specific ones. |
---|
47 | */ |
---|
48 | class QueueManager { |
---|
49 | |
---|
50 | private static final Log LOG = LogFactory.getLog(QueueManager.class); |
---|
51 | |
---|
52 | // Prefix in configuration for queue related keys |
---|
53 | private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX |
---|
54 | = "mapred.queue."; |
---|
55 | // Configured queues |
---|
56 | private Set<String> queueNames; |
---|
57 | // Map of a queue and ACL property name with an ACL |
---|
58 | private HashMap<String, AccessControlList> aclsMap; |
---|
59 | // Map of a queue name to any generic object that represents |
---|
60 | // scheduler information |
---|
61 | private HashMap<String, Object> schedulerInfoObjects; |
---|
62 | // Whether ACLs are enabled in the system or not. |
---|
63 | private boolean aclsEnabled; |
---|
64 | |
---|
65 | /** |
---|
66 | * Enum representing an operation that can be performed on a queue. |
---|
67 | */ |
---|
68 | static enum QueueOperation { |
---|
69 | SUBMIT_JOB ("acl-submit-job", false), |
---|
70 | ADMINISTER_JOBS ("acl-administer-jobs", true); |
---|
71 | // TODO: Add ACL for LIST_JOBS when we have ability to authenticate |
---|
72 | // users in UI |
---|
73 | // TODO: Add ACL for CHANGE_ACL when we have an admin tool for |
---|
74 | // configuring queues. |
---|
75 | |
---|
76 | private final String aclName; |
---|
77 | private final boolean jobOwnerAllowed; |
---|
78 | |
---|
79 | QueueOperation(String aclName, boolean jobOwnerAllowed) { |
---|
80 | this.aclName = aclName; |
---|
81 | this.jobOwnerAllowed = jobOwnerAllowed; |
---|
82 | } |
---|
83 | |
---|
84 | final String getAclName() { |
---|
85 | return aclName; |
---|
86 | } |
---|
87 | |
---|
88 | final boolean isJobOwnerAllowed() { |
---|
89 | return jobOwnerAllowed; |
---|
90 | } |
---|
91 | } |
---|
92 | |
---|
93 | /** |
---|
94 | * Construct a new QueueManager using configuration specified in the passed |
---|
95 | * in {@link org.apache.hadoop.conf.Configuration} object. |
---|
96 | * |
---|
97 | * @param conf Configuration object where queue configuration is specified. |
---|
98 | */ |
---|
99 | public QueueManager(Configuration conf) { |
---|
100 | queueNames = new TreeSet<String>(); |
---|
101 | aclsMap = new HashMap<String, AccessControlList>(); |
---|
102 | schedulerInfoObjects = new HashMap<String, Object>(); |
---|
103 | initialize(conf); |
---|
104 | } |
---|
105 | |
---|
106 | /** |
---|
107 | * Return the set of queues configured in the system. |
---|
108 | * |
---|
109 | * The number of queues configured should be dependent on the Scheduler |
---|
110 | * configured. Note that some schedulers work with only one queue, whereas |
---|
111 | * others can support multiple queues. |
---|
112 | * |
---|
113 | * @return Set of queue names. |
---|
114 | */ |
---|
115 | public synchronized Set<String> getQueues() { |
---|
116 | return queueNames; |
---|
117 | } |
---|
118 | |
---|
119 | /** |
---|
120 | * Return true if the given {@link QueueManager.QueueOperation} can be |
---|
121 | * performed by the specified user on the given queue. |
---|
122 | * |
---|
123 | * An operation is allowed if all users are provided access for this |
---|
124 | * operation, or if either the user or any of the groups specified is |
---|
125 | * provided access. |
---|
126 | * |
---|
127 | * @param queueName Queue on which the operation needs to be performed. |
---|
128 | * @param oper The operation to perform |
---|
129 | * @param ugi The user and groups who wish to perform the operation. |
---|
130 | * |
---|
131 | * @return true if the operation is allowed, false otherwise. |
---|
132 | */ |
---|
133 | public synchronized boolean hasAccess(String queueName, QueueOperation oper, |
---|
134 | UserGroupInformation ugi) { |
---|
135 | return hasAccess(queueName, null, oper, ugi); |
---|
136 | } |
---|
137 | |
---|
138 | /** |
---|
139 | * Return true if the given {@link QueueManager.QueueOperation} can be |
---|
140 | * performed by the specified user on the specified job in the given queue. |
---|
141 | * |
---|
142 | * An operation is allowed either if the owner of the job is the user |
---|
143 | * performing the task, all users are provided access for this |
---|
144 | * operation, or if either the user or any of the groups specified is |
---|
145 | * provided access. |
---|
146 | * |
---|
147 | * If the {@link QueueManager.QueueOperation} is not job specific then the |
---|
148 | * job parameter is ignored. |
---|
149 | * |
---|
150 | * @param queueName Queue on which the operation needs to be performed. |
---|
151 | * @param job The {@link JobInProgress} on which the operation is being |
---|
152 | * performed. |
---|
153 | * @param oper The operation to perform |
---|
154 | * @param ugi The user and groups who wish to perform the operation. |
---|
155 | * |
---|
156 | * @return true if the operation is allowed, false otherwise. |
---|
157 | */ |
---|
158 | public synchronized boolean hasAccess(String queueName, JobInProgress job, |
---|
159 | QueueOperation oper, |
---|
160 | UserGroupInformation ugi) { |
---|
161 | if (!aclsEnabled) { |
---|
162 | return true; |
---|
163 | } |
---|
164 | |
---|
165 | if (LOG.isDebugEnabled()) { |
---|
166 | LOG.debug("checking access for : " + toFullPropertyName(queueName, |
---|
167 | oper.getAclName())); |
---|
168 | } |
---|
169 | |
---|
170 | if (oper.isJobOwnerAllowed()) { |
---|
171 | if (job.getJobConf().getUser().equals(ugi.getUserName())) { |
---|
172 | return true; |
---|
173 | } |
---|
174 | } |
---|
175 | |
---|
176 | AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName())); |
---|
177 | if (acl == null) { |
---|
178 | return false; |
---|
179 | } |
---|
180 | |
---|
181 | // Check the ACL list |
---|
182 | boolean allowed = acl.allAllowed(); |
---|
183 | if (!allowed) { |
---|
184 | // Check the allowed users list |
---|
185 | if (acl.getUsers().contains(ugi.getUserName())) { |
---|
186 | allowed = true; |
---|
187 | } else { |
---|
188 | // Check the allowed groups list |
---|
189 | Set<String> allowedGroups = acl.getGroups(); |
---|
190 | for (String group : ugi.getGroupNames()) { |
---|
191 | if (allowedGroups.contains(group)) { |
---|
192 | allowed = true; |
---|
193 | break; |
---|
194 | } |
---|
195 | } |
---|
196 | } |
---|
197 | } |
---|
198 | |
---|
199 | return allowed; |
---|
200 | } |
---|
201 | |
---|
202 | /** |
---|
203 | * Set a generic Object that represents scheduling information relevant |
---|
204 | * to a queue. |
---|
205 | * |
---|
206 | * A string representation of this Object will be used by the framework |
---|
207 | * to display in user facing applications like the JobTracker web UI and |
---|
208 | * the hadoop CLI. |
---|
209 | * |
---|
210 | * @param queueName queue for which the scheduling information is to be set. |
---|
211 | * @param queueInfo scheduling information for this queue. |
---|
212 | */ |
---|
213 | public synchronized void setSchedulerInfo(String queueName, |
---|
214 | Object queueInfo) { |
---|
215 | schedulerInfoObjects.put(queueName, queueInfo); |
---|
216 | } |
---|
217 | |
---|
218 | /** |
---|
219 | * Return the scheduler information configured for this queue. |
---|
220 | * |
---|
221 | * @param queueName queue for which the scheduling information is required. |
---|
222 | * @return The scheduling information for this queue. |
---|
223 | * |
---|
224 | * @see #setSchedulerInfo(String, Object) |
---|
225 | */ |
---|
226 | public synchronized Object getSchedulerInfo(String queueName) { |
---|
227 | return schedulerInfoObjects.get(queueName); |
---|
228 | } |
---|
229 | |
---|
230 | /** |
---|
231 | * Refresh information configured for queues in the system by reading |
---|
232 | * it from the passed in {@link org.apache.hadoop.conf.Configuration}. |
---|
233 | * |
---|
234 | * Previously stored information about queues is removed and new |
---|
235 | * information populated from the configuration. |
---|
236 | * |
---|
237 | * @param conf New configuration for the queues. |
---|
238 | */ |
---|
239 | public synchronized void refresh(Configuration conf) { |
---|
240 | queueNames.clear(); |
---|
241 | aclsMap.clear(); |
---|
242 | schedulerInfoObjects.clear(); |
---|
243 | initialize(conf); |
---|
244 | } |
---|
245 | |
---|
246 | private void initialize(Configuration conf) { |
---|
247 | aclsEnabled = conf.getBoolean("mapred.acls.enabled", false); |
---|
248 | String[] queues = conf.getStrings("mapred.queue.names", |
---|
249 | new String[] {JobConf.DEFAULT_QUEUE_NAME}); |
---|
250 | addToSet(queueNames, queues); |
---|
251 | |
---|
252 | // for every queue, and every operation, get the ACL |
---|
253 | // if any is specified and store in aclsMap. |
---|
254 | for (String queue : queues) { |
---|
255 | for (QueueOperation oper : QueueOperation.values()) { |
---|
256 | String key = toFullPropertyName(queue, oper.getAclName()); |
---|
257 | String aclString = conf.get(key, "*"); |
---|
258 | aclsMap.put(key, new AccessControlList(aclString)); |
---|
259 | } |
---|
260 | } |
---|
261 | } |
---|
262 | |
---|
263 | private static final String toFullPropertyName(String queue, |
---|
264 | String property) { |
---|
265 | return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property; |
---|
266 | } |
---|
267 | |
---|
268 | private static final void addToSet(Set<String> set, String[] elems) { |
---|
269 | for (String elem : elems) { |
---|
270 | set.add(elem); |
---|
271 | } |
---|
272 | } |
---|
273 | |
---|
274 | synchronized JobQueueInfo[] getJobQueueInfos() { |
---|
275 | ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>(); |
---|
276 | for(String queue : queueNames) { |
---|
277 | Object schedulerInfo = schedulerInfoObjects.get(queue); |
---|
278 | if(schedulerInfo != null) { |
---|
279 | queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString())); |
---|
280 | }else { |
---|
281 | queueInfoList.add(new JobQueueInfo(queue,null)); |
---|
282 | } |
---|
283 | } |
---|
284 | return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList |
---|
285 | .size()]); |
---|
286 | } |
---|
287 | |
---|
288 | JobQueueInfo getJobQueueInfo(String queue) { |
---|
289 | Object schedulingInfo = schedulerInfoObjects.get(queue); |
---|
290 | if(schedulingInfo!=null){ |
---|
291 | return new JobQueueInfo(queue,schedulingInfo.toString()); |
---|
292 | }else { |
---|
293 | return new JobQueueInfo(queue,null); |
---|
294 | } |
---|
295 | } |
---|
296 | } |
---|