[167] | 1 | #include "Master.h" |
---|
| 2 | #include "CommChannelProvider.h" |
---|
| 3 | #include <stdio.h> |
---|
| 4 | #include <string.h> |
---|
| 5 | #include <typeinfo> |
---|
| 6 | |
---|
| 7 | Master::Master(Environment *bullet, int nprocs) |
---|
| 8 | : m_nprocs(nprocs), |
---|
| 9 | m_bullet(bullet), |
---|
| 10 | m_running(false) |
---|
| 11 | { |
---|
| 12 | } |
---|
| 13 | |
---|
| 14 | Master::~Master() |
---|
| 15 | { |
---|
| 16 | } |
---|
| 17 | |
---|
| 18 | /** Wait for "Hello" - messages from all agents which joined the world */ |
---|
| 19 | void Master::discoverAgents() |
---|
| 20 | { |
---|
| 21 | MPI_Status status; |
---|
| 22 | int i, j; |
---|
| 23 | int id; |
---|
| 24 | for(i = 0; i < WIDTH; i++) |
---|
| 25 | { |
---|
| 26 | for(j = 0; j < HEIGHT; j++) |
---|
| 27 | { |
---|
| 28 | MPI_Recv(&id, 1, MPI_INT, MPI_ANY_SOURCE, HELLO_MSG, MPI_COMM_WORLD, &status); |
---|
| 29 | //printf("Process %d joined the virtual world at %d,%d\n", id, i, j); |
---|
| 30 | positionID.insert(make_pair(id, make_pair(i, j))); |
---|
| 31 | } |
---|
| 32 | } |
---|
| 33 | } |
---|
| 34 | |
---|
| 35 | /** Return the process id associated with a given position in mesh */ |
---|
| 36 | int Master::getID(int i, int j) |
---|
| 37 | { |
---|
| 38 | for(int k = 1; k < m_nprocs; k++) |
---|
| 39 | { |
---|
| 40 | pair<int, int> pos = positionID[k]; |
---|
| 41 | if(i == pos.first && j == pos.second) |
---|
| 42 | return k; |
---|
| 43 | } |
---|
| 44 | |
---|
| 45 | return -1; |
---|
| 46 | } |
---|
| 47 | |
---|
| 48 | void Master::sendHello(int id) |
---|
| 49 | { |
---|
| 50 | vector<channelData> currentList; |
---|
| 51 | int i; |
---|
| 52 | |
---|
| 53 | /// Pack all the sensors/actuators of a certain id |
---|
| 54 | for(i = 0; i < (int) xList.size(); i++) |
---|
| 55 | { |
---|
| 56 | XProvider *provider = xList[i]; |
---|
| 57 | MPIChannel *channel = (MPIChannel *) provider->getChannel(); |
---|
| 58 | int dest = channel->getDest(); |
---|
| 59 | if(dest == id || (!strcmp(typeid(*provider).name(), "19CommChannelProvider") && ((CommChannelProvider *) provider)->getPeer() == id)) { |
---|
| 60 | channelData data; |
---|
| 61 | provider->packData(&data, id); |
---|
| 62 | data.tag = channel->getTag(); |
---|
| 63 | currentList.push_back(data); |
---|
| 64 | } |
---|
| 65 | } |
---|
| 66 | |
---|
| 67 | /// Send to a certain id process the init data for running the agent |
---|
| 68 | |
---|
| 69 | int size = (int) currentList.size(); |
---|
| 70 | int intSize, floatSize; |
---|
| 71 | MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &intSize); |
---|
| 72 | MPI_Pack_size(1, MPI_FLOAT, MPI_COMM_WORLD, &floatSize); |
---|
| 73 | |
---|
| 74 | int bufSize = size * (4 * intSize + 3 * floatSize); |
---|
| 75 | char *buffer = (char*) malloc(bufSize); |
---|
| 76 | int position = 0; |
---|
| 77 | |
---|
| 78 | for(i = 0; i < size; i++) |
---|
| 79 | { |
---|
| 80 | channelData *tmp = ¤tList[i]; |
---|
| 81 | |
---|
| 82 | MPI_Pack(&tmp->tag, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 83 | MPI_Pack(&tmp->type, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 84 | MPI_Pack(&tmp->orient, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 85 | MPI_Pack(&tmp->dest, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 86 | MPI_Pack(&tmp->minAngle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 87 | MPI_Pack(&tmp->maxAngle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 88 | MPI_Pack(&tmp->angle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD); |
---|
| 89 | } |
---|
| 90 | |
---|
| 91 | MPI_Send(&size, 1, MPI_INT, id, CHANNEL_MSG, MPI_COMM_WORLD); |
---|
| 92 | MPI_Send(buffer, position, MPI_PACKED, id, CHANNEL_MSG, MPI_COMM_WORLD); |
---|
| 93 | } |
---|
| 94 | |
---|
| 95 | /** Send to all workers their channels parameters */ |
---|
| 96 | int Master::sendAll() |
---|
| 97 | { |
---|
| 98 | int i; |
---|
| 99 | |
---|
| 100 | for (i = 1; i < m_nprocs; i++) { |
---|
| 101 | sendHello(i); |
---|
| 102 | } |
---|
| 103 | |
---|
| 104 | for (i = 1; i < m_nprocs; i++) { |
---|
| 105 | MPI_Send(&i, 1, MPI_INT, i, CHANNEL_MSG, MPI_COMM_WORLD); |
---|
| 106 | } |
---|
| 107 | |
---|
| 108 | return 1; |
---|
| 109 | } |
---|
| 110 | |
---|
| 111 | void Master::createProviders() |
---|
| 112 | { |
---|
| 113 | int i, j, tag = FIRST_CHANNEL_TAG; |
---|
| 114 | |
---|
| 115 | for(i = 0; i < WIDTH; i++) { |
---|
| 116 | for(j = 0; j < HEIGHT; j++) { |
---|
| 117 | int idx; |
---|
| 118 | |
---|
| 119 | if(i < WIDTH - 1) { |
---|
| 120 | idx = m_bullet->getMatrix()[i * HEIGHT + j][RIGHT]; |
---|
| 121 | //printf("RIGHT: (%d, %d) - %d\n", i, j, right); |
---|
| 122 | btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; |
---|
| 123 | int id = getID(i, j); |
---|
| 124 | Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); |
---|
| 125 | SensorProvider *s = new SensorProvider(channel, joint, RIGHT); |
---|
| 126 | xList.push_back(s); |
---|
| 127 | ActuatorProvider *a = new ActuatorProvider(channel, joint, RIGHT); |
---|
| 128 | xList.push_back(a); |
---|
| 129 | tag++; |
---|
| 130 | } |
---|
| 131 | |
---|
| 132 | if(i > 0) { |
---|
| 133 | idx = m_bullet->getMatrix()[i * HEIGHT + j][LEFT]; |
---|
| 134 | //printf("LEFT: (%d, %d) - %d\n", i, j, left); |
---|
| 135 | btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; |
---|
| 136 | int id = getID(i, j); |
---|
| 137 | Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); |
---|
| 138 | SensorProvider *s = new SensorProvider(channel, joint, LEFT); |
---|
| 139 | xList.push_back(s); |
---|
| 140 | ActuatorProvider *a = new ActuatorProvider(channel, joint, LEFT); |
---|
| 141 | xList.push_back(a); |
---|
| 142 | tag++; |
---|
| 143 | Channel *comm_channel = new MPIChannel(id, tag, MPI_COMM_WORLD); |
---|
| 144 | CommChannelProvider *c = new CommChannelProvider(comm_channel, id, getID(i-1, j), LEFT); |
---|
| 145 | // printf("creating channel: %d - %d\n", id, getID(i-1, j)); |
---|
| 146 | xList.push_back(c); |
---|
| 147 | tag++; |
---|
| 148 | } |
---|
| 149 | |
---|
| 150 | if(j < HEIGHT - 1) { |
---|
| 151 | idx = m_bullet->getMatrix()[i * HEIGHT + j][DOWN]; |
---|
| 152 | //printf("DOWN: (%d, %d) - %d\n", i, j, down); |
---|
| 153 | btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; |
---|
| 154 | int id = getID(i, j); |
---|
| 155 | Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); |
---|
| 156 | SensorProvider *s = new SensorProvider(channel, joint, DOWN); |
---|
| 157 | xList.push_back(s); |
---|
| 158 | ActuatorProvider *a = new ActuatorProvider(channel, joint, DOWN); |
---|
| 159 | xList.push_back(a); |
---|
| 160 | tag++; |
---|
| 161 | } |
---|
| 162 | |
---|
| 163 | if(j > 0) { |
---|
| 164 | idx = m_bullet->getMatrix()[i * HEIGHT + j][UP]; |
---|
| 165 | //printf("UP: (%d, %d) - %d\n", i, j, up); |
---|
| 166 | btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx]; |
---|
| 167 | int id = getID(i, j); |
---|
| 168 | Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD); |
---|
| 169 | SensorProvider *s = new SensorProvider(channel, joint, UP); |
---|
| 170 | xList.push_back(s); |
---|
| 171 | ActuatorProvider *a = new ActuatorProvider(channel, joint, UP); |
---|
| 172 | xList.push_back(a); |
---|
| 173 | tag++; |
---|
| 174 | Channel *comm_channel = new MPIChannel(id, tag, MPI_COMM_WORLD); |
---|
| 175 | CommChannelProvider *c = new CommChannelProvider(comm_channel, id, getID(i, j-1), UP); |
---|
| 176 | // printf("creating channel: %d - %d\n", id, getID(i, j-1)); |
---|
| 177 | xList.push_back(c); |
---|
| 178 | tag++; |
---|
| 179 | } |
---|
| 180 | } |
---|
| 181 | } |
---|
| 182 | } |
---|
| 183 | |
---|
| 184 | void Master::run() |
---|
| 185 | { |
---|
| 186 | int i, dummy, stop = 0; |
---|
| 187 | MPI_Status status; |
---|
| 188 | |
---|
| 189 | m_running = true; |
---|
| 190 | |
---|
| 191 | while (!stop) { |
---|
| 192 | for (i = 0; i < (int) xList.size(); i++) { |
---|
| 193 | XProvider *provider = xList[i]; |
---|
| 194 | provider->updatePre(); |
---|
| 195 | } |
---|
| 196 | |
---|
| 197 | if (!m_running) { |
---|
| 198 | stop = 1; |
---|
| 199 | } |
---|
| 200 | |
---|
| 201 | for (i = 1; i < m_nprocs; i++) { |
---|
| 202 | MPI_Send(&stop, 1, MPI_INT, i, SYNC_MSG, MPI_COMM_WORLD); |
---|
| 203 | } |
---|
| 204 | |
---|
| 205 | for (i = 1; i < m_nprocs; i++) { |
---|
| 206 | MPI_Recv(&dummy, 1, MPI_INT, MPI_ANY_SOURCE, SYNC_MSG, MPI_COMM_WORLD, &status); |
---|
| 207 | } |
---|
| 208 | |
---|
| 209 | for (i = 0; i < (int) xList.size(); i++) { |
---|
| 210 | XProvider *provider = xList[i]; |
---|
| 211 | provider->updatePost(); |
---|
| 212 | } |
---|
| 213 | } |
---|
| 214 | } |
---|
| 215 | |
---|
| 216 | void Master::stop() |
---|
| 217 | { |
---|
| 218 | while (!m_running) ; |
---|
| 219 | m_running = false; |
---|
| 220 | } |
---|