source: proiecte/HadoopJUnit/hadoop-0.20.1/src/contrib/thriftfs/scripts/hdfs.py @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 14.6 KB
Line 
1#!/usr/bin/env python
2
3"""
4  hdfs.py is a python client for the thrift interface to HDFS.
5 
6  Licensed under the Apache License, Version 2.0 (the "License");
7  you may not use this file except in compliance with the License.
8  You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12  Unless required by applicable law or agreed to in writing, software
13  distributed under the License is distributed on an "AS IS" BASIS,
14  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15  implied. See the License for the specific language governing permissions
16  and limitations under the License.
17
18"""
19import sys
20sys.path.append('../gen-py')
21
22from optparse import OptionParser
23from thrift import Thrift
24from thrift.transport import TSocket
25from thrift.transport import TTransport
26from thrift.protocol import TBinaryProtocol
27from hadoopfs import ThriftHadoopFileSystem
28from hadoopfs.ttypes import *
29from readline import *
30from cmd import *
31import os
32import re
33import readline
34import subprocess
35
36#
37# The address of the FileSystemClientProxy. If the host and port are
38# not specified, then a proxy server is automatically spawned.
39#
40host = 'localhost'
41port = 4677                       # use any port
42proxyStartScript = './start_thrift_server.sh'
43startServer = True                # shall we start a proxy server?
44
45#
46# The hdfs interactive shell. The Cmd class is a builtin that uses readline + implements
47# a whole bunch of utility stuff like help and custom tab completions.
48# It makes everything real easy.
49#
50class hadoopthrift_cli(Cmd):
51
52  # my custom prompt looks better than the default
53  prompt = 'hdfs>> '
54
55  #############################
56  # Class constructor
57  #############################
58  def __init__(self, server_name, server_port):
59    Cmd.__init__(self)
60    self.server_name = server_name
61    self.server_port = server_port
62
63  #############################
64  # Start the ClientProxy Server if we can find it.
65  # Read in its stdout to determine what port it is running on
66  #############################
67  def startProxyServer(self):
68    try:
69      p = subprocess.Popen(proxyStartScript, self.server_port, stdout=subprocess.PIPE)
70      content = p.stdout.readline()
71      p.stdout.close()
72      val = re.split( '\[|\]', content)
73      print val[1]
74      self.server_port = val[1]
75      return True
76
77    except Exception, ex:
78      print "ERROR in starting proxy  server " + proxyStartScript
79      print '%s' % (ex.message)
80      return False
81
82  #############################
83  # Connect to clientproxy
84  #############################
85  def connect(self):
86    try:
87      # connect to hdfs thrift server
88      self.transport = TSocket.TSocket(self.server_name, self.server_port)
89      self.transport = TTransport.TBufferedTransport(self.transport)
90      self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
91
92      # Create a client to use the protocol encoder
93      self.client = ThriftHadoopFileSystem.Client(self.protocol)
94      self.transport.open()
95
96      # tell the HadoopThrift server to die after 60 minutes of inactivity
97      self.client.setInactivityTimeoutPeriod(60*60)
98      return True
99
100    except Thrift.TException, tx:
101      print "ERROR in connecting to ", self.server_name, ":", self.server_port
102      print '%s' % (tx.message)
103      return False
104
105
106  #
107  # Disconnect from client proxy
108  #
109  def shutdown(self):
110    try :
111      self.transport.close()
112    except Exception, tx:
113      return False
114
115  #############################
116  # Create the specified file. Returns a handle to write data.
117  #############################
118  def do_create(self, name):
119    if name == "":
120      print "  ERROR usage: create <pathname>"
121      print
122      return 0
123
124    # Create the file, and immediately closes the handle
125    path = Pathname();
126    path.pathname = name;
127    status = self.client.create(path)
128    self.client.close(status)
129    return 0
130
131  #############################
132  # Delete the specified file.
133  #############################
134  def do_rm(self, name):
135    if name == "":
136      print "  ERROR usage: rm <pathname>\n"
137      return 0
138
139    # delete file
140    path = Pathname();
141    path.pathname = name;
142    status = self.client.rm(path, False)
143    if status == False:
144      print "  ERROR in deleting path: " + name
145    return 0
146
147  #############################
148  # Rename the specified file/dir
149  #############################
150  def do_mv(self, line):
151    params = line.split()
152    if (len(params) != 2):
153      print "  ERROR usage: mv <srcpathname> <destpathname>\n"
154      return 0
155    src = params[0].strip()
156    dest = params[1].strip()
157
158    if src == "":
159      print "  ERROR usage: mv <srcpathname> <destpathname>\n"
160      return 0
161    if dest == "":
162      print "  ERROR usage: mv <srcpathname> <destpathname>\n"
163      return 0
164
165    # move file
166    path = Pathname();
167    path.pathname = src;
168    destpath = Pathname();
169    destpath.pathname = dest;
170    status = self.client.rename(path, destpath)
171    if status == False:
172      print "  ERROR in renaming path: " + name
173    return 0
174
175  #############################
176  # Delete the specified file.
177  #############################
178  def do_mkdirs(self, name):
179    if name == "":
180      print "  ERROR usage: mkdirs <pathname>\n"
181      return 0
182
183    # create directory
184    path = Pathname();
185    path.pathname = name;
186    fields = self.client.mkdirs(path)
187    return 0
188
189  #############################
190  # does the pathname exist?
191  #############################
192  def do_exists(self, name):
193    if name == "":
194      print "  ERROR usage: exists <pathname>\n"
195      return 0
196
197    # check existence of pathname
198    path = Pathname();
199    path.pathname = name;
200    fields = self.client.exists(path)
201    if (fields == True):
202      print name + " exists."
203    else:
204      print name + " does not exist."
205    return 0
206
207  #############################
208  # copy local file into hdfs
209  #############################
210  def do_put(self, line):
211    params = line.split()
212    if (len(params) != 2):
213      print "  ERROR usage: put <localpathname> <hdfspathname>\n"
214      return 0
215    local = params[0].strip()
216    hdfs = params[1].strip()
217
218    if local == "":
219      print "  ERROR usage: put <localpathname> <hdfspathname>\n"
220      return 0
221    if hdfs == "":
222      print "  ERROR usage: put <localpathname> <hdfspathname>\n"
223      return 0
224
225    # open local file
226    input = open(local, 'rb')
227
228    # open output file
229    path = Pathname();
230    path.pathname = hdfs;
231    output = self.client.create(path)
232
233    # read 1MB at a time and upload to hdfs
234    while True:
235      chunk = input.read(1024*1024)
236      if not chunk: break
237      self.client.write(output, chunk)
238     
239    self.client.close(output) 
240    input.close()
241
242  #############################
243  # copy hdfs file into local
244  #############################
245  def do_get(self, line):
246    params = line.split()
247    if (len(params) != 2):
248      print "  ERROR usage: get <hdfspathname> <localpathname>\n"
249      return 0
250    hdfs = params[0].strip()
251    local = params[1].strip()
252
253    if local == "":
254      print "  ERROR usage: get <hdfspathname> <localpathname>\n"
255      return 0
256    if hdfs == "":
257      print "  ERROR usage: get <hdfspathname> <localpathname>\n"
258      return 0
259
260    # open output local file
261    output = open(local, 'wb')
262
263    # open input hdfs file
264    path = Pathname();
265    path.pathname = hdfs;
266    input = self.client.open(path)
267
268    # find size of hdfs file
269    filesize = self.client.stat(path).length
270
271    # read 1MB bytes at a time from hdfs
272    offset = 0
273    chunksize = 1024 * 1024
274    while True:
275      chunk = self.client.read(input, offset, chunksize)
276      if not chunk: break
277      output.write(chunk)
278      offset += chunksize
279      if (offset >= filesize): break
280     
281    self.client.close(input) 
282    output.close()
283
284  #############################
285  # List attributes of this path
286  #############################
287  def do_ls(self, name):
288    if name == "":
289      print "  ERROR usage: list <pathname>\n"
290      return 0
291
292    # list file status
293    path = Pathname();
294    path.pathname = name;
295    status = self.client.stat(path)
296    if (status.isdir == False):
297      self.printStatus(status)
298      return 0
299   
300    # This is a directory, fetch its contents
301    liststatus = self.client.listStatus(path)
302    for item in liststatus:
303      self.printStatus(item)
304
305  #############################
306  # Set permissions for a file
307  #############################
308  def do_chmod(self, line):
309    params = line.split()
310    if (len(params) != 2):
311      print "  ERROR usage: chmod 774 <pathname>\n"
312      return 0
313    perm = params[0].strip()
314    name = params[1].strip()
315
316    if name == "":
317      print "  ERROR usage: chmod 774 <pathname>\n"
318      return 0
319    if perm == "":
320      print "  ERROR usage: chmod 774 <pathname>\n"
321      return 0
322
323    # set permissions (in octal)
324    path = Pathname();
325    path.pathname = name;
326    status = self.client.chmod(path, int(perm,8))
327    return 0
328
329  #############################
330  # Set owner for a file. This is not an atomic operation.
331  # A change to the group of a file may be overwritten by this one.
332  #############################
333  def do_chown(self, line):
334    params = line.split()
335    if (len(params) != 2):
336      print "  ERROR usage: chown <ownername> <pathname>\n"
337      return 0
338    owner = params[0].strip()
339    name = params[1].strip()
340    if name == "":
341      print "  ERROR usage: chown <ownername> <pathname>\n"
342      return 0
343
344    # get the current owner and group
345    path = Pathname();
346    path.pathname = name;
347    cur = self.client.stat(path)
348
349    # set new owner, keep old group
350    status = self.client.chown(path, owner, cur.group)
351    return 0
352
353  #######################################
354  # Set the replication factor for a file
355  ######################################
356  def do_setreplication(self, line):
357    params = line.split()
358    if (len(params) != 2):
359      print "  ERROR usage: setreplication <replication factor> <pathname>\n"
360      return 0
361    repl = params[0].strip()
362    name = params[1].strip()
363    if name == "":
364      print "  ERROR usage: setreplication <replication factor> <pathname>\n"
365      return 0
366    if repl == "":
367      print "  ERROR usage: setreplication <replication factor> <pathname>\n"
368      return 0
369
370    path = Pathname();
371    path.pathname = name;
372    status = self.client.setReplication(path, int(repl))
373    return 0
374
375  #############################
376  # Display the locations of the blocks of this file
377  #############################
378  def do_getlocations(self, name):
379    if name == "":
380      print "  ERROR usage: getlocations <pathname>\n"
381      return 0
382    path = Pathname();
383    path.pathname = name;
384
385    # find size of hdfs file
386    filesize = self.client.stat(path).length
387
388    # getlocations file
389    blockLocations = self.client.getFileBlockLocations(path, 0, filesize)
390    for item in blockLocations:
391      self.printLocations(item)
392   
393    return 0
394
395  #############################
396  # Utility methods from here
397  #############################
398  #
399  # If I don't do this, the last command is always re-executed which is annoying.
400  #
401  def emptyline(self):
402    pass
403
404  #
405  # print the status of a path
406  #
407  def printStatus(self, stat):
408    print str(stat.block_replication) + "\t" + str(stat.length) + "\t" + str(stat.modification_time) + "\t" + stat.permission + "\t" + stat.owner + "\t" + stat.group + "\t" + stat.path
409         
410  #
411  # print the locations of a block
412  #
413  def printLocations(self, location):
414    print str(location.names) + "\t"  + str(location.offset) + "\t" + str(location.length)
415
416  #
417  # Various ways to exit the hdfs shell
418  #
419  def do_quit(self,ignored):
420    try:
421      if startServer:
422        self.client.shutdown(1)
423      return -1
424    except Exception, ex:
425      return -1
426
427  def do_q(self,ignored):
428    return self.do_quit(ignored)
429
430  # ctl-d
431  def do_EOF(self,ignored):
432    return self.do_quit(ignored)
433
434  #
435  # Give the user some amount of help - I am a nice guy
436  #
437
438  def help_create(self):
439    print "create <pathname>"
440
441  def help_rm(self):
442    print "rm <pathname>"
443
444  def help_mv(self):
445    print "mv <srcpathname> <destpathname>"
446
447  def help_mkdirs(self):
448    print "mkdirs <pathname>"
449
450  def help_exists(self):
451    print "exists <pathname>"
452
453  def help_put(self):
454    print "put <localpathname> <hdfspathname>"
455
456  def help_get(self):
457    print "get <hdfspathname> <localpathname>"
458
459  def help_ls(self):
460    print "ls <hdfspathname>"
461
462  def help_chmod(self):
463    print "chmod 775 <hdfspathname>"
464
465  def help_chown(self):
466    print "chown <ownername> <hdfspathname>"
467
468  def help_setreplication(self):
469    print "setrep <replication factor> <hdfspathname>"
470
471  def help_getlocations(self):
472    print "getlocations <pathname>"
473
474  def help_EOF(self):
475    print '<ctl-d> will quit this program.'
476
477  def help_quit(self):
478    print 'if you need to know what quit does, you shouldn\'t be using a computer.'
479
480  def help_q(self):
481    print 'quit and if you need to know what quit does, you shouldn\'t be using a computer.'
482
483  def help_help(self):
484    print 'duh'
485
486  def usage(exec_name):
487    print "Usage: "
488    print %s [proxyclientname [proxyclientport]]" % exec_name
489    print %s -v" % exec_name
490    print %s --help" % exec_name
491    print %s -h" % exec_name
492
493if __name__ == "__main__":
494
495  #
496  # Rudimentary command line processing.
497  #
498
499  # real parsing:
500  parser = OptionParser()
501  parser.add_option("-e", "--execute", dest="command_str",
502                                      help="execute this command and exit")
503  parser.add_option("-s","--proxyclient",dest="host",help="the proxyclient's hostname")
504  parser.add_option("-p","--port",dest="port",help="the proxyclient's port number")
505
506  (options, args) = parser.parse_args()
507
508  #
509  # Save host and port information of the proxy server
510  #
511  if (options.host):
512    host = options.host
513    startServer = False
514  if (options.port):
515    port = options.port
516    startServer = False
517
518  #
519  # Retrieve the user's readline history.
520  #
521  historyFileName = os.path.expanduser("~/.hdfs_history")
522  if (os.path.exists(historyFileName)):
523    readline.read_history_file(historyFileName)
524
525  #
526  # Create class and connect to proxy server
527  #
528  c = hadoopthrift_cli(host,port)
529
530  if startServer:
531    if c.startProxyServer() == False:
532      sys.exit(1)
533  if c.connect() == False:
534    sys.exit(1)
535   
536
537  #
538  # If this utility was invoked with one argument, process it
539  #
540  if (options.command_str):
541    c.onecmd(options.command_str)
542    sys.exit(0)
543
544  #
545  # Start looping over user commands.
546  #
547  c.cmdloop('Welcome to the Thrift interactive shell for Hadoop File System. - how can I help you? ' + '\n'
548      'Press tab twice to see the list of commands. ' + '\n' +
549      'To complete the name of a command press tab once. \n'
550      )
551  c.shutdown();
552
553  readline.write_history_file(historyFileName)
554  print '' # I am nothing if not courteous.
555  sys.exit(0)
Note: See TracBrowser for help on using the repository browser.