#include "Master.h" #include "CommChannelProvider.h" #include #include #include Master::Master(Environment *bullet, int nprocs) : m_nprocs(nprocs), m_bullet(bullet), m_running(false) { } Master::~Master() { } /** Wait for "Hello" - messages from all agents which joined the world */ void Master::discoverAgents() { MPI_Status status; int i, j; int id; for(i = 0; i < WIDTH; i++) { for(j = 0; j < HEIGHT; j++) { MPI_Recv(&id, 1, MPI_INT, MPI_ANY_SOURCE, HELLO_MSG, MPI_COMM_WORLD, &status); //printf("Process %d joined the virtual world at %d,%d\n", id, i, j); positionID.insert(make_pair(id, make_pair(i, j))); } } } /** Return the process id associated with a given position in mesh */ int Master::getID(int i, int j) { for(int k = 1; k < m_nprocs; k++) { pair pos = positionID[k]; if(i == pos.first && j == pos.second) return k; } return -1; } void Master::sendHello(int id) { vector currentList; int i; /// Pack all the sensors/actuators of a certain id for(i = 0; i < (int) xList.size(); i++) { XProvider *provider = xList[i]; MPIChannel *channel = (MPIChannel *) provider->getChannel(); int dest = channel->getDest(); if(dest == id || (!strcmp(typeid(*provider).name(), "19CommChannelProvider") && ((CommChannelProvider *) provider)->getPeer() == id)) { channelData data; provider->packData(&data, id); data.tag = channel->getTag(); currentList.push_back(data); } } /// Send to a certain id process the init data for running the agent int size = (int) currentList.size(); int intSize, floatSize; MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &intSize); MPI_Pack_size(1, MPI_FLOAT, MPI_COMM_WORLD, &floatSize); int bufSize = size * (4 * intSize + 3 * floatSize); char *buffer = (char*) malloc(bufSize); int position = 0; for(i = 0; i < size; i++) { channelData *tmp = ¤tList[i]; MPI_Pack(&tmp->tag, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); MPI_Pack(&tmp->type, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); MPI_Pack(&tmp->orient, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); MPI_Pack(&tmp->dest, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); MPI_Pack(&tmp->minAngle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD); MPI_Pack(&tmp->maxAngle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD); MPI_Pack(&tmp->angle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD); } MPI_Send(&size, 1, MPI_INT, id, CHANNEL_MSG, MPI_COMM_WORLD); MPI_Send(buffer, position, MPI_PACKED, id, CHANNEL_MSG, MPI_COMM_WORLD); } /** Send to all workers their channels parameters */ int Master::sendAll() { int i; for (i = 1; i < m_nprocs; i++) { sendHello(i); } for (i = 1; i < m_nprocs; i++) { MPI_Send(&i, 1, MPI_INT, i, CHANNEL_MSG, MPI_COMM_WORLD); } return 1; } void Master::createProviders() { int i, j, tag = FIRST_CHANNEL_TAG; for(i = 0; i < WIDTH; i++) { for(j = 0; j < HEIGHT; j++) { int idx; if(i < WIDTH - 1) { idx = m_bullet->getMatrix()[i * HEIGHT + j][RIGHT]; //printf("RIGHT: (%d, %d) - %d\n", i, j, right); btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; int id = getID(i, j); Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); SensorProvider *s = new SensorProvider(channel, joint, RIGHT); xList.push_back(s); ActuatorProvider *a = new ActuatorProvider(channel, joint, RIGHT); xList.push_back(a); tag++; } if(i > 0) { idx = m_bullet->getMatrix()[i * HEIGHT + j][LEFT]; //printf("LEFT: (%d, %d) - %d\n", i, j, left); btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; int id = getID(i, j); Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); SensorProvider *s = new SensorProvider(channel, joint, LEFT); xList.push_back(s); ActuatorProvider *a = new ActuatorProvider(channel, joint, LEFT); xList.push_back(a); tag++; Channel *comm_channel = new MPIChannel(id, tag, MPI_COMM_WORLD); CommChannelProvider *c = new CommChannelProvider(comm_channel, id, getID(i-1, j), LEFT); // printf("creating channel: %d - %d\n", id, getID(i-1, j)); xList.push_back(c); tag++; } if(j < HEIGHT - 1) { idx = m_bullet->getMatrix()[i * HEIGHT + j][DOWN]; //printf("DOWN: (%d, %d) - %d\n", i, j, down); btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; int id = getID(i, j); Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); SensorProvider *s = new SensorProvider(channel, joint, DOWN); xList.push_back(s); ActuatorProvider *a = new ActuatorProvider(channel, joint, DOWN); xList.push_back(a); tag++; } if(j > 0) { idx = m_bullet->getMatrix()[i * HEIGHT + j][UP]; //printf("UP: (%d, %d) - %d\n", i, j, up); btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; int id = getID(i, j); Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); SensorProvider *s = new SensorProvider(channel, joint, UP); xList.push_back(s); ActuatorProvider *a = new ActuatorProvider(channel, joint, UP); xList.push_back(a); tag++; Channel *comm_channel = new MPIChannel(id, tag, MPI_COMM_WORLD); CommChannelProvider *c = new CommChannelProvider(comm_channel, id, getID(i, j-1), UP); // printf("creating channel: %d - %d\n", id, getID(i, j-1)); xList.push_back(c); tag++; } } } } void Master::run() { int i, dummy, stop = 0; MPI_Status status; m_running = true; while (!stop) { for (i = 0; i < (int) xList.size(); i++) { XProvider *provider = xList[i]; provider->updatePre(); } if (!m_running) { stop = 1; } for (i = 1; i < m_nprocs; i++) { MPI_Send(&stop, 1, MPI_INT, i, SYNC_MSG, MPI_COMM_WORLD); } for (i = 1; i < m_nprocs; i++) { MPI_Recv(&dummy, 1, MPI_INT, MPI_ANY_SOURCE, SYNC_MSG, MPI_COMM_WORLD, &status); } for (i = 0; i < (int) xList.size(); i++) { XProvider *provider = xList[i]; provider->updatePost(); } } } void Master::stop() { while (!m_running) ; m_running = false; }