// Frequent (>1000) failure cause for different categories of events grouped by duration (short, medium, long) #include #include #include #include "mpi.h" #define CHUNK 20480 #define ENCODED_NO 33 unsigned long* poz; unsigned long recv[2]; unsigned long poz_size = 0; unsigned long recv_m[ENCODED_NO]; unsigned long send_m[ENCODED_NO]; int failure_key[] = {-1, 0, 999, 1999, 2999, 3999, 4999, 5999, 6999, 7000, 7001}; char* failure_value[] = {"not reported", "reported as undetermined", "infrastructure", "hardware", "IO", "network", "software", "human error", "user", "end of measurement", "TYPING"}; long duration_key[] = {1000, 100000, 2147483647}; char* duration_value[] = {"short", "medium", "long"}; char* encoded_values[] = {"short-not reported", "medium-not reported", "long-not reported", "short-reported as undetermined", "medium-reported as undetermined", "long-reported as undetermined", "short-infrastructure", "medium-infrastructure", "long-infrastructure", "short-hardware", "medium-hardware", "long-hardware", "short-IO", "medium-IO", "long-IO", "short-network", "medium-network", "long-network", "short-software", "medium-software", "long-software", "short-human error", "medium-human error", "long-human error", "short-user", "medium-user", "long-user", "short-end of measurement", "medium-end of measurement", "long-end of measurement", "short-TYPING", "medium-TYPING", "long-TYPING"}; unsigned long file_length(FILE *f) { long pos; long end; pos = ftell (f); fseek (f, 0, SEEK_END); end = ftell (f); fseek (f, pos, SEEK_SET); return end; } void write_result (char *filename) { FILE *f; int i; f = fopen(filename, "wt"); if (f == NULL) { fprintf(stderr, "Could not open file write.\n"); MPI_Abort(MPI_COMM_WORLD, 7777); } for(i = 0 ; i < ENCODED_NO ; i++) { fprintf(f, "%-40s", encoded_values[i]); fprintf(f, "%i\n", recv_m[i]); } } int init(char *filename) { unsigned long file_len = 0; unsigned long i, j; FILE *f; size_t read = 0; f = fopen(filename, "rt"); if (f == NULL) { fprintf(stderr, "Could not open file init.\n"); MPI_Abort(MPI_COMM_WORLD, 7777); } file_len = file_length(f); poz_size = file_len / CHUNK + 1; poz = (unsigned long*) malloc(poz_size * sizeof(unsigned long)); poz[0] = 0; for (i = CHUNK, j = 1; j < poz_size - 1; i += CHUNK, j++) { poz[j] = i; } poz[j] = file_len; return 1; } int main(int argc, char** argv) { int rank, size; MPI_Status status; MPI_Request request; int i, flag, j; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); if (rank == 0) { // master inits init(argv[1]); } MPI_Barrier(MPI_COMM_WORLD); // master sends indexes, receives results if(rank == 0) { for(i = 0 ; i < ENCODED_NO ; i++) { recv_m[i] = 0; send_m[i] = 0; } // sends indexes for (i = 0; i < poz_size - 1; i++) { MPI_Isend(poz+i, 2, MPI_UNSIGNED_LONG, i%(size-1)+1, 0, MPI_COMM_WORLD, &request); } // receive results unsigned long recv_temp[ENCODED_NO]; while(1) { for(i = 0 ; i < ENCODED_NO ; i++) recv_temp[i] = 0; MPI_Irecv(recv_temp, ENCODED_NO, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &request); MPI_Test(&request, &flag, &status); int times = 0; while(!flag && times < 10) { times++; sleep(1); MPI_Test(&request, &flag, &status); } if(!flag) break; for(i = 0 ; i < ENCODED_NO ; i++) { recv_m[i] += recv_temp[i]; } } // write results write_result(argv[2]); free(poz); } // slaves receive indexes, do computation else { while(1) { // receive until there's nothing left MPI_Irecv(recv, 2, MPI_UNSIGNED_LONG, 0, 0, MPI_COMM_WORLD, &request); MPI_Test(&request, &flag, &status); int times = 0; while(!flag && times < 10) { times++; sleep(1); MPI_Test(&request, &flag, &status); } if(!flag) break; // read file and do computation FILE *f; f = fopen(argv[1], "rt"); if (f == NULL) { fprintf(stderr, "Could not open file.\n"); MPI_Abort(MPI_COMM_WORLD, 7777); } fseek (f, recv[0], SEEK_SET); // go to the end of the line char * line = NULL; size_t len = 0; ssize_t read; unsigned long count = 0; for(i = 0 ; i < ENCODED_NO ; i++) { send_m[i] = 0; recv_m[i] = 0; } while ((read = getline(&line, &len, f)) != -1) { // skip the comment line and the half lines count++; if(count == 1) continue; // compute char response; double start, end, duration; int fault = -2; char delims[] = " "; char *result = NULL; result = strtok( line, delims ); int token_no = 0; while( result != NULL ) { token_no++; if(token_no == 7) { start = atof(result); } if(token_no == 8) { end = atof(result); } if(token_no == 9) { if(strncmp(result, "NULL", 4) == 0) fault = -1; else fault = atoi(result); } result = strtok( NULL, delims ); } int tag = 0; if(fault > failure_key[10]) { // detect typing fault fault = 7001; } for(i = 0 ; i < 11 ; i++) { if(fault <= failure_key[i]) { tag = i * 3; break; } } duration = end - start; for(i = 0 ; i < 3 ; i++) { if(duration <= duration_key[i]) { tag += i; break; } } send_m[tag]++; // finished reading chunk? if(ftell(f) > recv[1]) break; } // send result to master MPI_Isend(send_m, ENCODED_NO, MPI_UNSIGNED_LONG, 0, 0, MPI_COMM_WORLD, &request); } } // block and finish MPI_Barrier(MPI_COMM_WORLD); MPI_Finalize(); }