OPAL (Object Oriented Parallel Accelerator Library) 2024.2
OPAL
Communicate.h
Go to the documentation of this file.
1// -*- C++ -*-
2/***************************************************************************
3 *
4 * The IPPL Framework
5 *
6 *
7 * Visit http://people.web.psi.ch/adelmann/ for more details
8 *
9 ***************************************************************************/
10
11#ifndef COMMUNICATE_H
12#define COMMUNICATE_H
13
14/***************************************************************************
15 * Communicate.h - communications object for use with Ippl framework. Allows
16 * user to establish id's for available nodes, establish connections, and
17 * send/receive data.
18 ***************************************************************************/
19
20// include files
21#include "Message/TagMaker.h"
22#include "Message/Tags.h"
23
24#include <cstddef>
25#include <cstdlib>
26#include <cstring>
27#include <iostream>
28#include <map>
29#include <utility>
30#include <vector>
31
32#include <mpi.h>
33
34// forward declarations
35class Message;
36class Communicate;
37std::ostream& operator<<(std::ostream&, const Communicate&);
38
39// special codes used as 'wildcards' to match any node or tag
40const int COMM_ANY_NODE = (-1);
41const int COMM_ANY_TAG = (-1);
42
43
44// A simple class used to store information for caching sent messages. This
45// is only used if the 'retransmit' option is active.
47{
48public:
50 : size_m(0), buf_m(0)
51 {
52 }
53
54 CommSendInfo(int size, char *buf, int node)
56 {
57 }
58
61 {
62 }
63
65 {
66 // the user is actually responsible for freeing the buffer. We
67 // do not do this automatically here
68 }
69
71 {
72 size_m = c.size_m;
73 buf_m = c.buf_m;
74 node_m = c.node_m;
75 return *this;
76 }
77
78 int size() const
79 {
80 return size_m;
81 }
82
83 int node() const
84 {
85 return node_m;
86 }
87
88 char *buf()
89 {
90 return buf_m;
91 }
92 const char *buf() const
93 {
94 return buf_m;
95 }
96
97 void freebuf()
98 {
99 if (buf_m != 0)
100 delete [] buf_m;
101 buf_m = 0;
102 }
103
104private:
107 char *buf_m;
108};
109
110
111// The base class for all specific Communicate objects
112class Communicate : public TagMaker
113{
114
115public:
116 // default error codes, may be overridden by inherited classes.
118
119 // special tags used by this class ... 32000 is arbitrary
121
122 // special codes used as 'wildcards' to match any node or tag
123 // These are listed again because they should be here, but the global
124 // values are kept for compatibility.
125 enum CommCodes { COMM_ANY_NODE = (-1), COMM_ANY_TAG = (-1) };
126
127public:
128 // constructor and destructor
129 // constructor arguments: command-line args, and number of processes
130 // to start (if < 0, start the 'default' number, i.e. the number of
131 // hosts in a PVM virtual machine, the number of nodes in an O2K, etc)
132 Communicate(int argc = 0, char** argv = NULL, int procs = (-1));
133 virtual ~Communicate(void);
134
135 // return the name of this item
136 virtual const char *name() const
137 {
138 return "Serial";
139 }
140
141 // return info about connections in general
142 int getNodes() const
143 {
144 return TotalNodes;
145 }
146 int getContexts(const int n) const
147 {
148 return Contexts[n];
149 }
150 int getProcesses(const int n, const int c) const
151 {
152 return Processes[n][c];
153 }
154 int myNode() const
155 {
156 return myHost;
157 }
158 int getError() const
159 {
160 return ErrorStatus;
161 }
162 int getReceived() const
163 {
164 return recMsgList.size();
165 }
166
167 //
168 // nonvirtual routines to send/receive data
169 //
170
171 // send data to another node. Returns success (T or F).
172 // last argument specifies whether to delete the Message after sending
173 // (if message is for another node). Note that if the send is not
174 // successful, the message will NOT be deleted, regardless of delmsg.
175 bool send(Message *, int node, int tag, bool delmsg = true);
176
177 // receive data from another node. Returns newly created Message object
178 // with received message, or NULL if no message is available.
179 // If node is < 0, this will receive the next message with the given tag
180 // from any node. If tag < 0, this will receive the next message with
181 // any tag from the given node. If both are < 0, this will receive the
182 // next message, period. node and tag are passed by reference; if either
183 // is < 0, and a message is received, they are changed to their actual
184 // values.
185 Message *receive(int& node, int& tag);
186
187 // a blocking version of receive;
188 Message *receive_block(int& node, int& tag);
189
190 //send and receive for raw data
191 virtual bool raw_send(void *, int , int , int )
192 {
193 return false;
194 }
195 virtual MPI_Request raw_isend(void *, int , int , int )
196 {
197 return MPI_Request();
198 }
199 virtual int raw_receive(char *, int , int &, int &)
200 {
201 return 0;
202 }
203 virtual MPI_Request raw_ireceive(char *, int , int , int )
204 {
205 return MPI_Request();
206 }
207 virtual int raw_probe_receive(char *&, int &, int &)
208 {
209 return 0;
210 }
211
212 //
213 // virtual routines to broadcast data
214 //
215
216 // broadcast the current message to other nodes.
217 // Return number of nodes actually sent to.
218 // The first version sends to all nodes including this node.
219 // The second version sends to all nodes except this node.
220 // The first argument is the Message; the last argument is the tag.
221 virtual int broadcast_all(Message *, int);
222 virtual int broadcast_others(Message *, int, bool delmsg=true);
223
224 //
225 // routines to synchronize processors at a barrier
226 //
227
228 // Synchronize all processors (everybody waits for everybody
229 // else to get here before returning to calling function).
230 void barrier(void);
231
232 //
233 // virtual routines to deal with memory management
234 //
235
236 // clean up after a Message has been used (called by Message). By
237 // default, does nothing.
238 virtual void cleanupMessage(void *);
239
240protected:
241 // struct used to store messages, tags, and nodes
243 {
244 int node; // sending/receiving node
245 int tag; // tag of the message
246 Message *msg; // pointer to the message itself
247 MessageData(int n, int t, Message *m) : node(n),tag(t),msg(m) {}
248 MessageData() : node(-1), tag(-1), msg(0) { }
249 MessageData(const MessageData& m) : node(m.node),tag(m.tag),msg(m.msg) {}
251 };
252
253 // a list of messages which have already been received, but not yet
254 // delivered
255 std::vector<MessageData> recMsgList;
256
257 // the following items should be filled in by the derived classes
258 int TotalNodes; // number of nodes available (0 ... # nodes-1)
259 int myHost; // which node am I?
260 int ErrorStatus; // error code, from above enumeration
261 std::vector<int> Contexts; // the number of contexts per node
262 std::vector< std::vector<int> > Processes; // number of running processes per context
263
264 // An integer message number identifier; this is included in each
265 // message, and continually increases as more messages are sent.
266 typedef long MsgNum_t;
268
269 // An optional sent-message cache, used to attempt to retransmit
270 // messages if they are corrupted in-transit. Messages are keyed on
271 // a message number, which is is unique for each message.
272 typedef std::map<MsgNum_t, CommSendInfo> SentCache_t;
274
275 // a list of things to resend at the next opportunity
276 std::vector<MsgNum_t> resendList;
277
278 // a list of messages which have been received OK
279 std::vector<MsgNum_t> sentOKList;
280
281 // a list of messages which should be cleared out on other nodes
282 std::vector<std::pair<int,MsgNum_t> > informOKList;
283
284 // a list of requests we must make to other nodes to resend messages
285 std::vector<std::pair<int,MsgNum_t> > requestList;
286
287 // add a new message to the received message queues. Return success.
288 // arguments: message, sending node, tag
289 bool add_msg(Message *, int, int);
290
291 // Looks for a message in the message queue from the specified node
292 // and tag. This understands wildcards for node and tag.
293 // Returns a pointer to the Message object found, and sets node and
294 // tag equal to the proper values. Also, this will remove the item from
295 // the queue.
296 Message* find_msg(int&, int&);
297
298 //
299 // implementation-specific routines (which begin with 'my')
300 // these should be provided in a derived class, and contain the
301 // comm-library-specific code
302 //
303
304 // send a message ... arguments are the Message itself, the
305 // destination node, the 'user' tag, and the 'encoding' tag.
306 // Messages should be sent via the underlying mechanism by using the
307 // encoding tag (one of the COMM_ tags),
308 // and should embed the information about what the user
309 // tag is in the data sent between nodes. Return success.
310 virtual bool mysend(Message *, int node, int utag, int etag);
311
312 // receive a message from the given node and user tag. Return a NEW
313 // Message object if a message arrives, or NULL if no message available.
314 // node will be set to the node from which the message was sent.
315 // tag will be set to the 'user tag' for that message.
316 // etag is the 'encoding' tag, and must be one of the COMM_ tags.
317 // Only message sent via the underlying mechanism with the
318 // given etag are checked. When one is found, the user tag and sending
319 // node are extracted from the sent data.
320 // If node = COMM_ANY_NODE, checks for messages from any node.
321 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
322 virtual Message *myreceive(int& node, int& tag, int etag);
323
324 // Synchronize all processors (everybody waits for everybody
325 // else to get here before returning to calling function).
326 virtual void mybarrier(void);
327
328 // resent a message buffer that has been previously packed and copied
329 // into the provided buffer. Return success.
330 virtual bool resend(void *buf, int size, int node, int etag);
331
332 //
333 // utility functions used to serialize data into and out of byte buffers
334 //
335 // standard way to create and free buffer storage
336 static inline void *makebuffer(int size)
337 {
338 return malloc(size);
339 }
340 static inline void freebuffer(void *buf)
341 {
342 free(buf);
343 }
344
345 // compute the size of storage needed to add 'size' bytes to a buffer,
346 // in order to keep everything word-aligned
347 static inline unsigned int wordround(int size)
348 {
349 return sizeof(long long) *
350 ((size + sizeof(long long) - 1)/sizeof(long long));
351 }
352
353 // compute a wordround value for 'size' bytes, then add that to the
354 // given 'pos' pointer
355 static inline void addwordround(void * &pos, int size)
356 {
357 pos = static_cast<void *>(wordround(size) + static_cast<char *>(pos));
358 }
359
360 // memcpy data into the given location, and then increment the pointer
361 static inline void pack(void *packdata, void * &pos, int size)
362 {
363 memcpy(pos, packdata, size);
364 addwordround(pos, size);
365 }
366
367 // memcpy data out of a given location to another, updating 'pos'
368 static inline void unpack(void * &pos, void *packdata, int size)
369 {
370 memcpy(packdata, pos, size);
371 addwordround(pos, size);
372 }
373
374 //
375 // utility functions used in packing and unpacking Message data
376 //
377
378 // calculate how big the buffer must be to send the given message
380
381 // put data from the given Message into the given buffer, with tag value.
382 // the final arguments are the buffer size, in bytes, and the dest node.
383 void fill_msg_buffer(void *, Message &, int, int, int);
384
385 // take data out of the current receive buf and create a new Message
386 Message *unpack_message(int &node, int &tag, void *pos);
387
388 //
389 // utility functions used for message caching/retransmit
390 //
391
392 // put the given message buffer in the sent-message cache, as a new
393 // CommSendInfo object storing the buffer and other information.
394 void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node);
395
396 // send off a request to have this message retransmitted to us
397 void request_retransmission(int node, MsgNum_t mnum);
398
399 // resend the data for message mnum ... calls the virtual 'resend'
400 void perform_resend(MsgNum_t mnum);
401
402 // get the resend information from a buffer sent in a message requesting
403 // retransmission
404 void unpack_retransmission_request(int nitems, void *pos);
405
406 // tell the sender that we received this message OK
407 void send_ok_message(int node, MsgNum_t mnum);
408
409 // unpack message with a list of OK message numbers, and delete them
410 // from our cache
411 void clear_ok_messages(int nitems, void *pos);
412
413 // remove a single OK message
415
416 // process list of resend requests
418};
419
420#endif // COMMUNICATE_H
const int COMM_ANY_NODE
Definition Communicate.h:40
std::ostream & operator<<(std::ostream &, const Communicate &)
const int COMM_ANY_TAG
Definition Communicate.h:41
CommSendInfo & operator=(const CommSendInfo &c)
Definition Communicate.h:70
char * buf()
Definition Communicate.h:88
int node() const
Definition Communicate.h:83
void freebuf()
Definition Communicate.h:97
const char * buf() const
Definition Communicate.h:92
CommSendInfo(const CommSendInfo &c)
Definition Communicate.h:59
CommSendInfo(int size, char *buf, int node)
Definition Communicate.h:54
int size() const
Definition Communicate.h:78
virtual MPI_Request raw_isend(void *, int, int, int)
virtual bool mysend(Message *, int node, int utag, int etag)
std::vector< int > Contexts
std::map< MsgNum_t, CommSendInfo > SentCache_t
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
static void * makebuffer(int size)
int getProcesses(const int n, const int c) const
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
virtual MPI_Request raw_ireceive(char *, int, int, int)
std::vector< MessageData > recMsgList
static void freebuffer(void *buf)
static void addwordround(void *&pos, int size)
Message * receive(int &node, int &tag)
int getReceived() const
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
std::vector< std::pair< int, MsgNum_t > > informOKList
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
virtual int raw_probe_receive(char *&, int &, int &)
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
int getContexts(const int n) const
std::vector< std::pair< int, MsgNum_t > > requestList
static unsigned int wordround(int size)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
MsgNum_t nextMsgNum
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
virtual const char * name() const
void barrier(void)
virtual void mybarrier(void)
std::vector< std::vector< int > > Processes
static void unpack(void *&pos, void *packdata, int size)
int getNodes() const
int getError() const
virtual ~Communicate(void)
virtual int broadcast_all(Message *, int)
virtual bool raw_send(void *, int, int, int)
std::vector< MsgNum_t > sentOKList
bool add_msg(Message *, int, int)
static void pack(void *packdata, void *&pos, int size)
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
int myNode() const
SentCache_t sentMsgCache
virtual int raw_receive(char *, int, int &, int &)
void fill_msg_buffer(void *, Message &, int, int, int)
MessageData(int n, int t, Message *m)
MessageData(const MessageData &m)