source: proiecte/PDAD/trunk/nodeslocation/mpi/nodesLocation.c @ 154

Last change on this file since 154 was 154, checked in by (none), 14 years ago

PDAD project

File size: 10.0 KB
Line 
1// Frequent (>1000) failure cause for different categories of events grouped by duration (short, medium, long)
2
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <unistd.h>
7#include "mpi.h"
8#include "hashmap_public.h"
9
10#define LINEMAX 512
11#define NAMEMAX 32
12#define CHUNK 20480
13#define ENCODED_NO 33
14#define MAX_UNITS 10
15
16#define JOB_TAG 0
17#define RESPONSE_TAG 1
18#define QUERY_TAG 2
19
20typedef struct location_unit {
21    char location[NAMEMAX]; 
22    unsigned long failures; 
23} location_unit; 
24
25typedef struct resp_payload {
26    int unsigned count; 
27    location_unit units[MAX_UNITS]; 
28} resp_payload; 
29
30typedef struct node_key {
31    int unsigned node_id; 
32    int unsigned platform_id; 
33} node_key; 
34
35typedef struct query_payload {
36    node_key key; 
37} query_payload; 
38
39typedef struct node_value {
40    char location[0]; 
41} node_value; 
42
43uint32_t node_hash(key generic_key) {
44    node_key *k = (node_key *) generic_key; 
45    return (k->node_id * k->platform_id); 
46}
47
48bool node_eq(key a, key b) {
49    node_key *na = (node_key *)a; 
50    node_key *nb = (node_key *)b; 
51    return (na->node_id == nb->node_id) && (na->platform_id == nb->platform_id); 
52}
53
54typedef struct loc_key {
55    char *name; 
56} loc_key; 
57
58typedef struct loc_value {
59    int unsigned failures; 
60} loc_value; 
61
62uint32_t loc_hash(key generic_key) {
63    loc_key *k = (loc_key *) generic_key; 
64    return (uint32_t)k->name[0]; 
65}
66
67bool loc_eq(key a, key b) {
68    loc_key *la = (loc_key *) a; 
69    loc_key *lb = (loc_key *) b; 
70
71    return strcmp(la->name, lb->name) == 0; 
72}
73
74
75/* for master */
76hashmap *nodes_map;
77
78/* for slaves: */
79hashmap *locations; 
80hashmap *nodes_cache; 
81 
82unsigned long* poz;
83unsigned long recv[2];
84unsigned long poz_size = 0;
85unsigned long recv_m[ENCODED_NO];
86unsigned long send_m[ENCODED_NO];
87
88int failure_key[] = {-1, 0, 999, 1999, 2999, 3999, 4999, 5999, 6999, 7000, 7001};
89char* failure_value[] = {"not reported", "reported as undetermined", "infrastructure", "hardware", "IO", "network", "software", "human error",
90                         "user", "end of measurement", "TYPING"};
91long duration_key[] = {1000, 100000, 2147483647};
92char* duration_value[] = {"short", "medium", "long"};
93char* encoded_values[] = {"short-not reported", "medium-not reported", "long-not reported", "short-reported as undetermined",
94                          "medium-reported as undetermined", "long-reported as undetermined", "short-infrastructure", "medium-infrastructure",
95                          "long-infrastructure", "short-hardware", "medium-hardware", "long-hardware", "short-IO", "medium-IO", "long-IO", "short-network",
96                          "medium-network", "long-network", "short-software", "medium-software", "long-software", "short-human error",
97                          "medium-human error", "long-human error", "short-user", "medium-user", "long-user", "short-end of measurement",
98                          "medium-end of measurement", "long-end of measurement", "short-TYPING", "medium-TYPING", "long-TYPING"};
99
100unsigned long file_length(FILE *f) {
101    long pos;
102    long end;
103
104    pos = ftell (f);
105    fseek (f, 0, SEEK_END);
106    end = ftell (f);
107    fseek (f, pos, SEEK_SET);
108
109    return end;
110}
111
112
113void write_result (char *filename) {
114    FILE *f;
115    int i;
116
117    f = fopen(filename, "wt");
118    if (f == NULL) {
119        fprintf(stderr, "Could not open file %s for write\n", filename);
120        MPI_Abort(MPI_COMM_WORLD, 7777);
121    }
122
123    for(i = 0 ; i < ENCODED_NO ; i++) {
124        fprintf(f, "%-40s", encoded_values[i]);
125        fprintf(f, "%lu\n", recv_m[i]);
126    }
127}
128
129void load_nodes_info(char *nodes_filename) {
130    FILE *nodes_file;
131    char buffer[LINEMAX]; 
132    char node_name[NAMEMAX], node_ip[NAMEMAX], node_location[NAMEMAX]; 
133    node_key *node; 
134    node_value *value; 
135
136    /* initialize hashmap */
137    nodes_map = mk_hmap(node_hash, node_eq); 
138
139    nodes_file = fopen(nodes_filename, "rt"); 
140    if (nodes_file == NULL) { 
141        fprintf(stderr, "Could not open nodes file %s \n", nodes_filename); 
142        MPI_Abort(MPI_COMM_WORLD, 7777);
143    }
144
145    while (fgets(buffer, LINEMAX, nodes_file) != NULL) { 
146        /* skip comments */
147        if (buffer[0] == '#') 
148            continue; 
149       
150        node = malloc(sizeof(node_key)); 
151        if (node == NULL) {
152            fprintf(stderr, "Not enough memory \n"); 
153            MPI_Abort(MPI_COMM_WORLD, 7777); 
154        }
155       
156        sscanf(buffer, "%u %u %s %s %s", &node->node_id, &node->platform_id, node_name, node_ip, node_location);
157        value = malloc(sizeof(node_value) + strlen(node_location) + 1); 
158        if (value == NULL) { 
159            fprintf(stderr, "Not enough memory \n"); 
160            MPI_Abort(MPI_COMM_WORLD, 7777); 
161        }
162
163        strcpy(value->location, node_location); 
164        if (hmap_add(nodes_map, node, value) == false) {
165            fprintf(stderr, "Hashmap out of memory \n"); 
166            MPI_Abort(MPI_COMM_WORLD, 7777); 
167        } 
168    }
169
170    fclose(nodes_file); 
171
172    /*
173    node_key k;
174    k.node_id = 160;
175    k.platform_id = 3;
176    value = hmap_get(nodes_map, &k);
177    if (value == NULL) {
178        printf("Error \n");
179    } else {
180        printf("Search result: %s \n", value->location);
181    }
182    */
183}
184
185int init(char *filename) {
186    unsigned long file_len = 0;
187    unsigned long i, j;
188    FILE *f;
189
190    f = fopen(filename, "rt");
191    if (f == NULL) {
192        fprintf(stderr, "Could not open file init.\n");
193        MPI_Abort(MPI_COMM_WORLD, 7777);
194    }
195    file_len = file_length(f);
196    poz_size = file_len / CHUNK + 1;
197
198    poz = (unsigned long*) malloc(poz_size * sizeof(unsigned long));
199    poz[0] = 0;
200    for (i = CHUNK, j = 1; j < poz_size - 1; i += CHUNK, j++) {
201        poz[j] = i;
202    }
203    poz[j] = file_len;
204
205    return 1;
206}
207
208int main(int argc, char** argv) {
209
210    int rank, size;
211    MPI_Status status;
212    MPI_Request req;
213    int i, flag;
214    void *buffer = malloc(sizeof(resp_payload));
215    resp_payload *resp; 
216    query_payload *query; 
217
218    MPI_Init(&argc, &argv);
219    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
220    MPI_Comm_size(MPI_COMM_WORLD, &size);
221
222    if (rank == 0) {
223        // master inits
224        init(argv[1]);
225        load_nodes_info(argv[2]); 
226    }
227
228    MPI_Barrier(MPI_COMM_WORLD);
229
230    // master sends indexes, receives results
231    if(rank == 0) {
232
233        char null_location = 0;
234        for(i = 0 ; i < ENCODED_NO ; i++) {
235            recv_m[i] = 0;
236            send_m[i] = 0;
237        }
238
239        // sends indexes
240        for (i = 0; i < poz_size - 1; i++) {
241            MPI_Isend(poz+i, 2, MPI_UNSIGNED_LONG, i%(size-1)+1, JOB_TAG, MPI_COMM_WORLD, &req);
242        }
243
244        // receive results
245        unsigned long recv_temp[ENCODED_NO];
246        while(1) {
247           
248            for(i = 0 ; i < ENCODED_NO ; i++)
249                recv_temp[i] = 0;
250           
251            MPI_Irecv(buffer, ENCODED_NO, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &req);
252           
253            MPI_Test(&req, &flag, &status);
254            int times = 0;
255            while(!flag && times < 10) {
256                times++;
257                sleep(1);
258                MPI_Test(&req, &flag, &status);
259            }
260            if(!flag)
261                break;
262           
263            if (status.MPI_TAG == RESPONSE_TAG) { 
264                for(i = 0 ; i < ENCODED_NO ; i++) {
265                    recv_m[i] += recv_temp[i];
266                }
267                printf("Response received \n"); 
268
269            } else if (status.MPI_TAG == QUERY_TAG) { 
270                query = (query_payload *)buffer;
271                node_value *value = hmap_get(nodes_map, query); 
272                if (value != NULL) { 
273                    MPI_Send(value->location, NAMEMAX, MPI_CHAR, status.MPI_SOURCE, QUERY_TAG, MPI_COMM_WORLD); 
274                } else {
275                    MPI_Send(&null_location, 1, MPI_CHAR, status.MPI_SOURCE, QUERY_TAG, MPI_COMM_WORLD); 
276                }
277            }
278        }
279
280        // write results
281        write_result(argv[3]);
282        free(poz);
283
284        free_hmap(nodes_map); 
285    }
286
287    // slaves receive indexes, do computation
288    else {
289
290        locations = mk_hmap(loc_hash, loc_eq); 
291        nodes_cache = mk_hmap(node_hash, node_eq); 
292        while(1) {
293           
294            // receive until there's nothing left
295            MPI_Irecv(recv, 2, MPI_UNSIGNED_LONG, 0, JOB_TAG, MPI_COMM_WORLD, &req);
296           
297            MPI_Test(&req, &flag, &status);
298            int times = 0;
299            while(!flag && times < 10) {
300                times++;
301                sleep(1);
302                MPI_Test(&req, &flag, &status);
303            }
304            if(!flag)
305                break;
306
307            // read file and do computation
308            FILE *f;
309
310            f = fopen(argv[1], "rt");
311            if (f == NULL) {
312                fprintf(stderr, "Could not open file.\n");
313                MPI_Abort(MPI_COMM_WORLD, 7777);
314            }
315           
316            fseek (f, recv[0], SEEK_SET);
317
318            // go to the end of the line
319            char * line = NULL;
320            size_t len = 0;
321            ssize_t read;
322            unsigned long count = 0;
323
324            for(i = 0 ; i < ENCODED_NO ; i++) {
325                send_m[i] = 0;
326                recv_m[i] = 0;
327            }
328
329            while ((read = getline(&line, &len, f)) != -1) {
330
331                // skip the comment line and the half lines
332                count++;
333                if(count == 1)
334                    continue;
335
336                // compute
337                //char response;
338
339                node_key node; 
340                char delims[] = " ";
341                char *result = NULL;
342                result = strtok( line, delims );
343                int token_no = 0;
344                while( result != NULL ) {
345                    token_no++;
346                    if(token_no == 2) {
347                        node.node_id = atoi(result);
348                    }
349                    if(token_no == 3) {
350                        node.platform_id = atof(result);
351                    }
352                    result = strtok( NULL, delims );
353                }
354               
355                node_value *value = hmap_get(nodes_cache, &node); 
356               
357                if (value == NULL) { 
358                    char location[NAMEMAX]; 
359                    MPI_Send(&node, sizeof(node_key), MPI_CHAR, 0, QUERY_TAG, MPI_COMM_WORLD); 
360                    MPI_Recv(location, NAMEMAX, MPI_CHAR, 0, QUERY_TAG, MPI_COMM_WORLD, &status); 
361                    value = malloc(sizeof(node_value) * strlen(location) + 1); 
362                    node_key *new_node = malloc(sizeof(node_key)); 
363                    new_node->node_id = node.node_id; 
364                    new_node->platform_id = node.platform_id; 
365                    strcpy(value->location, location); 
366                    hmap_add(nodes_cache, new_node, value); 
367                    if (strlen(location) > 0) { 
368                        /* add to the locations set */
369                        loc_key *loc = malloc(sizeof(loc_key)); 
370                        loc->name = value->location; 
371                        loc_value *locval = malloc(sizeof(loc_value)); 
372                        locval->failures = 1; 
373                        hmap_add(locations, loc, locval); 
374                        printf("Location is %s \n", location); 
375                    }
376                } else {
377                    printf("Found in local cache %s \n", value->location); 
378                    if (strlen(value->location) > 0) { 
379                        loc_key *loc = malloc(sizeof(loc_key)); 
380                        loc->name = value->location; 
381                        loc_value *loc_data = hmap_get(locations, loc); 
382                        if (loc_data != NULL) {
383                            loc_data->failures++; 
384                        }
385                    }
386                }
387                // finished reading the chunk?
388                if(ftell(f) > recv[1])
389                    break;
390               
391            }
392
393            // send result to master
394            MPI_Isend(send_m, ENCODED_NO, MPI_UNSIGNED_LONG, 0, RESPONSE_TAG, MPI_COMM_WORLD, &req);
395        }
396    }
397
398    // block and finish
399    MPI_Barrier(MPI_COMM_WORLD);
400
401   
402    MPI_Finalize();
403    return 0; 
404    } 
405
Note: See TracBrowser for help on using the repository browser.