58 void Foam::PstreamBuffers::finalExchange
66 finishedSendsCalled_ =
true;
80 PstreamDetail::exchangeConsensus<DynamicList<char>,
char>
98 forAll(sendBuffers_, proci)
100 sendSizes[proci] = sendBuffers_[proci].size();
102 recvSizes.resize_nocopy(nProcs_);
122 Pstream::exchange<DynamicList<char>,
char>
135 void Foam::PstreamBuffers::finalExchange
145 finishedSendsCalled_ =
true;
152 recvSizes.resize_nocopy(nProcs_);
158 for (
const label proci : sendProcs)
160 recvSizes[proci] = 1;
163 for (label proci=0; proci < nProcs_; ++proci)
165 if (!recvSizes[proci])
167 sendBuffers_[proci].clear();
184 Pstream::exchange<DynamicList<char>,
char>
197 void Foam::PstreamBuffers::finalGatherScatter
206 finishedSendsCalled_ =
true;
214 for (label proci=1; proci < sendBuffers_.size(); ++proci)
216 sendBuffers_[proci].clear();
248 recvSizes.resize_nocopy(nProcs_);
256 recvSizes.resize_nocopy(nProcs_);
260 forAll(sendBuffers_, proci)
262 recvSizes[proci] = sendBuffers_[proci].size();
269 recvSizes[0] = myRecv;
273 Pstream::exchange<DynamicList<char>,
char>
296 finishedSendsCalled_(false),
297 allowClearRecv_(true),
299 commsType_(commsType),
302 nProcs_(UPstream::nProcs(comm_)),
303 sendBuffers_(nProcs_),
304 recvBuffers_(nProcs_),
305 recvPositions_(nProcs_,
Zero)
314 forAll(recvBuffers_, proci)
316 const label
pos = recvPositions_[proci];
317 const label len = recvBuffers_[proci].size();
322 <<
"Message from processor " << proci
323 <<
" Only consumed " <<
pos <<
" of " << len <<
" bytes" <<
nl 337 return sendBuffers_[proci];
346 return recvBuffers_[proci];
350 Foam::label& Foam::PstreamBuffers::accessRecvPosition(
const label proci)
352 return recvPositions_[proci];
369 for (DynamicList<char>& buf : recvBuffers_)
373 recvPositions_ =
Zero;
381 finishedSendsCalled_ =
false;
387 sendBuffers_[proci].clear();
393 recvBuffers_[proci].clear();
394 recvPositions_[proci] = 0;
406 for (DynamicList<char>& buf : recvBuffers_)
410 recvPositions_ =
Zero;
412 finishedSendsCalled_ =
false;
418 for (
const DynamicList<char>& buf : sendBuffers_)
431 if (finishedSendsCalled_)
433 forAll(recvBuffers_, proci)
435 if (recvPositions_[proci] < recvBuffers_[proci].size())
455 return sendBuffers_[proci].
size();
461 if (finishedSendsCalled_)
463 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
486 if (finishedSendsCalled_)
488 forAll(recvBuffers_, proci)
490 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
512 const label excludeProci
517 if (finishedSendsCalled_)
519 forAll(recvBuffers_, proci)
521 if (excludeProci != proci)
523 label len(recvBuffers_[proci].size() - recvPositions_[proci]);
524 maxLen =
max(maxLen, len);
543 return maxNonLocalRecvCount(-1);
556 if (finishedSendsCalled_)
558 const label
pos = recvPositions_[proci];
559 const label len = recvBuffers_[proci].size();
565 const_cast<char*
>(recvBuffers_[proci].cbegin(
pos)),
584 bool old(allowClearRecv_);
585 allowClearRecv_ = on;
593 finalExchange(wait, recvSizes);
606 finalExchange(wait, recvSizes);
611 <<
"Obtaining sizes not supported in " 613 <<
" since transfers already in progress. Use non-blocking instead." 629 finalExchange(neighProcs, neighProcs, wait, recvSizes);
640 finalExchange(neighProcs, neighProcs, wait, recvSizes);
652 bool changed = (sendConnections.
size() != nProcs());
656 sendConnections.
resize(nProcs());
660 forAll(sendBuffers_, proci)
662 if (sendConnections.
set(proci, !sendBuffers_[proci].empty()))
675 finishedSends(recvSizes, wait);
679 forAll(sendBuffers_, proci)
681 if (!sendBuffers_[proci].empty())
691 if (recvSizes[proci] > 0)
701 finalExchange(sendProcs, recvProcs, wait, recvSizes);
711 finalGatherScatter(
true, wait, recvSizes);
718 finalGatherScatter(
false, wait, recvSizes);
728 finalGatherScatter(
true, wait, recvSizes);
733 <<
"Obtaining sizes not supported in " 735 <<
" since transfers already in progress. Use non-blocking instead." 750 finalGatherScatter(
false, wait, recvSizes);
755 <<
"Obtaining sizes not supported in " 757 <<
" since transfers already in progress. Use non-blocking instead." void size(const label n)
Older name for setAddressableSize.
void set(const bitSet &bitset)
Set specified bits from another bitset.
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
errorManipArg< error, int > exit(error &err, const int errNo=1)
commsTypes
Communications types.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
constexpr char nl
The newline '\n' character (0x0a)
static int maxCommsSize
Optional maximum message size (bytes)
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static void reduceOr(bool &value, const label communicator=worldComm)
Logical (or) reduction (MPI_AllReduce)
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero integer data with all ranks in the communicator using non-blocking consensus exchan...
UList< label > labelUList
A UList of labels.
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
#define forAll(list, i)
Loop across all elements in list.
static constexpr int algorithm_PEX_allToAll
dimensionedScalar pos(const dimensionedScalar &ds)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark sends as done.
static constexpr int algorithm_full_NBX
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
registerOptSwitch("pbufs.tuning", int, Foam::PstreamBuffers::algorithm)
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange integer data with all processors (in the communicator).
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark sends as done using subset of send/recv ranks and recover the sizes (bytes) received.
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void push_back(const T &val)
Copy append an element to the end of this list.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void clear()
Clear all send/recv buffers and reset states.
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
void clearStorage()
Clear storage for all send/recv buffers and reset states.
List< label > labelList
A List of labels.
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
label size() const noexcept
Number of entries.
static constexpr const zero Zero
Global zero (0)