/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "hdfs.h" #include "hdfsJniHelper.h" /* Some frequently used Java paths */ #define HADOOP_CONF "org/apache/hadoop/conf/Configuration" #define HADOOP_PATH "org/apache/hadoop/fs/Path" #define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem" #define HADOOP_FS "org/apache/hadoop/fs/FileSystem" #define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation" #define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem" #define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream" #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream" #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus" #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission" #define HADOOP_UNIX_USER_GROUP_INFO "org/apache/hadoop/security/UnixUserGroupInformation" #define HADOOP_USER_GROUP_INFO "org/apache/hadoop/security/UserGroupInformation" #define JAVA_NET_ISA "java/net/InetSocketAddress" #define JAVA_NET_URI "java/net/URI" #define JAVA_STRING "java/lang/String" #define JAVA_VOID "V" /* Macros for constructing method signatures */ #define JPARAM(X) "L" X ";" #define JARRPARAM(X) "[L" X ";" #define JMETHOD1(X, R) "(" X ")" R #define JMETHOD2(X, Y, R) "(" X Y ")" R #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R /** * hdfsJniEnv: A wrapper struct to be used as 'value' * while saving thread -> JNIEnv* mappings */ typedef struct { JNIEnv* env; } hdfsJniEnv; /** * Helper function to destroy a local reference of java.lang.Object * @param env: The JNIEnv pointer. * @param jFile: The local reference of java.lang.Object object * @return None. */ static void destroyLocalReference(JNIEnv *env, jobject jObject) { if (jObject) (*env)->DeleteLocalRef(env, jObject); } /** * Helper function to create a org.apache.hadoop.fs.Path object. * @param env: The JNIEnv pointer. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path * object. * @return Returns a jobject on success and NULL on error. */ static jobject constructNewObjectOfPath(JNIEnv *env, const char *path) { //Construct a java.lang.String object jstring jPathString = (*env)->NewStringUTF(env, path); //Construct the org.apache.hadoop.fs.Path object jobject jPath = constructNewObjectOfClass(env, NULL, "org/apache/hadoop/fs/Path", "(Ljava/lang/String;)V", jPathString); if (jPath == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.fs.Path for %s\n", path); errno = EINTERNAL; return NULL; } // Destroy the local reference to the java.lang.String object destroyLocalReference(env, jPathString); return jPath; } /** * Helper function to translate an exception into a meaningful errno value. * @param exc: The exception. * @param env: The JNIEnv Pointer. * @param method: The name of the method that threw the exception. This * may be format string to be used in conjuction with additional arguments. * @return Returns a meaningful errno value if possible, or EINTERNAL if not. */ static int errnoFromException(jthrowable exc, JNIEnv *env, const char *method, ...) { va_list ap; int errnum = 0; char *excClass = NULL; if (exc == NULL) goto default_error; if ((excClass = classNameOfObject((jobject) exc, env)) == NULL) { errnum = EINTERNAL; goto done; } if (!strcmp(excClass, "org.apache.hadoop.security." "AccessControlException")) { errnum = EACCES; goto done; } if (!strcmp(excClass, "org.apache.hadoop.hdfs.protocol." "QuotaExceededException")) { errnum = EDQUOT; goto done; } if (!strcmp(excClass, "java.io.FileNotFoundException")) { errnum = ENOENT; goto done; } //TODO: interpret more exceptions; maybe examine exc.getMessage() default_error: //Can't tell what went wrong, so just punt (*env)->ExceptionDescribe(env); fprintf(stderr, "Call to "); va_start(ap, method); vfprintf(stderr, method, ap); va_end(ap); fprintf(stderr, " failed!\n"); errnum = EINTERNAL; done: (*env)->ExceptionClear(env); if (excClass != NULL) free(excClass); return errnum; } hdfsFS hdfsConnect(const char* host, tPort port) { // conect with NULL as user name/groups return hdfsConnectAsUser(host, port, NULL, NULL, 0); } hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char **groups, int groups_size ) { // JAVA EQUIVALENT: // FileSystem fs = FileSystem.get(new Configuration()); // return fs; JNIEnv *env = 0; jobject jConfiguration = NULL; jobject jFS = NULL; jobject jURI = NULL; jstring jURIString = NULL; jvalue jVal; jthrowable jExc = NULL; char *cURI = 0; jobject gFsRef = NULL; //Get the JNIEnv* corresponding to current thread env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } //Create the org.apache.hadoop.conf.Configuration object jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); errno = EINTERNAL; return NULL; } if (user != NULL) { if (groups == NULL || groups_size <= 0) { fprintf(stderr, "ERROR: groups must not be empty/null\n"); errno = EINVAL; return NULL; } jstring jUserString = (*env)->NewStringUTF(env, user); jarray jGroups = constructNewArrayString(env, &jExc, groups, groups_size); if (jGroups == NULL) { errno = EINTERNAL; fprintf(stderr, "ERROR: could not construct groups array\n"); return NULL; } jobject jUgi; if ((jUgi = constructNewObjectOfClass(env, &jExc, HADOOP_UNIX_USER_GROUP_INFO, JMETHOD2(JPARAM(JAVA_STRING), JARRPARAM(JAVA_STRING), JAVA_VOID), jUserString, jGroups)) == NULL) { fprintf(stderr,"failed to construct hadoop user unix group info object\n"); errno = errnoFromException(jExc, env, HADOOP_UNIX_USER_GROUP_INFO, "init"); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jUserString); if (jGroups != NULL) { destroyLocalReference(env, jGroups); } return NULL; } #define USE_UUGI #ifdef USE_UUGI // UnixUserGroupInformation.UGI_PROPERTY_NAME jstring jAttrString = (*env)->NewStringUTF(env,"hadoop.job.ugi"); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_UNIX_USER_GROUP_INFO, "saveToConf", JMETHOD3(JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING), JPARAM(HADOOP_UNIX_USER_GROUP_INFO), JAVA_VOID), jConfiguration, jAttrString, jUgi) != 0) { errno = errnoFromException(jExc, env, HADOOP_FSPERM, "init"); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jUserString); if (jGroups != NULL) { destroyLocalReference(env, jGroups); } destroyLocalReference(env, jUgi); return NULL; } destroyLocalReference(env, jUserString); destroyLocalReference(env, jGroups); destroyLocalReference(env, jUgi); } #else // what does "current" mean in the context of libhdfs ? does it mean for the last hdfs connection we used? // that's why this code cannot be activated. We know the above use of the conf object should work well with // multiple connections. if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_USER_GROUP_INFO, "setCurrentUGI", JMETHOD1(JPARAM(HADOOP_USER_GROUP_INFO), JAVA_VOID), jUgi) != 0) { errno = errnoFromException(jExc, env, HADOOP_USER_GROUP_INFO, "setCurrentUGI"); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jUserString); if (jGroups != NULL) { destroyLocalReference(env, jGroups); } destroyLocalReference(env, jUgi); return NULL; } destroyLocalReference(env, jUserString); destroyLocalReference(env, jGroups); destroyLocalReference(env, jUgi); } #endif //Check what type of FileSystem the caller wants... if (host == NULL) { // fs = FileSytem::getLocal(conf); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getLocal", JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)), jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getLocal"); goto done; } jFS = jVal.l; } else if (!strcmp(host, "default") && port == 0) { //fs = FileSystem::get(conf); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get", JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)), jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::get"); goto done; } jFS = jVal.l; } else { // fs = FileSystem::get(URI, conf); cURI = malloc(strlen(host)+16); sprintf(cURI, "hdfs://%s:%d", host, (int)(port)); jURIString = (*env)->NewStringUTF(env, cURI); if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI, "create", "(Ljava/lang/String;)Ljava/net/URI;", jURIString) != 0) { errno = errnoFromException(jExc, env, "java.net.URI::create"); goto done; } jURI = jVal.l; if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get", JMETHOD2(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF), JPARAM(HADOOP_FS)), jURI, jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Filesystem::get(URI, Configuration)"); goto done; } jFS = jVal.l; } done: // Release unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jURIString); destroyLocalReference(env, jURI); if (cURI) free(cURI); /* Create a global reference for this fs */ if (jFS) { gFsRef = (*env)->NewGlobalRef(env, jFS); destroyLocalReference(env, jFS); } return gFsRef; } int hdfsDisconnect(hdfsFS fs) { // JAVA EQUIVALENT: // fs.close() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -2; } //Parameters jobject jFS = (jobject)fs; //Caught exception jthrowable jExc = NULL; //Sanity check if (fs == NULL) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "close", "()V") != 0) { errno = errnoFromException(jExc, env, "Filesystem::close"); return -1; } //Release unnecessary references (*env)->DeleteGlobalRef(env, fs); return 0; } hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, short replication, tSize blockSize) { /* JAVA EQUIVALENT: File f = new File(path); FSData{Input|Output}Stream f{is|os} = fs.create(f); return f{is|os}; */ /* Get the JNIEnv* corresponding to current thread */ JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; if (flags & O_RDWR) { fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n"); errno = ENOTSUP; return NULL; } if ((flags & O_CREAT) && (flags & O_EXCL)) { fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); } /* The hadoop java api/signature */ const char* method = ((flags & O_WRONLY) == 0) ? "open" : (flags & O_APPEND) ? "append" : "create"; const char* signature = ((flags & O_WRONLY) == 0) ? JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM)) : (flags & O_APPEND) ? JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM)) : JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM)); /* Return value */ hdfsFile file = NULL; /* Create an object of org.apache.hadoop.fs.Path */ jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } /* Get the Configuration object from the FileSystem object */ jvalue jVal; jobject jConfiguration = NULL; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getConf", JMETHOD1("", JPARAM(HADOOP_CONF))) != 0) { errno = errnoFromException(jExc, env, "get configuration object " "from filesystem"); destroyLocalReference(env, jPath); return NULL; } jConfiguration = jVal.l; jint jBufferSize = bufferSize; jshort jReplication = replication; jlong jBlockSize = blockSize; jstring jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size"); jstring jStrReplication = (*env)->NewStringUTF(env, "dfs.replication"); jstring jStrBlockSize = (*env)->NewStringUTF(env, "dfs.block.size"); //bufferSize if (!bufferSize) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", jStrBufferSize, 4096) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "Configuration::getInt"); goto done; } jBufferSize = jVal.i; } if ((flags & O_WRONLY) && (flags & O_APPEND) == 0) { //replication if (!replication) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I", jStrReplication, 1) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "Configuration::getInt"); goto done; } jReplication = jVal.i; } //blockSize if (!blockSize) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration, HADOOP_CONF, "getLong", "(Ljava/lang/String;J)J", jStrBlockSize, 67108864)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } jBlockSize = jVal.j; } } /* Create and return either the FSDataInputStream or FSDataOutputStream references jobject jStream */ // READ? if ((flags & O_WRONLY) == 0) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, method, signature, jPath, jBufferSize)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } } else if ((flags & O_WRONLY) && (flags & O_APPEND)) { // WRITE/APPEND? if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, method, signature, jPath)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } } else { // WRITE/CREATE jboolean jOverWrite = 1; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, method, signature, jPath, jOverWrite, jBufferSize, jReplication, jBlockSize)) { errno = errnoFromException(jExc, env, "org.apache.hadoop.conf." "FileSystem::%s(%s)", method, signature); goto done; } } file = malloc(sizeof(struct hdfsFile_internal)); if (!file) { errno = ENOMEM; return NULL; } file->file = (*env)->NewGlobalRef(env, jVal.l); file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT); destroyLocalReference(env, jVal.l); done: //Delete unnecessary local references destroyLocalReference(env, jStrBufferSize); destroyLocalReference(env, jStrReplication); destroyLocalReference(env, jStrBlockSize); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); return file; } int hdfsCloseFile(hdfsFS fs, hdfsFile file) { // JAVA EQUIVALENT: // file.close //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -2; } //Parameters jobject jStream = (jobject)(file ? file->file : NULL); //Caught exception jthrowable jExc = NULL; //Sanity check if (!file || file->type == UNINITIALIZED) { errno = EBADF; return -1; } //The interface whose 'close' method to be called const char* interface = (file->type == INPUT) ? HADOOP_ISTRM : HADOOP_OSTRM; if (invokeMethod(env, NULL, &jExc, INSTANCE, jStream, interface, "close", "()V") != 0) { errno = errnoFromException(jExc, env, "%s::close", interface); return -1; } //De-allocate memory free(file); (*env)->DeleteGlobalRef(env, jStream); return 0; } int hdfsExists(hdfsFS fs, const char *path) { JNIEnv *env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -2; } jobject jPath = constructNewObjectOfPath(env, path); jvalue jVal; jthrowable jExc = NULL; jobject jFS = (jobject)fs; if (jPath == NULL) { return -1; } if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::exists"); return -1; } return jVal.z ? 0 : -1; } tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(bR); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : NULL); jbyteArray jbRarray; jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "([B)I", jbRarray) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::read"); noReadBytes = -1; } else { noReadBytes = jVal.i; if (noReadBytes > 0) { (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } else { //This is a valid case: there aren't any bytes left to read! if (noReadBytes == 0 || noReadBytes < -1) { fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes); } noReadBytes = 0; } errno = 0; } destroyLocalReference(env, jbRarray); return noReadBytes; } tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length) { // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(pos, bR, 0, length); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : NULL); jbyteArray jbRarray; jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } //Error checking... make sure that this file is 'readable' if (f->type != INPUT) { fprintf(stderr, "Cannot read from a non-InputStream object!\n"); errno = EINVAL; return -1; } //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "read", "(J[BII)I", position, jbRarray, 0, length) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::read"); noReadBytes = -1; } else { noReadBytes = jVal.i; if (noReadBytes > 0) { (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } else { //This is a valid case: there aren't any bytes left to read! if (noReadBytes == 0 || noReadBytes < -1) { fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes); } noReadBytes = 0; } errno = 0; } destroyLocalReference(env, jbRarray); return noReadBytes; } tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) { // JAVA EQUIVALENT // byte b[] = str.getBytes(); // fso.write(b); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jOutputStream = (jobject)(f ? f->file : 0); jbyteArray jbWarray; //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } if (length < 0) { errno = EINVAL; return -1; } //Error checking... make sure that this file is 'writable' if (f->type != OUTPUT) { fprintf(stderr, "Cannot write into a non-OutputStream object!\n"); errno = EINVAL; return -1; } // 'length' equals 'zero' is a valid use-case according to Posix! if (length != 0) { //Write the requisite bytes into the file jbWarray = (*env)->NewByteArray(env, length); (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer); if (invokeMethod(env, NULL, &jExc, INSTANCE, jOutputStream, HADOOP_OSTRM, "write", "([B)V", jbWarray) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataOutputStream::write"); length = -1; } destroyLocalReference(env, jbWarray); } //Return no. of bytes succesfully written (libc way) //i.e. 'length' itself! ;-) return length; } int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos) { // JAVA EQUIVALENT // fis.seek(pos); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "seek", "(J)V", desiredPos) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::seek"); return -1; } return 0; } tOffset hdfsTell(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // pos = f.getPos(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jStream = (jobject)(f ? f->file : 0); //Sanity check if (!f || f->type == UNINITIALIZED) { errno = EBADF; return -1; } const char* interface = (f->type == INPUT) ? HADOOP_ISTRM : HADOOP_OSTRM; jlong currentPos = -1; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStream, interface, "getPos", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::getPos"); return -1; } currentPos = jVal.j; return (tOffset)currentPos; } int hdfsFlush(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fos.flush(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jOutputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != OUTPUT) { errno = EBADF; return -1; } if (invokeMethod(env, NULL, &jExc, INSTANCE, jOutputStream, HADOOP_OSTRM, "flush", "()V") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::flush"); return -1; } return 0; } int hdfsAvailable(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT // fis.available(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jInputStream = (jobject)(f ? f->file : 0); //Caught exception jthrowable jExc = NULL; //Sanity check if (!f || f->type != INPUT) { errno = EBADF; return -1; } jint available = -1; jvalue jVal; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, "available", "()I") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FSDataInputStream::available"); return -1; } available = jVal.i; return available; } int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { //JAVA EQUIVALENT // FileUtil::copy(srcFS, srcPath, dstFS, dstPath, // deleteSource = false, conf) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jSrcFS = (jobject)srcFS; jobject jDstFS = (jobject)dstFS; jobject jSrcPath = NULL; jobject jDstPath = NULL; jSrcPath = constructNewObjectOfPath(env, src); if (jSrcPath == NULL) { return -1; } jDstPath = constructNewObjectOfPath(env, dst); if (jDstPath == NULL) { destroyLocalReference(env, jSrcPath); return -1; } int retval = 0; //Create the org.apache.hadoop.conf.Configuration object jobject jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); errno = EINTERNAL; destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); return -1; } //FileUtil::copy jboolean deleteSource = 0; //Only copy jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, "org/apache/hadoop/fs/FileUtil", "copy", "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z", jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource, jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileUtil::copy"); retval = -1; goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); return retval; } int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { //JAVA EQUIVALENT // FileUtil::copy(srcFS, srcPath, dstFS, dstPath, // deleteSource = true, conf) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } //Parameters jobject jSrcFS = (jobject)srcFS; jobject jDstFS = (jobject)dstFS; jobject jSrcPath = NULL; jobject jDstPath = NULL; jSrcPath = constructNewObjectOfPath(env, src); if (jSrcPath == NULL) { return -1; } jDstPath = constructNewObjectOfPath(env, dst); if (jDstPath == NULL) { destroyLocalReference(env, jSrcPath); return -1; } int retval = 0; //Create the org.apache.hadoop.conf.Configuration object jobject jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V"); if (jConfiguration == NULL) { fprintf(stderr, "Can't construct instance of class " "org.apache.hadoop.conf.Configuration\n"); errno = EINTERNAL; destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); return -1; } //FileUtil::copy jboolean deleteSource = 1; //Delete src after copy jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, "org/apache/hadoop/fs/FileUtil", "copy", "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z", jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource, jConfiguration) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileUtil::copy(move)"); retval = -1; goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jSrcPath); destroyLocalReference(env, jDstPath); return retval; } int hdfsDelete(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // File f = new File(path); // bool retval = fs.delete(f); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of java.io.File jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //Delete the file jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "delete", "(Lorg/apache/hadoop/fs/Path;)Z", jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::delete"); return -1; } //Delete unnecessary local references destroyLocalReference(env, jPath); return (jVal.z) ? 0 : -1; } int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { // JAVA EQUIVALENT: // Path old = new Path(oldPath); // Path new = new Path(newPath); // fs.rename(old, new); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create objects of org.apache.hadoop.fs.Path jobject jOldPath = NULL; jobject jNewPath = NULL; jOldPath = constructNewObjectOfPath(env, oldPath); if (jOldPath == NULL) { return -1; } jNewPath = constructNewObjectOfPath(env, newPath); if (jNewPath == NULL) { destroyLocalReference(env, jOldPath); return -1; } //Rename the file jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "rename", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"), jOldPath, jNewPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::rename"); return -1; } //Delete unnecessary local references destroyLocalReference(env, jOldPath); destroyLocalReference(env, jNewPath); return (jVal.z) ? 0 : -1; } char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { // JAVA EQUIVALENT: // Path p = fs.getWorkingDirectory(); // return p.toString() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; jobject jPath = NULL; jvalue jVal; jthrowable jExc = NULL; //FileSystem::getWorkingDirectory() if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getWorkingDirectory", "()Lorg/apache/hadoop/fs/Path;") != 0 || jVal.l == NULL) { errno = errnoFromException(jExc, env, "FileSystem::" "getWorkingDirectory"); return NULL; } jPath = jVal.l; //Path::toString() jstring jPathString; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jPath, "org/apache/hadoop/fs/Path", "toString", "()Ljava/lang/String;") != 0) { errno = errnoFromException(jExc, env, "Path::toString"); destroyLocalReference(env, jPath); return NULL; } jPathString = jVal.l; const char *jPathChars = (const char*) ((*env)->GetStringUTFChars(env, jPathString, NULL)); //Copy to user-provided buffer strncpy(buffer, jPathChars, bufferSize); //Delete unnecessary local references (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars); destroyLocalReference(env, jPathString); destroyLocalReference(env, jPath); return buffer; } int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // fs.setWorkingDirectory(Path(path)); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; int retval = 0; jthrowable jExc = NULL; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //FileSystem::setWorkingDirectory() if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setWorkingDirectory", "(Lorg/apache/hadoop/fs/Path;)V", jPath) != 0) { errno = errnoFromException(jExc, env, "FileSystem::" "setWorkingDirectory"); retval = -1; } //Delete unnecessary local references destroyLocalReference(env, jPath); return retval; } int hdfsCreateDirectory(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // fs.mkdirs(new Path(path)); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //Create the directory jvalue jVal; jVal.z = 0; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z", jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::mkdirs"); goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jPath); return (jVal.z) ? 0 : -1; } int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { // JAVA EQUIVALENT: // fs.setReplication(new Path(path), replication); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } //Create the directory jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z", jPath, replication) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setReplication"); goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jPath); return (jVal.z) ? 0 : -1; } int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) { // JAVA EQUIVALENT: // fs.setOwner(path, owner, group) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } if (owner == NULL && group == NULL) { fprintf(stderr, "Both owner and group cannot be null in chown"); errno = EINVAL; return -1; } jobject jFS = (jobject)fs; jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -1; } jstring jOwnerString = (*env)->NewStringUTF(env, owner); jstring jGroupString = (*env)->NewStringUTF(env, group); //Create the directory int ret = 0; jthrowable jExc = NULL; if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setOwner", JMETHOD3(JPARAM(HADOOP_PATH), JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID), jPath, jOwnerString, jGroupString) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setOwner"); ret = -1; goto done; } done: destroyLocalReference(env, jPath); destroyLocalReference(env, jOwnerString); destroyLocalReference(env, jGroupString); return ret; } int hdfsChmod(hdfsFS fs, const char* path, short mode) { // JAVA EQUIVALENT: // fs.setPermission(path, FsPermission) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; // construct jPerm = FsPermission.createImmutable(short mode); jshort jmode = mode; jobject jPermObj = constructNewObjectOfClass(env, NULL, HADOOP_FSPERM,"(S)V",jmode); if (jPermObj == NULL) { return -2; } //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return -3; } //Create the directory int ret = 0; jthrowable jExc = NULL; if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setPermission", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID), jPath, jPermObj) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setPermission"); ret = -1; goto done; } done: destroyLocalReference(env, jPath); destroyLocalReference(env, jPermObj); return ret; } int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { // JAVA EQUIVALENT: // fs.setTimes(src, mtime, atime) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { fprintf(stderr, "could not construct path object\n"); return -2; } jlong jmtime = mtime * (jlong)1000; jlong jatime = atime * (jlong)1000; int ret = 0; jthrowable jExc = NULL; if (invokeMethod(env, NULL, &jExc, INSTANCE, jFS, HADOOP_FS, "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID), jPath, jmtime, jatime) != 0) { fprintf(stderr, "call to setTime failed\n"); errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::setTimes"); ret = -1; goto done; } done: destroyLocalReference(env, jPath); return ret; } char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { // JAVA EQUIVALENT: // fs.getFileBlockLoctions(new Path(path), start, length); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } jvalue jFSVal; jthrowable jFSExc = NULL; if (invokeMethod(env, &jFSVal, &jFSExc, INSTANCE, jFS, HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)" "Lorg/apache/hadoop/fs/FileStatus;", jPath) != 0) { errno = errnoFromException(jFSExc, env, "org.apache.hadoop.fs." "FileSystem::getFileStatus"); destroyLocalReference(env, jPath); return NULL; } jobject jFileStatus = jFSVal.l; //org.apache.hadoop.fs.FileSystem::getFileBlockLocations char*** blockHosts = NULL; jobjectArray jBlockLocations;; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getFileBlockLocations", "(Lorg/apache/hadoop/fs/FileStatus;JJ)" "[Lorg/apache/hadoop/fs/BlockLocation;", jFileStatus, start, length) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getFileBlockLocations"); destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); return NULL; } jBlockLocations = jVal.l; //Figure out no of entries in jBlockLocations //Allocate memory and add NULL at the end jsize jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations); blockHosts = malloc(sizeof(char**) * (jNumFileBlocks+1)); if (blockHosts == NULL) { errno = ENOMEM; goto done; } blockHosts[jNumFileBlocks] = NULL; if (jNumFileBlocks == 0) { errno = 0; goto done; } //Now parse each block to get hostnames int i = 0; for (i=0; i < jNumFileBlocks; ++i) { jobject jFileBlock = (*env)->GetObjectArrayElement(env, jBlockLocations, i); jvalue jVal; jobjectArray jFileBlockHosts; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFileBlock, HADOOP_BLK_LOC, "getHosts", "()[Ljava/lang/String;") || jVal.l == NULL) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "BlockLocation::getHosts"); destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); destroyLocalReference(env, jBlockLocations); return NULL; } jFileBlockHosts = jVal.l; //Figure out no of hosts in jFileBlockHosts //Allocate memory and add NULL at the end jsize jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts); blockHosts[i] = malloc(sizeof(char*) * (jNumBlockHosts+1)); if (blockHosts[i] == NULL) { int x = 0; for (x=0; x < i; ++x) { free(blockHosts[x]); } free(blockHosts); errno = ENOMEM; goto done; } blockHosts[i][jNumBlockHosts] = NULL; //Now parse each hostname int j = 0; const char *hostName; for (j=0; j < jNumBlockHosts; ++j) { jstring jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j); hostName = (const char*)((*env)->GetStringUTFChars(env, jHost, NULL)); blockHosts[i][j] = strdup(hostName); (*env)->ReleaseStringUTFChars(env, jHost, hostName); destroyLocalReference(env, jHost); } destroyLocalReference(env, jFileBlockHosts); } done: //Delete unnecessary local references destroyLocalReference(env, jPath); destroyLocalReference(env, jFileStatus); destroyLocalReference(env, jBlockLocations); return blockHosts; } void hdfsFreeHosts(char ***blockHosts) { int i, j; for (i=0; blockHosts[i]; i++) { for (j=0; blockHosts[i][j]; j++) { free(blockHosts[i][j]); } free(blockHosts[i]); } free(blockHosts); } tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { // JAVA EQUIVALENT: // fs.getDefaultBlockSize(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; //FileSystem::getDefaultBlockSize() tOffset blockSize = -1; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getDefaultBlockSize", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getDefaultBlockSize"); return -1; } blockSize = jVal.j; return blockSize; } tOffset hdfsGetCapacity(hdfsFS fs) { // JAVA EQUIVALENT: // fs.getRawCapacity(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; if (!((*env)->IsInstanceOf(env, jFS, globalClassReference(HADOOP_DFS, env)))) { fprintf(stderr, "hdfsGetCapacity works only on a " "DistributedFileSystem!\n"); return -1; } //FileSystem::getRawCapacity() jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_DFS, "getRawCapacity", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getRawCapacity"); return -1; } return jVal.j; } tOffset hdfsGetUsed(hdfsFS fs) { // JAVA EQUIVALENT: // fs.getRawUsed(); //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return -1; } jobject jFS = (jobject)fs; if (!((*env)->IsInstanceOf(env, jFS, globalClassReference(HADOOP_DFS, env)))) { fprintf(stderr, "hdfsGetUsed works only on a " "DistributedFileSystem!\n"); return -1; } //FileSystem::getRawUsed() jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_DFS, "getRawUsed", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getRawUsed"); return -1; } return jVal.j; } static int getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo) { jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "isDir", "()Z") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::isDir"); return -1; } fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getReplication", "()S") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getReplication"); return -1; } fileInfo->mReplication = jVal.s; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getBlockSize", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getBlockSize"); return -1; } fileInfo->mBlockSize = jVal.j; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getModificationTime", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getModificationTime"); return -1; } fileInfo->mLastMod = (tTime) (jVal.j / 1000); if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getAccessTime", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getAccessTime"); return -1; } fileInfo->mLastAccess = (tTime) (jVal.j / 1000); if (fileInfo->mKind == kObjectKindFile) { if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getLen", "()J") != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileStatus::getLen"); return -1; } fileInfo->mSize = jVal.j; } jobject jPath; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getPath", "()Lorg/apache/hadoop/fs/Path;") || jVal.l == NULL) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Path::getPath"); return -1; } jPath = jVal.l; jstring jPathName; const char *cPathName; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jPath, HADOOP_PATH, "toString", "()Ljava/lang/String;")) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "Path::toString"); destroyLocalReference(env, jPath); return -1; } jPathName = jVal.l; cPathName = (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL)); fileInfo->mName = strdup(cPathName); (*env)->ReleaseStringUTFChars(env, jPathName, cPathName); destroyLocalReference(env, jPath); destroyLocalReference(env, jPathName); jstring jUserName; const char* cUserName; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getOwner", "()Ljava/lang/String;")) { fprintf(stderr, "Call to org.apache.hadoop.fs." "FileStatus::getOwner failed!\n"); errno = EINTERNAL; return -1; } jUserName = jVal.l; cUserName = (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL)); fileInfo->mOwner = strdup(cUserName); (*env)->ReleaseStringUTFChars(env, jUserName, cUserName); destroyLocalReference(env, jUserName); jstring jGroupName; const char* cGroupName; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getGroup", "()Ljava/lang/String;")) { fprintf(stderr, "Call to org.apache.hadoop.fs." "FileStatus::getGroup failed!\n"); errno = EINTERNAL; return -1; } jGroupName = jVal.l; cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL)); fileInfo->mGroup = strdup(cGroupName); (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName); destroyLocalReference(env, jGroupName); jobject jPermission; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jStat, HADOOP_STAT, "getPermission", "()Lorg/apache/hadoop/fs/permission/FsPermission;") || jVal.l == NULL) { fprintf(stderr, "Call to org.apache.hadoop.fs." "FileStatus::getPermission failed!\n"); errno = EINTERNAL; return -1; } jPermission = jVal.l; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jPermission, HADOOP_FSPERM, "toShort", "()S") != 0) { fprintf(stderr, "Call to org.apache.hadoop.fs.permission." "FsPermission::toShort failed!\n"); errno = EINTERNAL; return -1; } fileInfo->mPermissions = jVal.s; destroyLocalReference(env, jPermission); return 0; } static int getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo *fileInfo) { // JAVA EQUIVALENT: // fs.isDirectory(f) // fs.getModificationTime() // fs.getAccessTime() // fs.getLength(f) // f.getPath() // f.getOwner() // f.getGroup() // f.getPermission().toShort() jobject jStat; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::exists"); return -1; } if (jVal.z == 0) { errno = ENOENT; return -1; } if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_FS, "getFileStatus", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::getFileStatus"); return -1; } jStat = jVal.l; int ret = getFileInfoFromStat(env, jStat, fileInfo); destroyLocalReference(env, jStat); return ret; } hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { // JAVA EQUIVALENT: // Path p(path); // Path []pathList = fs.listPaths(p) // foreach path in pathList // getFileInfo(path) //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } hdfsFileInfo *pathList = 0; jobjectArray jPathList = NULL; jvalue jVal; jthrowable jExc = NULL; if (invokeMethod(env, &jVal, &jExc, INSTANCE, jFS, HADOOP_DFS, "listStatus", JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)), jPath) != 0) { errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "FileSystem::listStatus"); destroyLocalReference(env, jPath); return NULL; } jPathList = jVal.l; //Figure out no of entries in that directory jsize jPathListSize = (*env)->GetArrayLength(env, jPathList); *numEntries = jPathListSize; if (jPathListSize == 0) { errno = 0; goto done; } //Allocate memory pathList = calloc(jPathListSize, sizeof(hdfsFileInfo)); if (pathList == NULL) { errno = ENOMEM; goto done; } //Save path information in pathList jsize i; jobject tmpStat; for (i=0; i < jPathListSize; ++i) { tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i); if (getFileInfoFromStat(env, tmpStat, &pathList[i])) { hdfsFreeFileInfo(pathList, jPathListSize); destroyLocalReference(env, tmpStat); pathList = NULL; goto done; } destroyLocalReference(env, tmpStat); } done: //Delete unnecessary local references destroyLocalReference(env, jPath); destroyLocalReference(env, jPathList); return pathList; } hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { // JAVA EQUIVALENT: // File f(path); // fs.isDirectory(f) // fs.lastModified() ?? // fs.getLength(f) // f.getPath() //Get the JNIEnv* corresponding to current thread JNIEnv* env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; return NULL; } jobject jFS = (jobject)fs; //Create an object of org.apache.hadoop.fs.Path jobject jPath = constructNewObjectOfPath(env, path); if (jPath == NULL) { return NULL; } hdfsFileInfo *fileInfo = calloc(1, sizeof(hdfsFileInfo)); if (getFileInfo(env, jFS, jPath, fileInfo)) { hdfsFreeFileInfo(fileInfo, 1); fileInfo = NULL; goto done; } done: //Delete unnecessary local references destroyLocalReference(env, jPath); return fileInfo; } void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { //Free the mName int i; for (i=0; i < numEntries; ++i) { if (hdfsFileInfo[i].mName) { free(hdfsFileInfo[i].mName); } } //Free entire block free(hdfsFileInfo); } /** * vim: ts=4: sw=4: et: */