source: proiecte/SIMEO/Simeo/src/SimeoEngine/Master.cpp @ 167

Last change on this file since 167 was 167, checked in by (none), 14 years ago

Simeo: added final project and also older proof of concept code.

We used Git for version control, so look at the Git repo
in SIMEO/Simeo/ for more info.

File size: 6.1 KB
Line 
1#include "Master.h"
2#include "CommChannelProvider.h"
3#include <stdio.h>
4#include <string.h>
5#include <typeinfo>
6
7Master::Master(Environment *bullet, int nprocs)
8: m_nprocs(nprocs),
9m_bullet(bullet),
10m_running(false)
11{
12}
13
14Master::~Master()
15{
16}
17
18/** Wait for "Hello" - messages from all agents which joined the world */
19void Master::discoverAgents()
20{
21        MPI_Status status;
22        int i, j;
23        int id;
24        for(i = 0; i < WIDTH; i++)
25        {
26                for(j = 0; j < HEIGHT; j++)
27                {
28                        MPI_Recv(&id, 1, MPI_INT, MPI_ANY_SOURCE, HELLO_MSG, MPI_COMM_WORLD, &status);
29                        //printf("Process %d joined the virtual world at %d,%d\n", id, i, j);
30                        positionID.insert(make_pair(id, make_pair(i, j)));
31                }
32        }
33}
34
35/** Return the process id associated with a given position in mesh */
36int Master::getID(int i, int j)
37{
38        for(int k = 1; k < m_nprocs; k++)
39        {
40                pair<int, int> pos = positionID[k];
41                if(i == pos.first && j == pos.second)
42                        return k;
43        }
44
45        return -1;
46}
47
48void Master::sendHello(int id)
49{
50        vector<channelData> currentList;
51        int i;
52
53        /// Pack all the sensors/actuators of a certain id
54        for(i = 0; i < (int) xList.size(); i++)
55        {
56                XProvider *provider = xList[i];
57                MPIChannel *channel = (MPIChannel *) provider->getChannel();
58                int dest = channel->getDest();
59                if(dest == id || (!strcmp(typeid(*provider).name(), "19CommChannelProvider") && ((CommChannelProvider *) provider)->getPeer() == id)) {
60                        channelData data;
61                        provider->packData(&data, id);
62                        data.tag = channel->getTag();
63                        currentList.push_back(data);
64                }
65        }
66
67        /// Send to a certain id process the init data for running the agent
68
69        int size = (int) currentList.size();
70        int intSize, floatSize;
71        MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &intSize);
72        MPI_Pack_size(1, MPI_FLOAT, MPI_COMM_WORLD, &floatSize);
73
74        int bufSize = size * (4 * intSize + 3 * floatSize);
75        char *buffer = (char*) malloc(bufSize);
76        int position = 0;
77
78        for(i = 0; i < size; i++)
79        {
80                channelData *tmp = &currentList[i];
81
82                MPI_Pack(&tmp->tag, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD);
83                MPI_Pack(&tmp->type, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD);
84                MPI_Pack(&tmp->orient, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD);
85                MPI_Pack(&tmp->dest, 1, MPI_INT, buffer, bufSize, &position, MPI_COMM_WORLD);
86                MPI_Pack(&tmp->minAngle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD);
87                MPI_Pack(&tmp->maxAngle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD);
88                MPI_Pack(&tmp->angle, 1, MPI_FLOAT, buffer, bufSize, &position, MPI_COMM_WORLD);
89        }
90
91        MPI_Send(&size, 1, MPI_INT, id, CHANNEL_MSG, MPI_COMM_WORLD);
92        MPI_Send(buffer, position, MPI_PACKED, id, CHANNEL_MSG, MPI_COMM_WORLD);
93}
94
95/** Send to all workers their channels parameters */
96int Master::sendAll()
97{
98        int i;
99
100        for (i = 1; i < m_nprocs; i++) {
101                sendHello(i);
102        }
103
104        for (i = 1; i < m_nprocs; i++) {
105                MPI_Send(&i, 1, MPI_INT, i, CHANNEL_MSG, MPI_COMM_WORLD);
106        }
107
108        return 1;
109}
110
111void Master::createProviders()
112{
113        int i, j, tag = FIRST_CHANNEL_TAG;
114
115        for(i = 0; i < WIDTH; i++) {
116                for(j = 0; j < HEIGHT; j++) {
117                        int idx;
118
119                        if(i < WIDTH - 1) {
120                                idx = m_bullet->getMatrix()[i * HEIGHT + j][RIGHT];
121                                //printf("RIGHT: (%d, %d) - %d\n", i, j, right);
122                                btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx];
123                                int id = getID(i, j);
124                                Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD);
125                                SensorProvider *s = new SensorProvider(channel, joint, RIGHT);
126                                xList.push_back(s);
127                                ActuatorProvider *a = new ActuatorProvider(channel, joint, RIGHT);
128                                xList.push_back(a);
129                                tag++;
130                        }
131
132                        if(i > 0) {
133                                idx = m_bullet->getMatrix()[i * HEIGHT + j][LEFT];
134                                //printf("LEFT: (%d, %d) - %d\n", i, j, left);
135                                btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx];
136                                int id = getID(i, j);
137                                Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD);
138                                SensorProvider *s = new SensorProvider(channel, joint, LEFT);
139                                xList.push_back(s);
140                                ActuatorProvider *a = new ActuatorProvider(channel, joint, LEFT);
141                                xList.push_back(a);
142                                tag++;
143                                Channel *comm_channel = new MPIChannel(id, tag, MPI_COMM_WORLD);
144                                CommChannelProvider *c = new CommChannelProvider(comm_channel, id, getID(i-1, j), LEFT);
145//                              printf("creating channel: %d - %d\n", id, getID(i-1, j));
146                                xList.push_back(c);
147                                tag++;
148                        }
149
150                        if(j < HEIGHT - 1) {
151                                idx = m_bullet->getMatrix()[i * HEIGHT + j][DOWN];
152                                //printf("DOWN: (%d, %d) - %d\n", i, j, down);
153                                btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx];
154                                int id = getID(i, j);
155                                Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD);
156                                SensorProvider *s = new SensorProvider(channel, joint, DOWN);
157                                xList.push_back(s);
158                                ActuatorProvider *a = new ActuatorProvider(channel, joint, DOWN);
159                                xList.push_back(a);
160                                tag++;
161                        }
162
163                        if(j > 0) {
164                                idx = m_bullet->getMatrix()[i * HEIGHT + j][UP];
165                                //printf("UP: (%d, %d) - %d\n", i, j, up);
166                                btHingeConstraint *joint = (btHingeConstraint*)m_bullet->getJoints()[idx];
167                                int id = getID(i, j);
168                                Channel *channel = new MPIChannel(id, tag, MPI_COMM_WORLD);
169                                SensorProvider *s = new SensorProvider(channel, joint, UP);
170                                xList.push_back(s);
171                                ActuatorProvider *a = new ActuatorProvider(channel, joint, UP);
172                                xList.push_back(a);
173                                tag++;
174                                Channel *comm_channel = new MPIChannel(id, tag, MPI_COMM_WORLD);
175                                CommChannelProvider *c = new CommChannelProvider(comm_channel, id, getID(i, j-1), UP);
176//                              printf("creating channel: %d - %d\n", id, getID(i, j-1));
177                                xList.push_back(c);
178                                tag++;
179                        }
180                }
181        }
182}
183
184void Master::run()
185{
186        int i, dummy, stop = 0;
187        MPI_Status status;
188
189        m_running = true;
190
191        while (!stop) {
192                for (i = 0; i < (int) xList.size(); i++) {
193                        XProvider *provider = xList[i];
194                        provider->updatePre();
195                }
196
197                if (!m_running) {
198                        stop = 1;
199                }
200
201                for (i = 1; i < m_nprocs; i++) {
202                        MPI_Send(&stop, 1, MPI_INT, i, SYNC_MSG, MPI_COMM_WORLD);
203                }
204
205                for (i = 1; i < m_nprocs; i++) {
206                        MPI_Recv(&dummy, 1, MPI_INT, MPI_ANY_SOURCE, SYNC_MSG, MPI_COMM_WORLD, &status);
207                }
208
209                for (i = 0; i < (int) xList.size(); i++) {
210                        XProvider *provider = xList[i];
211                        provider->updatePost();
212                }
213        }
214}
215
216void Master::stop()
217{
218        while (!m_running) ;
219        m_running = false;
220}
Note: See TracBrowser for help on using the repository browser.