PstreamBuffers.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2025 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "PstreamBuffers.H"
30 #include "bitSet.H"
31 #include "debug.H"
32 #include "registerSwitch.H"
33 
34 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
35 
37 (
38  // Name may change in the future (JUN-2023)
39  Foam::debug::optimisationSwitch("pbufs.tuning", 0)
40 );
42 (
43  "pbufs.tuning",
44  int,
46 );
47 
48 namespace Foam
49 {
50  defineTypeNameAndDebug(PstreamBuffers, 0);
51 }
52 
53 
54 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
55 
56 inline void Foam::PstreamBuffers::setFinished(bool on) noexcept
57 {
58  finishedSendsCalled_ = on;
59 }
60 
61 
62 inline void Foam::PstreamBuffers::initFinalExchange()
63 {
64  // Could also check that it is not called twice
65  // but that is used for overlapping send/recv (eg, overset)
66  setFinished(true);
67 
69 }
70 
71 
72 void Foam::PstreamBuffers::finalExchange
73 (
74  enum modeOption mode,
75  const bool wait,
76  labelList& recvSizes
77 )
78 {
79  initFinalExchange();
80 
81  // Pre-flight checks
82  switch (mode)
83  {
84  case modeOption::DEFAULT :
85  {
86  // Choose (ALL_TO_ALL | NBX_PEX) from static settings
87  mode =
88  (
89  (algorithm <= 0)
90  ? modeOption::ALL_TO_ALL
91  : modeOption::NBX_PEX
92  );
93  break;
94  }
95 
96  case modeOption::GATHER :
97  {
98  // gather mode (all-to-one) : master [0] <- everyone
99  // - only send to master [0]
100  // note: master [0] is also allowed to 'send' to itself
101 
102  for (label proci = 1; proci < sendBuffers_.size(); ++proci)
103  {
104  sendBuffers_[proci].clear();
105  }
106  break;
107  }
108 
109  case modeOption::SCATTER :
110  {
111  // scatter mode (one-to-all) : master [0] -> everyone
112 
113  if (!UPstream::master(comm_))
114  {
115  // Non-master: has no sends
116  clearSends();
117  }
118  break;
119  }
120 
121  default :
122  break;
123  }
124 
125 
126  if (commsType_ == UPstream::commsTypes::nonBlocking)
127  {
128  // PEX algorithm with different flavours of exchanging sizes
129  // PEX stage 1: exchange sizes
130 
131  labelList sendSizes; // Not used by gather/scatter
132 
133  switch (mode)
134  {
135  case modeOption::GATHER :
136  {
137  // gather mode (all-to-one): master [0] <- everyone
138  // - presumed that MPI_Gather will be the most efficient
139 
140  recvSizes =
141  UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
142 
143  if (!UPstream::master(comm_))
144  {
145  recvSizes.resize_nocopy(nProcs_);
146  recvSizes = Zero;
147  }
148 
149  break;
150  }
151 
152  case modeOption::SCATTER :
153  {
154  // scatter mode (one-to-all): master [0] -> everyone
155  // - presumed that MPI_Scatter will be the most efficient
156 
157  recvSizes.resize_nocopy(nProcs_);
158 
159  if (UPstream::master(comm_))
160  {
161  forAll(sendBuffers_, proci)
162  {
163  recvSizes[proci] = sendBuffers_[proci].size();
164  }
165  }
166 
167  const label myRecv
168  (
169  UPstream::listScatterValues(recvSizes, comm_)
170  );
171 
172  recvSizes = Zero;
173  recvSizes[0] = myRecv;
174 
175  break;
176  }
177 
178  case modeOption::NBX_PEX :
179  {
180  // Assemble the send sizes (cf. Pstream::exchangeSizes)
181  sendSizes.resize_nocopy(nProcs_);
182  forAll(sendBuffers_, proci)
183  {
184  sendSizes[proci] = sendBuffers_[proci].size();
185  }
186  recvSizes.resize_nocopy(nProcs_);
187 
188  // Exchange sizes (non-blocking consensus)
190  (
191  sendSizes,
192  recvSizes,
193  (tag_ + 314159), // some unique tag?
194  comm_
195  );
196  break;
197  }
198 
199  case modeOption::DEFAULT :
200  case modeOption::ALL_TO_ALL :
201  {
202  // Assemble the send sizes (cf. Pstream::exchangeSizes)
203  sendSizes.resize_nocopy(nProcs_);
204  forAll(sendBuffers_, proci)
205  {
206  sendSizes[proci] = sendBuffers_[proci].size();
207  }
208  recvSizes.resize_nocopy(nProcs_);
209 
210  // Exchange sizes (all-to-all)
211  UPstream::allToAll(sendSizes, recvSizes, comm_);
212  break;
213  }
214  }
215 
216 
217  // PEX stage 2: point-to-point data exchange
218  Pstream::exchange<DynamicList<char>, char>
219  (
220  sendBuffers_,
221  recvSizes,
222  recvBuffers_,
223  tag_,
224  comm_,
225  wait
226  );
227  }
228 }
229 
230 
231 void Foam::PstreamBuffers::finalExchange
232 (
233  const labelUList& sendProcs,
234  const labelUList& recvProcs,
235  const bool wait,
236  labelList& recvSizes
237 )
238 {
240  << "tag:" << tag_
241  << " comm:" << comm_
242  << " nProcs:" << nProcs_
243  << endl;
244 
245  initFinalExchange();
246 
247  if (commsType_ == UPstream::commsTypes::nonBlocking)
248  {
249  // Preparation. Temporarily abuse recvSizes as logic to clear
250  // send buffers that are not in the neighbourhood connection
251  {
252  recvSizes.resize_nocopy(nProcs_);
253  recvSizes = 0;
254 
255  // Preserve self-send, even if not described by neighbourhood
256  recvSizes[UPstream::myProcNo(comm_)] = 1;
257 
258  for (const label proci : sendProcs)
259  {
260  recvSizes[proci] = 1; // Connected
261  }
262 
263  for (label proci = 0; proci < nProcs_; ++proci)
264  {
265  if (!recvSizes[proci]) // Not connected
266  {
267  sendBuffers_[proci].clear();
268  }
269  }
270  }
271 
272  // PEX stage 1: exchange sizes (limited neighbourhood)
274  (
275  sendProcs,
276  recvProcs,
277  sendBuffers_,
278  recvSizes,
279  tag_,
280  comm_
281  );
282 
283  // PEX stage 2: point-to-point data exchange
284  Pstream::exchange<DynamicList<char>, char>
285  (
286  sendBuffers_,
287  recvSizes,
288  recvBuffers_,
289  tag_,
290  comm_,
291  wait
292  );
293  }
294 }
295 
296 
297 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
298 
300 (
301  UPstream::commsTypes commsType,
302  int tag,
303  int communicator,
305 )
306 :
307  finishedSendsCalled_(false),
308  allowClearRecv_(true),
309  format_(fmt),
310  commsType_(commsType),
311  tag_(tag),
312  comm_(communicator),
313  nProcs_(UPstream::nProcs(comm_)),
314  sendBuffers_(nProcs_),
315  recvBuffers_(nProcs_),
316  recvPositions_(nProcs_, Foam::zero{})
317 {
319  << "tag:" << tag_
320  << " comm:" << comm_
321  << " nProcs:" << nProcs_
322  << endl;
323 }
324 
325 
326 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
327 
329 {
331  << "tag:" << tag_
332  << " comm:" << comm_
333  << " nProcs:" << nProcs_
334  << endl;
335 
336  // Check that all data has been consumed.
337  forAll(recvBuffers_, proci)
338  {
339  const label pos = recvPositions_[proci];
340  const label len = recvBuffers_[proci].size();
341 
342  if (pos >= 0 && pos < len)
343  {
345  << "Message from processor " << proci
346  << " Only consumed " << pos << " of " << len << " bytes" << nl
347  << " comm " << comm_ << " tag " << tag_ << nl
349  }
350  }
351 }
352 
353 
354 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
355 
356 Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
357 (
358  const label proci
359 )
360 {
361  return sendBuffers_[proci];
362 }
363 
364 
365 Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
366 (
367  const label proci
368 )
369 {
370  return recvBuffers_[proci];
371 }
372 
373 
374 Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
375 {
376  return recvPositions_[proci];
377 }
378 
379 
380 // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
381 
383 {
384  for (DynamicList<char>& buf : sendBuffers_)
385  {
386  buf.clear();
387  }
388 }
389 
390 
392 {
393  for (DynamicList<char>& buf : recvBuffers_)
394  {
395  buf.clear();
396  }
397  recvPositions_ = Zero;
398 }
399 
400 
402 {
403  clearSends();
404  clearRecvs();
405  setFinished(false);
406 }
407 
408 
410 {
411  for (label proci = 0; proci < nProcs_; ++proci)
412  {
413  if (recvPositions_[proci] < 0)
414  {
415  recvPositions_[proci] = 0;
416  sendBuffers_[proci].clear();
417  }
418  }
419 }
420 
421 
422 void Foam::PstreamBuffers::clearSend(const label proci)
423 {
424  sendBuffers_[proci].clear();
425  if (recvPositions_[proci] < 0)
426  {
427  // Reset the unregistered flag
428  recvPositions_[proci] = 0;
429  }
430 }
431 
432 
433 void Foam::PstreamBuffers::clearRecv(const label proci)
434 {
435  recvBuffers_[proci].clear();
436  recvPositions_[proci] = 0;
437 }
438 
439 
441 {
442  // Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
443  // Not sure if it makes much difference
444  for (DynamicList<char>& buf : sendBuffers_)
445  {
446  buf.clearStorage();
447  }
448  for (DynamicList<char>& buf : recvBuffers_)
449  {
450  buf.clearStorage();
451  }
452  recvPositions_ = Zero;
453 
454  setFinished(false);
455 }
456 
457 
459 {
460  if (!finished())
461  {
462  for (label proci = 0; proci < nProcs_; ++proci)
463  {
464  sendBuffers_[proci].clear();
465  // Mark send buffer as 'unregistered'
466  recvPositions_[proci] = -1;
467  }
468  }
469 }
470 
471 
472 void Foam::PstreamBuffers::registerSend(const label proci, const bool toggleOn)
473 {
474  // Clear the 'unregistered' flag
475  if (toggleOn && recvPositions_[proci] < 0)
476  {
477  recvPositions_[proci] = 0;
478  }
479 }
480 
481 
483 {
484  for (const DynamicList<char>& buf : sendBuffers_)
485  {
486  if (!buf.empty())
487  {
488  return true;
489  }
490  }
491  return false;
492 }
493 
494 
496 {
497  if (finished())
498  {
499  forAll(recvBuffers_, proci)
500  {
501  if (recvPositions_[proci] < recvBuffers_[proci].size())
502  {
503  return true;
504  }
505  }
506  }
507  #ifdef FULLDEBUG
508  else
509  {
511  << "Call finishedSends first" << exit(FatalError);
512  }
513  #endif
514 
515  return false;
516 }
517 
519 Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
520 {
521  return sendBuffers_[proci].size();
522 }
523 
524 
525 Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
526 {
527  if (finished())
528  {
529  const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
530 
531  if (len > 0)
532  {
533  return len;
534  }
535  }
536  #ifdef FULLDEBUG
537  else
538  {
540  << "Call finishedSends first" << exit(FatalError);
541  }
542  #endif
543 
544  return 0;
545 }
546 
547 
549 {
550  labelList counts(nProcs_, Zero);
551 
552  if (finished())
553  {
554  forAll(recvBuffers_, proci)
555  {
556  const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
557 
558  if (len > 0)
559  {
560  counts[proci] = len;
561  }
562  }
563  }
564  #ifdef FULLDEBUG
565  else
566  {
568  << "Call finishedSends first" << exit(FatalError);
569  }
570  #endif
571 
572  return counts;
573 }
574 
575 
577 (
578  const label excludeProci
579 ) const
580 {
581  label maxLen = 0;
582 
583  if (finished())
584  {
585  forAll(recvBuffers_, proci)
586  {
587  if (excludeProci != proci)
588  {
589  label len(recvBuffers_[proci].size() - recvPositions_[proci]);
590  maxLen = Foam::max(maxLen, len);
591  }
592  }
593  }
594  #ifdef FULLDEBUG
595  else
596  {
598  << "Call finishedSends first" << exit(FatalError);
599  }
600  #endif
601 
602  return maxLen;
603 }
604 
605 
607 {
608  // Use out-of-range proci to avoid excluding any processor
609  return maxNonLocalRecvCount(-1);
610 }
611 
612 
614 {
615  return maxNonLocalRecvCount(UPstream::myProcNo(comm_));
616 }
617 
618 
619 const Foam::UList<char>
620 Foam::PstreamBuffers::peekRecvData(const label proci) const
621 {
622  if (finished())
623  {
624  const label pos = recvPositions_[proci];
625  const label len = recvBuffers_[proci].size();
626 
627  if (pos < len)
628  {
629  return UList<char>
630  (
631  const_cast<char*>(recvBuffers_[proci].cbegin(pos)),
632  (len - pos)
633  );
634  }
635  }
636  #ifdef FULLDEBUG
637  else
638  {
640  << "Call finishedSends first" << exit(FatalError);
641  }
642  #endif
643 
644  return UList<char>();
645 }
646 
647 
648 void Foam::PstreamBuffers::finishedSends(const bool wait)
649 {
651  << "tag:" << tag_
652  << " comm:" << comm_
653  << " nProcs:" << nProcs_
654  << endl;
655 
656  labelList recvSizes;
657  finalExchange(modeOption::DEFAULT, wait, recvSizes);
658 }
659 
660 
661 void Foam::PstreamBuffers::finishedSendsNBX(const bool wait)
662 {
664  << "tag:" << tag_
665  << " comm:" << comm_
666  << " nProcs:" << nProcs_
667  << endl;
669  labelList recvSizes;
670  finalExchange(modeOption::NBX_PEX, wait, recvSizes);
671 }
672 
673 
675 (
676  labelList& recvSizes,
677  const bool wait
678 )
679 {
681  << "tag:" << tag_
682  << " comm:" << comm_
683  << " nProcs:" << nProcs_
684  << endl;
685 
686  // Resize for copying back
687  recvSizes.resize_nocopy(sendBuffers_.size());
688 
689  finalExchange(modeOption::DEFAULT, wait, recvSizes);
690 
691  if (commsType_ != UPstream::commsTypes::nonBlocking)
692  {
694  << "Obtaining sizes not supported in "
695  << UPstream::commsTypeNames[commsType_] << endl
696  << " since transfers already in progress. Use non-blocking instead."
697  << exit(FatalError);
698 
699  // Note: maybe possible only if using different tag from write started
700  // by ~UOPstream. Needs some work.
701  }
702 }
703 
704 
706 (
707  const labelUList& neighProcs,
708  labelList& recvSizes,
709  const bool wait
710 )
711 {
712  finalExchange(neighProcs, neighProcs, wait, recvSizes);
713 }
714 
715 
717 (
718  const labelUList& neighProcs,
719  const bool wait
720 )
721 {
722  labelList recvSizes;
723  finalExchange(neighProcs, neighProcs, wait, recvSizes);
724 }
725 
726 
728 (
729  bitSet& sendConnections,
730  DynamicList<label>& sendProcs,
731  DynamicList<label>& recvProcs,
732  const bool wait
733 )
734 {
735  bool changed = (sendConnections.size() != nProcs());
736 
737  if (changed)
738  {
739  sendConnections.resize(nProcs());
740  }
741 
742  // Update send connections
743  forAll(sendBuffers_, proci)
744  {
745  if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
746  {
747  // The state changed
748  changed = true;
749  }
750  }
751 
752  UPstream::reduceOr(changed, comm_);
753 
754  if (changed)
755  {
756  // Update send/recv topology
757 
758  labelList recvSizes;
759  finishedSends(recvSizes, wait); // modeOption::DEFAULT (eg all-to-all)
760 
761  // The send ranks
762  sendProcs.clear();
763  forAll(sendBuffers_, proci)
764  {
765  if (!sendBuffers_[proci].empty())
766  {
767  sendProcs.push_back(proci);
768  }
769  }
770 
771  // The recv ranks
772  recvProcs.clear();
773  forAll(recvSizes, proci)
774  {
775  if (recvSizes[proci] > 0)
776  {
777  recvProcs.push_back(proci);
778  }
779  }
780  }
781  else
782  {
783  // Use existing send/recv ranks
784  labelList recvSizes;
785  finalExchange(sendProcs, recvProcs, wait, recvSizes);
786  }
787 
788  return changed;
789 }
790 
791 
793 {
794  labelList recvSizes;
795  finalExchange(modeOption::GATHER, wait, recvSizes);
796 }
797 
798 
799 void Foam::PstreamBuffers::finishedScatters(const bool wait)
800 {
801  labelList recvSizes;
802  finalExchange(modeOption::SCATTER, wait, recvSizes);
803 }
804 
805 
807 (
808  labelList& recvSizes,
809  const bool wait
810 )
811 {
812  finalExchange(modeOption::GATHER, wait, recvSizes);
813 
814  if (commsType_ != UPstream::commsTypes::nonBlocking)
815  {
817  << "Obtaining sizes not supported in "
818  << UPstream::commsTypeNames[commsType_] << endl
819  << " since transfers already in progress. Use non-blocking instead."
820  << exit(FatalError);
821 
822  // Note: maybe possible only if using different tag from write started
823  // by ~UOPstream. Needs some work.
824  }
825 }
826 
827 
829 (
830  labelList& recvSizes,
831  const bool wait
832 )
833 {
834  finalExchange(modeOption::SCATTER, wait, recvSizes);
835 
836  if (commsType_ != UPstream::commsTypes::nonBlocking)
837  {
839  << "Obtaining sizes not supported in "
840  << UPstream::commsTypeNames[commsType_] << endl
841  << " since transfers already in progress. Use non-blocking instead."
842  << exit(FatalError);
843 
844  // Note: maybe possible only if using different tag from write started
845  // by ~UOPstream. Needs some work.
846  }
847 }
848 
849 
850 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
851 // Controls
854 {
855  return finishedSendsCalled_;
856 }
857 
860 {
861  return allowClearRecv_;
862 }
863 
864 
866 {
867  bool old(allowClearRecv_);
868  allowClearRecv_ = on;
869  return old;
870 }
871 
872 
873 // ************************************************************************* //
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const int tag=UPstream::msgType(), const int comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
void set(const bitSet &bitset)
Set specified bits from another bitset.
Definition: bitSetI.H:502
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:93
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Communications types.
Definition: UPstream.H:81
static T listScatterValues(const UList< T > &allValues, const int communicator=UPstream::worldComm)
Scatter individual values from list locations.
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:652
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:40
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:518
static int myProcNo(label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition: UPstream.H:1799
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int communicator=UPstream::worldComm)
Exchange int32_t data with all ranks in communicator.
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:171
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static void reduceOr(bool &value, const int communicator=UPstream::worldComm)
Logical (or) reduction (MPI_AllReduce)
UList< label > labelUList
A UList of labels.
Definition: UList.H:76
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
Definition: PackedListI.H:455
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:400
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
dimensionedScalar pos(const dimensionedScalar &ds)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clearUnregistered()
Clear any &#39;unregistered&#39; send buffers.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
registerOptSwitch("pbufs.tuning", int, Foam::PstreamBuffers::algorithm)
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
const direction noexcept
Definition: scalarImpl.H:265
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:234
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
defineTypeNameAndDebug(combustionModel, 0)
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
Definition: DynamicListI.H:451
void push_back(const T &val)
Copy append an element to the end of this list.
Definition: DynamicListI.H:599
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), int communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void clear()
Clear all send/recv buffers and reset states.
decomposeUsingBbs false
Use bounding boxes (default) or unique decomposition of triangles (i.e. do not duplicate triangles) ...
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as &#39;unregistered&#39;. ...
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:58
void clearSends()
Clear all send buffers (does not remove buffer storage)
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as &#39;registered&#39;. The setting is sticky (does not turn off) ...
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const int communicator=UPstream::worldComm)
Exchange non-zero int32_t data between ranks [NBX].
streamFormat
Data format (ascii | binary | coherent)
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static bool master(label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1807
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
void clearStorage()
Clear storage for all send/recv buffers and reset states.
List< label > labelList
A List of labels.
Definition: List.H:61
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Definition: POSIX.C:779
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
label size() const noexcept
Number of entries.
Definition: PackedList.H:392
Namespace for OpenFOAM.
#define DebugPoutInFunction
Report an information message using Foam::Pout.
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:127