56 inline void Foam::PstreamBuffers::setFinished(
bool on)
noexcept 58 finishedSendsCalled_ = on;
62 inline void Foam::PstreamBuffers::initFinalExchange()
72 void Foam::PstreamBuffers::finalExchange
84 case modeOption::DEFAULT :
90 ? modeOption::ALL_TO_ALL
96 case modeOption::GATHER :
102 for (label proci = 1; proci < sendBuffers_.size(); ++proci)
104 sendBuffers_[proci].clear();
109 case modeOption::SCATTER :
135 case modeOption::GATHER :
145 recvSizes.resize_nocopy(nProcs_);
152 case modeOption::SCATTER :
157 recvSizes.resize_nocopy(nProcs_);
161 forAll(sendBuffers_, proci)
163 recvSizes[proci] = sendBuffers_[proci].size();
173 recvSizes[0] = myRecv;
178 case modeOption::NBX_PEX :
181 sendSizes.resize_nocopy(nProcs_);
182 forAll(sendBuffers_, proci)
184 sendSizes[proci] = sendBuffers_[proci].size();
186 recvSizes.resize_nocopy(nProcs_);
199 case modeOption::DEFAULT :
200 case modeOption::ALL_TO_ALL :
203 sendSizes.resize_nocopy(nProcs_);
204 forAll(sendBuffers_, proci)
206 sendSizes[proci] = sendBuffers_[proci].size();
208 recvSizes.resize_nocopy(nProcs_);
218 Pstream::exchange<DynamicList<char>,
char>
231 void Foam::PstreamBuffers::finalExchange
242 <<
" nProcs:" << nProcs_
252 recvSizes.resize_nocopy(nProcs_);
258 for (
const label proci : sendProcs)
260 recvSizes[proci] = 1;
263 for (label proci = 0; proci < nProcs_; ++proci)
265 if (!recvSizes[proci])
267 sendBuffers_[proci].clear();
284 Pstream::exchange<DynamicList<char>,
char>
307 finishedSendsCalled_(
false),
308 allowClearRecv_(true),
310 commsType_(commsType),
313 nProcs_(UPstream::nProcs(comm_)),
314 sendBuffers_(nProcs_),
315 recvBuffers_(nProcs_),
316 recvPositions_(nProcs_,
Foam::zero{})
321 <<
" nProcs:" << nProcs_
333 <<
" nProcs:" << nProcs_
337 forAll(recvBuffers_, proci)
339 const label
pos = recvPositions_[proci];
340 const label len = recvBuffers_[proci].size();
342 if (
pos >= 0 &&
pos < len)
345 <<
"Message from processor " << proci
346 <<
" Only consumed " <<
pos <<
" of " << len <<
" bytes" <<
nl 347 <<
" comm " << comm_ <<
" tag " << tag_ <<
nl 361 return sendBuffers_[proci];
370 return recvBuffers_[proci];
374 Foam::label& Foam::PstreamBuffers::accessRecvPosition(
const label proci)
376 return recvPositions_[proci];
393 for (DynamicList<char>& buf : recvBuffers_)
397 recvPositions_ =
Zero;
411 for (label proci = 0; proci < nProcs_; ++proci)
413 if (recvPositions_[proci] < 0)
415 recvPositions_[proci] = 0;
416 sendBuffers_[proci].clear();
424 sendBuffers_[proci].clear();
425 if (recvPositions_[proci] < 0)
428 recvPositions_[proci] = 0;
435 recvBuffers_[proci].clear();
436 recvPositions_[proci] = 0;
448 for (DynamicList<char>& buf : recvBuffers_)
452 recvPositions_ =
Zero;
462 for (label proci = 0; proci < nProcs_; ++proci)
464 sendBuffers_[proci].
clear();
466 recvPositions_[proci] = -1;
475 if (toggleOn && recvPositions_[proci] < 0)
477 recvPositions_[proci] = 0;
484 for (
const DynamicList<char>& buf : sendBuffers_)
499 forAll(recvBuffers_, proci)
501 if (recvPositions_[proci] < recvBuffers_[proci].size())
521 return sendBuffers_[proci].size();
529 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
554 forAll(recvBuffers_, proci)
556 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
578 const label excludeProci
585 forAll(recvBuffers_, proci)
587 if (excludeProci != proci)
589 label len(recvBuffers_[proci].size() - recvPositions_[proci]);
609 return maxNonLocalRecvCount(-1);
624 const label
pos = recvPositions_[proci];
625 const label len = recvBuffers_[proci].size();
631 const_cast<char*
>(recvBuffers_[proci].cbegin(
pos)),
653 <<
" nProcs:" << nProcs_
657 finalExchange(modeOption::DEFAULT, wait, recvSizes);
666 <<
" nProcs:" << nProcs_
670 finalExchange(modeOption::NBX_PEX, wait, recvSizes);
683 <<
" nProcs:" << nProcs_
689 finalExchange(modeOption::DEFAULT, wait, recvSizes);
694 <<
"Obtaining sizes not supported in " 696 <<
" since transfers already in progress. Use non-blocking instead." 712 finalExchange(neighProcs, neighProcs, wait, recvSizes);
723 finalExchange(neighProcs, neighProcs, wait, recvSizes);
735 bool changed = (sendConnections.
size() != nProcs());
739 sendConnections.
resize(nProcs());
743 forAll(sendBuffers_, proci)
745 if (sendConnections.
set(proci, !sendBuffers_[proci].empty()))
759 finishedSends(recvSizes, wait);
763 forAll(sendBuffers_, proci)
765 if (!sendBuffers_[proci].empty())
775 if (recvSizes[proci] > 0)
785 finalExchange(sendProcs, recvProcs, wait, recvSizes);
795 finalExchange(modeOption::GATHER, wait, recvSizes);
802 finalExchange(modeOption::SCATTER, wait, recvSizes);
812 finalExchange(modeOption::GATHER, wait, recvSizes);
817 <<
"Obtaining sizes not supported in " 819 <<
" since transfers already in progress. Use non-blocking instead." 834 finalExchange(modeOption::SCATTER, wait, recvSizes);
839 <<
"Obtaining sizes not supported in " 841 <<
" since transfers already in progress. Use non-blocking instead." 855 return finishedSendsCalled_;
861 return allowClearRecv_;
867 bool old(allowClearRecv_);
868 allowClearRecv_ = on;
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const int tag=UPstream::msgType(), const int comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
void set(const bitSet &bitset)
Set specified bits from another bitset.
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
errorManipArg< error, int > exit(error &err, const int errNo=1)
commsTypes
Communications types.
static T listScatterValues(const UList< T > &allValues, const int communicator=UPstream::worldComm)
Scatter individual values from list locations.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
constexpr char nl
The newline '\n' character (0x0a)
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
Ostream & endl(Ostream &os)
Add newline and flush stream.
static int myProcNo(label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int communicator=UPstream::worldComm)
Exchange int32_t data with all ranks in communicator.
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static void reduceOr(bool &value, const int communicator=UPstream::worldComm)
Logical (or) reduction (MPI_AllReduce)
UList< label > labelUList
A UList of labels.
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
#define forAll(list, i)
Loop across all elements in list.
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
dimensionedScalar pos(const dimensionedScalar &ds)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clearUnregistered()
Clear any 'unregistered' send buffers.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
registerOptSwitch("pbufs.tuning", int, Foam::PstreamBuffers::algorithm)
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
defineTypeNameAndDebug(combustionModel, 0)
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void push_back(const T &val)
Copy append an element to the end of this list.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), int communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void clear()
Clear all send/recv buffers and reset states.
decomposeUsingBbs false
Use bounding boxes (default) or unique decomposition of triangles (i.e. do not duplicate triangles) ...
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as 'unregistered'. ...
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
void clearSends()
Clear all send buffers (does not remove buffer storage)
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as 'registered'. The setting is sticky (does not turn off) ...
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const int communicator=UPstream::worldComm)
Exchange non-zero int32_t data between ranks [NBX].
streamFormat
Data format (ascii | binary | coherent)
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static bool master(label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
void clearStorage()
Clear storage for all send/recv buffers and reset states.
List< label > labelList
A List of labels.
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
label size() const noexcept
Number of entries.
#define DebugPoutInFunction
Report an information message using Foam::Pout.
static constexpr const zero Zero
Global zero (0)