PstreamBuffers Class Reference

Buffers for inter-processor communications streams (UOPstream, UIPstream). More...

Public Member Functions

 PstreamBuffers (UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
 Construct given communication type (default: nonBlocking), message tag, communicator (default: worldComm), IO format (default: binary) More...
 
 PstreamBuffers (label communicator, UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
 Construct given communicator, communication type (default: nonBlocking), message tag, IO format (default: binary) More...
 
 PstreamBuffers (label communicator, int tag, UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
 Construct given communicator, message tag, communication type (default: nonBlocking), IO format (default: binary) More...
 
 ~PstreamBuffers ()
 Destructor - checks that all data have been consumed. More...
 
IOstreamOption::streamFormat format () const noexcept
 The associated buffer format (ascii | binary) More...
 
UPstream::commsTypes commsType () const noexcept
 The communications type of the stream. More...
 
int tag () const noexcept
 The transfer message tag. More...
 
label comm () const noexcept
 The communicator index. More...
 
label nProcs () const noexcept
 Number of ranks associated with PstreamBuffers. More...
 
UPstream::rangeType allProcs () const noexcept
 Range of ranks indices associated with PstreamBuffers. More...
 
UPstream::rangeType subProcs () const noexcept
 Range of sub-processes indices associated with PstreamBuffers. More...
 
bool finished () const noexcept
 True if finishedSends() or finishedNeighbourSends() has been called. More...
 
bool allowClearRecv () const noexcept
 Is clearStorage of individual receive buffer by external hooks allowed? (default: true) More...
 
bool hasSendData () const
 True if any (local) send buffers have data. More...
 
bool hasRecvData () const
 True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished.. method first! More...
 
label sendDataCount (const label proci) const
 Number of send bytes for the specified processor. More...
 
label recvDataCount (const label proci) const
 Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other finished.. method first! More...
 
labelList recvDataCounts () const
 Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished.. method first! More...
 
label maxRecvCount () const
 Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first! More...
 
label maxNonLocalRecvCount () const
 Maximum receive size, excluding current processor rank Must call finishedSends() or other finished.. method first! More...
 
label maxNonLocalRecvCount (const label excludeProci) const
 Maximum receive size, excluding the specified processor rank Must call finishedSends() or other finished.. method first! More...
 
const UList< char > peekRecvData (const label proci) const
 Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other finished.. method first! More...
 
void clear ()
 Clear all send/recv buffers and reset states. More...
 
void clearSends ()
 Clear all send buffers (does not remove buffer storage) More...
 
void clearRecvs ()
 Clear all recv buffer and positions (does not remove buffer storage) More...
 
void clearSend (const label proci)
 Clear an individual send buffer (eg, data not required) More...
 
void clearRecv (const label proci)
 Clear an individual receive buffer (eg, data not required) More...
 
void clearStorage ()
 Clear storage for all send/recv buffers and reset states. More...
 
bool allowClearRecv (bool on) noexcept
 Change allowClearRecv, return previous value. More...
 
void initRegisterSend ()
 Initialise registerSend() bookkeeping by mark all send buffers as 'unregistered'. More...
 
void registerSend (const label proci, const bool toggleOn=true)
 Toggle an individual send buffer as 'registered'. The setting is sticky (does not turn off) More...
 
void clearUnregistered ()
 Clear any 'unregistered' send buffers. More...
 
void finishedSends (const bool wait=true)
 Mark the send phase as being finished. More...
 
void finishedSendsNBX (const bool wait=true)
 Mark the send phase as being finished. More...
 
void finishedSends (labelList &recvSizes, const bool wait=true)
 Mark the send phase as being finished. Recovers the sizes (bytes) received. More...
 
void finishedSendsNBX (labelList &recvSizes, const bool wait=true)
 Mark the send phase as being finished. Recovers the sizes (bytes) received. More...
 
void finishedNeighbourSends (const labelUList &neighProcs, const bool wait=true)
 Mark the send phase as being finished, with communication being limited to a known subset of send/recv ranks. More...
 
void finishedNeighbourSends (const labelUList &neighProcs, labelList &recvSizes, const bool wait=true)
 Mark the send phase as being finished, with communication being limited to a known subset of send/recv ranks. Recovers the sizes (bytes) received. More...
 
bool finishedSends (bitSet &sendConnections, DynamicList< label > &sendProcs, DynamicList< label > &recvProcs, const bool wait=true)
 A caching version that uses a limited send/recv connectivity. More...
 
void finishedGathers (const bool wait=true)
 Mark all sends to master as done. More...
 
void finishedGathers (labelList &recvSizes, const bool wait=true)
 Mark all sends to master as done. Recovers the sizes (bytes) received. More...
 
void finishedScatters (const bool wait=true)
 Mark all sends to sub-procs as done. More...
 
void finishedScatters (labelList &recvSizes, const bool wait=true)
 Mark all sends to sub-procs as done. Recovers the sizes (bytes) received. More...
 

Static Public Attributes

static int algorithm
 Preferred exchange algorithm (may change or be removed in future) More...
 

Friends

class UOPstreamBase
 
class UIPstreamBase
 

Detailed Description

Buffers for inter-processor communications streams (UOPstream, UIPstream).

Use UOPstream to stream data into buffers, call finishedSends() to notify that data is in buffers and then use IUPstream to get data out of received buffers. Works with both blocking and non-blocking. Does not make much sense with scheduled since there you would not need these explicit buffers.

Example usage:

for (const int proci : UPstream::allProcs())
{
if (proci != UPstream::myProcNo())
{
someObject vals;
UOPstream send(proci, pBufs);
send << vals;
}
}
pBufs.finishedSends(); // no-op for blocking
for (const int proci : UPstream::allProcs())
{
if (proci != UPstream::myProcNo())
{
UIPstream recv(proci, pBufs);
someObject vals(recv);
}
}

There are special versions of finishedSends() for restricted neighbour communication as well as for special one-to-all and all-to-one communication patterns. For example,

{
someObject vals;
for (const int proci : UPstream::subProcs())
{
UOPstream send(proci, pBufs);
send << vals;
}
}
pBufs.finishedScatters();
{
UIPstream recv(UPstream::masterNo(), pBufs);
someObject vals(recv);
}

Additionally there are some situations that use speculative sends that may not actually be required. In this case, it is possible to mark all sends as initially unregistered and subsequently mark the "real" sends as registered.

For example,

pBufs.initRegisterSend();
for (const polyPatch& pp : patches)
{
const auto* ppp = isA<processorPolyPatch>(pp);
if (ppp)
{
const label nbrProci = ppp->neighbProcNo();
// Gather some patch information...
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
// The send is needed if patchInfo is non-empty
pBufs.registerSend(nbrProci, !patchInfo.empty());
}
}
// optional: pBufs.clearUnregistered();
pBufs.finishedSends();
...
Source files

Definition at line 147 of file PstreamBuffers.H.

Constructor & Destructor Documentation

◆ PstreamBuffers() [1/3]

Construct given communication type (default: nonBlocking), message tag, communicator (default: worldComm), IO format (default: binary)

Definition at line 282 of file PstreamBuffers.C.

◆ PstreamBuffers() [2/3]

Construct given communicator, communication type (default: nonBlocking), message tag, IO format (default: binary)

Definition at line 312 of file PstreamBuffers.H.

◆ PstreamBuffers() [3/3]

Construct given communicator, message tag, communication type (default: nonBlocking), IO format (default: binary)

Definition at line 327 of file PstreamBuffers.H.

◆ ~PstreamBuffers()

Destructor - checks that all data have been consumed.

Definition at line 304 of file PstreamBuffers.C.

References Foam::abort(), Foam::FatalError, FatalErrorInFunction, forAll, Foam::nl, and Foam::pos().

Here is the call graph for this function:

Member Function Documentation

◆ format()

IOstreamOption::streamFormat format ( ) const
inlinenoexcept

The associated buffer format (ascii | binary)

Definition at line 351 of file PstreamBuffers.H.

◆ commsType()

UPstream::commsTypes commsType ( ) const
inlinenoexcept

◆ tag()

int tag ( ) const
inlinenoexcept

The transfer message tag.

Definition at line 367 of file PstreamBuffers.H.

◆ comm()

label comm ( ) const
inlinenoexcept

The communicator index.

Definition at line 375 of file PstreamBuffers.H.

◆ nProcs()

label nProcs ( ) const
inlinenoexcept

Number of ranks associated with PstreamBuffers.

Definition at line 383 of file PstreamBuffers.H.

Referenced by processorPolyPatch::initGeometry(), processorFaPatch::initGeometry(), processorPolyPatch::initUpdateMesh(), and processorFaPatch::initUpdateMesh().

Here is the caller graph for this function:

◆ allProcs()

UPstream::rangeType allProcs ( ) const
inlinenoexcept

Range of ranks indices associated with PstreamBuffers.

Definition at line 394 of file PstreamBuffers.H.

Referenced by distributedTriSurfaceMesh::distribute(), globalIndex::get(), RecycleInteraction< CloudType >::postEvolve(), and syncObjects::sync().

Here is the caller graph for this function:

◆ subProcs()

UPstream::rangeType subProcs ( ) const
inlinenoexcept

Range of sub-processes indices associated with PstreamBuffers.

Definition at line 403 of file PstreamBuffers.H.

Referenced by masterUncollatedFileOperation::readHeader().

Here is the caller graph for this function:

◆ finished()

bool finished ( ) const
noexcept

True if finishedSends() or finishedNeighbourSends() has been called.

Definition at line 804 of file PstreamBuffers.C.

Referenced by UIPstreamBase::UIPstreamBase().

Here is the caller graph for this function:

◆ allowClearRecv() [1/2]

bool allowClearRecv ( ) const
noexcept

Is clearStorage of individual receive buffer by external hooks allowed? (default: true)

Definition at line 810 of file PstreamBuffers.C.

Referenced by Cloud< passivePositionParticle >::move(), and zoneDistribute::zoneDistribute().

Here is the caller graph for this function:

◆ hasSendData()

bool hasSendData ( ) const

True if any (local) send buffers have data.

Definition at line 451 of file PstreamBuffers.C.

◆ hasRecvData()

bool hasRecvData ( ) const

True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished.. method first!

Definition at line 464 of file PstreamBuffers.C.

References Foam::exit(), Foam::FatalError, FatalErrorInFunction, and forAll.

Referenced by Cloud< passivePositionParticle >::move().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ sendDataCount()

Foam::label sendDataCount ( const label  proci) const

Number of send bytes for the specified processor.

Definition at line 488 of file PstreamBuffers.C.

◆ recvDataCount()

Foam::label recvDataCount ( const label  proci) const

Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other finished.. method first!

Definition at line 494 of file PstreamBuffers.C.

References Foam::exit(), Foam::FatalError, and FatalErrorInFunction.

Referenced by distributedTriSurfaceMesh::distribute(), globalIndex::get(), Cloud< passivePositionParticle >::move(), masterUncollatedFileOperation::NewIFstream(), RecycleInteraction< CloudType >::postEvolve(), masterUncollatedFileOperation::read(), and box::setRefineFlags().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ recvDataCounts()

Foam::labelList recvDataCounts ( ) const

Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished.. method first!

Definition at line 517 of file PstreamBuffers.C.

References Foam::exit(), Foam::FatalError, FatalErrorInFunction, forAll, and Foam::Zero.

Referenced by fvMeshDistribute::distribute().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ maxRecvCount()

Foam::label maxRecvCount ( ) const

Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!

Definition at line 575 of file PstreamBuffers.C.

◆ maxNonLocalRecvCount() [1/2]

Foam::label maxNonLocalRecvCount ( ) const

Maximum receive size, excluding current processor rank Must call finishedSends() or other finished.. method first!

Definition at line 582 of file PstreamBuffers.C.

References UPstream::myProcNo().

Here is the call graph for this function:

◆ maxNonLocalRecvCount() [2/2]

Foam::label maxNonLocalRecvCount ( const label  excludeProci) const

Maximum receive size, excluding the specified processor rank Must call finishedSends() or other finished.. method first!

Definition at line 546 of file PstreamBuffers.C.

References Foam::exit(), Foam::FatalError, FatalErrorInFunction, forAll, and Foam::max().

Here is the call graph for this function:

◆ peekRecvData()

const Foam::UList< char > peekRecvData ( const label  proci) const

Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other finished.. method first!

The method is only useful in limited situations, such as when PstreamBuffers has been used to fill contiguous data (eg, using OPstream::write).

Definition at line 589 of file PstreamBuffers.C.

References Foam::exit(), Foam::FatalError, FatalErrorInFunction, and Foam::pos().

Here is the call graph for this function:

◆ clear()

void clear ( )

Clear all send/recv buffers and reset states.

Does not remove the buffer storage.

Definition at line 370 of file PstreamBuffers.C.

Referenced by fvMeshDistribute::distribute(), trackingInverseDistance::markDonors(), inverseDistance::markDonors(), trackingInverseDistance::markPatchesAsHoles(), inverseDistance::markPatchesAsHoles(), and Cloud< passivePositionParticle >::move().

Here is the caller graph for this function:

◆ clearSends()

void clearSends ( )

Clear all send buffers (does not remove buffer storage)

Definition at line 351 of file PstreamBuffers.C.

◆ clearRecvs()

void clearRecvs ( )

Clear all recv buffer and positions (does not remove buffer storage)

Definition at line 360 of file PstreamBuffers.C.

References Foam::Zero.

◆ clearSend()

void clearSend ( const label  proci)

Clear an individual send buffer (eg, data not required)

Definition at line 391 of file PstreamBuffers.C.

◆ clearRecv()

void clearRecv ( const label  proci)

Clear an individual receive buffer (eg, data not required)

Does not remove the buffer storage.

Definition at line 402 of file PstreamBuffers.C.

◆ clearStorage()

void clearStorage ( )

Clear storage for all send/recv buffers and reset states.

Definition at line 409 of file PstreamBuffers.C.

References Foam::Zero.

◆ allowClearRecv() [2/2]

bool allowClearRecv ( bool  on)
noexcept

Change allowClearRecv, return previous value.

Definition at line 816 of file PstreamBuffers.C.

◆ initRegisterSend()

void initRegisterSend ( )

Initialise registerSend() bookkeeping by mark all send buffers as 'unregistered'.

Usually called immediately after construction or clear().

Definition at line 427 of file PstreamBuffers.C.

◆ registerSend()

void registerSend ( const label  proci,
const bool  toggleOn = true 
)

Toggle an individual send buffer as 'registered'. The setting is sticky (does not turn off)

Definition at line 441 of file PstreamBuffers.C.

◆ clearUnregistered()

void clearUnregistered ( )

Clear any 'unregistered' send buffers.

Definition at line 378 of file PstreamBuffers.C.

◆ finishedSends() [1/3]

◆ finishedSendsNBX() [1/2]

void finishedSendsNBX ( const bool  wait = true)

Mark the send phase as being finished.

Non-blocking mode: populates receive buffers using NBX.

Parameters
waitwait for requests to complete (in non-blocking mode)

Definition at line 624 of file PstreamBuffers.C.

◆ finishedSends() [2/3]

void finishedSends ( labelList recvSizes,
const bool  wait = true 
)

Mark the send phase as being finished. Recovers the sizes (bytes) received.

Non-blocking mode: populates receive buffers using all-to-all or NBX (depending on tuning parameters).

Warning
currently only valid for non-blocking comms.
Parameters
[out]recvSizesthe sizes (bytes) received
waitwait for requests to complete (in non-blocking mode)

Definition at line 632 of file PstreamBuffers.C.

References UPstream::commsTypeNames, Foam::endl(), Foam::exit(), Foam::FatalError, FatalErrorInFunction, UPstream::nonBlocking, and List< T >::resize_nocopy().

Here is the call graph for this function:

◆ finishedSendsNBX() [2/2]

void finishedSendsNBX ( labelList recvSizes,
const bool  wait = true 
)

Mark the send phase as being finished. Recovers the sizes (bytes) received.

Non-blocking mode: populates receive buffers using NBX.

Warning
currently only valid for non-blocking comms.
Parameters
[out]recvSizesthe sizes (bytes) received
waitwait for requests to complete (in non-blocking mode)

◆ finishedNeighbourSends() [1/2]

void finishedNeighbourSends ( const labelUList neighProcs,
const bool  wait = true 
)

Mark the send phase as being finished, with communication being limited to a known subset of send/recv ranks.

Non-blocking mode: populates receive buffers.

Warning
currently only valid for non-blocking comms.
Note
Same as finishedSends with identical sendProcs/recvProcs
Parameters
neighProcsranks used for sends/recvs
waitwait for requests to complete (in non-blocking mode)

Definition at line 668 of file PstreamBuffers.C.

Referenced by Cloud< passivePositionParticle >::move(), and syncTools::syncBoundaryFaceList().

Here is the caller graph for this function:

◆ finishedNeighbourSends() [2/2]

void finishedNeighbourSends ( const labelUList neighProcs,
labelList recvSizes,
const bool  wait = true 
)

Mark the send phase as being finished, with communication being limited to a known subset of send/recv ranks. Recovers the sizes (bytes) received.

Non-blocking mode: it will populate receive buffers.

Warning
currently only valid for non-blocking mode.
Parameters
neighProcsranks used for sends/recvs
[out]recvSizesthe sizes (bytes) received
waitwait for requests to complete (in non-blocking mode)

Definition at line 657 of file PstreamBuffers.C.

◆ finishedSends() [3/3]

bool finishedSends ( bitSet sendConnections,
DynamicList< label > &  sendProcs,
DynamicList< label > &  recvProcs,
const bool  wait = true 
)

A caching version that uses a limited send/recv connectivity.

Non-blocking mode: populates receive buffers.

Returns
True if the send/recv connectivity changed
Warning
currently only valid for non-blocking comms.
Parameters
sendConnectionsinter-rank connections (on/off) for sending ranks
sendProcsranks used for sends
recvProcsranks used for recvs
waitwait for requests to complete (in non-blocking mode)

Definition at line 679 of file PstreamBuffers.C.

References DynamicList< T, SizeMin >::clear(), forAll, DynamicList< T, SizeMin >::push_back(), UPstream::reduceOr(), PackedList< Width >::resize(), bitSet::set(), and PackedList< Width >::size().

Here is the call graph for this function:

◆ finishedGathers() [1/2]

void finishedGathers ( const bool  wait = true)

Mark all sends to master as done.

Non-blocking mode: populates receive buffers. Can use recvDataCount, maxRecvCount etc to recover sizes received.

Parameters
waitwait for requests to complete (in non-blocking mode)
Warning
currently only valid for non-blocking comms.

Definition at line 743 of file PstreamBuffers.C.

Referenced by lduPrimitiveMesh::gather(), and energySpectrum::write().

Here is the caller graph for this function:

◆ finishedGathers() [2/2]

void finishedGathers ( labelList recvSizes,
const bool  wait = true 
)

Mark all sends to master as done. Recovers the sizes (bytes) received.

Non-blocking mode: populates receive buffers (all-to-one).

Warning
currently only valid for non-blocking comms.
Parameters
[out]recvSizesthe sizes (bytes) received
waitwait for requests to complete (in non-blocking mode)

Definition at line 758 of file PstreamBuffers.C.

References UPstream::commsTypeNames, Foam::endl(), Foam::exit(), Foam::FatalError, FatalErrorInFunction, and UPstream::nonBlocking.

Here is the call graph for this function:

◆ finishedScatters() [1/2]

void finishedScatters ( const bool  wait = true)

Mark all sends to sub-procs as done.

Non-blocking mode: populates receive buffers. Can use recvDataCount, maxRecvCount etc to recover sizes received.

Parameters
waitwait for requests to complete (in non-blocking mode)
Warning
currently only valid for non-blocking comms.

Definition at line 750 of file PstreamBuffers.C.

Referenced by masterUncollatedFileOperation::read(), decomposedBlockData::readBlocks(), masterUncollatedFileOperation::readHeader(), and masterUncollatedFileOperation::scatterList().

Here is the caller graph for this function:

◆ finishedScatters() [2/2]

void finishedScatters ( labelList recvSizes,
const bool  wait = true 
)

Mark all sends to sub-procs as done. Recovers the sizes (bytes) received.

Non-blocking mode: populates receive buffers (all-to-one).

Warning
currently only valid for non-blocking comms.
Parameters
[out]recvSizesthe sizes (bytes) received
waitwait for requests to complete (in non-blocking mode)

Definition at line 780 of file PstreamBuffers.C.

References UPstream::commsTypeNames, Foam::endl(), Foam::exit(), Foam::FatalError, FatalErrorInFunction, and UPstream::nonBlocking.

Here is the call graph for this function:

Friends And Related Function Documentation

◆ UOPstreamBase

friend class UOPstreamBase
friend

Definition at line 279 of file PstreamBuffers.H.

◆ UIPstreamBase

friend class UIPstreamBase
friend

Definition at line 280 of file PstreamBuffers.H.

Member Data Documentation

◆ algorithm

int algorithm
static

Preferred exchange algorithm (may change or be removed in future)

Definition at line 290 of file PstreamBuffers.H.

Referenced by argList::parse().


The documentation for this class was generated from the following files: