PstreamBuffers.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "PstreamBuffers.H"
30 #include "bitSet.H"
31 #include "debug.H"
32 #include "registerSwitch.H"
33 
34 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
35 
37 (
38  // Name may change in the future (JUN-2023)
39  Foam::debug::optimisationSwitch("pbufs.tuning", 0)
40 );
42 (
43  "pbufs.tuning",
44  int,
46 );
47 
48 
49 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
50 
51 inline void Foam::PstreamBuffers::setFinished(bool on) noexcept
52 {
53  finishedSendsCalled_ = on;
54 }
55 
56 
57 inline void Foam::PstreamBuffers::initFinalExchange()
58 {
59  // Could also check that it is not called twice
60  // but that is used for overlapping send/recv (eg, overset)
61  setFinished(true);
62 
64 }
65 
66 
67 void Foam::PstreamBuffers::finalExchange
68 (
69  enum modeOption mode,
70  const bool wait,
71  labelList& recvSizes
72 )
73 {
74  initFinalExchange();
75 
76  // Pre-flight checks
77  switch (mode)
78  {
79  case modeOption::DEFAULT :
80  {
81  // Choose (ALL_TO_ALL | NBX_PEX) from static settings
82  mode =
83  (
84  (algorithm <= 0)
85  ? modeOption::ALL_TO_ALL
86  : modeOption::NBX_PEX
87  );
88  break;
89  }
90 
91  case modeOption::GATHER :
92  {
93  // gather mode (all-to-one) : master [0] <- everyone
94  // - only send to master [0]
95  // note: master [0] is also allowed to 'send' to itself
96 
97  for (label proci = 1; proci < sendBuffers_.size(); ++proci)
98  {
99  sendBuffers_[proci].clear();
100  }
101  break;
102  }
103 
104  case modeOption::SCATTER :
105  {
106  // scatter mode (one-to-all) : master [0] -> everyone
107 
108  if (!UPstream::master(comm_))
109  {
110  // Non-master: has no sends
111  clearSends();
112  }
113  break;
114  }
115 
116  default :
117  break;
118  }
119 
120 
121  if (commsType_ == UPstream::commsTypes::nonBlocking)
122  {
123  // PEX algorithm with different flavours of exchanging sizes
124  // PEX stage 1: exchange sizes
125 
126  labelList sendSizes; // Not used by gather/scatter
127 
128  switch (mode)
129  {
130  case modeOption::GATHER :
131  {
132  // gather mode (all-to-one): master [0] <- everyone
133  // - presumed that MPI_Gather will be the most efficient
134 
135  recvSizes =
136  UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
137 
138  if (!UPstream::master(comm_))
139  {
140  recvSizes.resize_nocopy(nProcs_);
141  recvSizes = Zero;
142  }
143 
144  break;
145  }
146 
147  case modeOption::SCATTER :
148  {
149  // scatter mode (one-to-all): master [0] -> everyone
150  // - presumed that MPI_Scatter will be the most efficient
151 
152  recvSizes.resize_nocopy(nProcs_);
153 
154  if (UPstream::master(comm_))
155  {
156  forAll(sendBuffers_, proci)
157  {
158  recvSizes[proci] = sendBuffers_[proci].size();
159  }
160  }
161 
162  const label myRecv
163  (
164  UPstream::listScatterValues(recvSizes, comm_)
165  );
166 
167  recvSizes = Zero;
168  recvSizes[0] = myRecv;
169 
170  break;
171  }
172 
173  case modeOption::NBX_PEX :
174  {
175  // Assemble the send sizes (cf. Pstream::exchangeSizes)
176  sendSizes.resize_nocopy(nProcs_);
177  forAll(sendBuffers_, proci)
178  {
179  sendSizes[proci] = sendBuffers_[proci].size();
180  }
181  recvSizes.resize_nocopy(nProcs_);
182 
183  // Exchange sizes (non-blocking consensus)
185  (
186  sendSizes,
187  recvSizes,
188  (tag_ + 314159), // some unique tag?
189  comm_
190  );
191  break;
192  }
193 
194  case modeOption::DEFAULT :
195  case modeOption::ALL_TO_ALL :
196  {
197  // Assemble the send sizes (cf. Pstream::exchangeSizes)
198  sendSizes.resize_nocopy(nProcs_);
199  forAll(sendBuffers_, proci)
200  {
201  sendSizes[proci] = sendBuffers_[proci].size();
202  }
203  recvSizes.resize_nocopy(nProcs_);
204 
205  // Exchange sizes (all-to-all)
206  UPstream::allToAll(sendSizes, recvSizes, comm_);
207  break;
208  }
209  }
210 
211 
212  // PEX stage 2: point-to-point data exchange
213  Pstream::exchange<DynamicList<char>, char>
214  (
215  sendBuffers_,
216  recvSizes,
217  recvBuffers_,
218  tag_,
219  comm_,
220  wait
221  );
222  }
223 }
224 
225 
226 void Foam::PstreamBuffers::finalExchange
227 (
228  const labelUList& sendProcs,
229  const labelUList& recvProcs,
230  const bool wait,
231  labelList& recvSizes
232 )
233 {
234  initFinalExchange();
235 
236  if (commsType_ == UPstream::commsTypes::nonBlocking)
237  {
238  // Preparation. Temporarily abuse recvSizes as logic to clear
239  // send buffers that are not in the neighbourhood connection
240  {
241  recvSizes.resize_nocopy(nProcs_);
242  recvSizes = 0;
243 
244  // Preserve self-send, even if not described by neighbourhood
245  recvSizes[UPstream::myProcNo(comm_)] = 1;
246 
247  for (const label proci : sendProcs)
248  {
249  recvSizes[proci] = 1; // Connected
250  }
251 
252  for (label proci = 0; proci < nProcs_; ++proci)
253  {
254  if (!recvSizes[proci]) // Not connected
255  {
256  sendBuffers_[proci].clear();
257  }
258  }
259  }
260 
261  // PEX stage 1: exchange sizes (limited neighbourhood)
263  (
264  sendProcs,
265  recvProcs,
266  sendBuffers_,
267  recvSizes,
268  tag_,
269  comm_
270  );
271 
272  // PEX stage 2: point-to-point data exchange
273  Pstream::exchange<DynamicList<char>, char>
274  (
275  sendBuffers_,
276  recvSizes,
277  recvBuffers_,
278  tag_,
279  comm_,
280  wait
281  );
282  }
283 }
284 
285 
286 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
287 
289 (
290  UPstream::commsTypes commsType,
291  int tag,
292  label communicator,
294 )
295 :
296  finishedSendsCalled_(false),
297  allowClearRecv_(true),
298  format_(fmt),
299  commsType_(commsType),
300  tag_(tag),
301  comm_(communicator),
302  nProcs_(UPstream::nProcs(comm_)),
303  sendBuffers_(nProcs_),
304  recvBuffers_(nProcs_),
305  recvPositions_(nProcs_, Zero)
306 {}
307 
308 
309 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
310 
312 {
313  // Check that all data has been consumed.
314  forAll(recvBuffers_, proci)
315  {
316  const label pos = recvPositions_[proci];
317  const label len = recvBuffers_[proci].size();
318 
319  if (pos >= 0 && pos < len)
320  {
322  << "Message from processor " << proci
323  << " Only consumed " << pos << " of " << len << " bytes" << nl
325  }
326  }
327 }
328 
329 
330 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
331 
332 Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
333 (
334  const label proci
335 )
336 {
337  return sendBuffers_[proci];
338 }
339 
340 
341 Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
342 (
343  const label proci
344 )
345 {
346  return recvBuffers_[proci];
347 }
348 
349 
350 Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
351 {
352  return recvPositions_[proci];
353 }
354 
355 
356 // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
357 
359 {
360  for (DynamicList<char>& buf : sendBuffers_)
361  {
362  buf.clear();
363  }
364 }
365 
366 
368 {
369  for (DynamicList<char>& buf : recvBuffers_)
370  {
371  buf.clear();
372  }
373  recvPositions_ = Zero;
374 }
375 
376 
378 {
379  clearSends();
380  clearRecvs();
381  setFinished(false);
382 }
383 
384 
386 {
387  for (label proci = 0; proci < nProcs_; ++proci)
388  {
389  if (recvPositions_[proci] < 0)
390  {
391  recvPositions_[proci] = 0;
392  sendBuffers_[proci].clear();
393  }
394  }
395 }
396 
397 
398 void Foam::PstreamBuffers::clearSend(const label proci)
399 {
400  sendBuffers_[proci].clear();
401  if (recvPositions_[proci] < 0)
402  {
403  // Reset the unregistered flag
404  recvPositions_[proci] = 0;
405  }
406 }
407 
408 
409 void Foam::PstreamBuffers::clearRecv(const label proci)
410 {
411  recvBuffers_[proci].clear();
412  recvPositions_[proci] = 0;
413 }
414 
415 
417 {
418  // Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
419  // Not sure if it makes much difference
420  for (DynamicList<char>& buf : sendBuffers_)
421  {
422  buf.clearStorage();
423  }
424  for (DynamicList<char>& buf : recvBuffers_)
425  {
426  buf.clearStorage();
427  }
428  recvPositions_ = Zero;
429 
430  setFinished(false);
431 }
432 
433 
435 {
436  if (!finished())
437  {
438  for (label proci = 0; proci < nProcs_; ++proci)
439  {
440  sendBuffers_[proci].clear();
441  // Mark send buffer as 'unregistered'
442  recvPositions_[proci] = -1;
443  }
444  }
445 }
446 
447 
448 void Foam::PstreamBuffers::registerSend(const label proci, const bool toggleOn)
449 {
450  // Clear the 'unregistered' flag
451  if (toggleOn && recvPositions_[proci] < 0)
452  {
453  recvPositions_[proci] = 0;
454  }
455 }
456 
457 
459 {
460  for (const DynamicList<char>& buf : sendBuffers_)
461  {
462  if (!buf.empty())
463  {
464  return true;
465  }
466  }
467  return false;
468 }
469 
470 
472 {
473  if (finished())
474  {
475  forAll(recvBuffers_, proci)
476  {
477  if (recvPositions_[proci] < recvBuffers_[proci].size())
478  {
479  return true;
480  }
481  }
482  }
483  #ifdef FULLDEBUG
484  else
485  {
487  << "Call finishedSends first" << exit(FatalError);
488  }
489  #endif
490 
491  return false;
492 }
493 
495 Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
496 {
497  return sendBuffers_[proci].size();
498 }
499 
500 
501 Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
502 {
503  if (finished())
504  {
505  const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
506 
507  if (len > 0)
508  {
509  return len;
510  }
511  }
512  #ifdef FULLDEBUG
513  else
514  {
516  << "Call finishedSends first" << exit(FatalError);
517  }
518  #endif
519 
520  return 0;
521 }
522 
523 
525 {
526  labelList counts(nProcs_, Zero);
527 
528  if (finished())
529  {
530  forAll(recvBuffers_, proci)
531  {
532  const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
533 
534  if (len > 0)
535  {
536  counts[proci] = len;
537  }
538  }
539  }
540  #ifdef FULLDEBUG
541  else
542  {
544  << "Call finishedSends first" << exit(FatalError);
545  }
546  #endif
547 
548  return counts;
549 }
550 
551 
553 (
554  const label excludeProci
555 ) const
556 {
557  label maxLen = 0;
558 
559  if (finished())
560  {
561  forAll(recvBuffers_, proci)
562  {
563  if (excludeProci != proci)
564  {
565  label len(recvBuffers_[proci].size() - recvPositions_[proci]);
566  maxLen = max(maxLen, len);
567  }
568  }
569  }
570  #ifdef FULLDEBUG
571  else
572  {
574  << "Call finishedSends first" << exit(FatalError);
575  }
576  #endif
577 
578  return maxLen;
579 }
580 
581 
583 {
584  // Use out-of-range proci to avoid excluding any processor
585  return maxNonLocalRecvCount(-1);
586 }
587 
588 
590 {
591  return maxNonLocalRecvCount(UPstream::myProcNo(comm_));
592 }
593 
594 
595 const Foam::UList<char>
596 Foam::PstreamBuffers::peekRecvData(const label proci) const
597 {
598  if (finished())
599  {
600  const label pos = recvPositions_[proci];
601  const label len = recvBuffers_[proci].size();
602 
603  if (pos < len)
604  {
605  return UList<char>
606  (
607  const_cast<char*>(recvBuffers_[proci].cbegin(pos)),
608  (len - pos)
609  );
610  }
611  }
612  #ifdef FULLDEBUG
613  else
614  {
616  << "Call finishedSends first" << exit(FatalError);
617  }
618  #endif
619 
620  return UList<char>();
621 }
622 
623 
625 {
626  labelList recvSizes;
627  finalExchange(modeOption::DEFAULT, wait, recvSizes);
628 }
629 
630 
631 void Foam::PstreamBuffers::finishedSendsNBX(const bool wait)
632 {
633  labelList recvSizes;
634  finalExchange(modeOption::NBX_PEX, wait, recvSizes);
635 }
636 
637 
639 (
640  labelList& recvSizes,
641  const bool wait
642 )
643 {
644  // Resize for copying back
645  recvSizes.resize_nocopy(sendBuffers_.size());
646 
647  finalExchange(modeOption::DEFAULT, wait, recvSizes);
648 
649  if (commsType_ != UPstream::commsTypes::nonBlocking)
650  {
652  << "Obtaining sizes not supported in "
653  << UPstream::commsTypeNames[commsType_] << endl
654  << " since transfers already in progress. Use non-blocking instead."
655  << exit(FatalError);
656 
657  // Note: maybe possible only if using different tag from write started
658  // by ~UOPstream. Needs some work.
659  }
660 }
661 
662 
664 (
665  const labelUList& neighProcs,
666  labelList& recvSizes,
667  const bool wait
668 )
669 {
670  finalExchange(neighProcs, neighProcs, wait, recvSizes);
671 }
672 
673 
675 (
676  const labelUList& neighProcs,
677  const bool wait
678 )
679 {
680  labelList recvSizes;
681  finalExchange(neighProcs, neighProcs, wait, recvSizes);
682 }
683 
684 
686 (
687  bitSet& sendConnections,
688  DynamicList<label>& sendProcs,
689  DynamicList<label>& recvProcs,
690  const bool wait
691 )
692 {
693  bool changed = (sendConnections.size() != nProcs());
694 
695  if (changed)
696  {
697  sendConnections.resize(nProcs());
698  }
699 
700  // Update send connections
701  forAll(sendBuffers_, proci)
702  {
703  if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
704  {
705  // The state changed
706  changed = true;
707  }
708  }
709 
710  UPstream::reduceOr(changed, comm_);
711 
712  if (changed)
713  {
714  // Update send/recv topology
715 
716  labelList recvSizes;
717  finishedSends(recvSizes, wait); // modeOption::DEFAULT (eg all-to-all)
718 
719  // The send ranks
720  sendProcs.clear();
721  forAll(sendBuffers_, proci)
722  {
723  if (!sendBuffers_[proci].empty())
724  {
725  sendProcs.push_back(proci);
726  }
727  }
728 
729  // The recv ranks
730  recvProcs.clear();
731  forAll(recvSizes, proci)
732  {
733  if (recvSizes[proci] > 0)
734  {
735  recvProcs.push_back(proci);
736  }
737  }
738  }
739  else
740  {
741  // Use existing send/recv ranks
742  labelList recvSizes;
743  finalExchange(sendProcs, recvProcs, wait, recvSizes);
744  }
745 
746  return changed;
747 }
748 
749 
751 {
752  labelList recvSizes;
753  finalExchange(modeOption::GATHER, wait, recvSizes);
754 }
755 
756 
757 void Foam::PstreamBuffers::finishedScatters(const bool wait)
758 {
759  labelList recvSizes;
760  finalExchange(modeOption::SCATTER, wait, recvSizes);
761 }
762 
763 
765 (
766  labelList& recvSizes,
767  const bool wait
768 )
769 {
770  finalExchange(modeOption::GATHER, wait, recvSizes);
771 
772  if (commsType_ != UPstream::commsTypes::nonBlocking)
773  {
775  << "Obtaining sizes not supported in "
776  << UPstream::commsTypeNames[commsType_] << endl
777  << " since transfers already in progress. Use non-blocking instead."
778  << exit(FatalError);
779 
780  // Note: maybe possible only if using different tag from write started
781  // by ~UOPstream. Needs some work.
782  }
783 }
784 
785 
787 (
788  labelList& recvSizes,
789  const bool wait
790 )
791 {
792  finalExchange(modeOption::SCATTER, wait, recvSizes);
793 
794  if (commsType_ != UPstream::commsTypes::nonBlocking)
795  {
797  << "Obtaining sizes not supported in "
798  << UPstream::commsTypeNames[commsType_] << endl
799  << " since transfers already in progress. Use non-blocking instead."
800  << exit(FatalError);
801 
802  // Note: maybe possible only if using different tag from write started
803  // by ~UOPstream. Needs some work.
804  }
805 }
806 
807 
808 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
809 // Controls
812 {
813  return finishedSendsCalled_;
814 }
815 
818 {
819  return allowClearRecv_;
820 }
821 
822 
824 {
825  bool old(allowClearRecv_);
826  allowClearRecv_ = on;
827  return old;
828 }
829 
830 
831 // ************************************************************************* //
void set(const bitSet &bitset)
Set specified bits from another bitset.
Definition: bitSetI.H:504
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:82
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Communications types.
Definition: UPstream.H:72
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:40
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static void reduceOr(bool &value, const label communicator=worldComm)
Logical (or) reduction (MPI_AllReduce)
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:175
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1074
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero int32_t data between ranks [NBX].
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
Definition: PackedListI.H:455
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
dimensionedScalar pos(const dimensionedScalar &ds)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clearUnregistered()
Clear any &#39;unregistered&#39; send buffers.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
registerOptSwitch("pbufs.tuning", int, Foam::PstreamBuffers::algorithm)
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
const direction noexcept
Definition: Scalar.H:258
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:234
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange int32_t data with all ranks in communicator.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
Definition: DynamicListI.H:405
void push_back(const T &val)
Copy append an element to the end of this list.
Definition: DynamicListI.H:555
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
void clear()
Clear all send/recv buffers and reset states.
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as &#39;unregistered&#39;. ...
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:59
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as &#39;registered&#39;. The setting is sticky (does not turn off) ...
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1082
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
void clearStorage()
Clear storage for all send/recv buffers and reset states.
List< label > labelList
A List of labels.
Definition: List.H:62
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Definition: POSIX.C:773
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
label size() const noexcept
Number of entries.
Definition: PackedList.H:371
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:127