source: tradir/charm/pp/hello.C @ 177

Last change on this file since 177 was 177, checked in by (none), 14 years ago
File size: 10.8 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h> 
9#include <stdarg.h>
10#include <assert.h>
11#include <errno.h>
12#include "hello.decl.h"
13
14/*readonly*/ CProxy_Main mainProxy;
15/*readonly*/ int nElements;
16/*readonly*/ struct sockaddr_in serv_addr;
17char **edges;
18
19#define UBYTE   0x07
20#define BYTE    0x08
21#define INTEGER 0x09
22#define FLOAT   0x0a
23#define DOUBLE  0x0b
24#define STRING  0x0c
25#define LIST    0x0e
26
27struct message {
28        int32_t size;
29        int32_t pos;
30        uint8_t *data;
31};
32
33class Base {
34    public :
35        int sockfd;
36        struct message msg;
37
38        void error(const char *msg) {
39            perror(msg);
40            CkExit();
41        }
42
43        void msg_new() {
44                msg.size = 0;
45                msg.pos = 0;
46                msg.data = (uint8_t*) calloc(255, sizeof(char));
47        }
48
49        void msg_free() {
50                free(msg.data);
51        }
52
53        void msg_write(uint8_t cmd, const char *format, ...) {
54                va_list ap;
55                char *str;
56                int len;
57                int old = msg.size;
58                int i;
59
60                msg.size++;
61                msg.data[msg.size++] = cmd;
62                va_start(ap, format);
63                for (i = 0; i < strlen(format); i++) {
64                        switch (format[i]) {
65                                case 'b' :
66                                        msg.data[msg.size++] = (uint8_t) va_arg(ap, int);
67                                        break;
68                                case 'i' :
69                                        *(uint32_t*) &msg.data[msg.size] = htonl(va_arg(ap, uint32_t));
70                                        msg.size += 4;
71                                        break;
72                                case 'd' :
73                                        *(double*) &msg.data[msg.size] = va_arg(ap, double);
74                                        msg.size += 8;
75                                        break;
76                                case 's' :
77                                        str = va_arg(ap, char*);
78                                        len = strlen(str);
79                                        *(uint32_t*) &msg.data[msg.size] = htonl(len);
80                                        msg.size += 4;
81                                        memcpy(&msg.data[msg.size], str, len);
82                                        msg.size += len;
83                                        break;
84                                default :
85                                        break;
86                        }
87                }
88                va_end(ap);
89                msg.data[old] = (uint8_t) (msg.size - old);
90        }
91
92        void msg_send() {
93                int32_t size;
94                int res;
95
96                size = htonl(msg.size + 4);
97                res = write(sockfd, (char*) &size, 4);
98                if (res < 0) error("ERROR sending message size");
99                res = write(sockfd, msg.data, msg.size);
100                if (res < 0) error("ERROR sending message data");
101                free(msg.data);
102        }
103
104        void msg_recv() {
105                int32_t size;
106                int res;
107        //      int i;
108
109                res = read(sockfd, &size, 4);
110                if (res < 0) error("ERROR receiving message size");
111                msg.size = ntohl(size) - 4;
112                msg.data = (uint8_t*) calloc(msg.size, sizeof(char));
113                res = read(sockfd, msg.data, msg.size);
114                if (res < 0) error("ERROR receiving message data");
115        //      printf("### ");
116        //      for (i = 0; i < res; i++) {
117        //              printf("%02x-", msg.data[i]);
118        //      }
119        //      printf("\n");
120        }
121
122        void msg_read(uint8_t cmd, const char *format, ...) {
123                va_list ap;
124                char *s;
125                uint8_t** str;
126                uint8_t*** lst;
127                uint32_t host;
128                int cnt;
129                int len;
130                int old = msg.pos;
131                uint32_t size;
132                int i;
133                int j;
134
135                size = (uint8_t) msg.data[msg.pos++];
136                if (!size) {
137                        size = ntohl(*(uint32_t*) &msg.data[msg.pos]);
138                        msg.pos += 4;
139                }
140        //      printf("CMD: %02X DATA: %02X\n", cmd, msg.data[msg.pos]);
141                assert(cmd == msg.data[msg.pos++]);
142                va_start(ap, format);
143                for (i = 0; i < strlen(format); i++) {
144                        switch (format[i]) {
145                                case 'B' :
146                                        assert(va_arg(ap, int) == msg.data[msg.pos++]);
147                                        break;
148                                case 'b' :
149                                        *va_arg(ap, uint8_t*) = msg.data[msg.pos++];
150                                        break;
151                                case 'f' :
152                                        host = ntohl(*(uint32_t*) &msg.data[msg.pos]);
153                                        msg.pos += 4;
154                                        *va_arg(ap, float*) = *(float*) &host;
155                                        break;
156                                case 'S' :
157                                        len = ntohl(*(int32_t*) &msg.data[msg.pos]);
158                                        msg.pos += 4;
159                                        s = va_arg(ap, char*);
160                                        assert(len == strlen(s) && !strncmp((char*) &msg.data[msg.pos], s, len));
161                                        msg.pos += len;
162                                        break;
163                                case 's' :
164                                        len = ntohl(*(int32_t*) &msg.data[msg.pos]);
165                                        msg.pos += 4;
166                                        str = va_arg(ap, uint8_t**);
167                                        *str = (uint8_t*) calloc(1 + len, sizeof(uint8_t));
168                                        memcpy(*str, &msg.data[msg.pos], len);
169                                        msg.pos += len;
170                                        break;
171                                case 'l' :
172                                        cnt = ntohl(*(uint32_t*) &msg.data[msg.pos]);
173                                        msg.pos += 4;
174                                        lst = va_arg(ap, uint8_t***);
175                                        *lst = (uint8_t**) calloc(1 + cnt, sizeof(uint8_t*));
176                                        for (j = 0; j < cnt; j++) {
177                                                len = ntohl(*(uint32_t*) &msg.data[msg.pos]);
178                                                msg.pos += 4;
179                                                (*lst)[j] = (uint8_t*) calloc(1 + len, sizeof(uint8_t));
180                                                memcpy((*lst)[j], &msg.data[msg.pos], len);
181                                                msg.pos += len;
182                                        }
183                                case '#' :
184                                        assert(msg.pos - old == size);
185                                        break;
186                                default :
187                                        break;
188                        }
189                }
190                va_end(ap);
191        }
192
193
194        void do_close() {
195                char *desc;
196
197                msg_new();
198                msg_write(0x7f, ""); 
199                msg_send();
200                msg_recv();
201
202                msg_read(0x7f, "Bs#", 0x00, &desc);
203        //      printf("CLOSE: 0x%02x-%s\n", res, desc);
204                msg_free();
205        }
206
207        void do_step() {
208                char *desc;
209
210                msg_new();
211                msg_write(0x01, "db", 100000.0, 0); 
212                msg_send();
213                msg_recv();
214                msg_read(0x01, "Bs#", 0x00, &desc);
215        //      printf("STEP: #%s#\n", desc);
216                msg_free();
217        }
218
219        void do_steps(int n) {
220                int i;
221                for (i = 0; i < n; i++) {
222                        do_step();
223                }
224        }
225
226        char **get_edges() {
227                char *desc;
228                char **edges;
229        //      int i = 0;
230
231                msg_new();
232                msg_write(0xaa, "bs", 0x00, "");
233                msg_send();
234                msg_recv();
235                msg_read(0xaa, "Bs", 0x00, &desc);
236                msg_read(0xba, "BSBl#", 0x00, "", LIST, &edges);
237        //      printf("EDGES: ");
238        //      while (edges[i]) {
239        //              printf("[%s] ", edges[i]);
240        //              i++;
241        //      }
242        //      printf("\n");
243                msg_free();
244
245                return edges;
246        }
247
248        char **get_lanes() {
249                char *desc;
250                char **lanes;
251        //      int i = 0;
252
253                msg_new();
254                msg_write(0xa3, "bs", 0x00, "");
255                msg_send();
256                msg_recv();
257                msg_read(0xa3, "Bs", 0x00, &desc);
258                msg_read(0xb3, "BSBl#", 0x00, "", LIST, &lanes);
259        //      printf("LANES: ");
260        //      while (lanes[i]) {
261        //              printf("{%s} ", lanes[i]);
262        //              i++;
263        //      }
264        //      printf("\n");
265                msg_free();
266
267                return lanes;
268        }
269
270        char *get_lane_edge(char *lane) {
271                char *desc;
272                char *edge;
273
274                msg_new();
275                msg_write(0xa3, "bs", 0x31, lane);
276                msg_send();
277                msg_recv();
278                msg_read(0xa3, "Bs", 0x00, &desc);
279                msg_read(0xb3, "BSBs#", 0x31, lane, STRING, &edge);
280                msg_free();
281
282                return edge;
283        }
284
285        float get_lane_length(char *lane) {
286                char *desc;
287                float length;
288
289                msg_new();
290                msg_write(0xa3, "bs", 0x44, lane);
291                msg_send();
292                msg_recv();
293                msg_read(0xa3, "Bs", 0x00, &desc);
294                msg_read(0xb3, "BSBf#", 0x44, lane, FLOAT, &length);
295                msg_free();
296
297                return length;
298        }
299
300        float *get_edge_lengths(char **edges) {
301                int count;
302                float *lengths;
303                char **lanes;
304                int i, j;
305
306                printf("HERE\n");
307                lanes = get_lanes();
308                count = 0;
309                while (edges[count++]);
310                printf("HERE\n");
311                lengths = (float*) calloc(count, sizeof(float));
312                for (i = 0; lanes[i]; i++) {
313                        char *edge = get_lane_edge(lanes[i]);
314                        float length = get_lane_length(lanes[i]);
315                        for (j = 0; edges[j]; j++) {
316                                if (!strcmp(edges[j], edge)) {
317                                        lengths[j] = length;
318                                        break;
319                                }
320                        }
321                }
322                for (i = 0; edges[i]; i++) {
323                        printf("LENGTH OF [%s] is %f\n", edges[i], lengths[i]);
324                }
325
326                return lengths;
327        }
328
329        char **get_edge_vehicles(char *edge) {
330                char *desc;
331                char **vehicles;
332                int i = 0;
333
334                msg_new();
335                msg_write(0xaa, "bs", 0x12, edge);
336                msg_send();
337                msg_recv();
338                msg_read(0xaa, "Bs", 0x00, &desc);
339                msg_read(0xba, "BSBl#", 0x12, edge, LIST, &vehicles);
340        //      printf("VEHICLES: 0x%02x-#%s#\n", res, desc);
341        //      printf("ON #%s#: ", edge);
342        //      while (vehicles[i]) {
343        //              printf("<%s> ", vehicles[i]);
344        //              i++;
345        //      }
346        //      printf("\n");
347                msg_free();
348
349                return vehicles;
350        }
351
352        float get_vehicle_speed(char *vehicle) {
353                char *desc;
354                float speed;
355
356                msg_new();
357                msg_write(0xa4, "bs", 0x40, vehicle);
358                msg_send();
359                msg_recv();
360                msg_read(0xa4, "Bs", 0x00, &desc);
361                msg_read(0xb4, "BSBf#", 0x40, vehicle, FLOAT, &speed);
362        //      printf("SPEED: 0x%02x-#%s#\n", res, desc);
363        //      printf("speed is %f\n", speed);
364                msg_free();
365
366                return speed;
367        }
368};
369
370/*array [1D]*/
371class Hello : public CBase_Hello, public Base {
372    private :
373        char *edge;
374    public:
375        Hello() {
376                sockfd = socket(AF_INET, SOCK_STREAM, 0);
377                if (sockfd < 0) 
378                        error("ERROR opening socket");
379retry:         
380                if (connect(sockfd,(struct sockaddr*) &serv_addr,sizeof(serv_addr)) < 0)
381                        if (errno == EINTR || errno == EAGAIN)
382                                goto retry; 
383                        else 
384                                error("ERROR connecting");
385                edge = edges[CkMyPe()];
386        }
387
388        Hello(CkMigrateMessage *m) {}
389
390        void work() {
391                int j;
392                float speed = 0;
393                char **vehicles = get_edge_vehicles(edge);
394                for (j = 0; vehicles[j]; j++) {
395                        speed += get_vehicle_speed(vehicles[j]);
396                }
397                free(vehicles);
398                speed /= j;
399                CkPrintf("speed is %f\n", speed);
400
401                mainProxy.done();
402        }
403};
404
405/*mainchare*/
406class Main : public CBase_Main, public Base {
407    private :
408        CProxy_Hello arr;
409        int count;
410        int step;
411    public:
412        Main(CkArgMsg* m) {
413                uint16_t port;
414                struct hostent *server;
415
416                count = 0;
417                step = 0;
418
419                if (m->argc < 3) {
420                        fprintf(stderr,"usage %s hostname port\n", m->argv[0]);
421                        exit(0);
422                }
423                port = htons(atoi(m->argv[2]));
424                sockfd = socket(AF_INET, SOCK_STREAM, 0);
425                if (sockfd < 0) 
426                        error("ERROR opening socket");
427                server = gethostbyname(m->argv[1]);
428                if (server == NULL) {
429                        fprintf(stderr,"ERROR, no such host\n");
430                        exit(0);
431                }
432                bzero((char *) &serv_addr, sizeof(serv_addr));
433                serv_addr.sin_family = AF_INET;
434                bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length);
435                serv_addr.sin_port = port;
436                if (connect(sockfd,(struct sockaddr*) &serv_addr,sizeof(serv_addr)) < 0) 
437                        error("ERROR connecting");
438
439                edges = get_edges();
440                nElements = 0;
441                while (edges[nElements++]);
442
443                //Start the computation
444                CkPrintf("Running Hello on %d processors for %d elements\n", CkNumPes(),nElements);
445                mainProxy = thisProxy;
446
447                arr = CProxy_Hello::ckNew(nElements);
448                for (int i = 0; i < nElements; i++) {
449                        arr[i].work();
450                }
451        }
452
453        void done(void) {
454                count++;
455                if (count == nElements) {
456                        do_step();
457                        step++;
458                        if (step == 20) {
459                                CkExit();
460                        } else {
461                                count = 0;
462                                for (int i = 0; i < nElements; i++) {
463                                        arr[i].work();
464                                }
465                        }
466                }
467        }
468};
469
470
471
472
473/*
474int main(int argc, char* argv[]) {
475        FILE *fout;
476
477        int portno;
478        struct sockaddr_in serv_addr;
479        struct hostent *server;
480
481        int step;
482
483        char **edges;
484        char **vehicles;
485        float speed;
486        float *lengths;
487        int i, j;
488
489        if (argc < 3) {
490                fprintf(stderr,"usage %s hostname port\n", argv[0]);
491                exit(0);
492        }
493        portno = atoi(argv[2]);
494        sockfd = socket(AF_INET, SOCK_STREAM, 0);
495        if (sockfd < 0)
496                error("ERROR opening socket");
497        server = gethostbyname(argv[1]);
498        if (server == NULL) {
499                fprintf(stderr,"ERROR, no such host\n");
500                exit(0);
501        }
502        bzero((char *) &serv_addr, sizeof(serv_addr));
503        serv_addr.sin_family = AF_INET;
504        bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length);
505        serv_addr.sin_port = htons(portno);
506        if (connect(sockfd,(struct sockaddr*) &serv_addr,sizeof(serv_addr)) < 0)
507                error("ERROR connecting");
508
509        fout = fopen("results.txt", "w");
510
511        edges = get_edges();
512        lengths = get_edge_lengths(edges);
513        for (step = 0; step < 1; step++) {
514                printf("Step %03i ... ", step + 1);
515                do_step();
516                for (i = 0; edges[i] && i < 20; i++) {
517                        speed = 0;
518                        vehicles = get_edge_vehicles(edges[i]);
519                        for (j = 0; vehicles[j]; j++) {
520                                speed += get_vehicle_speed(vehicles[j]);
521                        }
522                        free(vehicles);
523                        speed /= j;
524                        fprintf(fout, "%f\t", speed);
525                }
526                free(edges);
527                fprintf(fout, "\n");
528                printf("OK\n");
529        }
530        do_close();
531
532        fclose(fout);
533
534        return 0;
535}
536
537*/
538
539
540
541
542
543
544
545#include "hello.def.h"
Note: See TracBrowser for help on using the repository browser.