[120] | 1 | /** |
---|
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
| 3 | * or more contributor license agreements. See the NOTICE file |
---|
| 4 | * distributed with this work for additional information |
---|
| 5 | * regarding copyright ownership. The ASF licenses this file |
---|
| 6 | * to you under the Apache License, Version 2.0 (the |
---|
| 7 | * "License"); you may not use this file except in compliance |
---|
| 8 | * with the License. You may obtain a copy of the License at |
---|
| 9 | * |
---|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 11 | * |
---|
| 12 | * Unless required by applicable law or agreed to in writing, software |
---|
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
| 15 | * See the License for the specific language governing permissions and |
---|
| 16 | * limitations under the License. |
---|
| 17 | */ |
---|
| 18 | package org.apache.hadoop.util; |
---|
| 19 | |
---|
| 20 | import java.io.BufferedReader; |
---|
| 21 | import java.io.File; |
---|
| 22 | import java.io.IOException; |
---|
| 23 | import java.io.InputStreamReader; |
---|
| 24 | import java.util.Map; |
---|
| 25 | |
---|
| 26 | import org.apache.commons.logging.Log; |
---|
| 27 | import org.apache.commons.logging.LogFactory; |
---|
| 28 | import org.apache.hadoop.conf.Configuration; |
---|
| 29 | |
---|
| 30 | /** |
---|
| 31 | * A base class for running a Unix command. |
---|
| 32 | * |
---|
| 33 | * <code>Shell</code> can be used to run unix commands like <code>du</code> or |
---|
| 34 | * <code>df</code>. It also offers facilities to gate commands by |
---|
| 35 | * time-intervals. |
---|
| 36 | */ |
---|
| 37 | abstract public class Shell { |
---|
| 38 | |
---|
| 39 | public static final Log LOG = LogFactory.getLog(Shell.class); |
---|
| 40 | |
---|
| 41 | /** a Unix command to get the current user's name */ |
---|
| 42 | public final static String USER_NAME_COMMAND = "whoami"; |
---|
| 43 | /** a Unix command to get the current user's groups list */ |
---|
| 44 | public static String[] getGROUPS_COMMAND() { |
---|
| 45 | return new String[]{"bash", "-c", "groups"}; |
---|
| 46 | } |
---|
| 47 | /** a Unix command to set permission */ |
---|
| 48 | public static final String SET_PERMISSION_COMMAND = "chmod"; |
---|
| 49 | /** a Unix command to set owner */ |
---|
| 50 | public static final String SET_OWNER_COMMAND = "chown"; |
---|
| 51 | public static final String SET_GROUP_COMMAND = "chgrp"; |
---|
| 52 | /** Return a Unix command to get permission information. */ |
---|
| 53 | public static String[] getGET_PERMISSION_COMMAND() { |
---|
| 54 | //force /bin/ls, except on windows. |
---|
| 55 | return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"}; |
---|
| 56 | } |
---|
| 57 | |
---|
| 58 | /** |
---|
| 59 | * Get the Unix command for setting the maximum virtual memory available |
---|
| 60 | * to a given child process. This is only relevant when we are forking a |
---|
| 61 | * process from within the {@link org.apache.hadoop.mapred.Mapper} or the |
---|
| 62 | * {@link org.apache.hadoop.mapred.Reducer} implementations |
---|
| 63 | * e.g. <a href="{@docRoot}/org/apache/hadoop/mapred/pipes/package-summary.html">Hadoop Pipes</a> |
---|
| 64 | * or <a href="{@docRoot}/org/apache/hadoop/streaming/package-summary.html">Hadoop Streaming</a>. |
---|
| 65 | * |
---|
| 66 | * It also checks to ensure that we are running on a *nix platform else |
---|
| 67 | * (e.g. in Cygwin/Windows) it returns <code>null</code>. |
---|
| 68 | * @param conf configuration |
---|
| 69 | * @return a <code>String[]</code> with the ulimit command arguments or |
---|
| 70 | * <code>null</code> if we are running on a non *nix platform or |
---|
| 71 | * if the limit is unspecified. |
---|
| 72 | */ |
---|
| 73 | public static String[] getUlimitMemoryCommand(Configuration conf) { |
---|
| 74 | // ulimit isn't supported on Windows |
---|
| 75 | if (WINDOWS) { |
---|
| 76 | return null; |
---|
| 77 | } |
---|
| 78 | |
---|
| 79 | // get the memory limit from the configuration |
---|
| 80 | String ulimit = conf.get("mapred.child.ulimit"); |
---|
| 81 | if (ulimit == null) { |
---|
| 82 | return null; |
---|
| 83 | } |
---|
| 84 | |
---|
| 85 | // Parse it to ensure it is legal/sane |
---|
| 86 | int memoryLimit = Integer.valueOf(ulimit); |
---|
| 87 | |
---|
| 88 | return new String[] {"ulimit", "-v", String.valueOf(memoryLimit)}; |
---|
| 89 | } |
---|
| 90 | |
---|
| 91 | /** Set to true on Windows platforms */ |
---|
| 92 | public static final boolean WINDOWS /* borrowed from Path.WINDOWS */ |
---|
| 93 | = System.getProperty("os.name").startsWith("Windows"); |
---|
| 94 | |
---|
| 95 | private long interval; // refresh interval in msec |
---|
| 96 | private long lastTime; // last time the command was performed |
---|
| 97 | private Map<String, String> environment; // env for the command execution |
---|
| 98 | private File dir; |
---|
| 99 | private Process process; // sub process used to execute the command |
---|
| 100 | private int exitCode; |
---|
| 101 | |
---|
| 102 | public Shell() { |
---|
| 103 | this(0L); |
---|
| 104 | } |
---|
| 105 | |
---|
| 106 | /** |
---|
| 107 | * @param interval the minimum duration to wait before re-executing the |
---|
| 108 | * command. |
---|
| 109 | */ |
---|
| 110 | public Shell( long interval ) { |
---|
| 111 | this.interval = interval; |
---|
| 112 | this.lastTime = (interval<0) ? 0 : -interval; |
---|
| 113 | } |
---|
| 114 | |
---|
| 115 | /** set the environment for the command |
---|
| 116 | * @param env Mapping of environment variables |
---|
| 117 | */ |
---|
| 118 | protected void setEnvironment(Map<String, String> env) { |
---|
| 119 | this.environment = env; |
---|
| 120 | } |
---|
| 121 | |
---|
| 122 | /** set the working directory |
---|
| 123 | * @param dir The directory where the command would be executed |
---|
| 124 | */ |
---|
| 125 | protected void setWorkingDirectory(File dir) { |
---|
| 126 | this.dir = dir; |
---|
| 127 | } |
---|
| 128 | |
---|
| 129 | /** check to see if a command needs to be executed and execute if needed */ |
---|
| 130 | protected void run() throws IOException { |
---|
| 131 | if (lastTime + interval > System.currentTimeMillis()) |
---|
| 132 | return; |
---|
| 133 | exitCode = 0; // reset for next run |
---|
| 134 | runCommand(); |
---|
| 135 | } |
---|
| 136 | |
---|
| 137 | /** Run a command */ |
---|
| 138 | private void runCommand() throws IOException { |
---|
| 139 | ProcessBuilder builder = new ProcessBuilder(getExecString()); |
---|
| 140 | boolean completed = false; |
---|
| 141 | |
---|
| 142 | if (environment != null) { |
---|
| 143 | builder.environment().putAll(this.environment); |
---|
| 144 | } |
---|
| 145 | if (dir != null) { |
---|
| 146 | builder.directory(this.dir); |
---|
| 147 | } |
---|
| 148 | |
---|
| 149 | process = builder.start(); |
---|
| 150 | final BufferedReader errReader = |
---|
| 151 | new BufferedReader(new InputStreamReader(process |
---|
| 152 | .getErrorStream())); |
---|
| 153 | BufferedReader inReader = |
---|
| 154 | new BufferedReader(new InputStreamReader(process |
---|
| 155 | .getInputStream())); |
---|
| 156 | final StringBuffer errMsg = new StringBuffer(); |
---|
| 157 | |
---|
| 158 | // read error and input streams as this would free up the buffers |
---|
| 159 | // free the error stream buffer |
---|
| 160 | Thread errThread = new Thread() { |
---|
| 161 | @Override |
---|
| 162 | public void run() { |
---|
| 163 | try { |
---|
| 164 | String line = errReader.readLine(); |
---|
| 165 | while((line != null) && !isInterrupted()) { |
---|
| 166 | errMsg.append(line); |
---|
| 167 | errMsg.append(System.getProperty("line.separator")); |
---|
| 168 | line = errReader.readLine(); |
---|
| 169 | } |
---|
| 170 | } catch(IOException ioe) { |
---|
| 171 | LOG.warn("Error reading the error stream", ioe); |
---|
| 172 | } |
---|
| 173 | } |
---|
| 174 | }; |
---|
| 175 | try { |
---|
| 176 | errThread.start(); |
---|
| 177 | } catch (IllegalStateException ise) { } |
---|
| 178 | try { |
---|
| 179 | parseExecResult(inReader); // parse the output |
---|
| 180 | // clear the input stream buffer |
---|
| 181 | String line = inReader.readLine(); |
---|
| 182 | while(line != null) { |
---|
| 183 | line = inReader.readLine(); |
---|
| 184 | } |
---|
| 185 | // wait for the process to finish and check the exit code |
---|
| 186 | exitCode = process.waitFor(); |
---|
| 187 | try { |
---|
| 188 | // make sure that the error thread exits |
---|
| 189 | errThread.join(); |
---|
| 190 | } catch (InterruptedException ie) { |
---|
| 191 | LOG.warn("Interrupted while reading the error stream", ie); |
---|
| 192 | } |
---|
| 193 | completed = true; |
---|
| 194 | if (exitCode != 0) { |
---|
| 195 | throw new ExitCodeException(exitCode, errMsg.toString()); |
---|
| 196 | } |
---|
| 197 | } catch (InterruptedException ie) { |
---|
| 198 | throw new IOException(ie.toString()); |
---|
| 199 | } finally { |
---|
| 200 | // close the input stream |
---|
| 201 | try { |
---|
| 202 | inReader.close(); |
---|
| 203 | } catch (IOException ioe) { |
---|
| 204 | LOG.warn("Error while closing the input stream", ioe); |
---|
| 205 | } |
---|
| 206 | if (!completed) { |
---|
| 207 | errThread.interrupt(); |
---|
| 208 | } |
---|
| 209 | try { |
---|
| 210 | errReader.close(); |
---|
| 211 | } catch (IOException ioe) { |
---|
| 212 | LOG.warn("Error while closing the error stream", ioe); |
---|
| 213 | } |
---|
| 214 | process.destroy(); |
---|
| 215 | lastTime = System.currentTimeMillis(); |
---|
| 216 | } |
---|
| 217 | } |
---|
| 218 | |
---|
| 219 | /** return an array containing the command name & its parameters */ |
---|
| 220 | protected abstract String[] getExecString(); |
---|
| 221 | |
---|
| 222 | /** Parse the execution result */ |
---|
| 223 | protected abstract void parseExecResult(BufferedReader lines) |
---|
| 224 | throws IOException; |
---|
| 225 | |
---|
| 226 | /** get the current sub-process executing the given command |
---|
| 227 | * @return process executing the command |
---|
| 228 | */ |
---|
| 229 | public Process getProcess() { |
---|
| 230 | return process; |
---|
| 231 | } |
---|
| 232 | |
---|
| 233 | /** get the exit code |
---|
| 234 | * @return the exit code of the process |
---|
| 235 | */ |
---|
| 236 | public int getExitCode() { |
---|
| 237 | return exitCode; |
---|
| 238 | } |
---|
| 239 | |
---|
| 240 | /** |
---|
| 241 | * This is an IOException with exit code added. |
---|
| 242 | */ |
---|
| 243 | public static class ExitCodeException extends IOException { |
---|
| 244 | int exitCode; |
---|
| 245 | |
---|
| 246 | public ExitCodeException(int exitCode, String message) { |
---|
| 247 | super(message); |
---|
| 248 | this.exitCode = exitCode; |
---|
| 249 | } |
---|
| 250 | |
---|
| 251 | public int getExitCode() { |
---|
| 252 | return exitCode; |
---|
| 253 | } |
---|
| 254 | } |
---|
| 255 | |
---|
| 256 | /** |
---|
| 257 | * A simple shell command executor. |
---|
| 258 | * |
---|
| 259 | * <code>ShellCommandExecutor</code>should be used in cases where the output |
---|
| 260 | * of the command needs no explicit parsing and where the command, working |
---|
| 261 | * directory and the environment remains unchanged. The output of the command |
---|
| 262 | * is stored as-is and is expected to be small. |
---|
| 263 | */ |
---|
| 264 | public static class ShellCommandExecutor extends Shell { |
---|
| 265 | |
---|
| 266 | private String[] command; |
---|
| 267 | private StringBuffer output; |
---|
| 268 | |
---|
| 269 | public ShellCommandExecutor(String[] execString) { |
---|
| 270 | command = execString.clone(); |
---|
| 271 | } |
---|
| 272 | |
---|
| 273 | public ShellCommandExecutor(String[] execString, File dir) { |
---|
| 274 | this(execString); |
---|
| 275 | this.setWorkingDirectory(dir); |
---|
| 276 | } |
---|
| 277 | |
---|
| 278 | public ShellCommandExecutor(String[] execString, File dir, |
---|
| 279 | Map<String, String> env) { |
---|
| 280 | this(execString, dir); |
---|
| 281 | this.setEnvironment(env); |
---|
| 282 | } |
---|
| 283 | |
---|
| 284 | /** Execute the shell command. */ |
---|
| 285 | public void execute() throws IOException { |
---|
| 286 | this.run(); |
---|
| 287 | } |
---|
| 288 | |
---|
| 289 | protected String[] getExecString() { |
---|
| 290 | return command; |
---|
| 291 | } |
---|
| 292 | |
---|
| 293 | protected void parseExecResult(BufferedReader lines) throws IOException { |
---|
| 294 | output = new StringBuffer(); |
---|
| 295 | char[] buf = new char[512]; |
---|
| 296 | int nRead; |
---|
| 297 | while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) { |
---|
| 298 | output.append(buf, 0, nRead); |
---|
| 299 | } |
---|
| 300 | } |
---|
| 301 | |
---|
| 302 | /** Get the output of the shell command.*/ |
---|
| 303 | public String getOutput() { |
---|
| 304 | return (output == null) ? "" : output.toString(); |
---|
| 305 | } |
---|
| 306 | |
---|
| 307 | /** |
---|
| 308 | * Returns the commands of this instance. |
---|
| 309 | * Arguments with spaces in are presented with quotes round; other |
---|
| 310 | * arguments are presented raw |
---|
| 311 | * |
---|
| 312 | * @return a string representation of the object. |
---|
| 313 | */ |
---|
| 314 | public String toString() { |
---|
| 315 | StringBuilder builder = new StringBuilder(); |
---|
| 316 | String[] args = getExecString(); |
---|
| 317 | for (String s : args) { |
---|
| 318 | if (s.indexOf(' ') >= 0) { |
---|
| 319 | builder.append('"').append(s).append('"'); |
---|
| 320 | } else { |
---|
| 321 | builder.append(s); |
---|
| 322 | } |
---|
| 323 | builder.append(' '); |
---|
| 324 | } |
---|
| 325 | return builder.toString(); |
---|
| 326 | } |
---|
| 327 | } |
---|
| 328 | |
---|
| 329 | /** |
---|
| 330 | * Static method to execute a shell command. |
---|
| 331 | * Covers most of the simple cases without requiring the user to implement |
---|
| 332 | * the <code>Shell</code> interface. |
---|
| 333 | * @param cmd shell command to execute. |
---|
| 334 | * @return the output of the executed command. |
---|
| 335 | */ |
---|
| 336 | public static String execCommand(String ... cmd) throws IOException { |
---|
| 337 | return execCommand(null, cmd); |
---|
| 338 | } |
---|
| 339 | |
---|
| 340 | /** |
---|
| 341 | * Static method to execute a shell command. |
---|
| 342 | * Covers most of the simple cases without requiring the user to implement |
---|
| 343 | * the <code>Shell</code> interface. |
---|
| 344 | * @param env the map of environment key=value |
---|
| 345 | * @param cmd shell command to execute. |
---|
| 346 | * @return the output of the executed command. |
---|
| 347 | */ |
---|
| 348 | public static String execCommand(Map<String,String> env, String ... cmd) |
---|
| 349 | throws IOException { |
---|
| 350 | ShellCommandExecutor exec = new ShellCommandExecutor(cmd); |
---|
| 351 | if (env != null) { |
---|
| 352 | exec.setEnvironment(env); |
---|
| 353 | } |
---|
| 354 | exec.execute(); |
---|
| 355 | return exec.getOutput(); |
---|
| 356 | } |
---|
| 357 | } |
---|