// Frequent (>1000) failure cause for different categories of events grouped by duration (short, medium, long) #include #include #include #include #include "mpi.h" #include "hashmap_public.h" #define LINEMAX 512 #define NAMEMAX 32 #define CHUNK 20480 #define ENCODED_NO 33 #define MAX_UNITS 10 #define JOB_TAG 0 #define RESPONSE_TAG 1 #define QUERY_TAG 2 typedef struct location_unit { char location[NAMEMAX]; unsigned long failures; } location_unit; typedef struct resp_payload { int unsigned count; location_unit units[MAX_UNITS]; } resp_payload; typedef struct node_key { int unsigned node_id; int unsigned platform_id; } node_key; typedef struct query_payload { node_key key; } query_payload; typedef struct node_value { char location[0]; } node_value; uint32_t node_hash(key generic_key) { node_key *k = (node_key *) generic_key; return (k->node_id * k->platform_id); } bool node_eq(key a, key b) { node_key *na = (node_key *)a; node_key *nb = (node_key *)b; return (na->node_id == nb->node_id) && (na->platform_id == nb->platform_id); } typedef struct loc_key { char *name; } loc_key; typedef struct loc_value { int unsigned failures; } loc_value; uint32_t loc_hash(key generic_key) { loc_key *k = (loc_key *) generic_key; return (uint32_t)k->name[0]; } bool loc_eq(key a, key b) { loc_key *la = (loc_key *) a; loc_key *lb = (loc_key *) b; return strcmp(la->name, lb->name) == 0; } /* for master */ hashmap *nodes_map; /* for slaves: */ hashmap *locations; hashmap *nodes_cache; unsigned long* poz; unsigned long recv[2]; unsigned long poz_size = 0; unsigned long recv_m[ENCODED_NO]; unsigned long send_m[ENCODED_NO]; int failure_key[] = {-1, 0, 999, 1999, 2999, 3999, 4999, 5999, 6999, 7000, 7001}; char* failure_value[] = {"not reported", "reported as undetermined", "infrastructure", "hardware", "IO", "network", "software", "human error", "user", "end of measurement", "TYPING"}; long duration_key[] = {1000, 100000, 2147483647}; char* duration_value[] = {"short", "medium", "long"}; char* encoded_values[] = {"short-not reported", "medium-not reported", "long-not reported", "short-reported as undetermined", "medium-reported as undetermined", "long-reported as undetermined", "short-infrastructure", "medium-infrastructure", "long-infrastructure", "short-hardware", "medium-hardware", "long-hardware", "short-IO", "medium-IO", "long-IO", "short-network", "medium-network", "long-network", "short-software", "medium-software", "long-software", "short-human error", "medium-human error", "long-human error", "short-user", "medium-user", "long-user", "short-end of measurement", "medium-end of measurement", "long-end of measurement", "short-TYPING", "medium-TYPING", "long-TYPING"}; unsigned long file_length(FILE *f) { long pos; long end; pos = ftell (f); fseek (f, 0, SEEK_END); end = ftell (f); fseek (f, pos, SEEK_SET); return end; } void write_result (char *filename) { FILE *f; int i; f = fopen(filename, "wt"); if (f == NULL) { fprintf(stderr, "Could not open file %s for write\n", filename); MPI_Abort(MPI_COMM_WORLD, 7777); } for(i = 0 ; i < ENCODED_NO ; i++) { fprintf(f, "%-40s", encoded_values[i]); fprintf(f, "%lu\n", recv_m[i]); } } void load_nodes_info(char *nodes_filename) { FILE *nodes_file; char buffer[LINEMAX]; char node_name[NAMEMAX], node_ip[NAMEMAX], node_location[NAMEMAX]; node_key *node; node_value *value; /* initialize hashmap */ nodes_map = mk_hmap(node_hash, node_eq); nodes_file = fopen(nodes_filename, "rt"); if (nodes_file == NULL) { fprintf(stderr, "Could not open nodes file %s \n", nodes_filename); MPI_Abort(MPI_COMM_WORLD, 7777); } while (fgets(buffer, LINEMAX, nodes_file) != NULL) { /* skip comments */ if (buffer[0] == '#') continue; node = malloc(sizeof(node_key)); if (node == NULL) { fprintf(stderr, "Not enough memory \n"); MPI_Abort(MPI_COMM_WORLD, 7777); } sscanf(buffer, "%u %u %s %s %s", &node->node_id, &node->platform_id, node_name, node_ip, node_location); value = malloc(sizeof(node_value) + strlen(node_location) + 1); if (value == NULL) { fprintf(stderr, "Not enough memory \n"); MPI_Abort(MPI_COMM_WORLD, 7777); } strcpy(value->location, node_location); if (hmap_add(nodes_map, node, value) == false) { fprintf(stderr, "Hashmap out of memory \n"); MPI_Abort(MPI_COMM_WORLD, 7777); } } fclose(nodes_file); /* node_key k; k.node_id = 160; k.platform_id = 3; value = hmap_get(nodes_map, &k); if (value == NULL) { printf("Error \n"); } else { printf("Search result: %s \n", value->location); } */ } int init(char *filename) { unsigned long file_len = 0; unsigned long i, j; FILE *f; f = fopen(filename, "rt"); if (f == NULL) { fprintf(stderr, "Could not open file init.\n"); MPI_Abort(MPI_COMM_WORLD, 7777); } file_len = file_length(f); poz_size = file_len / CHUNK + 1; poz = (unsigned long*) malloc(poz_size * sizeof(unsigned long)); poz[0] = 0; for (i = CHUNK, j = 1; j < poz_size - 1; i += CHUNK, j++) { poz[j] = i; } poz[j] = file_len; return 1; } int main(int argc, char** argv) { int rank, size; MPI_Status status; MPI_Request req; int i, flag; void *buffer = malloc(sizeof(resp_payload)); resp_payload *resp; query_payload *query; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); if (rank == 0) { // master inits init(argv[1]); load_nodes_info(argv[2]); } MPI_Barrier(MPI_COMM_WORLD); // master sends indexes, receives results if(rank == 0) { char null_location = 0; for(i = 0 ; i < ENCODED_NO ; i++) { recv_m[i] = 0; send_m[i] = 0; } // sends indexes for (i = 0; i < poz_size - 1; i++) { MPI_Isend(poz+i, 2, MPI_UNSIGNED_LONG, i%(size-1)+1, JOB_TAG, MPI_COMM_WORLD, &req); } // receive results unsigned long recv_temp[ENCODED_NO]; while(1) { for(i = 0 ; i < ENCODED_NO ; i++) recv_temp[i] = 0; MPI_Irecv(buffer, ENCODED_NO, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &req); MPI_Test(&req, &flag, &status); int times = 0; while(!flag && times < 10) { times++; sleep(1); MPI_Test(&req, &flag, &status); } if(!flag) break; if (status.MPI_TAG == RESPONSE_TAG) { for(i = 0 ; i < ENCODED_NO ; i++) { recv_m[i] += recv_temp[i]; } printf("Response received \n"); } else if (status.MPI_TAG == QUERY_TAG) { query = (query_payload *)buffer; node_value *value = hmap_get(nodes_map, query); if (value != NULL) { MPI_Send(value->location, NAMEMAX, MPI_CHAR, status.MPI_SOURCE, QUERY_TAG, MPI_COMM_WORLD); } else { MPI_Send(&null_location, 1, MPI_CHAR, status.MPI_SOURCE, QUERY_TAG, MPI_COMM_WORLD); } } } // write results write_result(argv[3]); free(poz); free_hmap(nodes_map); } // slaves receive indexes, do computation else { locations = mk_hmap(loc_hash, loc_eq); nodes_cache = mk_hmap(node_hash, node_eq); while(1) { // receive until there's nothing left MPI_Irecv(recv, 2, MPI_UNSIGNED_LONG, 0, JOB_TAG, MPI_COMM_WORLD, &req); MPI_Test(&req, &flag, &status); int times = 0; while(!flag && times < 10) { times++; sleep(1); MPI_Test(&req, &flag, &status); } if(!flag) break; // read file and do computation FILE *f; f = fopen(argv[1], "rt"); if (f == NULL) { fprintf(stderr, "Could not open file.\n"); MPI_Abort(MPI_COMM_WORLD, 7777); } fseek (f, recv[0], SEEK_SET); // go to the end of the line char * line = NULL; size_t len = 0; ssize_t read; unsigned long count = 0; for(i = 0 ; i < ENCODED_NO ; i++) { send_m[i] = 0; recv_m[i] = 0; } while ((read = getline(&line, &len, f)) != -1) { // skip the comment line and the half lines count++; if(count == 1) continue; // compute //char response; node_key node; char delims[] = " "; char *result = NULL; result = strtok( line, delims ); int token_no = 0; while( result != NULL ) { token_no++; if(token_no == 2) { node.node_id = atoi(result); } if(token_no == 3) { node.platform_id = atof(result); } result = strtok( NULL, delims ); } node_value *value = hmap_get(nodes_cache, &node); if (value == NULL) { char location[NAMEMAX]; MPI_Send(&node, sizeof(node_key), MPI_CHAR, 0, QUERY_TAG, MPI_COMM_WORLD); MPI_Recv(location, NAMEMAX, MPI_CHAR, 0, QUERY_TAG, MPI_COMM_WORLD, &status); value = malloc(sizeof(node_value) * strlen(location) + 1); node_key *new_node = malloc(sizeof(node_key)); new_node->node_id = node.node_id; new_node->platform_id = node.platform_id; strcpy(value->location, location); hmap_add(nodes_cache, new_node, value); if (strlen(location) > 0) { /* add to the locations set */ loc_key *loc = malloc(sizeof(loc_key)); loc->name = value->location; loc_value *locval = malloc(sizeof(loc_value)); locval->failures = 1; hmap_add(locations, loc, locval); printf("Location is %s \n", location); } } else { printf("Found in local cache %s \n", value->location); if (strlen(value->location) > 0) { loc_key *loc = malloc(sizeof(loc_key)); loc->name = value->location; loc_value *loc_data = hmap_get(locations, loc); if (loc_data != NULL) { loc_data->failures++; } } } // finished reading the chunk? if(ftell(f) > recv[1]) break; } // send result to master MPI_Isend(send_m, ENCODED_NO, MPI_UNSIGNED_LONG, 0, RESPONSE_TAG, MPI_COMM_WORLD, &req); } } // block and finish MPI_Barrier(MPI_COMM_WORLD); MPI_Finalize(); return 0; }