source: proiecte/HadoopJUnit/hadoop-0.20.1/src/c++/utils/impl/SerialUtils.cc @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 6.4 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18#include "hadoop/SerialUtils.hh"
19#include "hadoop/StringUtils.hh"
20
21#include <errno.h>
22#include <rpc/types.h>
23#include <rpc/xdr.h>
24#include <string>
25
26using std::string;
27
28namespace HadoopUtils {
29
30  Error::Error(const std::string& msg): error(msg) {
31  }
32
33  Error::Error(const std::string& msg, 
34               const std::string& file, int line, 
35               const std::string& function) {
36    error = msg + " at " + file + ":" + toString(line) + 
37            " in " + function;
38  }
39
40  const std::string& Error::getMessage() const {
41    return error;
42  }
43
44  FileInStream::FileInStream()
45  {
46    mFile = NULL;
47    isOwned = false;
48  }
49
50  bool FileInStream::open(const std::string& name)
51  {
52    mFile = fopen(name.c_str(), "rb");
53    isOwned = true;
54    return (mFile != NULL);
55  }
56
57  bool FileInStream::open(FILE* file)
58  {
59    mFile = file;
60    isOwned = false;
61    return (mFile != NULL);
62  }
63
64  void FileInStream::read(void *buf, size_t len)
65  {
66    size_t result = fread(buf, len, 1, mFile);
67    if (result == 0) {
68      if (feof(mFile)) {
69        HADOOP_ASSERT(false, "end of file");
70      } else {
71        HADOOP_ASSERT(false, string("read error on file: ") + strerror(errno));
72      }
73    }
74  }
75
76  bool FileInStream::skip(size_t nbytes)
77  {
78    return (0==fseek(mFile, nbytes, SEEK_CUR));
79  }
80
81  bool FileInStream::close()
82  {
83    int ret = 0;
84    if (mFile != NULL && isOwned) {
85      ret = fclose(mFile);
86    }
87    mFile = NULL;
88    return (ret==0);
89  }
90
91  FileInStream::~FileInStream()
92  {
93    if (mFile != NULL) {
94      close();
95    }
96  }
97
98  FileOutStream::FileOutStream()
99  {
100    mFile = NULL;
101    isOwned = false;
102  }
103
104  bool FileOutStream::open(const std::string& name, bool overwrite)
105  {
106    if (!overwrite) {
107      mFile = fopen(name.c_str(), "rb");
108      if (mFile != NULL) {
109        fclose(mFile);
110        return false;
111      }
112    }
113    mFile = fopen(name.c_str(), "wb");
114    isOwned = true;
115    return (mFile != NULL);
116  }
117
118  bool FileOutStream::open(FILE* file)
119  {
120    mFile = file;
121    isOwned = false;
122    return (mFile != NULL);
123  }
124
125  void FileOutStream::write(const void* buf, size_t len)
126  {
127    size_t result = fwrite(buf, len, 1, mFile);
128    HADOOP_ASSERT(result == 1,
129                  string("write error to file: ") + strerror(errno));
130  }
131
132  bool FileOutStream::advance(size_t nbytes)
133  {
134    return (0==fseek(mFile, nbytes, SEEK_CUR));
135  }
136
137  bool FileOutStream::close()
138  {
139    int ret = 0;
140    if (mFile != NULL && isOwned) {
141      ret = fclose(mFile);
142    }
143    mFile = NULL;
144    return (ret == 0);
145  }
146
147  void FileOutStream::flush()
148  {
149    fflush(mFile);
150  }
151
152  FileOutStream::~FileOutStream()
153  {
154    if (mFile != NULL) {
155      close();
156    }
157  }
158
159  StringInStream::StringInStream(const std::string& str): buffer(str) {
160    itr = buffer.begin();
161  }
162
163  void StringInStream::read(void *buf, size_t buflen) {
164    size_t bytes = 0;
165    char* output = (char*) buf;
166    std::string::const_iterator end = buffer.end();
167    while (bytes < buflen) {
168      output[bytes++] = *itr;
169      ++itr;
170      if (itr == end) {
171        break;
172      }
173    }
174    HADOOP_ASSERT(bytes == buflen, "unexpected end of string reached");
175  }
176
177  void serializeInt(int32_t t, OutStream& stream) {
178    serializeLong(t,stream);
179  }
180
181  void serializeLong(int64_t t, OutStream& stream)
182  {
183    if (t >= -112 && t <= 127) {
184      int8_t b = t;
185      stream.write(&b, 1);
186      return;
187    }
188       
189    int8_t len = -112;
190    if (t < 0) {
191      t ^= -1ll; // reset the sign bit
192      len = -120;
193    }
194       
195    uint64_t tmp = t;
196    while (tmp != 0) {
197      tmp = tmp >> 8;
198      len--;
199    }
200 
201    stream.write(&len, 1);     
202    len = (len < -120) ? -(len + 120) : -(len + 112);
203       
204    for (uint32_t idx = len; idx != 0; idx--) {
205      uint32_t shiftbits = (idx - 1) * 8;
206      uint64_t mask = 0xFFll << shiftbits;
207      uint8_t b = (t & mask) >> shiftbits;
208      stream.write(&b, 1);
209    }
210  }
211
212  int32_t deserializeInt(InStream& stream) {
213    return deserializeLong(stream);
214  }
215
216  int64_t deserializeLong(InStream& stream)
217  {
218    int8_t b;
219    stream.read(&b, 1);
220    if (b >= -112) {
221      return b;
222    }
223    bool negative;
224    int len;
225    if (b < -120) {
226      negative = true;
227      len = -120 - b;
228    } else {
229      negative = false;
230      len = -112 - b;
231    }
232    uint8_t barr[len];
233    stream.read(barr, len);
234    int64_t t = 0;
235    for (int idx = 0; idx < len; idx++) {
236      t = t << 8;
237      t |= (barr[idx] & 0xFF);
238    }
239    if (negative) {
240      t ^= -1ll;
241    }
242    return t;
243  }
244
245  void serializeFloat(float t, OutStream& stream)
246  {
247    char buf[sizeof(float)];
248    XDR xdrs;
249    xdrmem_create(&xdrs, buf, sizeof(float), XDR_ENCODE);
250    xdr_float(&xdrs, &t);
251    stream.write(buf, sizeof(float));
252  }
253
254  void deserializeFloat(float& t, InStream& stream)
255  {
256    char buf[sizeof(float)];
257    stream.read(buf, sizeof(float));
258    XDR xdrs;
259    xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
260    xdr_float(&xdrs, &t);
261  }
262
263  void serializeString(const std::string& t, OutStream& stream)
264  {
265    serializeInt(t.length(), stream);
266    if (t.length() > 0) {
267      stream.write(t.data(), t.length());
268    }
269  }
270
271  void deserializeString(std::string& t, InStream& stream)
272  {
273    int32_t len = deserializeInt(stream);
274    if (len > 0) {
275      // resize the string to the right length
276      t.resize(len);
277      // read into the string in 64k chunks
278      const int bufSize = 65536;
279      int offset = 0;
280      char buf[bufSize];
281      while (len > 0) {
282        int chunkLength = len > bufSize ? bufSize : len;
283        stream.read(buf, chunkLength);
284        t.replace(offset, chunkLength, buf, chunkLength);
285        offset += chunkLength;
286        len -= chunkLength;
287      }
288    } else {
289      t.clear();
290    }
291  }
292
293}
Note: See TracBrowser for help on using the repository browser.