OPAL (Object Oriented Parallel Accelerator Library) 2024.2
OPAL
Communicate.cpp
Go to the documentation of this file.
1// -*- C++ -*-
2/***************************************************************************
3 *
4 * The IPPL Framework
5 *
6 * This program was prepared by PSI.
7 * All rights in the program are reserved by PSI.
8 * Neither PSI nor the author(s)
9 * makes any warranty, express or implied, or assumes any liability or
10 * responsibility for the use of this software
11 *
12 * Visit www.amas.web.psi for more details
13 *
14 ***************************************************************************/
15
16// -*- C++ -*-
17/***************************************************************************
18 *
19 * The IPPL Framework
20 *
21 *
22 * Visit http://people.web.psi.ch/adelmann/ for more details
23 *
24 ***************************************************************************/
25
27// Communicate - common member functions for Communicate object.
28// On-node traffic
29// is handle here and architecture specific routines are called for off-node
30// traffic. This is the base class for all comm-lib-specific subclasses.
32
33// include files
34#include "Message/Communicate.h"
35#include "Message/Message.h"
36#include "Message/CRC.h"
38
39#include "Utility/IpplInfo.h"
40#include "Utility/IpplStats.h"
41#include "Utility/PAssert.h"
42
43#include <cstdlib>
44#include <cstring>
45#include <map>
46#include <ostream>
47#include <utility>
48#include <vector>
49
51// print summary of this class to the given output stream
52std::ostream& operator<<(std::ostream& o, const Communicate& c)
53{
54 o << "Parallel communication method: " << c.name() << "\n";
55 o << " Total nodes: " << c.getNodes() << ", Current node: ";
56 o << c.myNode() << "\n";
57 o << " Queued received messages: ";
58 o << c.getReceived() <<"\n";
59
60 return o;
61}
62
63
65// Constructor.
66// arguments: command-line args, and number of processes
67// to start (if < 0, start the 'default' number, i.e. the number of
68// hosts
69// Note: The base-class constructor does not need the argument info or
70// the number of nodes, it just by default sets the number of nodes=1
71// Also note: the derived classes should erase Contexts and Processes, and
72// put in the proper values.
73Communicate::Communicate(int, char **, int)
74 : TotalNodes(1),
75 myHost(0),
76 ErrorStatus(COMM_NOERROR),
77 Contexts(1, 1),
78 Processes(1, std::vector<int>(1, 1)),
79 nextMsgNum(1)
80{
81}
82
84// Destructor. Nothing to do at present.
86{
87 // delete the cached messages
88 SentCache_t::iterator cachei = sentMsgCache.begin();
89 for ( ; cachei != sentMsgCache.end(); ++cachei)
90 (*cachei).second.freebuf();
91}
92
94// Add a new on-node message to the linked list. Return success.
95bool Communicate::add_msg(Message *msg, int node, int tag)
96{
97 recMsgList.push_back(MessageData(node, tag, msg));
98 return true;
99}
100
102// Looks for a message in the message queue from the specified node
103// and tag. This understands wildcards for node and tag.
104// Returns a pointer to the Message object found, and sets node and
105// tag equal to the proper values. Also, this will remove the item from
106// the queue.
107Message* Communicate::find_msg(int& node, int& tag)
108{
109 // just find the first message that meets the criteria
110 std::vector<MessageData>::iterator qi = recMsgList.begin();
111 std::vector<MessageData>::iterator qend = recMsgList.end();
112 for ( ; qi != qend ; ++qi)
113 {
114 if ((node == COMM_ANY_NODE || (*qi).node == node) &&
115 (tag == COMM_ANY_TAG || (*qi).tag == tag))
116 {
117 node = (*qi).node;
118 tag = (*qi).tag;
119 Message *retval = (*qi).msg;
120 recMsgList.erase(qi);
121 //INCIPPLSTAT(incMessageReceived);
122 //INCIPPLSTAT(incMessageReceivedFromQueue);
123 return retval;
124 }
125 }
126
127 // if we're here, no message was found
128 return 0;
129}
130
132// Default version of virtual send function ... here, does nothing.
133bool Communicate::mysend(Message *, int, int, int)
134{
135 // just return false, since we cannot send a message with this function
136 return false;
137}
138
140// Default version of virtual receive function ... here, does nothing.
142{
143 // just return NULL, since we cannot find a message with this function
144 return 0;
145}
146
148// Default version of virtual barrier function ... here, does nothing.
150{
151 // just return NULL, since we cannot find a message with this function
152 return;
153}
154
156// resent a message buffer that has been previously packed and copied
157// into the provided buffer. Return success.
158bool Communicate::resend(void *, int, int, int)
159{
160 // just return false, since we cannot resend a message with this function
161 return false;
162}
163
165// Send data to the given node, with given tag. If delmsg==true, the
166// message will be deleted after it is sent, otherwise it will be left alone.
167bool Communicate::send(Message *msg, int node, int tag, bool delmsg)
168{
169 bool retval;
170
171 // process list of resend requests
172 //process_resend_requests();
173
174 // check for problems ...
175 if ( node < 0 || node >= getNodes() || tag < 0 )
176 {
177 ERRORMSG("Communicate: illegal send node " << node << endl);
179 return false;
180 }
181
182 // if the message is addressed to this node, put it in the local receive
183 // queue immediately
184 if ( node == myNode() )
185 {
186 retval = add_msg(msg, node, tag);
187 //INCIPPLSTAT(incMessageSent);
188 //INCIPPLSTAT(incMessageSentToSelf);
189 }
190 else
191 {
192 // the message must be sent elsewhere ... call the proper function
193 retval = mysend(msg, node, tag, COMM_SEND_TAG);
194
195 // if the send was successful, delete the message if requested
196 if (retval)
197 {
198 //INCIPPLSTAT(incMessageSent);
199 //INCIPPLSTAT(incMessageSentToOthers);
200 if (delmsg)
201 delete msg;
202 }
203 }
204
205 // set error code
206 ErrorStatus = (retval != 0 ? COMM_NOERROR : COMM_NOSEND);
207
208 // return the success of the operation
209 return retval;
210}
211
213// Receive data from another node. Returns newly created Message object
214// with received message, or NULL if no message is available.
215// If node == COMM_ANY_NODE, this will receive the next message with the given
216// tag from any node.
217// If tag == COMM_ANY_TAG, this will receive the next message with
218// any tag from the given node. If both are wildcards, this will receive the
219// next message, period. node and tag are passed by reference; if either
220// is a wildcard, and a message is received, they are changed to their actual
221// values.
222// Messages are searched for in this order (if node == COMM_ANY_NODE) :
223// 1. Pending in network
224// 2. In receive queue
225Message* Communicate::receive(int& node, int& tag)
226{
227 //Inform dbgmsg("Comm::receive", INFORM_ALL_NODES);
228 //dbgmsg << "Doing receive from node " << node << ", tag " << tag << endl;
229
230 // process list of resend requests
232
233 // do a check for a message from another node
234 //dbgmsg << "Checking for queued message ..." << endl;
235 Message *msg = find_msg(node, tag);
236 //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
237
238 if (msg == 0 && myNode() != node)
239 {
240 int checknode = node;
241 int checktag = tag;
242 //INCIPPLSTAT(incMessageReceiveChecks);
243 //dbgmsg << "Checking for remote message ..." << endl;
244 if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
245 {
246 // see if the message matches our criteria for searching
247 //dbgmsg << "Message found from node " << checknode << " with tag ";
248 //dbgmsg << checktag << endl;
249 if ((node != COMM_ANY_NODE && node != checknode) ||
250 (tag != COMM_ANY_TAG && tag != checktag ))
251 {
252 // the message does not match; queue it and report no msg found
253 //dbgmsg << "But it's not what we want." << endl;
254 add_msg(msg, checknode, checktag);
255 msg = 0;
256 }
257 else
258 {
259 // the message matches; save the node and tag and return the msg
260 //dbgmsg << "And it is what we want!" << endl;
261 node = checknode;
262 tag = checktag;
263 //INCIPPLSTAT(incMessageReceived);
264 //INCIPPLSTAT(incMessageReceivedFromNetwork);
265 }
266 }
267 else
268 {
269 //INCIPPLSTAT(incMessageReceiveChecksFailed);
270 }
271 }
272
273 // set error code
274 ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
275
276 // return the message, or NULL if none was found
277 return msg;
278}
279
281// A blocking version of receive.
283{
284 // process list of resend requests
286
287 //Inform dbgmsg("Comm::receive_block", INFORM_ALL_NODES);
288 //dbgmsg << "Doing blocking receive from node " << node;
289 //dbgmsg << ", tag " << tag << endl;
290
291 // If we haven't already found a message, check the local messages
292 //dbgmsg << "Checking for queued message ..." << endl;
293
294 Message *msg = find_msg(node, tag);
295
296 //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
297
298 // keep checking for remote msgs until we get one
299
300 if (myNode() != node)
301 {
302 while (msg == 0)
303 {
304 // process list of resend requests
306
307 int checknode = node;
308 int checktag = tag;
309 //INCIPPLSTAT(incMessageReceiveChecks);
310 //dbgmsg << "Checking for remote message ..." << endl;
311 if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
312 {
313 // see if the message matches our criteria for searching
314 //dbgmsg << "Message found from node " << checknode << " with tag ";
315 //dbgmsg << checktag << endl;
316 if ((node != COMM_ANY_NODE && node != checknode) ||
317 (tag != COMM_ANY_TAG && tag != checktag ))
318 {
319 // the message does not match; queue it and report no msg found
320 //dbgmsg << "But it's not what we want." << endl;
321 add_msg(msg, checknode, checktag);
322 msg = 0;
323 }
324 else
325 {
326 // the message matches; save the node and tag and return the msg
327 //dbgmsg << "And it is what we want!" << endl;
328 node = checknode;
329 tag = checktag;
330 //INCIPPLSTAT(incMessageReceived);
331 //INCIPPLSTAT(incMessageReceivedFromNetwork);
332 }
333 }
334 else
335 {
336 //INCIPPLSTAT(incMessageReceiveChecksFailed);
337 if (Ippl::retransmit())
338 msg = find_msg(node, tag);
339 }
340 }
341 }
342
343 // If we're on just one node, and we did not find a message, this is
344 // a big problem.
345 PInsist(!(myNode() == node && msg == 0),
346 "Local message not found in Communicate::receive_block!!");
347
348 // set error code
349 ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
350
351 // return the message, or NULL if none was found
352 return msg;
353}
354
356// Broadcast the given message to ALL nodes, including this node.
357// Return number of nodes sent to.
358// Arguments are the Message, and the tag for the message.
360{
361 int i; // loop variable
362
363 // send message to all other nodes
364 for (i=(getNodes() - 1); i >= 0; i--)
365 {
366 if (i != myNode())
367 {
368 mysend(msg, i, tag, COMM_SEND_TAG);
369 //INCIPPLSTAT(incMessageSent);
370 //INCIPPLSTAT(incMessageSentToOthers);
371 }
372 }
373
374 // send message to this node; since we do this, don't need to delete msg
375 add_msg(msg, myNode(), tag);
376 //INCIPPLSTAT(incMessageSent);
377 //INCIPPLSTAT(incMessageSentToSelf);
378
379 return getNodes();
380}
381
383// Broadcast the given message to all OTHER nodes, but not this node.
384// Return number of nodes sent to.
385// Arguments are the Message, and the tag for the message, and whether
386// we should delete the given message object.
387int Communicate::broadcast_others(Message *msg, int tag, bool delmsg)
388{
389 int i; // loop variable
390
391 // send message to all other nodes
392 for (i=(getNodes() - 1); i >= 0; i--)
393 {
394 if (i != myNode())
395 {
396 mysend(msg, i, tag, COMM_SEND_TAG);
397 //INCIPPLSTAT(incMessageSent);
398 //INCIPPLSTAT(incMessageSentToOthers);
399 }
400 }
401
402 // delete message
403 if (delmsg)
404 delete msg;
405
406 return getNodes() - 1;
407}
408
410// Synchronize all processors (everybody waits for everybody
411// else to get here before returning to calling function).
413{
414 mybarrier();
415 //INCIPPLSTAT(incBarriers);
416}
417
419// clean up after a Message has been used (called by Message). By
420// default, does nothing.
422
424// calculate how big the buffer must be to send the given message
426{
427 static const unsigned int longsize = wordround(sizeof(MsgNum_t));
428 static const unsigned int intsize4 = wordround(4 * sizeof(int));
429 static const unsigned int intsize2 = wordround(2 * sizeof(int));
430
431 // the message contains a long and three integers at the start with the
432 // msg num, node, tag, and number of items
433 unsigned int buffsize = longsize + intsize4;
434
435 // now include the sizes of the elements themselves. For each item,
436 // we also include two integers with size information.
437 int nitems = msg.size();
438 for (int i=0; i < nitems; ++i)
439 buffsize += (intsize2 + wordround(msg.item(i).numBytes()));
440
441 // if checksums are to be performed, add in space for the 32-bit checksum
442 if (Ippl::useChecksums())
443 buffsize += sizeof(CRCTYPE);
444
445 return buffsize;
446}
447
449// put data from the given Message into the given buffer
450void Communicate::fill_msg_buffer(void *buffer, Message &msg, int tag,
451 int bufsize, int node)
452{
453 void *pos = buffer; // location in buffer to pack data
454 int nitems = msg.size(); // Number of items in Message
455 int mdata[4]; // Array to store msg header info
456 MsgNum_t mnum = (nextMsgNum++); // Message ID
457
458 //Inform dbgmsg("***Communicate::fill_msg_buffer", INFORM_ALL_NODES);
459 //dbgmsg << "Preparing to send out message " << mnum;
460 //dbgmsg << " with tag " << tag << " of size " << bufsize << endl;
461
462 // we must make sure to zero out the buffer if we're using checksums,
463 // so that random data values do not occur in the spaces where word
464 // alignment padding is used
465 if (Ippl::useChecksums())
466 memset(pos, 0, bufsize);
467
468 // put message ID info into the buffer
469 pack(&mnum, pos, sizeof(MsgNum_t));
470
471 // put message header info into the buffer
472 mdata[0] = tag;
473 mdata[1] = myNode();
474 mdata[2] = nitems;
475 mdata[3] = bufsize;
476 pack(mdata, pos, 4*sizeof(int));
477
478 // finally pack in the data
479 for (int i=0; i < nitems; ++i)
480 {
481 Message::MsgItem &msgitem = msg.item(i);
482 mdata[0] = msgitem.numElems();
483 mdata[1] = msgitem.numBytes();
484 pack(mdata, pos, 2*sizeof(int));
485 if (mdata[1] > 0)
486 pack(msgitem.data(), pos, mdata[1]);
487 }
488
489 // if checksums are on, find the checksum and append it to the buffer
490 if (Ippl::useChecksums())
491 {
492 // calculate the crc
493 int crcsize = bufsize - sizeof(CRCTYPE);
494 CRCTYPE crcval = crc(buffer, crcsize);
495
496 // append it to the end of the buffer
497 *(static_cast<CRCTYPE *>(pos)) = crcval;
498
499 // if we're trying to retransmit, cache the message
500 if (Ippl::retransmit())
501 {
502 if (tag != IPPL_RETRANSMIT_TAG && tag != IPPL_MSG_OK_TAG)
503 {
504 //dbgmsg << "Adding message " << mnum << " of size " << bufsize;
505 //dbgmsg << " with tag " << tag << " to sent cache." << endl;
506 add_to_send_cache(buffer, mnum, bufsize, node);
507 }
508 else
509 {
510 //dbgmsg << "NOT adding msg with tag " << tag << " to cache" << endl;
511 }
512 }
513 }
514
515 ADDIPPLSTAT(incMessageBytesSent,bufsize);
516}
517
519// get data out of a buffer and create a Message
520Message* Communicate::unpack_message(int &node, int &tag, void *buffer)
521{
522 Message *newmsg = 0;
523
524 // pos will always point to the next location in the buffer to get data
525 void *pos = buffer;
526
527 // get the message ID number
528 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
529 addwordround(pos, sizeof(MsgNum_t));
530
531 // get the tag, sender, and number of messages
532 int *mdata = static_cast<int *>(pos);
533 tag = mdata[0];
534 node = mdata[1];
535 int nitems = mdata[2];
536 int bufsize = mdata[3];
537 addwordround(pos, 4*sizeof(int));
538
539 //WARNMSG("Received message " << mnum << " from node " << node);
540 //WARNMSG(" with tag " << tag << " of size " << bufsize << endl);
541
542 // check for special tags, to abort, retransmit, or just receive
543 if (tag == IPPL_ABORT_TAG)
544 {
545 ERRORMSG("Stopping due to abort request sent from node " << node << endl);
546 ::abort();
547 }
548 else if (tag == IPPL_EXIT_TAG)
549 {
550 ERRORMSG("Exiting due to exit request sent from node " << node << endl);
551 ::exit(1);
552 }
553 else if (tag == IPPL_RETRANSMIT_TAG)
554 {
555 // get the retransmit message number and tag out of the current buffer
557 }
558 else if (tag == IPPL_MSG_OK_TAG)
559 {
560 // clear out the messages that this message lists are OK to be deleted
561 clear_ok_messages(nitems, pos);
562 }
563 else
564 {
565 // this is just a regular message
566
567 // do checksum comparison, if checksums are on
568 if (Ippl::useChecksums())
569 {
570 // calculate the crc
571 int crcsize = bufsize - sizeof(CRCTYPE);
572 CRCTYPE crcval = crc(buffer, crcsize);
573
574 // as a test, randomly change crcval
575 //if (IpplRandom() < 0.1)
576 // crcval += 1;
577
578 // compare this crc to the original one
579 void *origloc = static_cast<void *>(static_cast<char *>(buffer)+crcsize);
580 CRCTYPE origcrc = *(static_cast<CRCTYPE *>(origloc));
581 if (crcval != origcrc)
582 {
583 ERRORMSG("Failed CRC check (" << crcval << " != " << origcrc);
584 ERRORMSG(") on node " << Ippl::myNode());
585 ERRORMSG(" for message " << mnum << " of size " << bufsize);
586 ERRORMSG(" bytes sent from node ");
587 ERRORMSG(node << " with tag " << tag << endl);
588 if (Ippl::retransmit())
589 {
590 // send off a request to have message 'mnum' resent to us by 'node'
591 requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
592 }
593 else
594 {
595 // since we're not trying to retransmit, we just quit.
596 PInsist(crcval == origcrc, "Exiting due to CRC check failure.");
597 }
598
599 // and then return 0 so that the caller knows there was a problem
600 return 0;
601 }
602 }
603
604 // if we're here, the checksums (if enabled) were OK, so receive the
605 // message
606
607 // create data structure for this message
608 newmsg = new Message(nitems);
609
610 // get all the items and add to the message
611 for (int j = 0; j < nitems; j++)
612 {
613 int *hdr = static_cast<int *>(pos);
614 int elements = hdr[0];
615 int bytesize = hdr[1];
616 addwordround(pos, 2*sizeof(int));
617
618 // for each item, find the pointer to the actual data and give
619 // that pointer to the Message object. The Message object then
620 // does not delete the data until the very end, when the Message
621 // is deleted.
622 if (bytesize > 0 && elements > 0)
623 {
624 newmsg->setCopy(false);
625 newmsg->setDelete(false);
626 newmsg->putmsg(pos, bytesize/elements, elements);
627 addwordround(pos, bytesize);
628 }
629 }
630
631 // indicate we've received a normal message
632 ADDIPPLSTAT(incMessageBytesReceived,bufsize);
633
634 // tell the sender that we received this message OK
635 if (Ippl::retransmit())
636 informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
637 }
638
639 // return the new message, or zero to indicate the buffer contained
640 // something else than an actual message
641 return newmsg;
642}
643
644
646// put the given message buffer in the sent-message cache, as a new
647// CommSendInfo object storing the buffer and other information.
648void Communicate::add_to_send_cache(void *msgbuf, MsgNum_t mnum, int msgsize,
649 int node)
650{
651 // make sure we do not already have this message
652 SentCache_t::iterator senti = sentMsgCache.find(mnum);
653 if (senti != sentMsgCache.end())
654 {
655 ERRORMSG("ERROR: Trying to cache an already-cached message with num = ");
656 ERRORMSG(mnum << endl);
657 return;
658 }
659
660 //Inform dbgmsg("***Communicate::add_to_send_cache", INFORM_ALL_NODES);
661 //dbgmsg << "Adding message " << mnum << " to cache with size = " << msgsize;
662 //dbgmsg << endl;
663
664 // make a copy of the message
665 char *copybuf = new char[msgsize];
666 memcpy(copybuf, msgbuf, msgsize);
667
668 // add the message to the cache list
669 CommSendInfo csi(msgsize, copybuf, node);
670 sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
671
672 //dbgmsg<<"Sent message cached; now " << sentMsgCache.size() << " buffers ";
673 //dbgmsg << "in the cache." << endl;
674}
675
676
678// send off a request to have this message retransmitted to us
680{
681 Inform dbgmsg("***Communicate::request_retransmission", INFORM_ALL_NODES);
682 dbgmsg << "Requesting retransmission of message " << mnum << " from node ";
683 dbgmsg << node << endl;
684
685 // create a regular message, but with the
686 // special retransmit tag and the message number as the one item
687 // in the Message
688 Message msg(1);
689 msg.put(mnum);
690 send(&msg, node, IPPL_RETRANSMIT_TAG, false); // does not delete message
691}
692
693
695// get the resend information from a buffer sent in a message requesting
696// retransmission
698{
699 Inform dbgmsg("***Communicate::unpack_retrans_req", INFORM_ALL_NODES);
700 //dbgmsg << "Unpacking retransmission request ..." << endl;
701
702 // retransmission messages have the following information as separate
703 // items:
704 // message number to retransmit (type == MsgNum_t)
705 // so, nitems should be one, and the bytesize should match
706 PInsist(nitems == 1, "Wrong number of items in retransmit request.");
707
708 // get the retransmit message number item header info
709 int *hdr = static_cast<int *>(pos);
710 PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
711 "Wrong message info for retransmit message number.");
712 addwordround(pos, 2*sizeof(int));
713
714 // get the actual retransmit message number
715 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
716 dbgmsg << "Received request to resend message " << mnum << endl;
717 resendList.push_back(mnum);
718}
719
720
722// for message mnum, resend the data
724{
725 // get the message info from our cache.
726 SentCache_t::iterator senti = sentMsgCache.find(mnum);
727 PInsist(senti != sentMsgCache.end(),
728 "Could not find message in local sent cache to retransmit.");
729
730 // get the node and size
731 int size = (*senti).second.size();
732 int node = (*senti).second.node();
733
734 // resend the data
735 ERRORMSG("WARNING: Resending message number " << mnum);
736 ERRORMSG(" of size " << size << " from node ");
737 ERRORMSG(myNode() << " to node " << node << " ..." << endl);
738 resend((*senti).second.buf(), size, node, COMM_SEND_TAG);
739}
740
742// tell the sender that we received this message OK
744{
745 Inform dbgmsg("***Communicate::send_ok_message", INFORM_ALL_NODES);
746 dbgmsg << "Informing node " << node << " that message " << mnum;
747 dbgmsg << " was received ok." << endl;
748
749 Message msg(1);
750 msg.put(mnum); // the list of message numbers, one at a time
751
752 send(&msg, node, IPPL_MSG_OK_TAG, false); // does not delete message
753}
754
756// unpack message with a list of OK message numbers, and delete them
757// from our cache
758void Communicate::clear_ok_messages(int nitems, void *pos)
759{
760 Inform dbgmsg("***Communicate::clear_ok_messages", INFORM_ALL_NODES);
761 //dbgmsg << "Unpacking messages-ok information for " << nitems;
762 //dbgmsg << " messages ..." << endl;
763
764 // message-ok messages have the following information as separate
765 // items:
766 // the number of OK messages (type == int)
767 // the first OK message number (type == MsgNum_t)
768 // the second OK message number (type == MsgNum_t)
769 // etc
770 PInsist(nitems >= 1, "Wrong number of items in retransmit request.");
771
772 // loop through the list of items, get the message number from each,
773 // and remove that message from our queue
774 for (int i=0; i < nitems; ++i)
775 {
776 // get the message-ok header
777 int *hdr = static_cast<int *>(pos);
778 PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
779 "Wrong message info for message-ok number.");
780 addwordround(pos, 2*sizeof(int));
781
782 // get the message-ok number
783 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
784 addwordround(pos, sizeof(MsgNum_t));
785
786 // add this number to our list of messages to say are OK
787 dbgmsg << "Will clear message " << mnum << " as OK." << endl;
788 sentOKList.push_back(mnum);
789 }
790}
791
793// unpack message with a list of OK message numbers, and delete them
794// from our cache
796{
797 Inform dbgmsg("***Communicate::remove_single_ok_message", INFORM_ALL_NODES);
798
799 // check if we have that message
800 SentCache_t::iterator senti = sentMsgCache.find(mnum);
801 if (senti == sentMsgCache.end())
802 {
803 // we do not have it; print an error message
804 ERRORMSG("ERROR: Received 'message ok' for message " << mnum);
805 ERRORMSG(", but this node does not have that message in cache." << endl);
806
807 }
808 else
809 {
810 // we have it, so remove it after freeing the buffer
811 (*senti).second.freebuf();
812 sentMsgCache.erase(senti);
813 dbgmsg << "Removed message " << mnum << " from send cache; now ";
814 dbgmsg << sentMsgCache.size() << " messages in list." << endl;
815 }
816}
817
818
820// process list of resend requests
822{
823 if (!resendList.empty())
824 {
825 Inform dbgmsg("***Communicate::process_resend_reqs", INFORM_ALL_NODES);
826 dbgmsg << "Clearing " << sentOKList.size() << " and resending ";
827 dbgmsg << resendList.size() << " messages ..." << endl;
828 }
829
830 // clear out OK messages
831 while (!sentOKList.empty())
832 {
833 MsgNum_t mnum = *(sentOKList.begin());
834 sentOKList.erase(sentOKList.begin());
836 }
837
838 // resend a message, if necessary
839 while (!resendList.empty())
840 {
841 MsgNum_t mnum = *(resendList.begin());
842 resendList.erase(resendList.begin());
843 perform_resend(mnum);
844 }
845
846 // inform other nodes that we've received their messages ok
847 while (!informOKList.empty())
848 {
849 int node = (*(informOKList.begin())).first;
850 MsgNum_t mnum = (*(informOKList.begin())).second;
851 informOKList.erase(informOKList.begin());
852 send_ok_message(node, mnum);
853 }
854
855 // request resends from other nodes
856 while (!requestList.empty())
857 {
858 int node = (*(requestList.begin())).first;
859 MsgNum_t mnum = (*(requestList.begin())).second;
860 requestList.erase(requestList.begin());
861 request_retransmission(node, mnum);
862 }
863}
elements
Definition IndexMap.cpp:163
unsigned int CRCTYPE
Definition CRC.h:25
std::ostream & operator<<(std::ostream &o, const Communicate &c)
CRCTYPE crc(void *icp, int icnt)
Definition CRC.cpp:106
#define IPPL_EXIT_TAG
Definition Tags.h:26
#define IPPL_RETRANSMIT_TAG
Definition Tags.h:27
#define IPPL_MSG_OK_TAG
Definition Tags.h:28
#define IPPL_ABORT_TAG
Definition Tags.h:25
#define PInsist(c, m)
Definition PAssert.h:120
Inform & endl(Inform &inf)
Definition Inform.cpp:42
#define ADDIPPLSTAT(stat, amount)
Definition IpplStats.h:237
#define ERRORMSG(msg)
Definition IpplInfo.h:350
#define INFORM_ALL_NODES
Definition Inform.h:39
virtual bool mysend(Message *, int node, int utag, int etag)
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
std::vector< MessageData > recMsgList
static void addwordround(void *&pos, int size)
Message * receive(int &node, int &tag)
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
std::vector< std::pair< int, MsgNum_t > > informOKList
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
std::vector< std::pair< int, MsgNum_t > > requestList
static unsigned int wordround(int size)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
MsgNum_t nextMsgNum
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
void barrier(void)
virtual void mybarrier(void)
int getNodes() const
virtual ~Communicate(void)
virtual int broadcast_all(Message *, int)
std::vector< MsgNum_t > sentOKList
bool add_msg(Message *, int, int)
static void pack(void *packdata, void *&pos, int size)
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
int myNode() const
SentCache_t sentMsgCache
void fill_msg_buffer(void *, Message &, int, int, int)
size_t size() const
Definition Message.h:292
Message & putmsg(void *, int, int=0)
Message & setCopy(const bool c)
Definition Message.h:319
Message & put(const T &val)
Definition Message.h:406
MsgItem & item(size_t n)
Definition Message.h:308
Message & setDelete(const bool c)
Definition Message.h:331
unsigned int numBytes() const
Definition Message.h:218
unsigned int numElems() const
Definition Message.h:222
void * data()
Definition Message.h:244
static bool retransmit()
Definition IpplInfo.h:195
static bool useChecksums()
Definition IpplInfo.h:192
static int myNode()
Definition IpplInfo.cpp:691