PstreamBuffers.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "PstreamBuffers.H"
30 #include "bitSet.H"
31 #include "debug.H"
32 #include "registerSwitch.H"
33 
34 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
35 
37 (
38  // Name may change in the future (JUN-2023)
39  Foam::debug::optimisationSwitch("pbufs.tuning", -1)
40 );
42 (
43  "pbufs.tuning",
44  int,
46 );
47 
48 
49 // Simple enumerations
50 // -------------------
51 static constexpr int algorithm_PEX_allToAll = -1; // Traditional PEX
52 //static constexpr int algorithm_PEX_hybrid = 0; // Possible new default?
53 static constexpr int algorithm_full_NBX = 1; // Very experimental
54 
55 
56 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
57 
58 void Foam::PstreamBuffers::finalExchange
59 (
60  const bool wait,
61  labelList& recvSizes
62 )
63 {
64  // Could also check that it is not called twice
65  // but that is used for overlapping send/recv (eg, overset)
66  finishedSendsCalled_ = true;
67 
68  if (commsType_ == UPstream::commsTypes::nonBlocking)
69  {
70  if
71  (
72  wait
74  && (UPstream::maxCommsSize <= 0)
75  )
76  {
77  // NBX algorithm (nonblocking exchange)
78  // - when requested and waiting, no data chunking etc
79 
80  PstreamDetail::exchangeConsensus<DynamicList<char>, char>
81  (
82  sendBuffers_,
83  recvBuffers_,
84  recvSizes,
85  (tag_ + 271828), // some unique tag?
86  comm_,
87  wait
88  );
89 
90  return;
91  }
92 
93 
94  // PEX algorithm with two different flavours of exchanging sizes
95 
96  // Assemble the send sizes (cf. Pstream::exchangeSizes)
97  labelList sendSizes(nProcs_);
98  forAll(sendBuffers_, proci)
99  {
100  sendSizes[proci] = sendBuffers_[proci].size();
101  }
102  recvSizes.resize_nocopy(nProcs_);
103 
105  {
106  // PEX stage 1: exchange sizes (all-to-all)
107  UPstream::allToAll(sendSizes, recvSizes, comm_);
108  }
109  else
110  {
111  // PEX stage 1: exchange sizes (non-blocking consensus)
113  (
114  sendSizes,
115  recvSizes,
116  (tag_ + 314159), // some unique tag?
117  comm_
118  );
119  }
120 
121  // PEX stage 2: point-to-point data exchange
122  Pstream::exchange<DynamicList<char>, char>
123  (
124  sendBuffers_,
125  recvSizes,
126  recvBuffers_,
127  tag_,
128  comm_,
129  wait
130  );
131  }
132 }
133 
134 
135 void Foam::PstreamBuffers::finalExchange
136 (
137  const labelUList& sendProcs,
138  const labelUList& recvProcs,
139  const bool wait,
140  labelList& recvSizes
141 )
142 {
143  // Could also check that it is not called twice
144  // but that is used for overlapping send/recv (eg, overset)
145  finishedSendsCalled_ = true;
146 
147  if (commsType_ == UPstream::commsTypes::nonBlocking)
148  {
149  // Preparation. Temporarily abuse recvSizes as logic to clear
150  // send buffers that are not in the neighbourhood connection
151  {
152  recvSizes.resize_nocopy(nProcs_);
153  recvSizes = 0;
154 
155  // Preserve self-send, even if not described by neighbourhood
156  recvSizes[UPstream::myProcNo(comm_)] = 1;
157 
158  for (const label proci : sendProcs)
159  {
160  recvSizes[proci] = 1; // Connected
161  }
162 
163  for (label proci=0; proci < nProcs_; ++proci)
164  {
165  if (!recvSizes[proci]) // Not connected
166  {
167  sendBuffers_[proci].clear();
168  }
169  }
170  }
171 
172  // PEX stage 1: exchange sizes (limited neighbourhood)
174  (
175  sendProcs,
176  recvProcs,
177  sendBuffers_,
178  recvSizes,
179  tag_,
180  comm_
181  );
182 
183  // PEX stage 2: point-to-point data exchange
184  Pstream::exchange<DynamicList<char>, char>
185  (
186  sendBuffers_,
187  recvSizes,
188  recvBuffers_,
189  tag_,
190  comm_,
191  wait
192  );
193  }
194 }
195 
196 
197 void Foam::PstreamBuffers::finalGatherScatter
198 (
199  const bool isGather,
200  const bool wait,
201  labelList& recvSizes
202 )
203 {
204  // Could also check that it is not called twice
205  // but that is used for overlapping send/recv (eg, overset)
206  finishedSendsCalled_ = true;
207 
208  if (isGather)
209  {
210  // gather mode (all-to-one)
211 
212  // Only send to master [0]. Master is also allowed to 'send' to itself
213 
214  for (label proci=1; proci < sendBuffers_.size(); ++proci)
215  {
216  sendBuffers_[proci].clear();
217  }
218  }
219  else
220  {
221  // scatter mode (one-to-all)
222 
223  if (!UPstream::master(comm_))
224  {
225  // Non-master: has no sends
226  clearSends();
227  }
228  }
229 
230 
231  if (commsType_ == UPstream::commsTypes::nonBlocking)
232  {
233  // Use PEX algorithm
234  // - for a non-sparse gather/scatter, it is presumed that
235  // MPI_Gather/MPI_Scatter will be the most efficient way to
236  // communicate the sizes.
237 
238  // PEX stage 1: exchange sizes (gather or scatter)
239  if (isGather)
240  {
241  // gather mode (all-to-one): master [0] <- everyone
242 
243  recvSizes =
244  UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
245 
246  if (!UPstream::master(comm_))
247  {
248  recvSizes.resize_nocopy(nProcs_);
249  recvSizes = Zero;
250  }
251  }
252  else
253  {
254  // scatter mode (one-to-all): master [0] -> everyone
255 
256  recvSizes.resize_nocopy(nProcs_);
257 
258  if (UPstream::master(comm_))
259  {
260  forAll(sendBuffers_, proci)
261  {
262  recvSizes[proci] = sendBuffers_[proci].size();
263  }
264  }
265 
266  const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
267 
268  recvSizes = Zero;
269  recvSizes[0] = myRecv;
270  }
271 
272  // PEX stage 2: point-to-point data exchange
273  Pstream::exchange<DynamicList<char>, char>
274  (
275  sendBuffers_,
276  recvSizes,
277  recvBuffers_,
278  tag_,
279  comm_,
280  wait
281  );
282  }
283 }
284 
285 
286 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
287 
289 (
290  UPstream::commsTypes commsType,
291  int tag,
292  label communicator,
294 )
295 :
296  finishedSendsCalled_(false),
297  allowClearRecv_(true),
298  format_(fmt),
299  commsType_(commsType),
300  tag_(tag),
301  comm_(communicator),
302  nProcs_(UPstream::nProcs(comm_)),
303  sendBuffers_(nProcs_),
304  recvBuffers_(nProcs_),
305  recvPositions_(nProcs_, Zero)
306 {}
307 
308 
309 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
310 
312 {
313  // Check that all data has been consumed.
314  forAll(recvBuffers_, proci)
315  {
316  const label pos = recvPositions_[proci];
317  const label len = recvBuffers_[proci].size();
318 
319  if (pos < len)
320  {
322  << "Message from processor " << proci
323  << " Only consumed " << pos << " of " << len << " bytes" << nl
325  }
326  }
327 }
328 
329 
330 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
331 
332 Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
333 (
334  const label proci
335 )
336 {
337  return sendBuffers_[proci];
338 }
339 
340 
341 Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
342 (
343  const label proci
344 )
345 {
346  return recvBuffers_[proci];
347 }
348 
349 
350 Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
351 {
352  return recvPositions_[proci];
353 }
354 
355 
356 // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
357 
359 {
360  for (DynamicList<char>& buf : sendBuffers_)
361  {
362  buf.clear();
363  }
364 }
365 
366 
368 {
369  for (DynamicList<char>& buf : recvBuffers_)
370  {
371  buf.clear();
372  }
373  recvPositions_ = Zero;
374 }
375 
376 
378 {
379  clearSends();
380  clearRecvs();
381  finishedSendsCalled_ = false;
382 }
383 
385 void Foam::PstreamBuffers::clearSend(const label proci)
386 {
387  sendBuffers_[proci].clear();
388 }
389 
390 
391 void Foam::PstreamBuffers::clearRecv(const label proci)
392 {
393  recvBuffers_[proci].clear();
394  recvPositions_[proci] = 0;
395 }
396 
397 
399 {
400  // Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
401  // Not sure if it makes much difference
402  for (DynamicList<char>& buf : sendBuffers_)
403  {
404  buf.clearStorage();
405  }
406  for (DynamicList<char>& buf : recvBuffers_)
407  {
408  buf.clearStorage();
409  }
410  recvPositions_ = Zero;
411 
412  finishedSendsCalled_ = false;
413 }
414 
415 
417 {
418  for (const DynamicList<char>& buf : sendBuffers_)
419  {
420  if (!buf.empty())
421  {
422  return true;
423  }
424  }
425  return false;
426 }
427 
428 
430 {
431  if (finishedSendsCalled_)
432  {
433  forAll(recvBuffers_, proci)
434  {
435  if (recvPositions_[proci] < recvBuffers_[proci].size())
436  {
437  return true;
438  }
439  }
440  }
441  #ifdef FULLDEBUG
442  else
443  {
445  << "Call finishedSends first" << exit(FatalError);
446  }
447  #endif
448 
449  return false;
450 }
451 
453 Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
454 {
455  return sendBuffers_[proci].size();
456 }
457 
458 
459 Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
460 {
461  if (finishedSendsCalled_)
462  {
463  const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
464 
465  if (len > 0)
466  {
467  return len;
468  }
469  }
470  #ifdef FULLDEBUG
471  else
472  {
474  << "Call finishedSends first" << exit(FatalError);
475  }
476  #endif
477 
478  return 0;
479 }
480 
481 
483 {
484  labelList counts(nProcs_, Zero);
485 
486  if (finishedSendsCalled_)
487  {
488  forAll(recvBuffers_, proci)
489  {
490  const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
491 
492  if (len > 0)
493  {
494  counts[proci] = len;
495  }
496  }
497  }
498  #ifdef FULLDEBUG
499  else
500  {
502  << "Call finishedSends first" << exit(FatalError);
503  }
504  #endif
505 
506  return counts;
507 }
508 
509 
511 (
512  const label excludeProci
513 ) const
514 {
515  label maxLen = 0;
516 
517  if (finishedSendsCalled_)
518  {
519  forAll(recvBuffers_, proci)
520  {
521  if (excludeProci != proci)
522  {
523  label len(recvBuffers_[proci].size() - recvPositions_[proci]);
524  maxLen = max(maxLen, len);
525  }
526  }
527  }
528  #ifdef FULLDEBUG
529  else
530  {
532  << "Call finishedSends first" << exit(FatalError);
533  }
534  #endif
535 
536  return maxLen;
537 }
538 
539 
541 {
542  // Use out-of-range proci to avoid excluding any processor
543  return maxNonLocalRecvCount(-1);
544 }
545 
546 
548 {
549  return maxNonLocalRecvCount(UPstream::myProcNo(comm_));
550 }
551 
552 
553 const Foam::UList<char>
554 Foam::PstreamBuffers::peekRecvData(const label proci) const
555 {
556  if (finishedSendsCalled_)
557  {
558  const label pos = recvPositions_[proci];
559  const label len = recvBuffers_[proci].size();
560 
561  if (pos < len)
562  {
563  return UList<char>
564  (
565  const_cast<char*>(recvBuffers_[proci].cbegin(pos)),
566  (len - pos)
567  );
568  }
569  }
570  #ifdef FULLDEBUG
571  else
572  {
574  << "Call finishedSends first" << exit(FatalError);
575  }
576  #endif
577 
578  return UList<char>();
579 }
580 
581 
583 {
584  bool old(allowClearRecv_);
585  allowClearRecv_ = on;
586  return old;
587 }
588 
589 
590 void Foam::PstreamBuffers::finishedSends(const bool wait)
591 {
592  labelList recvSizes;
593  finalExchange(wait, recvSizes);
594 }
595 
596 
598 (
599  labelList& recvSizes,
600  const bool wait
601 )
602 {
603  // Resize for copying back
604  recvSizes.resize_nocopy(sendBuffers_.size());
605 
606  finalExchange(wait, recvSizes);
607 
608  if (commsType_ != UPstream::commsTypes::nonBlocking)
609  {
611  << "Obtaining sizes not supported in "
612  << UPstream::commsTypeNames[commsType_] << endl
613  << " since transfers already in progress. Use non-blocking instead."
614  << exit(FatalError);
615 
616  // Note: maybe possible only if using different tag from write started
617  // by ~UOPstream. Needs some work.
618  }
619 }
620 
621 
623 (
624  const labelUList& neighProcs,
625  labelList& recvSizes,
626  const bool wait
627 )
628 {
629  finalExchange(neighProcs, neighProcs, wait, recvSizes);
630 }
631 
632 
634 (
635  const labelUList& neighProcs,
636  const bool wait
637 )
638 {
639  labelList recvSizes;
640  finalExchange(neighProcs, neighProcs, wait, recvSizes);
641 }
642 
643 
645 (
646  bitSet& sendConnections,
647  DynamicList<label>& sendProcs,
648  DynamicList<label>& recvProcs,
649  const bool wait
650 )
651 {
652  bool changed = (sendConnections.size() != nProcs());
653 
654  if (changed)
655  {
656  sendConnections.resize(nProcs());
657  }
658 
659  // Update send connections
660  forAll(sendBuffers_, proci)
661  {
662  if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
663  {
664  // The state changed
665  changed = true;
666  }
667  }
668 
669  UPstream::reduceOr(changed, comm_);
670 
671  if (changed)
672  {
673  // Update send/recv topology
674  labelList recvSizes;
675  finishedSends(recvSizes, wait); // eg, using all-to-all
676 
677  // The send ranks
678  sendProcs.clear();
679  forAll(sendBuffers_, proci)
680  {
681  if (!sendBuffers_[proci].empty())
682  {
683  sendProcs.push_back(proci);
684  }
685  }
686 
687  // The recv ranks
688  recvProcs.clear();
689  forAll(recvSizes, proci)
690  {
691  if (recvSizes[proci] > 0)
692  {
693  recvProcs.push_back(proci);
694  }
695  }
696  }
697  else
698  {
699  // Use existing send/recv ranks
700  labelList recvSizes;
701  finalExchange(sendProcs, recvProcs, wait, recvSizes);
702  }
703 
704  return changed;
705 }
706 
707 
709 {
710  labelList recvSizes;
711  finalGatherScatter(true, wait, recvSizes);
712 }
713 
714 
715 void Foam::PstreamBuffers::finishedScatters(const bool wait)
716 {
717  labelList recvSizes;
718  finalGatherScatter(false, wait, recvSizes);
719 }
720 
721 
723 (
724  labelList& recvSizes,
725  const bool wait
726 )
727 {
728  finalGatherScatter(true, wait, recvSizes);
729 
730  if (commsType_ != UPstream::commsTypes::nonBlocking)
731  {
733  << "Obtaining sizes not supported in "
734  << UPstream::commsTypeNames[commsType_] << endl
735  << " since transfers already in progress. Use non-blocking instead."
736  << exit(FatalError);
737 
738  // Note: maybe possible only if using different tag from write started
739  // by ~UOPstream. Needs some work.
740  }
741 }
742 
743 
745 (
746  labelList& recvSizes,
747  const bool wait
748 )
749 {
750  finalGatherScatter(false, wait, recvSizes);
751 
752  if (commsType_ != UPstream::commsTypes::nonBlocking)
753  {
755  << "Obtaining sizes not supported in "
756  << UPstream::commsTypeNames[commsType_] << endl
757  << " since transfers already in progress. Use non-blocking instead."
758  << exit(FatalError);
759 
760  // Note: maybe possible only if using different tag from write started
761  // by ~UOPstream. Needs some work.
762  }
763 }
764 
765 
766 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
void set(const bitSet &bitset)
Set specified bits from another bitset.
Definition: bitSetI.H:571
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:84
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Communications types.
Definition: UPstream.H:74
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:40
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:49
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:392
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
static void reduceOr(bool &value, const label communicator=worldComm)
Logical (or) reduction (MPI_AllReduce)
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:139
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1029
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero integer data with all ranks in the communicator using non-blocking consensus exchan...
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
Definition: PackedListI.H:388
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:414
static constexpr int algorithm_PEX_allToAll
dimensionedScalar pos(const dimensionedScalar &ds)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark sends as done.
static constexpr int algorithm_full_NBX
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
registerOptSwitch("pbufs.tuning", int, Foam::PstreamBuffers::algorithm)
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:99
const direction noexcept
Definition: Scalar.H:258
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:234
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange integer data with all processors (in the communicator).
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark sends as done using subset of send/recv ranks and recover the sizes (bytes) received.
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
Definition: DynamicListI.H:389
void push_back(const T &val)
Copy append an element to the end of this list.
Definition: DynamicListI.H:518
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void clear()
Clear all send/recv buffers and reset states.
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:59
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1037
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
void clearStorage()
Clear storage for all send/recv buffers and reset states.
List< label > labelList
A List of labels.
Definition: List.H:62
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
label size() const noexcept
Number of entries.
Definition: PackedList.H:357
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:133