/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "hadoop/Pipes.hh" #include "hadoop/SerialUtils.hh" #include "hadoop/StringUtils.hh" #include #include #include #include #include #include #include #include #include #include using std::map; using std::string; using std::vector; using namespace HadoopUtils; namespace HadoopPipes { class JobConfImpl: public JobConf { private: map values; public: void set(const string& key, const string& value) { values[key] = value; } virtual bool hasKey(const string& key) const { return values.find(key) != values.end(); } virtual const string& get(const string& key) const { map::const_iterator itr = values.find(key); if (itr == values.end()) { throw Error("Key " + key + " not found in JobConf"); } return itr->second; } virtual int getInt(const string& key) const { const string& val = get(key); return toInt(val); } virtual float getFloat(const string& key) const { const string& val = get(key); return toFloat(val); } virtual bool getBoolean(const string&key) const { const string& val = get(key); return toBool(val); } }; class DownwardProtocol { public: virtual void start(int protocol) = 0; virtual void setJobConf(vector values) = 0; virtual void setInputTypes(string keyType, string valueType) = 0; virtual void runMap(string inputSplit, int numReduces, bool pipedInput)= 0; virtual void mapItem(const string& key, const string& value) = 0; virtual void runReduce(int reduce, bool pipedOutput) = 0; virtual void reduceKey(const string& key) = 0; virtual void reduceValue(const string& value) = 0; virtual void close() = 0; virtual void abort() = 0; virtual ~DownwardProtocol() {} }; class UpwardProtocol { public: virtual void output(const string& key, const string& value) = 0; virtual void partitionedOutput(int reduce, const string& key, const string& value) = 0; virtual void status(const string& message) = 0; virtual void progress(float progress) = 0; virtual void done() = 0; virtual void registerCounter(int id, const string& group, const string& name) = 0; virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0; virtual ~UpwardProtocol() {} }; class Protocol { public: virtual void nextEvent() = 0; virtual UpwardProtocol* getUplink() = 0; virtual ~Protocol() {} }; class TextUpwardProtocol: public UpwardProtocol { private: FILE* stream; static const char fieldSeparator = '\t'; static const char lineSeparator = '\n'; void writeBuffer(const string& buffer) { fprintf(stream, quoteString(buffer, "\t\n").c_str()); } public: TextUpwardProtocol(FILE* _stream): stream(_stream) {} virtual void output(const string& key, const string& value) { fprintf(stream, "output%c", fieldSeparator); writeBuffer(key); fprintf(stream, "%c", fieldSeparator); writeBuffer(value); fprintf(stream, "%c", lineSeparator); } virtual void partitionedOutput(int reduce, const string& key, const string& value) { fprintf(stream, "parititionedOutput%c%d%c", fieldSeparator, reduce, fieldSeparator); writeBuffer(key); fprintf(stream, "%c", fieldSeparator); writeBuffer(value); fprintf(stream, "%c", lineSeparator); } virtual void status(const string& message) { fprintf(stream, "status%c%s%c", fieldSeparator, message.c_str(), lineSeparator); } virtual void progress(float progress) { fprintf(stream, "progress%c%f%c", fieldSeparator, progress, lineSeparator); } virtual void registerCounter(int id, const string& group, const string& name) { fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id, fieldSeparator, group.c_str(), fieldSeparator, name.c_str(), lineSeparator); } virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) { fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(), fieldSeparator, (long)amount, lineSeparator); } virtual void done() { fprintf(stream, "done%c", lineSeparator); } }; class TextProtocol: public Protocol { private: FILE* downStream; DownwardProtocol* handler; UpwardProtocol* uplink; string key; string value; int readUpto(string& buffer, const char* limit) { int ch; buffer.clear(); while ((ch = getc(downStream)) != -1) { if (strchr(limit, ch) != NULL) { return ch; } buffer += ch; } return -1; } static const char* delim; public: TextProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) { downStream = down; uplink = new TextUpwardProtocol(up); handler = _handler; } UpwardProtocol* getUplink() { return uplink; } virtual void nextEvent() { string command; string arg; int sep; sep = readUpto(command, delim); if (command == "mapItem") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(key, delim); HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(value, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->mapItem(key, value); } else if (command == "reduceValue") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(value, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->reduceValue(value); } else if (command == "reduceKey") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(key, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->reduceKey(key); } else if (command == "start") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(arg, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->start(toInt(arg)); } else if (command == "setJobConf") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(arg, delim); int len = toInt(arg); vector values(len); for(int i=0; i < len; ++i) { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(arg, delim); values.push_back(arg); } HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->setJobConf(values); } else if (command == "setInputTypes") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(key, delim); HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(value, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->setInputTypes(key, value); } else if (command == "runMap") { string split; HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(split, delim); string reduces; HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(reduces, delim); HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(arg, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->runMap(split, toInt(reduces), toBool(arg)); } else if (command == "runReduce") { HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); sep = readUpto(arg, delim); HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command); string piped; sep = readUpto(piped, delim); HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->runReduce(toInt(arg), toBool(piped)); } else if (command == "abort") { HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->abort(); } else if (command == "close") { HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command); handler->close(); } else { throw Error("Illegal text protocol command " + command); } } ~TextProtocol() { delete uplink; } }; const char* TextProtocol::delim = "\t\n"; enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, CLOSE, ABORT, OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE, REGISTER_COUNTER, INCREMENT_COUNTER}; class BinaryUpwardProtocol: public UpwardProtocol { private: FileOutStream* stream; public: BinaryUpwardProtocol(FILE* _stream) { stream = new FileOutStream(); HADOOP_ASSERT(stream->open(_stream), "problem opening stream"); } virtual void output(const string& key, const string& value) { serializeInt(OUTPUT, *stream); serializeString(key, *stream); serializeString(value, *stream); } virtual void partitionedOutput(int reduce, const string& key, const string& value) { serializeInt(PARTITIONED_OUTPUT, *stream); serializeInt(reduce, *stream); serializeString(key, *stream); serializeString(value, *stream); } virtual void status(const string& message) { serializeInt(STATUS, *stream); serializeString(message, *stream); } virtual void progress(float progress) { serializeInt(PROGRESS, *stream); serializeFloat(progress, *stream); stream->flush(); } virtual void done() { serializeInt(DONE, *stream); } virtual void registerCounter(int id, const string& group, const string& name) { serializeInt(REGISTER_COUNTER, *stream); serializeInt(id, *stream); serializeString(group, *stream); serializeString(name, *stream); } virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) { serializeInt(INCREMENT_COUNTER, *stream); serializeInt(counter->getId(), *stream); serializeLong(amount, *stream); } ~BinaryUpwardProtocol() { delete stream; } }; class BinaryProtocol: public Protocol { private: FileInStream* downStream; DownwardProtocol* handler; BinaryUpwardProtocol * uplink; string key; string value; public: BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) { downStream = new FileInStream(); downStream->open(down); uplink = new BinaryUpwardProtocol(up); handler = _handler; } UpwardProtocol* getUplink() { return uplink; } virtual void nextEvent() { int32_t cmd; cmd = deserializeInt(*downStream); switch (cmd) { case START_MESSAGE: { int32_t prot; prot = deserializeInt(*downStream); handler->start(prot); break; } case SET_JOB_CONF: { int32_t entries; entries = deserializeInt(*downStream); vector result(entries); for(int i=0; i < entries; ++i) { string item; deserializeString(item, *downStream); result.push_back(item); } handler->setJobConf(result); break; } case SET_INPUT_TYPES: { string keyType; string valueType; deserializeString(keyType, *downStream); deserializeString(valueType, *downStream); handler->setInputTypes(keyType, valueType); break; } case RUN_MAP: { string split; int32_t numReduces; int32_t piped; deserializeString(split, *downStream); numReduces = deserializeInt(*downStream); piped = deserializeInt(*downStream); handler->runMap(split, numReduces, piped); break; } case MAP_ITEM: { deserializeString(key, *downStream); deserializeString(value, *downStream); handler->mapItem(key, value); break; } case RUN_REDUCE: { int32_t reduce; int32_t piped; reduce = deserializeInt(*downStream); piped = deserializeInt(*downStream); handler->runReduce(reduce, piped); break; } case REDUCE_KEY: { deserializeString(key, *downStream); handler->reduceKey(key); break; } case REDUCE_VALUE: { deserializeString(value, *downStream); handler->reduceValue(value); break; } case CLOSE: handler->close(); break; case ABORT: handler->abort(); break; default: HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); } } virtual ~BinaryProtocol() { delete downStream; delete uplink; } }; /** * Define a context object to give to combiners that will let them * go through the values and emit their results correctly. */ class CombineContext: public ReduceContext { private: ReduceContext* baseContext; Partitioner* partitioner; int numReduces; UpwardProtocol* uplink; bool firstKey; bool firstValue; map >::iterator keyItr; map >::iterator endKeyItr; vector::iterator valueItr; vector::iterator endValueItr; public: CombineContext(ReduceContext* _baseContext, Partitioner* _partitioner, int _numReduces, UpwardProtocol* _uplink, map >& data) { baseContext = _baseContext; partitioner = _partitioner; numReduces = _numReduces; uplink = _uplink; keyItr = data.begin(); endKeyItr = data.end(); firstKey = true; firstValue = true; } virtual const JobConf* getJobConf() { return baseContext->getJobConf(); } virtual const std::string& getInputKey() { return keyItr->first; } virtual const std::string& getInputValue() { return *valueItr; } virtual void emit(const std::string& key, const std::string& value) { if (partitioner != NULL) { uplink->partitionedOutput(partitioner->partition(key, numReduces), key, value); } else { uplink->output(key, value); } } virtual void progress() { baseContext->progress(); } virtual void setStatus(const std::string& status) { baseContext->setStatus(status); } bool nextKey() { if (firstKey) { firstKey = false; } else { ++keyItr; } if (keyItr != endKeyItr) { valueItr = keyItr->second.begin(); endValueItr = keyItr->second.end(); firstValue = true; return true; } return false; } virtual bool nextValue() { if (firstValue) { firstValue = false; } else { ++valueItr; } return valueItr != endValueItr; } virtual Counter* getCounter(const std::string& group, const std::string& name) { return baseContext->getCounter(group, name); } virtual void incrementCounter(const Counter* counter, uint64_t amount) { baseContext->incrementCounter(counter, amount); } }; /** * A RecordWriter that will take the map outputs, buffer them up and then * combine then when the buffer is full. */ class CombineRunner: public RecordWriter { private: map > data; int64_t spillSize; int64_t numBytes; ReduceContext* baseContext; Partitioner* partitioner; int numReduces; UpwardProtocol* uplink; Reducer* combiner; public: CombineRunner(int64_t _spillSize, ReduceContext* _baseContext, Reducer* _combiner, UpwardProtocol* _uplink, Partitioner* _partitioner, int _numReduces) { numBytes = 0; spillSize = _spillSize; baseContext = _baseContext; partitioner = _partitioner; numReduces = _numReduces; uplink = _uplink; combiner = _combiner; } virtual void emit(const std::string& key, const std::string& value) { numBytes += key.length() + value.length(); data[key].push_back(value); if (numBytes >= spillSize) { spillAll(); } } virtual void close() { spillAll(); } private: void spillAll() { CombineContext context(baseContext, partitioner, numReduces, uplink, data); while (context.nextKey()) { combiner->reduce(context); } data.clear(); numBytes = 0; } }; class TaskContextImpl: public MapContext, public ReduceContext, public DownwardProtocol { private: bool done; JobConf* jobConf; string key; const string* newKey; const string* value; bool hasTask; bool isNewKey; bool isNewValue; string* inputKeyClass; string* inputValueClass; string status; float progressFloat; uint64_t lastProgress; bool statusSet; Protocol* protocol; UpwardProtocol *uplink; string* inputSplit; RecordReader* reader; Mapper* mapper; Reducer* reducer; RecordWriter* writer; Partitioner* partitioner; int numReduces; const Factory* factory; pthread_mutex_t mutexDone; std::vector registeredCounterIds; public: TaskContextImpl(const Factory& _factory) { statusSet = false; done = false; newKey = NULL; factory = &_factory; jobConf = NULL; inputKeyClass = NULL; inputValueClass = NULL; inputSplit = NULL; mapper = NULL; reducer = NULL; reader = NULL; writer = NULL; partitioner = NULL; protocol = NULL; isNewKey = false; isNewValue = false; lastProgress = 0; progressFloat = 0.0f; hasTask = false; pthread_mutex_init(&mutexDone, NULL); } void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) { protocol = _protocol; uplink = _uplink; } virtual void start(int protocol) { if (protocol != 0) { throw Error("Protocol version " + toString(protocol) + " not supported"); } } virtual void setJobConf(vector values) { int len = values.size(); JobConfImpl* result = new JobConfImpl(); HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values"); for(int i=0; i < len; i += 2) { result->set(values[i], values[i+1]); } jobConf = result; } virtual void setInputTypes(string keyType, string valueType) { inputKeyClass = new string(keyType); inputValueClass = new string(valueType); } virtual void runMap(string _inputSplit, int _numReduces, bool pipedInput) { inputSplit = new string(_inputSplit); reader = factory->createRecordReader(*this); HADOOP_ASSERT((reader == NULL) == pipedInput, pipedInput ? "RecordReader defined when not needed.": "RecordReader not defined"); if (reader != NULL) { value = new string(); } mapper = factory->createMapper(*this); numReduces = _numReduces; if (numReduces != 0) { reducer = factory->createCombiner(*this); partitioner = factory->createPartitioner(*this); } if (reducer != NULL) { int64_t spillSize = 100; if (jobConf->hasKey("io.sort.mb")) { spillSize = jobConf->getInt("io.sort.mb"); } writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer, uplink, partitioner, numReduces); } hasTask = true; } virtual void mapItem(const string& _key, const string& _value) { newKey = &_key; value = &_value; isNewKey = true; } virtual void runReduce(int reduce, bool pipedOutput) { reducer = factory->createReducer(*this); writer = factory->createRecordWriter(*this); HADOOP_ASSERT((writer == NULL) == pipedOutput, pipedOutput ? "RecordWriter defined when not needed.": "RecordWriter not defined"); hasTask = true; } virtual void reduceKey(const string& _key) { isNewKey = true; newKey = &_key; } virtual void reduceValue(const string& _value) { isNewValue = true; value = &_value; } virtual bool isDone() { pthread_mutex_lock(&mutexDone); bool doneCopy = done; pthread_mutex_unlock(&mutexDone); return doneCopy; } virtual void close() { pthread_mutex_lock(&mutexDone); done = true; pthread_mutex_unlock(&mutexDone); } virtual void abort() { throw Error("Aborted by driver"); } void waitForTask() { while (!done && !hasTask) { protocol->nextEvent(); } } bool nextKey() { if (reader == NULL) { while (!isNewKey) { nextValue(); if (done) { return false; } } key = *newKey; } else { if (!reader->next(key, const_cast(*value))) { pthread_mutex_lock(&mutexDone); done = true; pthread_mutex_unlock(&mutexDone); return false; } progressFloat = reader->getProgress(); } isNewKey = false; if (mapper != NULL) { mapper->map(*this); } else { reducer->reduce(*this); } return true; } /** * Advance to the next value. */ virtual bool nextValue() { if (isNewKey || done) { return false; } isNewValue = false; progress(); protocol->nextEvent(); return isNewValue; } /** * Get the JobConf for the current task. */ virtual JobConf* getJobConf() { return jobConf; } /** * Get the current key. * @return the current key or NULL if called before the first map or reduce */ virtual const string& getInputKey() { return key; } /** * Get the current value. * @return the current value or NULL if called before the first map or * reduce */ virtual const string& getInputValue() { return *value; } /** * Mark your task as having made progress without changing the status * message. */ virtual void progress() { if (uplink != 0) { uint64_t now = getCurrentMillis(); if (now - lastProgress > 1000) { lastProgress = now; if (statusSet) { uplink->status(status); statusSet = false; } uplink->progress(progressFloat); } } } /** * Set the status message and call progress. */ virtual void setStatus(const string& status) { this->status = status; statusSet = true; progress(); } /** * Get the name of the key class of the input to this task. */ virtual const string& getInputKeyClass() { return *inputKeyClass; } /** * Get the name of the value class of the input to this task. */ virtual const string& getInputValueClass() { return *inputValueClass; } /** * Access the InputSplit of the mapper. */ virtual const std::string& getInputSplit() { return *inputSplit; } virtual void emit(const string& key, const string& value) { progress(); if (writer != NULL) { writer->emit(key, value); } else if (partitioner != NULL) { int part = partitioner->partition(key, numReduces); uplink->partitionedOutput(part, key, value); } else { uplink->output(key, value); } } /** * Register a counter with the given group and name. */ virtual Counter* getCounter(const std::string& group, const std::string& name) { int id = registeredCounterIds.size(); registeredCounterIds.push_back(id); uplink->registerCounter(id, group, name); return new Counter(id); } /** * Increment the value of the counter with the given amount. */ virtual void incrementCounter(const Counter* counter, uint64_t amount) { uplink->incrementCounter(counter, amount); } void closeAll() { if (reader) { reader->close(); } if (mapper) { mapper->close(); } if (reducer) { reducer->close(); } if (writer) { writer->close(); } } virtual ~TaskContextImpl() { delete jobConf; delete inputKeyClass; delete inputValueClass; delete inputSplit; if (reader) { delete value; } delete reader; delete mapper; delete reducer; delete writer; delete partitioner; pthread_mutex_destroy(&mutexDone); } }; /** * Ping the parent every 5 seconds to know if it is alive */ void* ping(void* ptr) { TaskContextImpl* context = (TaskContextImpl*) ptr; char* portStr = getenv("hadoop.pipes.command.port"); int MAX_RETRIES = 3; int remaining_retries = MAX_RETRIES; while (!context->isDone()) { try{ sleep(5); int sock = -1; if (portStr) { sock = socket(PF_INET, SOCK_STREAM, 0); HADOOP_ASSERT(sock != - 1, string("problem creating socket: ") + strerror(errno)); sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(toInt(portStr)); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, string("problem connecting command socket: ") + strerror(errno)); } if (sock != -1) { int result = shutdown(sock, SHUT_RDWR); HADOOP_ASSERT(result == 0, "problem shutting socket"); result = close(sock); HADOOP_ASSERT(result == 0, "problem closing socket"); } remaining_retries = MAX_RETRIES; } catch (Error& err) { if (!context->isDone()) { fprintf(stderr, "Hadoop Pipes Exception: in ping %s\n", err.getMessage().c_str()); remaining_retries -= 1; if (remaining_retries == 0) { exit(1); } } else { return NULL; } } } return NULL; } /** * Run the assigned task in the framework. * The user's main function should set the various functions using the * set* functions above and then call this. * @return true, if the task succeeded. */ bool runTask(const Factory& factory) { try { TaskContextImpl* context = new TaskContextImpl(factory); Protocol* connection; char* portStr = getenv("hadoop.pipes.command.port"); int sock = -1; FILE* stream = NULL; FILE* outStream = NULL; char *bufin = NULL; char *bufout = NULL; if (portStr) { sock = socket(PF_INET, SOCK_STREAM, 0); HADOOP_ASSERT(sock != - 1, string("problem creating socket: ") + strerror(errno)); sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(toInt(portStr)); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, string("problem connecting command socket: ") + strerror(errno)); stream = fdopen(sock, "r"); outStream = fdopen(sock, "w"); // increase buffer size int bufsize = 128*1024; int setbuf; bufin = new char[bufsize]; bufout = new char[bufsize]; setbuf = setvbuf(stream, bufin, _IOFBF, bufsize); HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ") + strerror(errno)); setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize); HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ") + strerror(errno)); connection = new BinaryProtocol(stream, context, outStream); } else if (getenv("hadoop.pipes.command.file")) { char* filename = getenv("hadoop.pipes.command.file"); string outFilename = filename; outFilename += ".out"; stream = fopen(filename, "r"); outStream = fopen(outFilename.c_str(), "w"); connection = new BinaryProtocol(stream, context, outStream); } else { connection = new TextProtocol(stdin, context, stdout); } context->setProtocol(connection, connection->getUplink()); pthread_t pingThread; pthread_create(&pingThread, NULL, ping, (void*)(context)); context->waitForTask(); while (!context->isDone()) { context->nextKey(); } context->closeAll(); connection->getUplink()->done(); pthread_join(pingThread,NULL); delete context; delete connection; if (stream != NULL) { fflush(stream); } if (outStream != NULL) { fflush(outStream); } fflush(stdout); if (sock != -1) { int result = shutdown(sock, SHUT_RDWR); HADOOP_ASSERT(result == 0, "problem shutting socket"); result = close(sock); HADOOP_ASSERT(result == 0, "problem closing socket"); } if (stream != NULL) { //fclose(stream); } if (outStream != NULL) { //fclose(outStream); } delete bufin; delete bufout; return true; } catch (Error& err) { fprintf(stderr, "Hadoop Pipes Exception: %s\n", err.getMessage().c_str()); return false; } } }