52 namespace PstreamDetail
60 const std::size_t max_bytes = std::size_t(0)
67 : (max_bytes > std::size_t(INT_MAX))
68 ? (std::size_t(INT_MAX) /
sizeof(Type))
69 : (max_bytes >
sizeof(Type))
70 ? (max_bytes /
sizeof(Type))
84 ? std::size_t(INT_MAX + max_bytes)
85 : std::size_t(max_bytes)
111 if (sends.empty() && recvs.empty())
122 const std::size_t chunkSize
124 PstreamDetail::maxTransferCount<Type>
134 for (
auto& slot : recvs)
137 const auto proci = slot.first;
138 auto& payload = slot.second;
141 if (proci == myProci || payload.empty())
152 payload.data_bytes(),
153 payload.size_bytes(),
161 for (
int iter = 0; ; ++iter)
164 const std::size_t beg = (std::size_t(iter)*chunkSize);
165 const std::size_t
end = (std::size_t(iter+1)*chunkSize);
169 const int msgTagChunk = (tag + iter);
171 if (payload.size() <= beg)
179 (
end < payload.size())
180 ? payload.subspan(beg,
end - beg)
181 : payload.subspan(beg)
200 for (
const auto& slot : sends)
203 const auto proci = slot.first;
204 const auto& payload = slot.second;
207 if (proci == myProci || payload.empty())
218 payload.cdata_bytes(),
219 payload.size_bytes(),
227 <<
"Failure sending message to:" << proci
228 <<
" nBytes:" << label(payload.size_bytes()) <<
nl 235 for (
int iter = 0; ; ++iter)
238 const std::size_t beg = (std::size_t(iter)*chunkSize);
239 const std::size_t
end = (std::size_t(iter+1)*chunkSize);
243 const int msgTagChunk = (tag + iter);
245 if (payload.size() <= beg)
253 (
end < payload.size())
254 ? payload.subspan(beg,
end - beg)
255 : payload.subspan(beg)
262 window.cdata_bytes(),
271 <<
"Failure sending message to:" << proci
272 <<
" nBytes:" << label(window.size_bytes()) <<
nl 286 template<
class Container,
class Type>
312 const std::size_t chunkSize
314 PstreamDetail::maxTransferCount<Type>
329 recvBufs[proci].data(),
330 std::size_t(recvBufs[proci].size())
334 if (proci == myProci || payload.empty())
345 payload.data_bytes(),
346 payload.size_bytes(),
354 for (
int iter = 0; ; ++iter)
357 const std::size_t beg = (std::size_t(iter)*chunkSize);
358 const std::size_t
end = (std::size_t(iter+1)*chunkSize);
362 const int msgTagChunk = (tag + iter);
364 if (payload.size() <= beg)
372 (
end < payload.size())
373 ? payload.subspan(beg,
end - beg)
374 : payload.subspan(beg)
398 sendBufs[proci].cdata(),
399 std::size_t(sendBufs[proci].size())
403 if (proci == myProci || payload.empty())
414 payload.cdata_bytes(),
415 payload.size_bytes(),
423 <<
"Fallure sending message to:" << proci
424 <<
" nBytes:" << label(payload.size_bytes()) <<
nl 431 for (
int iter = 0; ; ++iter)
434 const std::size_t beg = (std::size_t(iter)*chunkSize);
435 const std::size_t
end = (std::size_t(iter+1)*chunkSize);
439 const int msgTagChunk = (tag + iter);
441 if (payload.size() <= beg)
449 (
end < payload.size())
450 ? payload.subspan(beg,
end - beg)
451 : payload.subspan(beg)
458 window.cdata_bytes(),
467 <<
"Failure sending message to:" << proci
468 <<
" nBytes:" << label(window.size_bytes()) <<
nl 480 Perr<<
"Pstream::exchange with " 482 <<
" requests" <<
nl;
498 template<
class Container,
class Type>
512 typedef std::pair<int, sendType> sendTuple;
513 typedef std::pair<int, recvType> recvTuple;
523 const auto proci = iter.key();
524 auto& recvData = recvBufs[proci];
529 std::size_t(recvData.size())
533 if (proci != myProci && !payload.empty())
535 recvs.emplace_back(proci, payload);
543 [=](
const recvTuple& a,
const recvTuple&
b)
546 return (a.first <
b.first);
559 const auto proci = iter.key();
560 const auto& sendData = iter.val();
565 std::size_t(sendData.size())
569 if (proci != myProci && !payload.empty())
571 sends.emplace_back(proci, payload);
579 [=](
const sendTuple& a,
const sendTuple&
b)
582 return (a.first <
b.first);
591 PstreamDetail::exchangeBuffers<Type>
605 Perr<<
"Pstream::exchange with " 607 <<
" requests" <<
nl;
623 template<
class Container,
class Type>
626 const UList<Container>& sendBufs,
628 List<Container>& recvBufs,
644 if (sendBufs.size() != numProc)
647 <<
"List size " << sendBufs.size()
648 <<
" != number of ranks " << numProc <<
nl 652 recvBufs.resize_nocopy(numProc);
659 const label
count = recvSizes[proci];
661 if (proci != myProci &&
count > 0)
663 recvBufs[proci].resize_nocopy(
count);
667 recvBufs[proci].clear();
671 PstreamDetail::exchangeContainer<Container, Type>
683 recvBufs[myProci] = sendBufs[myProci];
687 template<
class Container,
class Type>
717 const label proci = iter.key();
718 const label
count = iter.val();
720 if (proci != myProci &&
count > 0)
722 recvBufs(proci).resize_nocopy(
count);
726 PstreamDetail::exchangeContainer<Container, Type>
740 const auto iter = sendBufs.
find(myProci);
742 bool needsCopy = iter.good();
746 const auto& sendData = iter.val();
748 needsCopy = !sendData.empty();
752 recvBufs(myProci) = sendData;
758 recvBufs.
erase(myProci);
764 template<
class Container>
769 const Container& sendBufs,
784 if (sendBufs.size() != numProc)
787 <<
"Container size " << sendBufs.size()
788 <<
" != number of ranks " << numProc <<
nl 793 for (label proci = 0; proci < numProc; ++proci)
795 sendSizes[proci] = sendBufs[proci].
size();
802 recvSizes[myProci] = sendSizes[myProci];
806 for (
const label proci : recvProcs)
808 if (proci != myProci)
814 reinterpret_cast<char*>(&recvSizes[proci]),
822 for (
const label proci : sendProcs)
824 if (proci != myProci)
830 reinterpret_cast<char*>(&sendSizes[proci]),
842 template<
class Container>
846 const Container& sendBufs,
858 Pstream::exchangeSizes<Container>
871 template<
class Container>
874 const Map<Container>& sendBufs,
875 Map<label>& recvSizes,
888 sendSizes.
reserve(sendBufs.size());
892 const label proci = iter.key();
893 const label
count = iter.val().size();
911 template<
class Container>
914 const Container& sendBufs,
927 if (sendBufs.size() != numProc)
930 <<
"Container size " << sendBufs.size()
931 <<
" != number of ranks " << numProc <<
nl 938 sendSizes[proci] = sendBufs[proci].
size();
940 recvSizes.resize_nocopy(sendSizes.
size());
964 template<
class Container,
class Type>
967 const UList<Container>& sendBufs,
968 List<Container>& recvBufs,
983 Pstream::exchange<Container, Type>
995 template<
class Container,
class Type>
1011 Pstream::exchange<Container, Type>
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
void size(const label n)
Older name for setAddressableSize.
Inter-processor communication reduction functions.
bool emplace(const Key &key, Args &&... args)
Emplace insert a new entry, not overwriting existing entries.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
constexpr char nl
The newline '\n' character (0x0a)
static int maxCommsSize
Optional maximum message size (bytes)
bool empty() const noexcept
True if List is empty (ie, size() is zero)
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
static int & msgType() noexcept
Message tag of standard messages.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero int32_t data between ranks [NBX].
static void waitRequests()
Wait for all requests to finish.
UList< label > labelUList
A UList of labels.
#define forAll(list, i)
Loop across all elements in list.
static int nProcsNonblockingExchange
Number of processors to change to nonBlocking consensual exchange (NBX). Ignored for zero or negative...
label size() const noexcept
The number of elements in table.
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
const dimensionedScalar b
Wien displacement law constant: default SI units: [m.K].
void clear()
Clear the list, i.e. set size to zero.
iterator find(const label &key)
Find and return an iterator set at the hashed entry.
std::size_t maxTransferCount(const std::size_t max_bytes=std::size_t(0)) noexcept
Number of elements corresponding to max byte transfer.
void sort(UList< T > &list)
Sort the list.
#define forAllIters(container, iter)
Iterate across all elements in the container object.
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
static void exchange(const UList< Container > &sendBufs, const labelUList &recvSizes, List< Container > &recvBufs, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool wait=true)
Helper: exchange contiguous data. Sends sendBufs, receives into recvBufs using predetermined receive ...
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange int32_t data with all ranks in communicator.
int debug
Static debugging option.
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
void exchangeContainer(const UList< Container > &sendBufs, UList< Container > &recvBufs, const int tag, const label comm, const bool wait, const int64_t maxComms_bytes=UPstream::maxCommsSize)
Exchange contiguous data using point-to-point communication. Sends sendBufs, receives into recvBufs...
bool erase(const iterator &iter)
Erase an entry specified by given iterator.
A template class to specify that a data type can be considered as being contiguous in memory...
void reserve(label numEntries)
Reserve space for at least the specified number of elements (not the number of buckets) and regenerat...
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Rudimentary functionality similar to std::span for holding memory view.
std::size_t maxTransferBytes(const int64_t max_bytes) noexcept
Upper limit on number of transfer bytes.
List< label > labelList
A List of labels.
void exchangeBuffers(const UList< std::pair< int, stdFoam::span< const Type >>> &sends, const UList< std::pair< int, stdFoam::span< Type >>> &recvs, const int tag, const label comm, const int64_t maxComms_bytes=UPstream::maxCommsSize)
Exchange of contiguous data, with or without chunking. Setup sends and receives, each specified as [r...
forAllConstIters(mixture.phases(), phase)
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
A HashTable to objects of type <T> with a label key.