source: proiecte/HadoopJUnit/hadoop-0.20.1/src/c++/pipes/impl/HadoopPipes.cc @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 31.0 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "hadoop/Pipes.hh"
20#include "hadoop/SerialUtils.hh"
21#include "hadoop/StringUtils.hh"
22
23#include <map>
24#include <vector>
25
26#include <errno.h>
27#include <netinet/in.h>
28#include <stdint.h>
29#include <stdio.h>
30#include <stdlib.h>
31#include <strings.h>
32#include <sys/socket.h>
33#include <pthread.h>
34
35using std::map;
36using std::string;
37using std::vector;
38
39using namespace HadoopUtils;
40
41namespace HadoopPipes {
42
43  class JobConfImpl: public JobConf {
44  private:
45    map<string, string> values;
46  public:
47    void set(const string& key, const string& value) {
48      values[key] = value;
49    }
50
51    virtual bool hasKey(const string& key) const {
52      return values.find(key) != values.end();
53    }
54
55    virtual const string& get(const string& key) const {
56      map<string,string>::const_iterator itr = values.find(key);
57      if (itr == values.end()) {
58        throw Error("Key " + key + " not found in JobConf");
59      }
60      return itr->second;
61    }
62
63    virtual int getInt(const string& key) const {
64      const string& val = get(key);
65      return toInt(val);
66    }
67
68    virtual float getFloat(const string& key) const {
69      const string& val = get(key);
70      return toFloat(val);
71    }
72
73    virtual bool getBoolean(const string&key) const {
74      const string& val = get(key);
75      return toBool(val);
76    }
77  };
78
79  class DownwardProtocol {
80  public:
81    virtual void start(int protocol) = 0;
82    virtual void setJobConf(vector<string> values) = 0;
83    virtual void setInputTypes(string keyType, string valueType) = 0;
84    virtual void runMap(string inputSplit, int numReduces, bool pipedInput)= 0;
85    virtual void mapItem(const string& key, const string& value) = 0;
86    virtual void runReduce(int reduce, bool pipedOutput) = 0;
87    virtual void reduceKey(const string& key) = 0;
88    virtual void reduceValue(const string& value) = 0;
89    virtual void close() = 0;
90    virtual void abort() = 0;
91    virtual ~DownwardProtocol() {}
92  };
93
94  class UpwardProtocol {
95  public:
96    virtual void output(const string& key, const string& value) = 0;
97    virtual void partitionedOutput(int reduce, const string& key,
98                                   const string& value) = 0;
99    virtual void status(const string& message) = 0;
100    virtual void progress(float progress) = 0;
101    virtual void done() = 0;
102    virtual void registerCounter(int id, const string& group, 
103                                 const string& name) = 0;
104    virtual void 
105      incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
106    virtual ~UpwardProtocol() {}
107  };
108
109  class Protocol {
110  public:
111    virtual void nextEvent() = 0;
112    virtual UpwardProtocol* getUplink() = 0;
113    virtual ~Protocol() {}
114  };
115
116  class TextUpwardProtocol: public UpwardProtocol {
117  private:
118    FILE* stream;
119    static const char fieldSeparator = '\t';
120    static const char lineSeparator = '\n';
121
122    void writeBuffer(const string& buffer) {
123      fprintf(stream, quoteString(buffer, "\t\n").c_str());
124    }
125
126  public:
127    TextUpwardProtocol(FILE* _stream): stream(_stream) {}
128   
129    virtual void output(const string& key, const string& value) {
130      fprintf(stream, "output%c", fieldSeparator);
131      writeBuffer(key);
132      fprintf(stream, "%c", fieldSeparator);
133      writeBuffer(value);
134      fprintf(stream, "%c", lineSeparator);
135    }
136
137    virtual void partitionedOutput(int reduce, const string& key,
138                                   const string& value) {
139      fprintf(stream, "parititionedOutput%c%d%c", fieldSeparator, reduce, 
140              fieldSeparator);
141      writeBuffer(key);
142      fprintf(stream, "%c", fieldSeparator);
143      writeBuffer(value);
144      fprintf(stream, "%c", lineSeparator);
145    }
146
147    virtual void status(const string& message) {
148      fprintf(stream, "status%c%s%c", fieldSeparator, message.c_str(), 
149              lineSeparator);
150    }
151
152    virtual void progress(float progress) {
153      fprintf(stream, "progress%c%f%c", fieldSeparator, progress, 
154              lineSeparator);
155    }
156
157    virtual void registerCounter(int id, const string& group, 
158                                 const string& name) {
159      fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
160              fieldSeparator, group.c_str(), fieldSeparator, name.c_str(), 
161              lineSeparator);
162    }
163
164    virtual void incrementCounter(const TaskContext::Counter* counter, 
165                                  uint64_t amount) {
166      fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(), 
167              fieldSeparator, (long)amount, lineSeparator);
168    }
169   
170    virtual void done() {
171      fprintf(stream, "done%c", lineSeparator);
172    }
173  };
174
175  class TextProtocol: public Protocol {
176  private:
177    FILE* downStream;
178    DownwardProtocol* handler;
179    UpwardProtocol* uplink;
180    string key;
181    string value;
182
183    int readUpto(string& buffer, const char* limit) {
184      int ch;
185      buffer.clear();
186      while ((ch = getc(downStream)) != -1) {
187        if (strchr(limit, ch) != NULL) {
188          return ch;
189        }
190        buffer += ch;
191      }
192      return -1;
193    }
194
195    static const char* delim;
196  public:
197
198    TextProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
199      downStream = down;
200      uplink = new TextUpwardProtocol(up);
201      handler = _handler;
202    }
203
204    UpwardProtocol* getUplink() {
205      return uplink;
206    }
207
208    virtual void nextEvent() {
209      string command;
210      string arg;
211      int sep;
212      sep = readUpto(command, delim);
213      if (command == "mapItem") {
214        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
215        sep = readUpto(key, delim);
216        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
217        sep = readUpto(value, delim);
218        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
219        handler->mapItem(key, value);
220      } else if (command == "reduceValue") {
221        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
222        sep = readUpto(value, delim);
223        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
224        handler->reduceValue(value);
225      } else if (command == "reduceKey") {
226        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
227        sep = readUpto(key, delim);
228        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
229        handler->reduceKey(key);
230      } else if (command == "start") {
231        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
232        sep = readUpto(arg, delim);
233        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
234        handler->start(toInt(arg));
235      } else if (command == "setJobConf") {
236        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
237        sep = readUpto(arg, delim);
238        int len = toInt(arg);
239        vector<string> values(len);
240        for(int i=0; i < len; ++i) {
241          HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
242          sep = readUpto(arg, delim);
243          values.push_back(arg);
244        }
245        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
246        handler->setJobConf(values);
247      } else if (command == "setInputTypes") {
248        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
249        sep = readUpto(key, delim);
250        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
251        sep = readUpto(value, delim);
252        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
253        handler->setInputTypes(key, value);
254      } else if (command == "runMap") {
255        string split;
256        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
257        sep = readUpto(split, delim);
258        string reduces;
259        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
260        sep = readUpto(reduces, delim);
261        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
262        sep = readUpto(arg, delim);
263        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
264        handler->runMap(split, toInt(reduces), toBool(arg));
265      } else if (command == "runReduce") {
266        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
267        sep = readUpto(arg, delim);
268        HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
269        string piped;
270        sep = readUpto(piped, delim);
271        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
272        handler->runReduce(toInt(arg), toBool(piped));
273      } else if (command == "abort") { 
274        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
275        handler->abort();
276      } else if (command == "close") {
277        HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
278        handler->close();
279      } else {
280        throw Error("Illegal text protocol command " + command);
281      }
282    }
283
284    ~TextProtocol() {
285      delete uplink;
286    }
287  };
288  const char* TextProtocol::delim = "\t\n";
289
290  enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
291                     MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
292                     CLOSE, ABORT, 
293                     OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
294                     REGISTER_COUNTER, INCREMENT_COUNTER};
295
296  class BinaryUpwardProtocol: public UpwardProtocol {
297  private:
298    FileOutStream* stream;
299  public:
300    BinaryUpwardProtocol(FILE* _stream) {
301      stream = new FileOutStream();
302      HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
303    }
304
305    virtual void output(const string& key, const string& value) {
306      serializeInt(OUTPUT, *stream);
307      serializeString(key, *stream);
308      serializeString(value, *stream);
309    }
310
311    virtual void partitionedOutput(int reduce, const string& key,
312                                   const string& value) {
313      serializeInt(PARTITIONED_OUTPUT, *stream);
314      serializeInt(reduce, *stream);
315      serializeString(key, *stream);
316      serializeString(value, *stream);
317    }
318
319    virtual void status(const string& message) {
320      serializeInt(STATUS, *stream);
321      serializeString(message, *stream);
322    }
323
324    virtual void progress(float progress) {
325      serializeInt(PROGRESS, *stream);
326      serializeFloat(progress, *stream);
327      stream->flush();
328    }
329
330    virtual void done() {
331      serializeInt(DONE, *stream);
332    }
333
334    virtual void registerCounter(int id, const string& group, 
335                                 const string& name) {
336      serializeInt(REGISTER_COUNTER, *stream);
337      serializeInt(id, *stream);
338      serializeString(group, *stream);
339      serializeString(name, *stream);
340    }
341
342    virtual void incrementCounter(const TaskContext::Counter* counter, 
343                                  uint64_t amount) {
344      serializeInt(INCREMENT_COUNTER, *stream);
345      serializeInt(counter->getId(), *stream);
346      serializeLong(amount, *stream);
347    }
348   
349    ~BinaryUpwardProtocol() {
350      delete stream;
351    }
352  };
353
354  class BinaryProtocol: public Protocol {
355  private:
356    FileInStream* downStream;
357    DownwardProtocol* handler;
358    BinaryUpwardProtocol * uplink;
359    string key;
360    string value;
361
362  public:
363    BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
364      downStream = new FileInStream();
365      downStream->open(down);
366      uplink = new BinaryUpwardProtocol(up);
367      handler = _handler;
368    }
369
370    UpwardProtocol* getUplink() {
371      return uplink;
372    }
373
374    virtual void nextEvent() {
375      int32_t cmd;
376      cmd = deserializeInt(*downStream);
377      switch (cmd) {
378      case START_MESSAGE: {
379        int32_t prot;
380        prot = deserializeInt(*downStream);
381        handler->start(prot);
382        break;
383      }
384      case SET_JOB_CONF: {
385        int32_t entries;
386        entries = deserializeInt(*downStream);
387        vector<string> result(entries);
388        for(int i=0; i < entries; ++i) {
389          string item;
390          deserializeString(item, *downStream);
391          result.push_back(item);
392        }
393        handler->setJobConf(result);
394        break;
395      }
396      case SET_INPUT_TYPES: {
397        string keyType;
398        string valueType;
399        deserializeString(keyType, *downStream);
400        deserializeString(valueType, *downStream);
401        handler->setInputTypes(keyType, valueType);
402        break;
403      }
404      case RUN_MAP: {
405        string split;
406        int32_t numReduces;
407        int32_t piped;
408        deserializeString(split, *downStream);
409        numReduces = deserializeInt(*downStream);
410        piped = deserializeInt(*downStream);
411        handler->runMap(split, numReduces, piped);
412        break;
413      }
414      case MAP_ITEM: {
415        deserializeString(key, *downStream);
416        deserializeString(value, *downStream);
417        handler->mapItem(key, value);
418        break;
419      }
420      case RUN_REDUCE: {
421        int32_t reduce;
422        int32_t piped;
423        reduce = deserializeInt(*downStream);
424        piped = deserializeInt(*downStream);
425        handler->runReduce(reduce, piped);
426        break;
427      }
428      case REDUCE_KEY: {
429        deserializeString(key, *downStream);
430        handler->reduceKey(key);
431        break;
432      }
433      case REDUCE_VALUE: {
434        deserializeString(value, *downStream);
435        handler->reduceValue(value);
436        break;
437      }
438      case CLOSE:
439        handler->close();
440        break;
441      case ABORT:
442        handler->abort();
443        break;
444      default:
445        HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
446      }
447    }
448
449    virtual ~BinaryProtocol() {
450      delete downStream;
451      delete uplink;
452    }
453  };
454
455  /**
456   * Define a context object to give to combiners that will let them
457   * go through the values and emit their results correctly.
458   */
459  class CombineContext: public ReduceContext {
460  private:
461    ReduceContext* baseContext;
462    Partitioner* partitioner;
463    int numReduces;
464    UpwardProtocol* uplink;
465    bool firstKey;
466    bool firstValue;
467    map<string, vector<string> >::iterator keyItr;
468    map<string, vector<string> >::iterator endKeyItr;
469    vector<string>::iterator valueItr;
470    vector<string>::iterator endValueItr;
471
472  public:
473    CombineContext(ReduceContext* _baseContext,
474                   Partitioner* _partitioner,
475                   int _numReduces,
476                   UpwardProtocol* _uplink,
477                   map<string, vector<string> >& data) {
478      baseContext = _baseContext;
479      partitioner = _partitioner;
480      numReduces = _numReduces;
481      uplink = _uplink;
482      keyItr = data.begin();
483      endKeyItr = data.end();
484      firstKey = true;
485      firstValue = true;
486    }
487
488    virtual const JobConf* getJobConf() {
489      return baseContext->getJobConf();
490    }
491
492    virtual const std::string& getInputKey() {
493      return keyItr->first;
494    }
495
496    virtual const std::string& getInputValue() {
497      return *valueItr;
498    }
499
500    virtual void emit(const std::string& key, const std::string& value) {
501      if (partitioner != NULL) {
502        uplink->partitionedOutput(partitioner->partition(key, numReduces),
503                                  key, value);
504      } else {
505        uplink->output(key, value);
506      }
507    }
508
509    virtual void progress() {
510      baseContext->progress();
511    }
512
513    virtual void setStatus(const std::string& status) {
514      baseContext->setStatus(status);
515    }
516
517    bool nextKey() {
518      if (firstKey) {
519        firstKey = false;
520      } else {
521        ++keyItr;
522      }
523      if (keyItr != endKeyItr) {
524        valueItr = keyItr->second.begin();
525        endValueItr = keyItr->second.end();
526        firstValue = true;
527        return true;
528      }
529      return false;
530    }
531
532    virtual bool nextValue() {
533      if (firstValue) {
534        firstValue = false;
535      } else {
536        ++valueItr;
537      }
538      return valueItr != endValueItr;
539    }
540   
541    virtual Counter* getCounter(const std::string& group, 
542                               const std::string& name) {
543      return baseContext->getCounter(group, name);
544    }
545
546    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
547      baseContext->incrementCounter(counter, amount);
548    }
549  };
550
551  /**
552   * A RecordWriter that will take the map outputs, buffer them up and then
553   * combine then when the buffer is full.
554   */
555  class CombineRunner: public RecordWriter {
556  private:
557    map<string, vector<string> > data;
558    int64_t spillSize;
559    int64_t numBytes;
560    ReduceContext* baseContext;
561    Partitioner* partitioner;
562    int numReduces;
563    UpwardProtocol* uplink;
564    Reducer* combiner;
565  public:
566    CombineRunner(int64_t _spillSize, ReduceContext* _baseContext, 
567                  Reducer* _combiner, UpwardProtocol* _uplink, 
568                  Partitioner* _partitioner, int _numReduces) {
569      numBytes = 0;
570      spillSize = _spillSize;
571      baseContext = _baseContext;
572      partitioner = _partitioner;
573      numReduces = _numReduces;
574      uplink = _uplink;
575      combiner = _combiner;
576    }
577
578    virtual void emit(const std::string& key,
579                      const std::string& value) {
580      numBytes += key.length() + value.length();
581      data[key].push_back(value);
582      if (numBytes >= spillSize) {
583        spillAll();
584      }
585    }
586
587    virtual void close() {
588      spillAll();
589    }
590
591  private:
592    void spillAll() {
593      CombineContext context(baseContext, partitioner, numReduces, 
594                             uplink, data);
595      while (context.nextKey()) {
596        combiner->reduce(context);
597      }
598      data.clear();
599      numBytes = 0;
600    }
601  };
602
603  class TaskContextImpl: public MapContext, public ReduceContext, 
604                         public DownwardProtocol {
605  private:
606    bool done;
607    JobConf* jobConf;
608    string key;
609    const string* newKey;
610    const string* value;
611    bool hasTask;
612    bool isNewKey;
613    bool isNewValue;
614    string* inputKeyClass;
615    string* inputValueClass;
616    string status;
617    float progressFloat;
618    uint64_t lastProgress;
619    bool statusSet;
620    Protocol* protocol;
621    UpwardProtocol *uplink;
622    string* inputSplit;
623    RecordReader* reader;
624    Mapper* mapper;
625    Reducer* reducer;
626    RecordWriter* writer;
627    Partitioner* partitioner;
628    int numReduces;
629    const Factory* factory;
630    pthread_mutex_t mutexDone;
631    std::vector<int> registeredCounterIds;
632
633  public:
634
635    TaskContextImpl(const Factory& _factory) {
636      statusSet = false;
637      done = false;
638      newKey = NULL;
639      factory = &_factory;
640      jobConf = NULL;
641      inputKeyClass = NULL;
642      inputValueClass = NULL;
643      inputSplit = NULL;
644      mapper = NULL;
645      reducer = NULL;
646      reader = NULL;
647      writer = NULL;
648      partitioner = NULL;
649      protocol = NULL;
650      isNewKey = false;
651      isNewValue = false;
652      lastProgress = 0;
653      progressFloat = 0.0f;
654      hasTask = false;
655      pthread_mutex_init(&mutexDone, NULL);
656    }
657
658    void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
659
660      protocol = _protocol;
661      uplink = _uplink;
662    }
663
664    virtual void start(int protocol) {
665      if (protocol != 0) {
666        throw Error("Protocol version " + toString(protocol) + 
667                    " not supported");
668      }
669    }
670
671    virtual void setJobConf(vector<string> values) {
672      int len = values.size();
673      JobConfImpl* result = new JobConfImpl();
674      HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
675      for(int i=0; i < len; i += 2) {
676        result->set(values[i], values[i+1]);
677      }
678      jobConf = result;
679    }
680
681    virtual void setInputTypes(string keyType, string valueType) {
682      inputKeyClass = new string(keyType);
683      inputValueClass = new string(valueType);
684    }
685
686    virtual void runMap(string _inputSplit, int _numReduces, bool pipedInput) {
687      inputSplit = new string(_inputSplit);
688      reader = factory->createRecordReader(*this);
689      HADOOP_ASSERT((reader == NULL) == pipedInput,
690                    pipedInput ? "RecordReader defined when not needed.":
691                    "RecordReader not defined");
692      if (reader != NULL) {
693        value = new string();
694      }
695      mapper = factory->createMapper(*this);
696      numReduces = _numReduces;
697      if (numReduces != 0) { 
698        reducer = factory->createCombiner(*this);
699        partitioner = factory->createPartitioner(*this);
700      }
701      if (reducer != NULL) {
702        int64_t spillSize = 100;
703        if (jobConf->hasKey("io.sort.mb")) {
704          spillSize = jobConf->getInt("io.sort.mb");
705        }
706        writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer, 
707                                   uplink, partitioner, numReduces);
708      }
709      hasTask = true;
710    }
711
712    virtual void mapItem(const string& _key, const string& _value) {
713      newKey = &_key;
714      value = &_value;
715      isNewKey = true;
716    }
717
718    virtual void runReduce(int reduce, bool pipedOutput) {
719      reducer = factory->createReducer(*this);
720      writer = factory->createRecordWriter(*this);
721      HADOOP_ASSERT((writer == NULL) == pipedOutput,
722                    pipedOutput ? "RecordWriter defined when not needed.":
723                    "RecordWriter not defined");
724      hasTask = true;
725    }
726
727    virtual void reduceKey(const string& _key) {
728      isNewKey = true;
729      newKey = &_key;
730    }
731
732    virtual void reduceValue(const string& _value) {
733      isNewValue = true;
734      value = &_value;
735    }
736   
737    virtual bool isDone() {
738      pthread_mutex_lock(&mutexDone);
739      bool doneCopy = done;
740      pthread_mutex_unlock(&mutexDone);
741      return doneCopy;
742    }
743
744    virtual void close() {
745      pthread_mutex_lock(&mutexDone);
746      done = true;
747      pthread_mutex_unlock(&mutexDone);
748    }
749
750    virtual void abort() {
751      throw Error("Aborted by driver");
752    }
753
754    void waitForTask() {
755      while (!done && !hasTask) {
756        protocol->nextEvent();
757      }
758    }
759
760    bool nextKey() {
761      if (reader == NULL) {
762        while (!isNewKey) {
763          nextValue();
764          if (done) {
765            return false;
766          }
767        }
768        key = *newKey;
769      } else {
770        if (!reader->next(key, const_cast<string&>(*value))) {
771          pthread_mutex_lock(&mutexDone);
772          done = true;
773          pthread_mutex_unlock(&mutexDone);
774          return false;
775        }
776        progressFloat = reader->getProgress();
777      }
778      isNewKey = false;
779      if (mapper != NULL) {
780        mapper->map(*this);
781      } else {
782        reducer->reduce(*this);
783      }
784      return true;
785    }
786
787    /**
788     * Advance to the next value.
789     */
790    virtual bool nextValue() {
791      if (isNewKey || done) {
792        return false;
793      }
794      isNewValue = false;
795      progress();
796      protocol->nextEvent();
797      return isNewValue;
798    }
799
800    /**
801     * Get the JobConf for the current task.
802     */
803    virtual JobConf* getJobConf() {
804      return jobConf;
805    }
806
807    /**
808     * Get the current key.
809     * @return the current key or NULL if called before the first map or reduce
810     */
811    virtual const string& getInputKey() {
812      return key;
813    }
814
815    /**
816     * Get the current value.
817     * @return the current value or NULL if called before the first map or
818     *    reduce
819     */
820    virtual const string& getInputValue() {
821      return *value;
822    }
823
824    /**
825     * Mark your task as having made progress without changing the status
826     * message.
827     */
828    virtual void progress() {
829      if (uplink != 0) {
830        uint64_t now = getCurrentMillis();
831        if (now - lastProgress > 1000) {
832          lastProgress = now;
833          if (statusSet) {
834            uplink->status(status);
835            statusSet = false;
836          }
837          uplink->progress(progressFloat);
838        }
839      }
840    }
841
842    /**
843     * Set the status message and call progress.
844     */
845    virtual void setStatus(const string& status) {
846      this->status = status;
847      statusSet = true;
848      progress();
849    }
850
851    /**
852     * Get the name of the key class of the input to this task.
853     */
854    virtual const string& getInputKeyClass() {
855      return *inputKeyClass;
856    }
857
858    /**
859     * Get the name of the value class of the input to this task.
860     */
861    virtual const string& getInputValueClass() {
862      return *inputValueClass;
863    }
864
865    /**
866     * Access the InputSplit of the mapper.
867     */
868    virtual const std::string& getInputSplit() {
869      return *inputSplit;
870    }
871
872    virtual void emit(const string& key, const string& value) {
873      progress();
874      if (writer != NULL) {
875        writer->emit(key, value);
876      } else if (partitioner != NULL) {
877        int part = partitioner->partition(key, numReduces);
878        uplink->partitionedOutput(part, key, value);
879      } else {
880        uplink->output(key, value);
881      }
882    }
883
884    /**
885     * Register a counter with the given group and name.
886     */
887    virtual Counter* getCounter(const std::string& group, 
888                               const std::string& name) {
889      int id = registeredCounterIds.size();
890      registeredCounterIds.push_back(id);
891      uplink->registerCounter(id, group, name);
892      return new Counter(id);
893    }
894
895    /**
896     * Increment the value of the counter with the given amount.
897     */
898    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
899      uplink->incrementCounter(counter, amount); 
900    }
901
902    void closeAll() {
903      if (reader) {
904        reader->close();
905      }
906      if (mapper) {
907        mapper->close();
908      }
909      if (reducer) {
910        reducer->close();
911      }
912      if (writer) {
913        writer->close();
914      }
915    }
916
917    virtual ~TaskContextImpl() {
918      delete jobConf;
919      delete inputKeyClass;
920      delete inputValueClass;
921      delete inputSplit;
922      if (reader) {
923        delete value;
924      }
925      delete reader;
926      delete mapper;
927      delete reducer;
928      delete writer;
929      delete partitioner;
930      pthread_mutex_destroy(&mutexDone);
931    }
932  };
933
934  /**
935   * Ping the parent every 5 seconds to know if it is alive
936   */
937  void* ping(void* ptr) {
938    TaskContextImpl* context = (TaskContextImpl*) ptr;
939    char* portStr = getenv("hadoop.pipes.command.port");
940    int MAX_RETRIES = 3;
941    int remaining_retries = MAX_RETRIES;
942    while (!context->isDone()) {
943      try{
944        sleep(5);
945        int sock = -1;
946        if (portStr) {
947          sock = socket(PF_INET, SOCK_STREAM, 0);
948          HADOOP_ASSERT(sock != - 1,
949                        string("problem creating socket: ") + strerror(errno));
950          sockaddr_in addr;
951          addr.sin_family = AF_INET;
952          addr.sin_port = htons(toInt(portStr));
953          addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
954          HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
955                        string("problem connecting command socket: ") +
956                        strerror(errno));
957
958        }
959        if (sock != -1) {
960          int result = shutdown(sock, SHUT_RDWR);
961          HADOOP_ASSERT(result == 0, "problem shutting socket");
962          result = close(sock);
963          HADOOP_ASSERT(result == 0, "problem closing socket");
964        }
965        remaining_retries = MAX_RETRIES;
966      } catch (Error& err) {
967        if (!context->isDone()) {
968          fprintf(stderr, "Hadoop Pipes Exception: in ping %s\n", 
969                err.getMessage().c_str());
970          remaining_retries -= 1;
971          if (remaining_retries == 0) {
972            exit(1);
973          }
974        } else {
975          return NULL;
976        }
977      }
978    }
979    return NULL;
980  }
981
982  /**
983   * Run the assigned task in the framework.
984   * The user's main function should set the various functions using the
985   * set* functions above and then call this.
986   * @return true, if the task succeeded.
987   */
988  bool runTask(const Factory& factory) {
989    try {
990      TaskContextImpl* context = new TaskContextImpl(factory);
991      Protocol* connection;
992      char* portStr = getenv("hadoop.pipes.command.port");
993      int sock = -1;
994      FILE* stream = NULL;
995      FILE* outStream = NULL;
996      char *bufin = NULL;
997      char *bufout = NULL;
998      if (portStr) {
999        sock = socket(PF_INET, SOCK_STREAM, 0);
1000        HADOOP_ASSERT(sock != - 1,
1001                      string("problem creating socket: ") + strerror(errno));
1002        sockaddr_in addr;
1003        addr.sin_family = AF_INET;
1004        addr.sin_port = htons(toInt(portStr));
1005        addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1006        HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
1007                      string("problem connecting command socket: ") +
1008                      strerror(errno));
1009
1010        stream = fdopen(sock, "r");
1011        outStream = fdopen(sock, "w");
1012
1013        // increase buffer size
1014        int bufsize = 128*1024;
1015        int setbuf;
1016        bufin = new char[bufsize];
1017        bufout = new char[bufsize];
1018        setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
1019        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
1020                                     + strerror(errno));
1021        setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
1022        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
1023                                     + strerror(errno));
1024
1025        connection = new BinaryProtocol(stream, context, outStream);
1026      } else if (getenv("hadoop.pipes.command.file")) {
1027        char* filename = getenv("hadoop.pipes.command.file");
1028        string outFilename = filename;
1029        outFilename += ".out";
1030        stream = fopen(filename, "r");
1031        outStream = fopen(outFilename.c_str(), "w");
1032        connection = new BinaryProtocol(stream, context, outStream);
1033      } else {
1034        connection = new TextProtocol(stdin, context, stdout);
1035      }
1036      context->setProtocol(connection, connection->getUplink());
1037      pthread_t pingThread;
1038      pthread_create(&pingThread, NULL, ping, (void*)(context));
1039      context->waitForTask();
1040      while (!context->isDone()) {
1041        context->nextKey();
1042      }
1043      context->closeAll();
1044      connection->getUplink()->done();
1045      pthread_join(pingThread,NULL);
1046      delete context;
1047      delete connection;
1048      if (stream != NULL) {
1049        fflush(stream);
1050      }
1051      if (outStream != NULL) {
1052        fflush(outStream);
1053      }
1054      fflush(stdout);
1055      if (sock != -1) {
1056        int result = shutdown(sock, SHUT_RDWR);
1057        HADOOP_ASSERT(result == 0, "problem shutting socket");
1058        result = close(sock);
1059        HADOOP_ASSERT(result == 0, "problem closing socket");
1060      }
1061      if (stream != NULL) {
1062        //fclose(stream);
1063      }
1064      if (outStream != NULL) {
1065        //fclose(outStream);
1066      } 
1067      delete bufin;
1068      delete bufout;
1069      return true;
1070    } catch (Error& err) {
1071      fprintf(stderr, "Hadoop Pipes Exception: %s\n", 
1072              err.getMessage().c_str());
1073      return false;
1074    }
1075  }
1076}
1077
Note: See TracBrowser for help on using the repository browser.