34 void Foam::PstreamBuffers::finalExchange
42 finishedSendsCalled_ =
true;
49 Pstream::exchange<DynamicList<char>,
char>
62 void Foam::PstreamBuffers::finalExchange
72 finishedSendsCalled_ =
true;
86 Pstream::exchange<DynamicList<char>,
char>
99 void Foam::PstreamBuffers::finalExchangeGatherScatter
107 finishedSendsCalled_ =
true;
121 recvSizes.resize_nocopy(recvBuf_.size());
129 recvSizes.resize_nocopy(sendBuf_.size());
135 recvSizes[proci] = sendBuf_[proci].size();
142 recvSizes[0] = myRecv;
146 Pstream::exchange<DynamicList<char>,
char>
169 finishedSendsCalled_(false),
170 allowClearRecv_(true),
172 commsType_(commsType),
189 finishedSendsCalled_(false),
190 allowClearRecv_(true),
192 commsType_(commsType),
206 forAll(recvBufPos_, proci)
208 if (recvBufPos_[proci] < recvBuf_[proci].size())
211 <<
"Message from processor " << proci
212 <<
" Only consumed " << recvBufPos_[proci] <<
" of " 213 << recvBuf_[proci].size() <<
" bytes" <<
nl 224 for (DynamicList<char>& buf : sendBuf_)
228 for (DynamicList<char>& buf : recvBuf_)
234 finishedSendsCalled_ =
false;
240 recvBuf_[proci].clear();
241 recvBufPos_[proci] = 0;
253 for (DynamicList<char>& buf : recvBuf_)
259 finishedSendsCalled_ =
false;
265 for (
const DynamicList<char>& buf : sendBuf_)
278 if (finishedSendsCalled_)
280 forAll(recvBufPos_, proci)
282 if (recvBuf_[proci].size() > recvBufPos_[proci])
302 return sendBuf_[proci].
size();
308 if (finishedSendsCalled_)
310 const label len(recvBuf_[proci].size() > recvBufPos_[proci]);
333 if (finishedSendsCalled_)
335 forAll(recvBufPos_, proci)
337 const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
360 if (finishedSendsCalled_)
362 const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
368 const_cast<char*
>(&recvBuf_[proci][recvBufPos_[proci]]),
387 bool old(allowClearRecv_);
388 allowClearRecv_ = on;
396 finalExchange(recvSizes, wait);
406 finalExchange(recvSizes, wait);
411 <<
"Obtaining sizes not supported in " 413 <<
" since transfers already in progress. Use non-blocking instead." 430 finalExchange(sendProcs, recvProcs, recvSizes, wait);
442 finalExchange(sendProcs, recvProcs, recvSizes, wait);
447 <<
"Obtaining sizes not supported in " 449 <<
" since transfers already in progress. Use non-blocking instead." 460 bitSet& sendConnections,
461 DynamicList<label>& sendProcs,
462 DynamicList<label>& recvProcs,
466 bool changed = (sendConnections.size() != nProcs());
470 sendConnections.resize(nProcs());
478 if (sendConnections.set(proci, !sendBuf_[proci].empty()))
496 if (!sendBuf_[proci].empty())
498 sendProcs.append(proci);
509 if (!recvBuf_[proci].empty())
511 recvProcs.append(proci);
519 finishedSends(sendProcs, recvProcs, wait);
528 finalExchangeGatherScatter(
true, wait);
534 finalExchangeGatherScatter(
false, wait);
544 finalExchangeGatherScatter(
true, wait);
549 <<
"Obtaining sizes not supported in " 551 <<
" since transfers already in progress. Use non-blocking instead." 561 recvSizes = recvDataCounts();
571 finalExchangeGatherScatter(
false, wait);
576 <<
"Obtaining sizes not supported in " 578 <<
" since transfers already in progress. Use non-blocking instead." 588 recvSizes = recvDataCounts();
void size(const label n)
Older name for setAddressableSize.
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
errorManipArg< error, int > exit(error &err, const int errNo=1)
commsTypes
Types of communications.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
constexpr char nl
The newline '\n' character (0x0a)
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
Ostream & endl(Ostream &os)
Add newline and flush stream.
PstreamBuffers(const UPstream::commsTypes commsType, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given comms type, message tag, communicator, IO format.
static void reduceOr(bool &value, const label communicator=worldComm)
Logical (or) reduction (cf. MPI AllReduce)
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
UList< label > labelUList
A UList of labels.
#define forAll(list, i)
Loop across all elements in list.
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark sends as done.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
void clear()
Clear individual buffers and reset states.
static bool master(const label communicator=worldComm)
Am I the master rank.
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendData, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendData for specified set of send/receive processes.
void clearStorage()
Clear individual buffer storage and reset states.
List< label > labelList
A List of labels.
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
Inter-processor communications stream.
static constexpr const zero Zero
Global zero (0)