PstreamBuffers.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::PstreamBuffers
29 
30 Description
31  Buffers for inter-processor communications streams (UOPstream, UIPstream).
32 
33  Use UOPstream to stream data into buffers, call finishedSends() to
34  notify that data is in buffers and then use IUPstream to get data out
35  of received buffers. Works with both buffered and non-blocking. Does
36  not make much sense with scheduled since there you would not need these
37  explicit buffers.
38 
39  Example usage:
40  \code
41  PstreamBuffers pBufs;
42 
43  for (const int proci : UPstream::allProcs())
44  {
45  if (proci != UPstream::myProcNo())
46  {
47  someObject vals;
48 
49  UOPstream send(proci, pBufs);
50  send << vals;
51  }
52  }
53 
54  pBufs.finishedSends(); // no-op for buffered
55 
56  for (const int proci : UPstream::allProcs())
57  {
58  if (proci != UPstream::myProcNo())
59  {
60  UIPstream recv(proci, pBufs);
61  someObject vals(recv);
62  }
63  }
64  \endcode
65 
66  There are special versions of finishedSends() for
67  restricted neighbour communication as well as for special
68  one-to-all and all-to-one communication patterns.
69  For example,
70  \code
71  PstreamBuffers pBufs;
72 
73  if (UPstream::master())
74  {
75  someObject vals;
76  for (const int proci : UPstream::subProcs())
77  {
78  UOPstream send(proci, pBufs);
79  send << vals;
80  }
81  }
82 
83  pBufs.finishedScatters();
84 
85  if (!UPstream::master())
86  {
87  UIPstream recv(UPstream::masterNo(), pBufs);
88  someObject vals(recv);
89  }
90  \endcode
91 
92  Additionally there are some situations that use speculative sends
93  that may not actually be required. In this case, it is possible to
94  mark all sends as initially \em unregistered and subsequently
95  mark the "real" sends as \em registered.
96 
97  For example,
98  \code
99  PstreamBuffers pBufs;
100 
101  pBufs.initRegisterSend();
102 
103  for (const polyPatch& pp : patches)
104  {
105  const auto* ppp = isA<processorPolyPatch>(pp);
106  if (ppp)
107  {
108  const label nbrProci = ppp->neighbProcNo();
109 
110  // Gather some patch information...
111  UOPstream toNbr(nbrProci, pBufs);
112  toNbr << patchInfo;
113 
114  // The send is needed if patchInfo is non-empty
115  pBufs.registerSend(nbrProci, !patchInfo.empty());
116  }
117  }
118 
119  // optional: pBufs.clearUnregistered();
120 
121  pBufs.finishedSends();
122 
123  ...
124  \endcode
125 
126 SourceFiles
127  PstreamBuffers.C
128 
129 \*---------------------------------------------------------------------------*/
130 
131 #include "Pstream.H"
132 
133 #ifndef Foam_PstreamBuffers_H
134 #define Foam_PstreamBuffers_H
135 
136 #include "DynamicList.H"
137 #include "UPstream.H"
138 #include "IOstream.H"
139 
140 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
141 
142 namespace Foam
143 {
144 
145 // Forward Declarations
146 class bitSet;
148 /*---------------------------------------------------------------------------*\
149  Class PstreamBuffers Declaration
150 \*---------------------------------------------------------------------------*/
151 
152 class PstreamBuffers
153 {
154  // Private Data Types
155 
156  //- Private enumeration for handling PEX stage 1 (sizing) modes
157  enum class modeOption : unsigned char
158  {
159  DEFAULT,
160  GATHER,
161  SCATTER,
162  ALL_TO_ALL,
163  NBX_PEX
164  };
165 
166 
167  // Private Data
168 
169  //- Track if sends are complete
170  bool finishedSendsCalled_;
171 
172  //- Permit clear of individual receive buffer by external access
173  bool allowClearRecv_;
174 
175  //- Buffer format (ascii | binary)
176  const IOstreamOption::streamFormat format_;
177 
178  //- Communications type of this stream
179  const UPstream::commsTypes commsType_;
180 
181  //- The transfer message type
182  const int tag_;
183 
184  //- Communicator
185  const label comm_;
186 
187  //- Number of ranks associated with PstreamBuffers (at construction)
188  const label nProcs_;
189 
190 
191  // Buffer storage
192 
193  //- Send buffers. Size is nProcs()
194  List<DynamicList<char>> sendBuffers_;
195 
196  //- Receive buffers. Size is nProcs()
197  List<DynamicList<char>> recvBuffers_;
198 
199  //- Current read positions within recvBuffers_. Size is nProcs()
200  // This list is also misused for registerSend() bookkeeping
201  labelList recvPositions_;
202 
203 
204  // Private Member Functions
205 
206  //- Change status of finished sends called
207  inline void setFinished(bool on) noexcept;
208 
209  //- Clear 'unregistered' send buffers, tag as being send-ready
210  inline void initFinalExchange();
211 
212  //- Mark all sends as having been done.
213  // This will start receives (non-blocking comms).
214  void finalExchange
215  (
216  enum modeOption mode,
217  const bool wait,
218  labelList& recvSizes
219  );
220 
221  //- Mark sends as done.
222  // Only exchange sizes using the neighbour ranks
223  // (non-blocking comms).
224  void finalExchange
225  (
226  const labelUList& sendProcs,
227  const labelUList& recvProcs,
228  const bool wait,
229  labelList& recvSizes
230  );
231 
232 
233  // Friendship Access
234 
235  //- Access a send buffer for given proc (in range 0-nProcs)
236  DynamicList<char>& accessSendBuffer(const label proci);
237 
238  //- Access a recv buffer for given proc (in range 0-nProcs).
239  DynamicList<char>& accessRecvBuffer(const label proci);
240 
241  //- Access the recv position within recv buffer for given proc
242  //- (in range 0-nProcs).
243  label& accessRecvPosition(const label proci);
244 
245  friend class UOPstreamBase; // accessSendBuffer()
246  friend class UIPstreamBase; // accessRecvBuffer(), accessRecvPosition()
247 
248 
249 public:
250 
251  // Declare name of the class and its debug switch
252  ClassName("PstreamBuffers");
253 
254 
255  // Static Data
256 
257  //- Preferred exchange algorithm (may change or be removed in future)
258  static int algorithm;
259 
260 
261  // Constructors
262 
263  //- Construct given communication type (default: nonBlocking), message
264  //- tag, communicator (default: worldComm), IO format (default: binary)
265  explicit PstreamBuffers
266  (
268  int tag = UPstream::msgType(),
269  label communicator = UPstream::worldComm,
271  );
272 
273  //- Construct given communicator, communication type
274  //- (default: nonBlocking), message tag, IO format (default: binary)
275  explicit PstreamBuffers
276  (
277  label communicator,
281  )
282  :
283  PstreamBuffers(commsType, tag, communicator, fmt)
284  {}
285 
286  //- Construct given communicator, message tag, communication type
287  //- (default: nonBlocking), IO format (default: binary)
289  (
290  label communicator,
291  int tag,
294  )
295  :
296  PstreamBuffers(commsType, tag, communicator, fmt)
297  {}
298 
299 
300  //- Destructor - checks that all data have been consumed
301  ~PstreamBuffers();
302 
303 
304  // Member Functions
305 
306  // Attributes
307 
308  //- The associated buffer format (ascii | binary)
310  {
311  return format_;
312  }
313 
314  //- The communications type of the stream
316  {
317  return commsType_;
318  }
319 
320  //- The transfer message tag
321  int tag() const noexcept
322  {
323  return tag_;
324  }
325 
326  //- The communicator index
327  label comm() const noexcept
328  {
329  return comm_;
330  }
332  //- Number of ranks associated with PstreamBuffers
333  label nProcs() const noexcept
334  {
335  return nProcs_;
336  }
337 
338 
339  // Sizing
340 
341  //- Range of ranks indices associated with PstreamBuffers
343  {
344  // Proc 0 -> nProcs (int value)
345  return UPstream::rangeType(static_cast<int>(nProcs_));
346  }
347 
348  //- Range of sub-processes indices associated with PstreamBuffers
350  {
351  // Proc 1 -> nProcs (int value)
352  return UPstream::rangeType(1, static_cast<int>(nProcs_-1));
353  }
354 
356  // Queries
357 
358  //- True if finishedSends() or finishedNeighbourSends() has been called
359  bool finished() const noexcept;
360 
361  //- Is clearStorage of individual receive buffer by external hooks
362  //- allowed? (default: true)
363  bool allowClearRecv() const noexcept;
364 
365  //- True if any (local) send buffers have data
366  bool hasSendData() const;
367 
368  //- True if any (local) recv buffers have unconsumed data.
369  //- Must call finishedSends() or other finished.. method first!
370  bool hasRecvData() const;
372  //- Number of send bytes for the specified processor.
373  label sendDataCount(const label proci) const;
374 
375  //- Number of unconsumed receive bytes for the specified processor.
376  //- Must call finishedSends() or other finished.. method first!
377  label recvDataCount(const label proci) const;
378 
379  //- Number of unconsumed receive bytes for all processors.
380  //- Must call finishedSends() or other finished.. method first!
381  labelList recvDataCounts() const;
382 
383  //- Maximum receive size from any rocessor rank.
384  //- Must call finishedSends() or other finished.. method first!
385  label maxRecvCount() const;
386 
387  //- Maximum receive size, excluding current processor rank
388  //- Must call finishedSends() or other finished.. method first!
389  label maxNonLocalRecvCount() const;
390 
391  //- Maximum receive size, excluding the specified processor rank
392  //- Must call finishedSends() or other finished.. method first!
393  label maxNonLocalRecvCount(const label excludeProci) const;
394 
395  //- Number of unconsumed receive bytes for the specified processor.
396  //- Must call finishedSends() or other finished.. method first!
397  // The method is only useful in limited situations, such as when
398  // PstreamBuffers has been used to fill contiguous data
399  // (eg, using OPstream::write).
400  const UList<char> peekRecvData(const label proci) const;
401 
402 
403  // Edit
404 
405  //- Clear all send/recv buffers and reset states.
406  // Does not remove the buffer storage.
407  void clear();
408 
409  //- Clear all send buffers (does not remove buffer storage)
410  void clearSends();
411 
412  //- Clear all recv buffer and positions (does not remove buffer storage)
413  void clearRecvs();
414 
415  //- Clear an individual send buffer (eg, data not required)
416  void clearSend(const label proci);
417 
418  //- Clear an individual receive buffer (eg, data not required)
419  // Does not remove the buffer storage.
420  void clearRecv(const label proci);
421 
422  //- Clear storage for all send/recv buffers and reset states.
423  void clearStorage();
424 
425  //- Change allowClearRecv, return previous value
426  bool allowClearRecv(bool on) noexcept;
427 
428 
429  // Registered Sending
430 
431  //- Initialise registerSend() bookkeeping by mark all send buffers
432  //- as 'unregistered'
433  // Usually called immediately after construction or clear().
434  void initRegisterSend();
435 
436  //- Toggle an individual send buffer as 'registered'.
437  //- The setting is sticky (does not turn off)
438  void registerSend(const label proci, const bool toggleOn = true);
439 
440  //- Clear any 'unregistered' send buffers.
441  void clearUnregistered();
442 
443 
444  // Regular Functions
445 
446  //- Mark the send phase as being finished.
447  //
448  // Non-blocking mode: populates receive buffers using all-to-all
449  // or NBX (depending on tuning parameters).
450  // \param wait wait for requests to complete (in non-blocking mode)
451  void finishedSends(const bool wait = true);
452 
453  //- Mark the send phase as being finished.
454  //
455  // Non-blocking mode: populates receive buffers using NBX.
456  // \param wait wait for requests to complete (in non-blocking mode)
457  void finishedSendsNBX(const bool wait = true);
458 
459  //- Mark the send phase as being finished.
460  //- Recovers the sizes (bytes) received.
461  //
462  // Non-blocking mode: populates receive buffers using all-to-all
463  // or NBX (depending on tuning parameters).
464  // \warning currently only valid for non-blocking comms.
465  void finishedSends
466  (
468  labelList& recvSizes,
470  const bool wait = true
471  );
472 
473  //- Mark the send phase as being finished.
474  //- Recovers the sizes (bytes) received.
475  //
476  // Non-blocking mode: populates receive buffers using NBX.
477  // \warning currently only valid for non-blocking comms.
478  void finishedSendsNBX
479  (
481  labelList& recvSizes,
483  const bool wait = true
484  );
485 
486 
487  // Functions with restricted neighbours
488 
489  //- Mark the send phase as being finished, with communication
490  //- being limited to a known subset of send/recv ranks.
491  //
492  // Non-blocking mode: populates receive buffers.
493  //
494  // \warning currently only valid for non-blocking comms.
495  // \note Same as finishedSends with identical sendProcs/recvProcs
497  (
499  const labelUList& neighProcs,
501  const bool wait = true
502  );
503 
504  //- Mark the send phase as being finished, with communication
505  //- being limited to a known subset of send/recv ranks.
506  //- Recovers the sizes (bytes) received.
507  //
508  // Non-blocking mode: it will populate receive buffers.
509  //
510  // \warning currently only valid for non-blocking mode.
512  (
514  const labelUList& neighProcs,
516  labelList& recvSizes,
518  const bool wait = true
519  );
520 
521  //- A caching version that uses a limited send/recv connectivity.
522  //
523  // Non-blocking mode: populates receive buffers.
524  // \return True if the send/recv connectivity changed
525  //
526  // \warning currently only valid for non-blocking comms.
527  bool finishedSends
528  (
530  bitSet& sendConnections,
532  DynamicList<label>& sendProcs,
534  DynamicList<label>& recvProcs,
536  const bool wait = true
537  );
538 
539 
540  // Gather/scatter modes
541 
542  //- Mark all sends to master as done.
543  //
544  // Non-blocking mode: populates receive buffers.
545  // Can use recvDataCount, maxRecvCount etc to recover sizes received.
546  //
547  // \param wait wait for requests to complete (in non-blocking mode)
548  //
549  // \warning currently only valid for non-blocking comms.
550  void finishedGathers(const bool wait = true);
551 
552  //- Mark all sends to master as done.
553  //- Recovers the sizes (bytes) received.
554  //
555  // Non-blocking mode: populates receive buffers (all-to-one).
556  // \warning currently only valid for non-blocking comms.
557  void finishedGathers
558  (
560  labelList& recvSizes,
562  const bool wait = true
563  );
564 
565  //- Mark all sends to sub-procs as done.
566  //
567  // Non-blocking mode: populates receive buffers.
568  // Can use recvDataCount, maxRecvCount etc to recover sizes received.
569  //
570  // \param wait wait for requests to complete (in non-blocking mode)
571  //
572  // \warning currently only valid for non-blocking comms.
573  void finishedScatters(const bool wait = true);
574 
575  //- Mark all sends to sub-procs as done.
576  //- Recovers the sizes (bytes) received.
577  //
578  // Non-blocking mode: populates receive buffers (all-to-one).
579  // \warning currently only valid for non-blocking comms.
580  void finishedScatters
581  (
583  labelList& recvSizes,
585  const bool wait = true
586  );
587 };
588 
589 
590 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
591 
592 } // End namespace Foam
593 
594 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
595 
596 #endif
597 
598 // ************************************************************************* //
int tag() const noexcept
The transfer message tag.
commsTypes
Communications types.
Definition: UPstream.H:77
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
Base class for input inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UIPstream.H:54
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
An interval of (signed) integers defined by a start and a size.
Definition: IntRange.H:59
IntRange< int > rangeType
Int ranges are used for MPI ranks (processes)
Definition: UPstream.H:72
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
IOstreamOption::streamFormat format() const noexcept
The associated buffer format (ascii | binary)
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1252
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:421
label comm() const noexcept
The communicator index.
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clearUnregistered()
Clear any &#39;unregistered&#39; send buffers.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
ClassName("PstreamBuffers")
UPstream::rangeType subProcs() const noexcept
Range of sub-processes indices associated with PstreamBuffers.
const direction noexcept
Definition: Scalar.H:258
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
Buffers for inter-processor communications streams (UOPstream, UIPstream).
UPstream::commsTypes commsType() const noexcept
The communications type of the stream.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
void clear()
Clear all send/recv buffers and reset states.
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as &#39;unregistered&#39;. ...
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:59
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as &#39;registered&#39;. The setting is sticky (does not turn off) ...
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
streamFormat
Data format (ascii | binary)
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
void clearStorage()
Clear storage for all send/recv buffers and reset states.
Base class for output inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UOPstream.H:54
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Definition: POSIX.C:773
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
UPstream::rangeType allProcs() const noexcept
Range of ranks indices associated with PstreamBuffers.
Namespace for OpenFOAM.