37 namespace PstreamDetail
60 for (
auto& slot : recvs)
63 const auto proci = slot.first;
64 auto& payload = slot.second;
66 if (proci != myProci && !payload.empty())
83 for (
const auto& slot : sends)
86 const auto proci = slot.first;
87 const auto& payload = slot.second;
89 if (proci != myProci && !payload.empty())
97 payload.cdata_bytes(),
105 <<
"Cannot send outgoing message to:" 106 << proci <<
" nBytes:" 107 << label(payload.size_bytes())
138 typedef std::pair<int, stdFoam::span<const Type>> sendTuple;
139 typedef std::pair<int, stdFoam::span<Type>> recvTuple;
157 const label maxChunkSize =
161 static_cast<label>(1),
173 for (
const auto& slot : sends)
176 const auto proci = slot.first;
177 const auto count = slot.second.size();
179 if (proci != myProci &&
count > maxCount)
190 nChunks = 1 + label(maxCount/maxChunkSize);
203 for (label iter = 0; iter < nChunks; ++iter)
206 const auto beg =
static_cast<std::size_t
>(iter*maxChunkSize);
207 const auto end =
static_cast<std::size_t
>((iter+1)*maxChunkSize);
211 const auto& baseline = sends[sloti].second;
212 auto& payload = sendChunks[sloti].second;
215 if (beg < baseline.size())
219 (
end < baseline.size())
220 ? baseline.subspan(beg,
end - beg)
221 : baseline.subspan(beg)
226 payload = baseline.
first(0);
232 const auto& baseline = recvs[sloti].second;
233 auto& payload = recvChunks[sloti].second;
236 if (beg < baseline.size())
240 (
end < baseline.size())
241 ? baseline.subspan(beg,
end - beg)
242 : baseline.subspan(beg)
247 payload = baseline.
first(0);
253 PstreamDetail::exchangeBuf<Type>
263 #if 0 // ifdef Foam_PstreamExchange_debug_chunks 271 const auto& baseline = sends[sloti].second;
272 const auto& payload = sendChunks[sloti].second;
274 sendStarts[sloti] = (payload.data() - baseline.data());
275 sendCounts[sloti] = (payload.size());
278 Info<<
"iter " << iter
294 template<
class Container,
class Type>
312 auto& recvData = recvBufs[proci];
314 if (proci != myProci && !recvData.empty())
320 recvData.data_bytes(),
321 recvData.size_bytes(),
334 const auto& sendData = sendBufs[proci];
336 if (proci != myProci && !sendData.empty())
344 sendData.cdata_bytes(),
345 sendData.size_bytes(),
352 <<
"Cannot send outgoing message. " 353 <<
"to:" << proci <<
" nBytes:" 354 << label(sendData.size_bytes())
376 template<
class Container,
class Type>
394 const label proci = iter.key();
395 auto& recvData = iter.val();
397 if (proci != myProci && !recvData.empty())
403 recvData.data_bytes(),
404 recvData.size_bytes(),
417 const label proci = iter.key();
418 const auto& sendData = iter.val();
420 if (proci != myProci && !sendData.empty())
428 sendData.cdata_bytes(),
429 sendData.size_bytes(),
436 <<
"Cannot send outgoing message to:" 437 << proci <<
" nBytes:" 438 << label(sendData.size_bytes())
460 template<
class Container,
class Type>
463 const UList<Container>& sendBufs,
465 List<Container>& recvBufs,
481 if (sendBufs.size() != numProcs)
484 <<
"Size of list " << sendBufs.size()
485 <<
" does not equal the number of processors " << numProcs
489 recvBufs.resize_nocopy(numProcs);
496 const label
count = recvSizes[proci];
498 if (proci != myProci &&
count > 0)
500 recvBufs[proci].resize_nocopy(
count);
504 recvBufs[proci].clear();
508 typedef std::pair<int, stdFoam::span<const Type>> sendTuple;
509 typedef std::pair<int, stdFoam::span<Type>> recvTuple;
514 PstreamDetail::exchangeContainer<Container, Type>
530 const auto& sendData = sendBufs[proci];
532 if (proci != myProci && !sendData.empty())
539 { sendData.cdata(), std::size_t(sendData.size()) }
549 auto& recvData = recvBufs[proci];
551 if (proci != myProci && !recvData.empty())
558 { recvData.data(), std::size_t(recvData.size()) }
565 PstreamDetail::exchangeChunkedBuf<Type>
577 recvBufs[myProci] = sendBufs[myProci];
581 template<
class Container,
class Type>
611 const label proci = iter.key();
612 const label
count = iter.val();
614 if (proci != myProci &&
count > 0)
616 recvBufs(proci).resize_nocopy(
count);
624 typedef std::pair<int, stdFoam::span<const Type>> sendTuple;
625 typedef std::pair<int, stdFoam::span<Type>> recvTuple;
631 const auto proci = iter.key();
632 const auto& sendData = iter.val();
634 if (proci != myProci && !sendData.empty())
641 { sendData.cdata(), std::size_t(sendData.size()) }
652 [=](
const sendTuple& a,
const sendTuple&
b)
654 return (a.second.size() <
b.second.size());
662 const auto proci = iter.key();
663 auto& recvData = recvBufs[proci];
665 if (proci != myProci && !recvData.empty())
672 { recvData.data(), std::size_t(recvData.size()) }
683 [=](
const recvTuple& a,
const recvTuple&
b)
685 return (a.second.size() <
b.second.size());
693 PstreamDetail::exchangeBuf<Type>
705 PstreamDetail::exchangeChunkedBuf<Type>
719 const auto iter = sendBufs.
find(myProci);
721 bool needsCopy = iter.good();
725 const auto& sendData = iter.val();
727 needsCopy = !sendData.empty();
731 recvBufs(myProci) = sendData;
737 recvBufs.
erase(myProci);
743 template<
class Container>
748 const Container& sendBufs,
763 if (sendBufs.size() != numProcs)
766 <<
"Size of container " << sendBufs.size()
767 <<
" does not equal the number of processors " << numProcs
772 for (label proci = 0; proci < numProcs; ++proci)
774 sendSizes[proci] = sendBufs[proci].
size();
781 recvSizes[myProci] = sendSizes[myProci];
785 for (
const label proci : recvProcs)
787 if (proci != myProci)
793 reinterpret_cast<char*>(&recvSizes[proci]),
801 for (
const label proci : sendProcs)
803 if (proci != myProci)
809 reinterpret_cast<char*>(&sendSizes[proci]),
821 template<
class Container>
825 const Container& sendBufs,
837 Pstream::exchangeSizes<Container>
850 template<
class Container>
869 const label proci = iter.key();
870 const label
count = iter.val().size();
874 sendSizes.emplace(proci,
count);
888 template<
class Container>
891 const Container& sendBufs,
904 if (sendBufs.size() != numProcs)
907 <<
"Size of container " << sendBufs.size()
908 <<
" does not equal the number of processors " << numProcs
915 sendSizes[proci] = sendBufs[proci].
size();
941 template<
class Container,
class Type>
958 exchangeSizes(sendBufs, recvSizes, comm);
960 exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait);
964 template<
class Container,
class Type>
978 exchangeSizes(sendBufs, recvSizes, tag, comm);
980 exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait);
void size(const label n)
Older name for setAddressableSize.
Inter-processor communication reduction functions.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
static int maxCommsSize
Optional maximum message size (bytes)
T & first()
Access first element of the list, position [0].
Ostream & endl(Ostream &os)
Add newline and flush stream.
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
static int & msgType() noexcept
Message tag of standard messages.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero integer data with all ranks in the communicator using non-blocking consensus exchan...
static void waitRequests()
Wait for all requests to finish.
UList< label > labelUList
A UList of labels.
#define forAll(list, i)
Loop across all elements in list.
static int nProcsNonblockingExchange
Number of processors to change to nonBlocking consensual exchange (NBX). Ignored for zero or negative...
label size() const noexcept
The number of elements in table.
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
const dimensionedScalar b
Wien displacement law constant: default SI units: [m.K].
void clear()
Clear the list, i.e. set size to zero.
iterator find(const label &key)
Find and return an iterator set at the hashed entry.
void sort(UList< T > &list)
Sort the list.
void clear()
Clear all entries from table.
void exchangeBuf(const UList< std::pair< int, stdFoam::span< const Type >>> &sends, const UList< std::pair< int, stdFoam::span< Type >>> &recvs, const int tag, const label comm, const bool wait)
Setup sends and receives, each specified as [rank, span] tuple.
#define forAllIters(container, iter)
Iterate across all elements in the container object.
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
void exchangeChunkedBuf(const UList< std::pair< int, stdFoam::span< const Type >>> &sends, const UList< std::pair< int, stdFoam::span< Type >>> &recvs, const int tag, const label comm, const bool wait)
Chunked exchange of contiguous data. Setup sends and receives, each specified as [rank, span] tuple.
static void exchange(const UList< Container > &sendBufs, const labelUList &recvSizes, List< Container > &recvBufs, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool wait=true)
Helper: exchange contiguous data. Sends sendBufs, receives into recvBufs using predetermined receive ...
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange integer data with all processors (in the communicator).
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
bool erase(const iterator &iter)
Erase an entry specified by given iterator.
void exchangeContainer(const UList< Container > &sendBufs, UList< Container > &recvBufs, const int tag, const label comm, const bool wait)
Exchange contiguous data using point-to-point communication. Sends sendBufs, receives into recvBufs...
A template class to specify that a data type can be considered as being contiguous in memory...
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
void reduce(const List< UPstream::commsStruct > &comms, T &value, const BinaryOp &bop, const int tag, const label comm)
Reduce inplace (cf. MPI Allreduce) using specified communication schedule.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
Rudimentary functionality similar to std::span for holding memory view.
messageStream Info
Information stream (stdout output on master, null elsewhere)
forAllConstIters(mixture.phases(), phase)
A HashTable to objects of type <T> with a label key.
FlatOutput::OutputAdaptor< Container, Delimiters > flatOutput(const Container &obj, Delimiters delim)
Global flatOutput() function with specified output delimiters.