54 o <<
"Parallel communication method: " << c.name() <<
"\n";
55 o <<
" Total nodes: " << c.getNodes() <<
", Current node: ";
56 o << c.myNode() <<
"\n";
57 o <<
" Queued received messages: ";
58 o << c.getReceived() <<
"\n";
76 ErrorStatus(COMM_NOERROR),
78 Processes(1,
std::vector<int>(1, 1)),
90 (*cachei).second.freebuf();
110 std::vector<MessageData>::iterator qi =
recMsgList.begin();
111 std::vector<MessageData>::iterator qend =
recMsgList.end();
112 for ( ; qi != qend ; ++qi)
175 if ( node < 0 || node >=
getNodes() || tag < 0 )
177 ERRORMSG(
"Communicate: illegal send node " << node <<
endl);
186 retval =
add_msg(msg, node, tag);
238 if (msg == 0 &&
myNode() != node)
240 int checknode = node;
254 add_msg(msg, checknode, checktag);
307 int checknode = node;
321 add_msg(msg, checknode, checktag);
346 "Local message not found in Communicate::receive_block!!");
364 for (i=(
getNodes() - 1); i >= 0; i--)
392 for (i=(
getNodes() - 1); i >= 0; i--)
428 static const unsigned int intsize4 =
wordround(4 *
sizeof(
int));
429 static const unsigned int intsize2 =
wordround(2 *
sizeof(
int));
433 unsigned int buffsize = longsize + intsize4;
437 int nitems = msg.
size();
438 for (
int i=0; i < nitems; ++i)
451 int bufsize,
int node)
454 int nitems = msg.
size();
466 memset(pos, 0, bufsize);
476 pack(mdata, pos, 4*
sizeof(
int));
479 for (
int i=0; i < nitems; ++i)
484 pack(mdata, pos, 2*
sizeof(
int));
486 pack(msgitem.
data(), pos, mdata[1]);
493 int crcsize = bufsize -
sizeof(
CRCTYPE);
497 *(
static_cast<CRCTYPE *
>(pos)) = crcval;
532 int *mdata =
static_cast<int *
>(pos);
535 int nitems = mdata[2];
536 int bufsize = mdata[3];
545 ERRORMSG(
"Stopping due to abort request sent from node " << node <<
endl);
550 ERRORMSG(
"Exiting due to exit request sent from node " << node <<
endl);
571 int crcsize = bufsize -
sizeof(
CRCTYPE);
579 void *origloc =
static_cast<void *
>(
static_cast<char *
>(buffer)+crcsize);
581 if (crcval != origcrc)
583 ERRORMSG(
"Failed CRC check (" << crcval <<
" != " << origcrc);
585 ERRORMSG(
" for message " << mnum <<
" of size " << bufsize);
591 requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
596 PInsist(crcval == origcrc,
"Exiting due to CRC check failure.");
611 for (
int j = 0; j < nitems; j++)
613 int *hdr =
static_cast<int *
>(pos);
615 int bytesize = hdr[1];
636 informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
655 ERRORMSG(
"ERROR: Trying to cache an already-cached message with num = ");
665 char *copybuf =
new char[msgsize];
666 memcpy(copybuf, msgbuf, msgsize);
670 sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
682 dbgmsg <<
"Requesting retransmission of message " << mnum <<
" from node ";
683 dbgmsg << node <<
endl;
706 PInsist(nitems == 1,
"Wrong number of items in retransmit request.");
709 int *hdr =
static_cast<int *
>(pos);
711 "Wrong message info for retransmit message number.");
716 dbgmsg <<
"Received request to resend message " << mnum <<
endl;
728 "Could not find message in local sent cache to retransmit.");
731 int size = (*senti).second.size();
732 int node = (*senti).second.node();
735 ERRORMSG(
"WARNING: Resending message number " << mnum);
736 ERRORMSG(
" of size " << size <<
" from node ");
746 dbgmsg <<
"Informing node " << node <<
" that message " << mnum;
747 dbgmsg <<
" was received ok." <<
endl;
770 PInsist(nitems >= 1,
"Wrong number of items in retransmit request.");
774 for (
int i=0; i < nitems; ++i)
777 int *hdr =
static_cast<int *
>(pos);
779 "Wrong message info for message-ok number.");
787 dbgmsg <<
"Will clear message " << mnum <<
" as OK." <<
endl;
804 ERRORMSG(
"ERROR: Received 'message ok' for message " << mnum);
805 ERRORMSG(
", but this node does not have that message in cache." <<
endl);
811 (*senti).second.freebuf();
813 dbgmsg <<
"Removed message " << mnum <<
" from send cache; now ";
826 dbgmsg <<
"Clearing " <<
sentOKList.size() <<
" and resending ";
std::ostream & operator<<(std::ostream &o, const Communicate &c)
CRCTYPE crc(void *icp, int icnt)
#define IPPL_RETRANSMIT_TAG
Inform & endl(Inform &inf)
#define ADDIPPLSTAT(stat, amount)
virtual bool mysend(Message *, int node, int utag, int etag)
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
std::vector< MessageData > recMsgList
static void addwordround(void *&pos, int size)
Message * receive(int &node, int &tag)
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
std::vector< std::pair< int, MsgNum_t > > informOKList
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
std::vector< std::pair< int, MsgNum_t > > requestList
static unsigned int wordround(int size)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
virtual void mybarrier(void)
virtual ~Communicate(void)
virtual int broadcast_all(Message *, int)
std::vector< MsgNum_t > sentOKList
bool add_msg(Message *, int, int)
static void pack(void *packdata, void *&pos, int size)
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
void fill_msg_buffer(void *, Message &, int, int, int)
Message & putmsg(void *, int, int=0)
Message & setCopy(const bool c)
Message & put(const T &val)
Message & setDelete(const bool c)
unsigned int numBytes() const
unsigned int numElems() const
static bool useChecksums()