source: proiecte/HadoopJUnit/hadoop-0.20.1/src/c++/librecordio/binarchive.cc @ 141

Last change on this file since 141 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 7.3 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "binarchive.hh"
20#include <rpc/types.h>
21#include <rpc/xdr.h>
22
23
24using namespace hadoop;
25
26template <typename T>
27static void serialize(T t, OutStream& stream)
28{
29  if (sizeof(T) != stream.write((const void *) &t, sizeof(T))) {
30    throw new IOException("Error serializing data.");
31  }
32}
33
34template <typename T>
35static void deserialize(T& t, InStream& stream)
36{
37  if (sizeof(T) != stream.read((void *) &t, sizeof(T))) {
38    throw new IOException("Error deserializing data.");
39  }
40}
41
42static void serializeLong(int64_t t, OutStream& stream)
43{
44  if (t >= -112 && t <= 127) {
45    int8_t b = t;
46    stream.write(&b, 1);
47    return;
48  }
49       
50  int8_t len = -112;
51  if (t < 0) {
52    t ^= 0xFFFFFFFFFFFFFFFFLL; // take one's complement
53    len = -120;
54  }
55       
56  uint64_t tmp = t;
57  while (tmp != 0) {
58    tmp = tmp >> 8;
59    len--;
60  }
61 
62  stream.write(&len, 1);
63       
64  len = (len < -120) ? -(len + 120) : -(len + 112);
65       
66  for (uint32_t idx = len; idx != 0; idx--) {
67    uint32_t shiftbits = (idx - 1) * 8;
68    uint64_t mask = 0xFFLL << shiftbits;
69    uint8_t b = (t & mask) >> shiftbits;
70    stream.write(&b, 1);
71  }
72}
73
74static void deserializeLong(int64_t& t, InStream& stream)
75{
76  int8_t b;
77  if (1 != stream.read(&b, 1)) {
78    throw new IOException("Error deserializing long.");
79  }
80  if (b >= -112) {
81    t = b;
82    return;
83  }
84  bool isNegative = (b < -120);
85  b = isNegative ? -(b + 120) : -(b + 112);
86  uint8_t barr[b];
87  if (b != stream.read(barr, b)) {
88    throw new IOException("Error deserializing long.");
89  }
90  t = 0;
91  for (int idx = 0; idx < b; idx++) {
92    t = t << 8;
93    t |= (barr[idx] & 0xFF);
94  }
95  if (isNegative) {
96    t ^= 0xFFFFFFFFFFFFFFFFLL;
97  }
98}
99
100static void serializeInt(int32_t t, OutStream& stream)
101{
102  int64_t longVal = t;
103  ::serializeLong(longVal, stream);
104}
105
106static void deserializeInt(int32_t& t, InStream& stream)
107{
108  int64_t longVal;
109  ::deserializeLong(longVal, stream);
110  t = longVal;
111}
112
113static void serializeFloat(float t, OutStream& stream)
114{
115  char buf[sizeof(float)];
116  XDR xdrs;
117  xdrmem_create(&xdrs, buf, sizeof(float), XDR_ENCODE);
118  xdr_float(&xdrs, &t);
119  stream.write(buf, sizeof(float));
120}
121
122static void deserializeFloat(float& t, InStream& stream)
123{
124  char buf[sizeof(float)];
125  if (sizeof(float) != stream.read(buf, sizeof(float))) {
126    throw new IOException("Error deserializing float.");
127  }
128  XDR xdrs;
129  xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
130  xdr_float(&xdrs, &t);
131}
132
133static void serializeDouble(double t, OutStream& stream)
134{
135  char buf[sizeof(double)];
136  XDR xdrs;
137  xdrmem_create(&xdrs, buf, sizeof(double), XDR_ENCODE);
138  xdr_double(&xdrs, &t);
139  stream.write(buf, sizeof(double));
140}
141
142static void deserializeDouble(double& t, InStream& stream)
143{
144  char buf[sizeof(double)];
145  stream.read(buf, sizeof(double));
146  XDR xdrs;
147  xdrmem_create(&xdrs, buf, sizeof(double), XDR_DECODE);
148  xdr_double(&xdrs, &t);
149}
150
151static void serializeString(const std::string& t, OutStream& stream)
152{
153  ::serializeInt(t.length(), stream);
154  if (t.length() > 0) {
155    stream.write(t.data(), t.length());
156  }
157}
158
159static void deserializeString(std::string& t, InStream& stream)
160{
161  int32_t len = 0;
162  ::deserializeInt(len, stream);
163  if (len > 0) {
164    // resize the string to the right length
165    t.resize(len);
166    // read into the string in 64k chunks
167    const int bufSize = 65536;
168    int offset = 0;
169    char buf[bufSize];
170    while (len > 0) {
171      int chunkLength = len > bufSize ? bufSize : len;
172      stream.read((void *)buf, chunkLength);
173      t.replace(offset, chunkLength, buf, chunkLength);
174      offset += chunkLength;
175      len -= chunkLength;
176    }
177  }
178}
179
180void hadoop::IBinArchive::deserialize(int8_t& t, const char* tag)
181{
182  ::deserialize(t, stream);
183}
184
185void hadoop::IBinArchive::deserialize(bool& t, const char* tag)
186{
187  ::deserialize(t, stream);
188}
189
190void hadoop::IBinArchive::deserialize(int32_t& t, const char* tag)
191{
192  int64_t longVal = 0LL;
193  ::deserializeLong(longVal, stream);
194  t = longVal;
195}
196
197void hadoop::IBinArchive::deserialize(int64_t& t, const char* tag)
198{
199  ::deserializeLong(t, stream);
200}
201
202void hadoop::IBinArchive::deserialize(float& t, const char* tag)
203{
204  ::deserializeFloat(t, stream);
205}
206
207void hadoop::IBinArchive::deserialize(double& t, const char* tag)
208{
209  ::deserializeDouble(t, stream);
210}
211
212void hadoop::IBinArchive::deserialize(std::string& t, const char* tag)
213{
214  ::deserializeString(t, stream);
215}
216
217void hadoop::IBinArchive::deserialize(std::string& t, size_t& len, const char* tag)
218{
219  ::deserializeString(t, stream);
220  len = t.length();
221}
222
223void hadoop::IBinArchive::startRecord(Record& s, const char* tag)
224{
225}
226
227void hadoop::IBinArchive::endRecord(Record& s, const char* tag)
228{
229}
230
231Index* hadoop::IBinArchive::startVector(const char* tag)
232{
233  int32_t len;
234  ::deserializeInt(len, stream);
235  BinIndex *idx = new BinIndex((size_t) len);
236  return idx;
237}
238
239void hadoop::IBinArchive::endVector(Index* idx, const char* tag)
240{
241  delete idx;
242}
243
244Index* hadoop::IBinArchive::startMap(const char* tag)
245{
246  int32_t len;
247  ::deserializeInt(len, stream);
248  BinIndex *idx = new BinIndex((size_t) len);
249  return idx;
250}
251
252void hadoop::IBinArchive::endMap(Index* idx, const char* tag)
253{
254  delete idx;
255}
256
257hadoop::IBinArchive::~IBinArchive()
258{
259}
260
261void hadoop::OBinArchive::serialize(int8_t t, const char* tag)
262{
263  ::serialize(t, stream);
264}
265
266void hadoop::OBinArchive::serialize(bool t, const char* tag)
267{
268  ::serialize(t, stream);
269}
270
271void hadoop::OBinArchive::serialize(int32_t t, const char* tag)
272{
273  int64_t longVal = t;
274  ::serializeLong(longVal, stream);
275}
276
277void hadoop::OBinArchive::serialize(int64_t t, const char* tag)
278{
279  ::serializeLong(t, stream);
280}
281
282void hadoop::OBinArchive::serialize(float t, const char* tag)
283{
284  ::serializeFloat(t, stream);
285}
286
287void hadoop::OBinArchive::serialize(double t, const char* tag)
288{
289  ::serializeDouble(t, stream);
290}
291
292void hadoop::OBinArchive::serialize(const std::string& t, const char* tag)
293{
294  ::serializeString(t, stream);
295}
296
297void hadoop::OBinArchive::serialize(const std::string& t, size_t len, const char* tag)
298{
299  ::serializeString(t, stream);
300}
301
302void hadoop::OBinArchive::startRecord(const Record& s, const char* tag)
303{
304}
305
306void hadoop::OBinArchive::endRecord(const Record& s, const char* tag)
307{
308}
309
310void hadoop::OBinArchive::startVector(size_t len, const char* tag)
311{
312  ::serializeInt(len, stream);
313}
314
315void hadoop::OBinArchive::endVector(size_t len, const char* tag)
316{
317}
318
319void hadoop::OBinArchive::startMap(size_t len, const char* tag)
320{
321  ::serializeInt(len, stream);
322}
323
324void hadoop::OBinArchive::endMap(size_t len, const char* tag)
325{
326}
327
328hadoop::OBinArchive::~OBinArchive()
329{
330}
Note: See TracBrowser for help on using the repository browser.