PstreamBuffers.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::PstreamBuffers
29 
30 Description
31  Buffers for inter-processor communications streams (UOPstream, UIPstream).
32 
33  Use UOPstream to stream data into buffers, call finishedSends() to
34  notify that data is in buffers and then use IUPstream to get data out
35  of received buffers. Works with both blocking and non-blocking. Does
36  not make much sense with scheduled since there you would not need these
37  explicit buffers.
38 
39  Example usage:
40  \code
41  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
42 
43  for (const int proci : UPstream::allProcs())
44  {
45  if (proci != UPstream::myProcNo())
46  {
47  someObject vals;
48 
49  UOPstream send(proci, pBufs);
50  send << vals;
51  }
52  }
53 
54  pBufs.finishedSends(); // no-op for blocking
55 
56  for (const int proci : UPstream::allProcs())
57  {
58  if (proci != UPstream::myProcNo())
59  {
60  UIPstream recv(proci, pBufs);
61  someObject vals(recv);
62  }
63  }
64  \endcode
65 
66  There are special versions of finishedSends() for
67  restricted neighbour communication as well as for special
68  one-to-all and all-to-one communication patterns.
69  For example,
70  \code
71  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
72 
73  if (UPstream::master())
74  {
75  someObject vals;
76  for (const int proci : UPstream::subProcs())
77  {
78  UOPstream send(proci, pBufs);
79  send << vals;
80  }
81  }
82 
83  pBufs.finishedScatters();
84 
85  if (!UPstream::master())
86  {
87  UIPstream recv(UPstream::masterNo(), pBufs);
88  someObject vals(recv);
89  }
90  \endcode
91 
92  Additionally there are some situations that use speculative sends
93  that may not actually be required. In this case, it is possible to
94  mark all sends as initially \em unregistered and subsequently
95  mark the "real" sends as \em registered.
96 
97  For example,
98  \code
99  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
100 
101  pBufs.initRegisterSend();
102 
103  for (const polyPatch& pp : patches)
104  {
105  const auto* ppp = isA<processorPolyPatch>(pp);
106  if (ppp)
107  {
108  const label nbrProci = ppp->neighbProcNo();
109 
110  // Gather some patch information...
111  UOPstream toNbr(nbrProci, pBufs);
112  toNbr << patchInfo;
113 
114  // The send is needed if patchInfo is non-empty
115  pBufs.registerSend(nbrProci, !patchInfo.empty());
116  }
117  }
118 
119  // optional: pBufs.clearUnregistered();
120 
121  pBufs.finishedSends();
122 
123  ...
124  \endcode
125 
126 SourceFiles
127  PstreamBuffers.C
128 
129 \*---------------------------------------------------------------------------*/
130 
131 #include "Pstream.H"
132 
133 #ifndef Foam_PstreamBuffers_H
134 #define Foam_PstreamBuffers_H
135 
136 #include "DynamicList.H"
137 #include "UPstream.H"
138 #include "IOstream.H"
139 
140 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
141 
142 namespace Foam
143 {
144 
145 // Forward Declarations
146 class bitSet;
148 /*---------------------------------------------------------------------------*\
149  Class PstreamBuffers Declaration
150 \*---------------------------------------------------------------------------*/
151 
152 class PstreamBuffers
153 {
154  // Private Data Types
155 
156  //- Private enumeration for handling PEX stage 1 (sizing) modes
157  enum class modeOption : unsigned char
158  {
159  DEFAULT,
160  GATHER,
161  SCATTER,
162  ALL_TO_ALL,
163  NBX_PEX
164  };
165 
166 
167  // Private Data
168 
169  //- Track if sends are complete
170  bool finishedSendsCalled_;
171 
172  //- Permit clear of individual receive buffer by external access
173  bool allowClearRecv_;
174 
175  //- Buffer format (ascii | binary)
176  const IOstreamOption::streamFormat format_;
177 
178  //- Communications type of this stream
179  const UPstream::commsTypes commsType_;
180 
181  //- The transfer message type
182  const int tag_;
183 
184  //- Communicator
185  const label comm_;
186 
187  //- Number of ranks associated with PstreamBuffers (at construction)
188  const label nProcs_;
189 
190 
191  // Buffer storage
192 
193  //- Send buffers. Size is nProcs()
194  List<DynamicList<char>> sendBuffers_;
195 
196  //- Receive buffers. Size is nProcs()
197  List<DynamicList<char>> recvBuffers_;
198 
199  //- Current read positions within recvBuffers_. Size is nProcs()
200  // This list is also misused for registerSend() bookkeeping
201  labelList recvPositions_;
202 
203 
204  // Private Member Functions
205 
206  //- Change status of finished sends called
207  inline void setFinished(bool on) noexcept;
208 
209  //- Clear 'unregistered' send buffers, tag as being send-ready
210  inline void initFinalExchange();
211 
212  //- Mark all sends as having been done.
213  // This will start receives (non-blocking comms).
214  void finalExchange
215  (
216  enum modeOption mode,
217  const bool wait,
218  labelList& recvSizes
219  );
220 
221  //- Mark sends as done.
222  // Only exchange sizes using the neighbour ranks
223  // (non-blocking comms).
224  void finalExchange
225  (
226  const labelUList& sendProcs,
227  const labelUList& recvProcs,
228  const bool wait,
229  labelList& recvSizes
230  );
231 
232 
233  // Friendship Access
234 
235  //- Access a send buffer for given proc (in range 0-nProcs)
236  DynamicList<char>& accessSendBuffer(const label proci);
237 
238  //- Access a recv buffer for given proc (in range 0-nProcs).
239  DynamicList<char>& accessRecvBuffer(const label proci);
240 
241  //- Access the recv position within recv buffer for given proc
242  //- (in range 0-nProcs).
243  label& accessRecvPosition(const label proci);
244 
245  friend class UOPstreamBase; // accessSendBuffer()
246  friend class UIPstreamBase; // accessRecvBuffer(), accessRecvPosition()
247 
248 
249 public:
250 
251  // Static Data
252 
253  //- Preferred exchange algorithm (may change or be removed in future)
254  static int algorithm;
255 
256 
257  // Constructors
258 
259  //- Construct given communication type (default: nonBlocking), message
260  //- tag, communicator (default: worldComm), IO format (default: binary)
261  explicit PstreamBuffers
262  (
264  int tag = UPstream::msgType(),
265  label communicator = UPstream::worldComm,
267  );
268 
269  //- Construct given communicator, communication type
270  //- (default: nonBlocking), message tag, IO format (default: binary)
271  explicit PstreamBuffers
272  (
273  label communicator,
275  int tag = UPstream::msgType(),
277  )
278  :
279  PstreamBuffers(commsType, tag, communicator, fmt)
280  {}
281 
282  //- Construct given communicator, message tag, communication type
283  //- (default: nonBlocking), IO format (default: binary)
285  (
286  label communicator,
287  int tag,
290  )
291  :
292  PstreamBuffers(commsType, tag, communicator, fmt)
293  {}
294 
295 
296  //- Destructor - checks that all data have been consumed
297  ~PstreamBuffers();
298 
299 
300  // Member Functions
301 
302  // Attributes
303 
304  //- The associated buffer format (ascii | binary)
306  {
307  return format_;
308  }
309 
310  //- The communications type of the stream
312  {
313  return commsType_;
314  }
315 
316  //- The transfer message tag
317  int tag() const noexcept
318  {
319  return tag_;
320  }
321 
322  //- The communicator index
323  label comm() const noexcept
324  {
325  return comm_;
326  }
328  //- Number of ranks associated with PstreamBuffers
329  label nProcs() const noexcept
330  {
331  return nProcs_;
332  }
333 
334 
335  // Sizing
336 
337  //- Range of ranks indices associated with PstreamBuffers
339  {
340  // Proc 0 -> nProcs (int value)
341  return UPstream::rangeType(static_cast<int>(nProcs_));
342  }
343 
344  //- Range of sub-processes indices associated with PstreamBuffers
346  {
347  // Proc 1 -> nProcs (int value)
348  return UPstream::rangeType(1, static_cast<int>(nProcs_-1));
349  }
350 
352  // Queries
353 
354  //- True if finishedSends() or finishedNeighbourSends() has been called
355  bool finished() const noexcept;
356 
357  //- Is clearStorage of individual receive buffer by external hooks
358  //- allowed? (default: true)
359  bool allowClearRecv() const noexcept;
360 
361  //- True if any (local) send buffers have data
362  bool hasSendData() const;
363 
364  //- True if any (local) recv buffers have unconsumed data.
365  //- Must call finishedSends() or other finished.. method first!
366  bool hasRecvData() const;
368  //- Number of send bytes for the specified processor.
369  label sendDataCount(const label proci) const;
370 
371  //- Number of unconsumed receive bytes for the specified processor.
372  //- Must call finishedSends() or other finished.. method first!
373  label recvDataCount(const label proci) const;
374 
375  //- Number of unconsumed receive bytes for all processors.
376  //- Must call finishedSends() or other finished.. method first!
377  labelList recvDataCounts() const;
378 
379  //- Maximum receive size from any rocessor rank.
380  //- Must call finishedSends() or other finished.. method first!
381  label maxRecvCount() const;
382 
383  //- Maximum receive size, excluding current processor rank
384  //- Must call finishedSends() or other finished.. method first!
385  label maxNonLocalRecvCount() const;
386 
387  //- Maximum receive size, excluding the specified processor rank
388  //- Must call finishedSends() or other finished.. method first!
389  label maxNonLocalRecvCount(const label excludeProci) const;
390 
391  //- Number of unconsumed receive bytes for the specified processor.
392  //- Must call finishedSends() or other finished.. method first!
393  // The method is only useful in limited situations, such as when
394  // PstreamBuffers has been used to fill contiguous data
395  // (eg, using OPstream::write).
396  const UList<char> peekRecvData(const label proci) const;
397 
398 
399  // Edit
400 
401  //- Clear all send/recv buffers and reset states.
402  // Does not remove the buffer storage.
403  void clear();
404 
405  //- Clear all send buffers (does not remove buffer storage)
406  void clearSends();
407 
408  //- Clear all recv buffer and positions (does not remove buffer storage)
409  void clearRecvs();
410 
411  //- Clear an individual send buffer (eg, data not required)
412  void clearSend(const label proci);
413 
414  //- Clear an individual receive buffer (eg, data not required)
415  // Does not remove the buffer storage.
416  void clearRecv(const label proci);
417 
418  //- Clear storage for all send/recv buffers and reset states.
419  void clearStorage();
420 
421  //- Change allowClearRecv, return previous value
422  bool allowClearRecv(bool on) noexcept;
423 
424 
425  // Registered Sending
426 
427  //- Initialise registerSend() bookkeeping by mark all send buffers
428  //- as 'unregistered'
429  // Usually called immediately after construction or clear().
430  void initRegisterSend();
431 
432  //- Toggle an individual send buffer as 'registered'.
433  //- The setting is sticky (does not turn off)
434  void registerSend(const label proci, const bool toggleOn = true);
435 
436  //- Clear any 'unregistered' send buffers.
437  void clearUnregistered();
438 
439 
440  // Regular Functions
441 
442  //- Mark the send phase as being finished.
443  //
444  // Non-blocking mode: populates receive buffers using all-to-all
445  // or NBX (depending on tuning parameters).
446  // \param wait wait for requests to complete (in non-blocking mode)
447  void finishedSends(const bool wait = true);
448 
449  //- Mark the send phase as being finished.
450  //
451  // Non-blocking mode: populates receive buffers using NBX.
452  // \param wait wait for requests to complete (in non-blocking mode)
453  void finishedSendsNBX(const bool wait = true);
454 
455  //- Mark the send phase as being finished.
456  //- Recovers the sizes (bytes) received.
457  //
458  // Non-blocking mode: populates receive buffers using all-to-all
459  // or NBX (depending on tuning parameters).
460  // \warning currently only valid for non-blocking comms.
461  void finishedSends
462  (
464  labelList& recvSizes,
466  const bool wait = true
467  );
468 
469  //- Mark the send phase as being finished.
470  //- Recovers the sizes (bytes) received.
471  //
472  // Non-blocking mode: populates receive buffers using NBX.
473  // \warning currently only valid for non-blocking comms.
474  void finishedSendsNBX
475  (
477  labelList& recvSizes,
479  const bool wait = true
480  );
481 
482 
483  // Functions with restricted neighbours
484 
485  //- Mark the send phase as being finished, with communication
486  //- being limited to a known subset of send/recv ranks.
487  //
488  // Non-blocking mode: populates receive buffers.
489  //
490  // \warning currently only valid for non-blocking comms.
491  // \note Same as finishedSends with identical sendProcs/recvProcs
493  (
495  const labelUList& neighProcs,
497  const bool wait = true
498  );
499 
500  //- Mark the send phase as being finished, with communication
501  //- being limited to a known subset of send/recv ranks.
502  //- Recovers the sizes (bytes) received.
503  //
504  // Non-blocking mode: it will populate receive buffers.
505  //
506  // \warning currently only valid for non-blocking mode.
508  (
510  const labelUList& neighProcs,
512  labelList& recvSizes,
514  const bool wait = true
515  );
516 
517  //- A caching version that uses a limited send/recv connectivity.
518  //
519  // Non-blocking mode: populates receive buffers.
520  // \return True if the send/recv connectivity changed
521  //
522  // \warning currently only valid for non-blocking comms.
523  bool finishedSends
524  (
526  bitSet& sendConnections,
528  DynamicList<label>& sendProcs,
530  DynamicList<label>& recvProcs,
532  const bool wait = true
533  );
534 
535 
536  // Gather/scatter modes
537 
538  //- Mark all sends to master as done.
539  //
540  // Non-blocking mode: populates receive buffers.
541  // Can use recvDataCount, maxRecvCount etc to recover sizes received.
542  //
543  // \param wait wait for requests to complete (in non-blocking mode)
544  //
545  // \warning currently only valid for non-blocking comms.
546  void finishedGathers(const bool wait = true);
547 
548  //- Mark all sends to master as done.
549  //- Recovers the sizes (bytes) received.
550  //
551  // Non-blocking mode: populates receive buffers (all-to-one).
552  // \warning currently only valid for non-blocking comms.
553  void finishedGathers
554  (
556  labelList& recvSizes,
558  const bool wait = true
559  );
560 
561  //- Mark all sends to sub-procs as done.
562  //
563  // Non-blocking mode: populates receive buffers.
564  // Can use recvDataCount, maxRecvCount etc to recover sizes received.
565  //
566  // \param wait wait for requests to complete (in non-blocking mode)
567  //
568  // \warning currently only valid for non-blocking comms.
569  void finishedScatters(const bool wait = true);
570 
571  //- Mark all sends to sub-procs as done.
572  //- Recovers the sizes (bytes) received.
573  //
574  // Non-blocking mode: populates receive buffers (all-to-one).
575  // \warning currently only valid for non-blocking comms.
576  void finishedScatters
577  (
579  labelList& recvSizes,
581  const bool wait = true
582  );
583 };
584 
585 
586 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
587 
588 } // End namespace Foam
589 
590 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
591 
592 #endif
593 
594 // ************************************************************************* //
int tag() const noexcept
The transfer message tag.
commsTypes
Communications types.
Definition: UPstream.H:72
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
Base class for input inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UIPstream.H:54
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
An interval of (signed) integers defined by a start and a size.
Definition: IntRange.H:59
IntRange< int > rangeType
Int ranges are used for MPI ranks (processes)
Definition: UPstream.H:67
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
IOstreamOption::streamFormat format() const noexcept
The associated buffer format (ascii | binary)
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1229
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:409
label comm() const noexcept
The communicator index.
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clearUnregistered()
Clear any &#39;unregistered&#39; send buffers.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
UPstream::rangeType subProcs() const noexcept
Range of sub-processes indices associated with PstreamBuffers.
const direction noexcept
Definition: Scalar.H:258
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
Buffers for inter-processor communications streams (UOPstream, UIPstream).
UPstream::commsTypes commsType() const noexcept
The communications type of the stream.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
void clear()
Clear all send/recv buffers and reset states.
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as &#39;unregistered&#39;. ...
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:59
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as &#39;registered&#39;. The setting is sticky (does not turn off) ...
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
void clearStorage()
Clear storage for all send/recv buffers and reset states.
Base class for output inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UOPstream.H:54
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Definition: POSIX.C:773
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
UPstream::rangeType allProcs() const noexcept
Range of ranks indices associated with PstreamBuffers.
Namespace for OpenFOAM.