51 inline void Foam::PstreamBuffers::setFinished(
bool on)
noexcept 53 finishedSendsCalled_ = on;
57 inline void Foam::PstreamBuffers::initFinalExchange()
67 void Foam::PstreamBuffers::finalExchange
79 case modeOption::DEFAULT :
85 ? modeOption::ALL_TO_ALL
91 case modeOption::GATHER :
97 for (label proci = 1; proci < sendBuffers_.size(); ++proci)
99 sendBuffers_[proci].clear();
104 case modeOption::SCATTER :
130 case modeOption::GATHER :
140 recvSizes.resize_nocopy(nProcs_);
147 case modeOption::SCATTER :
152 recvSizes.resize_nocopy(nProcs_);
156 forAll(sendBuffers_, proci)
158 recvSizes[proci] = sendBuffers_[proci].size();
168 recvSizes[0] = myRecv;
173 case modeOption::NBX_PEX :
176 sendSizes.resize_nocopy(nProcs_);
177 forAll(sendBuffers_, proci)
179 sendSizes[proci] = sendBuffers_[proci].size();
181 recvSizes.resize_nocopy(nProcs_);
194 case modeOption::DEFAULT :
195 case modeOption::ALL_TO_ALL :
198 sendSizes.resize_nocopy(nProcs_);
199 forAll(sendBuffers_, proci)
201 sendSizes[proci] = sendBuffers_[proci].size();
203 recvSizes.resize_nocopy(nProcs_);
213 Pstream::exchange<DynamicList<char>,
char>
226 void Foam::PstreamBuffers::finalExchange
241 recvSizes.resize_nocopy(nProcs_);
247 for (
const label proci : sendProcs)
249 recvSizes[proci] = 1;
252 for (label proci = 0; proci < nProcs_; ++proci)
254 if (!recvSizes[proci])
256 sendBuffers_[proci].clear();
273 Pstream::exchange<DynamicList<char>,
char>
296 finishedSendsCalled_(false),
297 allowClearRecv_(true),
299 commsType_(commsType),
302 nProcs_(UPstream::nProcs(comm_)),
303 sendBuffers_(nProcs_),
304 recvBuffers_(nProcs_),
305 recvPositions_(nProcs_,
Zero)
314 forAll(recvBuffers_, proci)
316 const label
pos = recvPositions_[proci];
317 const label len = recvBuffers_[proci].size();
319 if (
pos >= 0 &&
pos < len)
322 <<
"Message from processor " << proci
323 <<
" Only consumed " <<
pos <<
" of " << len <<
" bytes" <<
nl 337 return sendBuffers_[proci];
346 return recvBuffers_[proci];
350 Foam::label& Foam::PstreamBuffers::accessRecvPosition(
const label proci)
352 return recvPositions_[proci];
369 for (DynamicList<char>& buf : recvBuffers_)
373 recvPositions_ =
Zero;
387 for (label proci = 0; proci < nProcs_; ++proci)
389 if (recvPositions_[proci] < 0)
391 recvPositions_[proci] = 0;
392 sendBuffers_[proci].clear();
400 sendBuffers_[proci].clear();
401 if (recvPositions_[proci] < 0)
404 recvPositions_[proci] = 0;
411 recvBuffers_[proci].clear();
412 recvPositions_[proci] = 0;
424 for (DynamicList<char>& buf : recvBuffers_)
428 recvPositions_ =
Zero;
438 for (label proci = 0; proci < nProcs_; ++proci)
440 sendBuffers_[proci].
clear();
442 recvPositions_[proci] = -1;
451 if (toggleOn && recvPositions_[proci] < 0)
453 recvPositions_[proci] = 0;
460 for (
const DynamicList<char>& buf : sendBuffers_)
475 forAll(recvBuffers_, proci)
477 if (recvPositions_[proci] < recvBuffers_[proci].size())
497 return sendBuffers_[proci].size();
505 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
530 forAll(recvBuffers_, proci)
532 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
554 const label excludeProci
561 forAll(recvBuffers_, proci)
563 if (excludeProci != proci)
565 label len(recvBuffers_[proci].size() - recvPositions_[proci]);
585 return maxNonLocalRecvCount(-1);
600 const label
pos = recvPositions_[proci];
601 const label len = recvBuffers_[proci].size();
607 const_cast<char*
>(recvBuffers_[proci].cbegin(
pos)),
627 finalExchange(modeOption::DEFAULT, wait, recvSizes);
634 finalExchange(modeOption::NBX_PEX, wait, recvSizes);
647 finalExchange(modeOption::DEFAULT, wait, recvSizes);
652 <<
"Obtaining sizes not supported in " 654 <<
" since transfers already in progress. Use non-blocking instead." 670 finalExchange(neighProcs, neighProcs, wait, recvSizes);
681 finalExchange(neighProcs, neighProcs, wait, recvSizes);
693 bool changed = (sendConnections.
size() != nProcs());
697 sendConnections.
resize(nProcs());
701 forAll(sendBuffers_, proci)
703 if (sendConnections.
set(proci, !sendBuffers_[proci].empty()))
717 finishedSends(recvSizes, wait);
721 forAll(sendBuffers_, proci)
723 if (!sendBuffers_[proci].empty())
733 if (recvSizes[proci] > 0)
743 finalExchange(sendProcs, recvProcs, wait, recvSizes);
753 finalExchange(modeOption::GATHER, wait, recvSizes);
760 finalExchange(modeOption::SCATTER, wait, recvSizes);
770 finalExchange(modeOption::GATHER, wait, recvSizes);
775 <<
"Obtaining sizes not supported in " 777 <<
" since transfers already in progress. Use non-blocking instead." 792 finalExchange(modeOption::SCATTER, wait, recvSizes);
797 <<
"Obtaining sizes not supported in " 799 <<
" since transfers already in progress. Use non-blocking instead." 813 return finishedSendsCalled_;
819 return allowClearRecv_;
825 bool old(allowClearRecv_);
826 allowClearRecv_ = on;
void set(const bitSet &bitset)
Set specified bits from another bitset.
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
errorManipArg< error, int > exit(error &err, const int errNo=1)
commsTypes
Communications types.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
constexpr char nl
The newline '\n' character (0x0a)
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static void reduceOr(bool &value, const label communicator=worldComm)
Logical (or) reduction (MPI_AllReduce)
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero int32_t data between ranks [NBX].
UList< label > labelUList
A UList of labels.
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
#define forAll(list, i)
Loop across all elements in list.
dimensionedScalar pos(const dimensionedScalar &ds)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clearUnregistered()
Clear any 'unregistered' send buffers.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
registerOptSwitch("pbufs.tuning", int, Foam::PstreamBuffers::algorithm)
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange int32_t data with all ranks in communicator.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void push_back(const T &val)
Copy append an element to the end of this list.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
void clear()
Clear all send/recv buffers and reset states.
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as 'unregistered'. ...
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as 'registered'. The setting is sticky (does not turn off) ...
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
streamFormat
Data format (ascii | binary)
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
void clearStorage()
Clear storage for all send/recv buffers and reset states.
List< label > labelList
A List of labels.
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
label size() const noexcept
Number of entries.
static constexpr const zero Zero
Global zero (0)