PstreamBuffers.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::PstreamBuffers
29 
30 Description
31  Buffers for inter-processor communications streams (UOPstream, UIPstream).
32 
33  Use UOPstream to stream data into buffers, call finishedSends() to
34  notify that data is in buffers and then use IUPstream to get data out
35  of received buffers. Works with both blocking and nonBlocking. Does
36  not make much sense with scheduled since there you would not need these
37  explicit buffers.
38 
39  Example usage:
40  \code
41  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
42 
43  for (const int proci : UPstream::allProcs())
44  {
45  if (proci != UPstream::myProcNo())
46  {
47  someObject vals;
48 
49  UOPstream send(proci, pBufs);
50  send << vals;
51  }
52  }
53 
54  pBufs.finishedSends(); // no-op for blocking
55 
56  for (const int proci : UPstream::allProcs())
57  {
58  if (proci != UPstream::myProcNo())
59  {
60  UIPstream recv(proci, pBufs);
61  someObject vals(recv);
62  }
63  }
64  \endcode
65 
66  There are additional special versions of finishedSends() for
67  restricted neighbour communication as well as for special
68  one-to-all and all-to-one communication patterns.
69  For example,
70  \code
71  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
72 
73  if (UPstream::master())
74  {
75  someObject vals;
76  for (const int proci : UPstream::subProcs())
77  {
78  UOPstream send(proci, pBufs);
79  send << vals;
80  }
81  }
82 
83  pBufs.finishedScatters();
84 
85  if (!UPstream::master())
86  {
87  UIPstream recv(UPstream::masterNo(), pBufs);
88  someObject vals(recv);
89  }
90  \endcode
91 
92 SourceFiles
93  PstreamBuffers.C
94 
95 \*---------------------------------------------------------------------------*/
96 
97 #include "Pstream.H"
98 
99 #ifndef Foam_PstreamBuffers_H
100 #define Foam_PstreamBuffers_H
101 
102 #include "DynamicList.H"
103 #include "UPstream.H"
104 #include "IOstream.H"
105 
106 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
107 
108 namespace Foam
109 {
110 
111 // Forward Declarations
112 class bitSet;
114 /*---------------------------------------------------------------------------*\
115  Class PstreamBuffers Declaration
116 \*---------------------------------------------------------------------------*/
117 
118 class PstreamBuffers
119 {
120  // Private Data
121 
122  //- Track if sends are complete
123  bool finishedSendsCalled_;
124 
125  //- Permit clear of individual receive buffer by external access
126  bool allowClearRecv_;
127 
128  //- Buffer format (ascii | binary)
129  const IOstreamOption::streamFormat format_;
130 
131  //- Communications type of this stream
132  const UPstream::commsTypes commsType_;
133 
134  //- The transfer message type
135  const int tag_;
136 
137  //- Communicator
138  const label comm_;
139 
140  //- Number of ranks associated with PstreamBuffers (at construction)
141  const label nProcs_;
142 
143 
144  // Buffer storage
145 
146  //- Send buffers. Size is nProcs()
147  List<DynamicList<char>> sendBuffers_;
148 
149  //- Receive buffers. Size is nProcs()
150  List<DynamicList<char>> recvBuffers_;
151 
152  //- Current read positions within recvBuffers_. Size is nProcs()
153  labelList recvPositions_;
154 
155 
156  // Private Member Functions
157 
158  //- Mark all sends as having been done.
159  // This will start receives (nonBlocking comms).
160  void finalExchange
161  (
162  const bool wait,
163  labelList& recvSizes
164  );
165 
166  //- Mark sends as done.
167  // Only exchange sizes using the neighbour ranks
168  // (nonBlocking comms).
169  void finalExchange
170  (
171  const labelUList& sendProcs,
172  const labelUList& recvProcs,
173  const bool wait,
174  labelList& recvSizes
175  );
176 
177  //- For all-to-one or one-to-all
178  void finalGatherScatter
179  (
180  const bool isGather,
181  const bool wait,
182  labelList& recvSizes
183  );
184 
185 
186  // Friendship Access
187 
188  //- Access a send buffer for given proc (in range 0-nProcs)
189  DynamicList<char>& accessSendBuffer(const label proci);
190 
191  //- Access a recv buffer for given proc (in range 0-nProcs).
192  DynamicList<char>& accessRecvBuffer(const label proci);
193 
194  //- Access the recv position within recv buffer for given proc
195  //- (in range 0-nProcs).
196  label& accessRecvPosition(const label proci);
197 
198  friend class UOPstreamBase; // accessSendBuffer()
199  friend class UIPstreamBase; // accessRecvBuffer(), accessRecvPosition()
200 
201 
202 public:
203 
204  // Static Data
205 
206  //- Preferred exchange algorithm (may change or be removed in future)
207  static int algorithm;
208 
209 
210  // Constructors
211 
212  //- Construct given communication type (default: nonBlocking), message
213  //- tag, communicator (default: worldComm), IO format (default: binary)
214  explicit PstreamBuffers
215  (
217  int tag = UPstream::msgType(),
218  label communicator = UPstream::worldComm,
220  );
221 
222  //- Construct given communicator, communication type
223  //- (default: nonBlocking), message tag, IO format (default: binary)
224  explicit PstreamBuffers
225  (
226  label communicator,
230  )
231  :
232  PstreamBuffers(commsType, tag, communicator, fmt)
233  {}
234 
235  //- Construct given communicator, message tag, communication type
236  //- (default: nonBlocking), IO format (default: binary)
238  (
239  label communicator,
240  int tag,
243  )
244  :
245  PstreamBuffers(commsType, tag, communicator, fmt)
246  {}
247 
248 
249  //- Destructor - checks that all data have been consumed
250  ~PstreamBuffers();
251 
252 
253  // Member Functions
254 
255  // Attributes
256 
257  //- The associated buffer format (ascii | binary)
259  {
260  return format_;
261  }
262 
263  //- The communications type of the stream
265  {
266  return commsType_;
267  }
268 
269  //- The transfer message tag
270  int tag() const noexcept
271  {
272  return tag_;
273  }
274 
275  //- The communicator index
276  label comm() const noexcept
277  {
278  return comm_;
279  }
280 
281  //- Number of ranks associated with PstreamBuffers
282  label nProcs() const noexcept
283  {
284  return nProcs_;
285  }
286 
287 
288  // Sizing
289 
290  //- Range of ranks indices associated with PstreamBuffers
292  {
293  // Proc 0 -> nProcs (int value)
294  return UPstream::rangeType(static_cast<int>(nProcs_));
295  }
296 
297  //- Range of sub-processes indices associated with PstreamBuffers
299  {
300  // Proc 1 -> nProcs (int value)
301  return UPstream::rangeType(1, static_cast<int>(nProcs_-1));
302  }
303 
304 
305  // Queries
306 
307  //- True if finishedSends() or finishedNeighbourSends() has been called
308  bool finished() const noexcept
309  {
310  return finishedSendsCalled_;
311  }
312 
313  //- Is clearStorage of individual receive buffer by external hooks
314  //- allowed? (default: true)
316  {
317  return allowClearRecv_;
318  }
319 
320  //- True if any (local) send buffers have data
321  bool hasSendData() const;
322 
323  //- True if any (local) recv buffers have unconsumed data.
324  //- Must call finishedSends() or other finished.. method first!
325  bool hasRecvData() const;
326 
327  //- Number of send bytes for the specified processor.
328  label sendDataCount(const label proci) const;
329 
330  //- Number of unconsumed receive bytes for the specified processor.
331  //- Must call finishedSends() or other finished.. method first!
332  label recvDataCount(const label proci) const;
333 
334  //- Number of unconsumed receive bytes for all processors.
335  //- Must call finishedSends() or other finished.. method first!
336  labelList recvDataCounts() const;
337 
338  //- Maximum receive size from any rocessor rank.
339  //- Must call finishedSends() or other finished.. method first!
340  label maxRecvCount() const;
341 
342  //- Maximum receive size, excluding current processor rank
343  //- Must call finishedSends() or other finished.. method first!
344  label maxNonLocalRecvCount() const;
345 
346  //- Maximum receive size, excluding the specified processor rank
347  //- Must call finishedSends() or other finished.. method first!
348  label maxNonLocalRecvCount(const label excludeProci) const;
349 
350  //- Number of unconsumed receive bytes for the specified processor.
351  //- Must call finishedSends() or other finished.. method first!
352  // The method is only useful in limited situations, such as when
353  // PstreamBuffers has been used to fill contiguous data
354  // (eg, using OPstream::write).
355  const UList<char> peekRecvData(const label proci) const;
356 
357 
358  // Edit
359 
360  //- Clear all send/recv buffers and reset states.
361  // Does not remove the buffer storage.
362  void clear();
364  //- Clear all send buffers (does not remove buffer storage)
365  void clearSends();
366 
367  //- Clear all recv buffer and positions (does not remove buffer storage)
368  void clearRecvs();
369 
370  //- Clear an individual send buffer (eg, data not required)
371  void clearSend(const label proci);
373  //- Clear an individual receive buffer (eg, data not required)
374  // Does not remove the buffer storage.
375  void clearRecv(const label proci);
376 
377  //- Clear storage for all send/recv buffers and reset states.
378  void clearStorage();
379 
380  //- Change allowClearRecv, return previous value
381  bool allowClearRecv(bool on) noexcept;
382 
383 
384  // Regular Functions
385 
386  //- Mark sends as done
387  //
388  // Non-blocking mode: populates receive buffers (all-to-all).
389  // \param wait wait for requests to complete (in nonBlocking mode)
390  void finishedSends(const bool wait = true);
391 
392  //- Mark sends as done.
393  //- Recovers the sizes (bytes) received.
394  //
395  // Non-blocking mode: populates receive buffers (all-to-all).
396  // \param[out] recvSizes the sizes (bytes) received
397  // \param wait wait for requests to complete (in nonBlocking mode)
398  //
399  // \warning currently only valid for nonBlocking comms.
400  void finishedSends(labelList& recvSizes, const bool wait = true);
401 
402 
403  // Functions with restricted neighbours
404 
405  //- Mark sends as done using subset of send/recv ranks
406  //- and recover the sizes (bytes) received.
407  //
408  // Non-blocking mode: populates receive buffers.
409  //
410  // \param neighProcs ranks used for sends/recvs
411  // \param wait wait for requests to complete (in nonBlocking mode)
412  //
413  // \warning currently only valid for nonBlocking comms.
414  // \note Same as finishedSends with identical sendProcs/recvProcs
416  (
417  const labelUList& neighProcs,
418  const bool wait = true
419  );
420 
421  //- Mark sends as done using subset of send/recv ranks
422  //- and recover the sizes (bytes) received.
423  //
424  // Non-blocking mode: it will populate receive buffers.
425  //
426  // \param neighProcs ranks used for sends/recvs
427  // \param[out] recvSizes the sizes (bytes) received
428  // \param wait wait for requests to complete (in nonBlocking mode)
429  //
430  // \warning currently only valid for nonBlocking mode.
432  (
433  const labelUList& neighProcs,
434  labelList& recvSizes,
435  const bool wait = true
436  );
437 
438  //- A caching version that uses a limited send/recv connectivity.
439  //
440  // Non-blocking mode: populates receive buffers.
441  // \param sendConnections on/off for sending ranks
442  // \param sendProcs ranks used for sends
443  // \param recvProcs ranks used for recvs
444  // \param wait wait for requests to complete (in nonBlocking mode)
445  //
446  // \return True if the send/recv connectivity changed
447  //
448  // \warning currently only valid for nonBlocking comms.
449  bool finishedSends
450  (
451  bitSet& sendConnections,
452  DynamicList<label>& sendProcs,
453  DynamicList<label>& recvProcs,
454  const bool wait = true
455  );
456 
457 
458  // Gather/scatter modes
459 
460  //- Mark all sends to master as done.
461  //
462  // Non-blocking mode: populates receive buffers.
463  // Can use recvDataCount, maxRecvCount etc to recover sizes received.
464  //
465  // \param wait wait for requests to complete (in nonBlocking mode)
466  //
467  // \warning currently only valid for nonBlocking comms.
468  void finishedGathers(const bool wait = true);
469 
470  //- Mark all sends to master as done.
471  //- Recovers the sizes (bytes) received.
472  //
473  // Non-blocking mode: populates receive buffers (all-to-one).
474  // \param[out] recvSizes the sizes (bytes) received
475  // \param wait wait for requests to complete (in nonBlocking mode)
476  //
477  // \warning currently only valid for nonBlocking comms.
478  void finishedGathers(labelList& recvSizes, const bool wait = true);
479 
480  //- Mark all sends to sub-procs as done.
481  //
482  // Non-blocking mode: populates receive buffers.
483  // Can use recvDataCount, maxRecvCount etc to recover sizes received.
484  //
485  // \param wait wait for requests to complete (in nonBlocking mode)
486  //
487  // \warning currently only valid for nonBlocking comms.
488  void finishedScatters(const bool wait = true);
489 
490  //- Mark all sends to sub-procs as done.
491  //- Recovers the sizes (bytes) received.
492  //
493  // Non-blocking mode: populates receive buffers (all-to-one).
494  // \param[out] recvSizes the sizes (bytes) received
495  // \param wait wait for requests to complete (in nonBlocking mode)
496  //
497  // \warning currently only valid for nonBlocking comms.
498  void finishedScatters(labelList& recvSizes, const bool wait = true);
499 };
500 
501 
502 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
503 
504 } // End namespace Foam
505 
506 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
507 
508 #endif
509 
510 // ************************************************************************* //
int tag() const noexcept
The transfer message tag.
commsTypes
Communications types.
Definition: UPstream.H:74
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
Base class for input inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UIPstream.H:54
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
IntRange< int > rangeType
Int ranges are used for MPI ranks (processes)
Definition: UPstream.H:69
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
IOstreamOption::streamFormat format() const noexcept
The associated buffer format (ascii | binary)
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1184
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:411
label comm() const noexcept
The communicator index.
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required)
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark sends as done.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
UPstream::rangeType subProcs() const noexcept
Range of sub-processes indices associated with PstreamBuffers.
const direction noexcept
Definition: Scalar.H:258
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark sends as done using subset of send/recv ranks and recover the sizes (bytes) received.
Buffers for inter-processor communications streams (UOPstream, UIPstream).
UPstream::commsTypes commsType() const noexcept
The communications type of the stream.
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage)
void clear()
Clear all send/recv buffers and reset states.
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition: bitSet.H:59
void clearSends()
Clear all send buffers (does not remove buffer storage)
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), label communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
static int algorithm
Preferred exchange algorithm (may change or be removed in future)
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished...
label nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
void clearStorage()
Clear storage for all send/recv buffers and reset states.
Base class for output inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UOPstream.H:54
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished.. method first!
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
UPstream::rangeType allProcs() const noexcept
Range of ranks indices associated with PstreamBuffers.
Namespace for OpenFOAM.