PstreamBuffers.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2021-2022 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "PstreamBuffers.H"
30 #include "bitSet.H"
31 
32 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
33 
34 void Foam::PstreamBuffers::finalExchange
35 (
36  labelList& recvSizes,
37  const bool wait
38 )
39 {
40  // Could also check that it is not called twice
41  // but that is used for overlapping send/recv (eg, overset)
42  finishedSendsCalled_ = true;
43 
44  if (commsType_ == UPstream::commsTypes::nonBlocking)
45  {
46  // all-to-all
47  Pstream::exchangeSizes(sendBuf_, recvSizes, comm_);
48 
49  Pstream::exchange<DynamicList<char>, char>
50  (
51  sendBuf_,
52  recvSizes,
53  recvBuf_,
54  tag_,
55  comm_,
56  wait
57  );
58  }
59 }
60 
61 
62 void Foam::PstreamBuffers::finalExchange
63 (
64  const labelUList& sendProcs,
65  const labelUList& recvProcs,
66  labelList& recvSizes,
67  const bool wait
68 )
69 {
70  // Could also check that it is not called twice
71  // but that is used for overlapping send/recv (eg, overset)
72  finishedSendsCalled_ = true;
73 
74  if (commsType_ == UPstream::commsTypes::nonBlocking)
75  {
77  (
78  sendProcs,
79  recvProcs,
80  sendBuf_,
81  recvSizes,
82  tag_,
83  comm_
84  );
85 
86  Pstream::exchange<DynamicList<char>, char>
87  (
88  sendBuf_,
89  recvSizes,
90  recvBuf_,
91  tag_,
92  comm_,
93  wait
94  );
95  }
96 }
97 
98 
99 void Foam::PstreamBuffers::finalExchangeGatherScatter
100 (
101  const bool isGather,
102  const bool wait
103 )
104 {
105  // Could also check that it is not called twice
106  // but that is used for overlapping send/recv (eg, overset)
107  finishedSendsCalled_ = true;
108 
109  if (commsType_ == UPstream::commsTypes::nonBlocking)
110  {
111  labelList recvSizes;
112 
113  if (isGather)
114  {
115  // gather mode (all-to-one): master [0] <- everyone
116 
117  recvSizes = UPstream::listGatherValues(sendBuf_[0].size(), comm_);
118 
119  if (!UPstream::master(comm_))
120  {
121  recvSizes.resize_nocopy(recvBuf_.size());
122  recvSizes = Zero;
123  }
124  }
125  else
126  {
127  // scatter mode (one-to-all): master [0] -> everyone
128 
129  recvSizes.resize_nocopy(sendBuf_.size());
130 
131  if (UPstream::master(comm_))
132  {
133  forAll(sendBuf_, proci)
134  {
135  recvSizes[proci] = sendBuf_[proci].size();
136  }
137  }
138 
139  const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
140 
141  recvSizes = Zero;
142  recvSizes[0] = myRecv;
143  }
144 
145 
146  Pstream::exchange<DynamicList<char>, char>
147  (
148  sendBuf_,
149  recvSizes,
150  recvBuf_,
151  tag_,
152  comm_,
153  wait
154  );
155  }
156 }
157 
158 
159 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
160 
162 (
163  const UPstream::commsTypes commsType,
164  const int tag,
165  const label comm,
167 )
168 :
169  finishedSendsCalled_(false),
170  allowClearRecv_(true),
171  format_(fmt),
172  commsType_(commsType),
173  tag_(tag),
174  comm_(comm),
175  sendBuf_(UPstream::nProcs(comm_)),
176  recvBuf_(UPstream::nProcs(comm_)),
177  recvBufPos_(UPstream::nProcs(comm_), Zero)
178 {}
179 
180 
182 (
183  const label comm,
184  const UPstream::commsTypes commsType,
185  const int tag,
187 )
188 :
189  finishedSendsCalled_(false),
190  allowClearRecv_(true),
191  format_(fmt),
192  commsType_(commsType),
193  tag_(tag),
194  comm_(comm),
195  sendBuf_(UPstream::nProcs(comm_)),
196  recvBuf_(UPstream::nProcs(comm_)),
197  recvBufPos_(UPstream::nProcs(comm_), Zero)
198 {}
199 
200 
201 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
202 
204 {
205  // Check that all data has been consumed.
206  forAll(recvBufPos_, proci)
207  {
208  if (recvBufPos_[proci] < recvBuf_[proci].size())
209  {
211  << "Message from processor " << proci
212  << " Only consumed " << recvBufPos_[proci] << " of "
213  << recvBuf_[proci].size() << " bytes" << nl
215  }
216  }
217 }
218 
219 
220 // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
221 
223 {
224  for (DynamicList<char>& buf : sendBuf_)
225  {
226  buf.clear();
227  }
228  for (DynamicList<char>& buf : recvBuf_)
229  {
230  buf.clear();
231  }
232  recvBufPos_ = 0;
233 
234  finishedSendsCalled_ = false;
235 }
236 
237 
238 void Foam::PstreamBuffers::clearRecv(const label proci)
239 {
240  recvBuf_[proci].clear();
241  recvBufPos_[proci] = 0;
242 }
243 
244 
246 {
247  // Could also clear out entire sendBuf_, recvBuf_ and reallocate.
248  // Not sure if it makes much difference
249  for (DynamicList<char>& buf : sendBuf_)
250  {
251  buf.clearStorage();
252  }
253  for (DynamicList<char>& buf : recvBuf_)
254  {
255  buf.clearStorage();
256  }
257  recvBufPos_ = 0;
258 
259  finishedSendsCalled_ = false;
260 }
261 
262 
264 {
265  for (const DynamicList<char>& buf : sendBuf_)
266  {
267  if (!buf.empty())
268  {
269  return true;
270  }
271  }
272  return false;
273 }
274 
275 
277 {
278  if (finishedSendsCalled_)
279  {
280  forAll(recvBufPos_, proci)
281  {
282  if (recvBuf_[proci].size() > recvBufPos_[proci])
283  {
284  return true;
285  }
286  }
287  }
288  #ifdef FULLDEBUG
289  else
290  {
292  << "Call finishedSends first" << exit(FatalError);
293  }
294  #endif
295 
296  return false;
297 }
298 
300 Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
301 {
302  return sendBuf_[proci].size();
303 }
304 
305 
306 Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
307 {
308  if (finishedSendsCalled_)
309  {
310  const label len(recvBuf_[proci].size() > recvBufPos_[proci]);
311 
312  if (len > 0)
313  {
314  return len;
315  }
316  }
317  #ifdef FULLDEBUG
318  else
319  {
321  << "Call finishedSends first" << exit(FatalError);
322  }
323  #endif
324 
325  return 0;
326 }
327 
328 
330 {
331  labelList counts(recvBuf_.size(), Zero);
332 
333  if (finishedSendsCalled_)
334  {
335  forAll(recvBufPos_, proci)
336  {
337  const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
338 
339  if (len > 0)
340  {
341  counts[proci] = len;
342  }
343  }
344  }
345  #ifdef FULLDEBUG
346  else
347  {
349  << "Call finishedSends first" << exit(FatalError);
350  }
351  #endif
352 
353  return counts;
354 }
355 
356 
357 const Foam::UList<char>
358 Foam::PstreamBuffers::peekRecvData(const label proci) const
359 {
360  if (finishedSendsCalled_)
361  {
362  const label len(recvBuf_[proci].size() - recvBufPos_[proci]);
363 
364  if (len > 0)
365  {
366  return UList<char>
367  (
368  const_cast<char*>(&recvBuf_[proci][recvBufPos_[proci]]),
369  len
370  );
371  }
372  }
373  #ifdef FULLDEBUG
374  else
375  {
377  << "Call finishedSends first" << exit(FatalError);
378  }
379  #endif
380 
381  return UList<char>();
382 }
383 
384 
386 {
387  bool old(allowClearRecv_);
388  allowClearRecv_ = on;
389  return old;
390 }
391 
392 
393 void Foam::PstreamBuffers::finishedSends(const bool wait)
394 {
395  labelList recvSizes;
396  finalExchange(recvSizes, wait);
397 }
398 
399 
401 (
402  labelList& recvSizes,
403  const bool wait
404 )
405 {
406  finalExchange(recvSizes, wait);
407 
408  if (commsType_ != UPstream::commsTypes::nonBlocking)
409  {
411  << "Obtaining sizes not supported in "
412  << UPstream::commsTypeNames[commsType_] << endl
413  << " since transfers already in progress. Use non-blocking instead."
414  << exit(FatalError);
415 
416  // Note: maybe possible only if using different tag from write started
417  // by ~UOPstream. Needs some work.
418  }
419 }
420 
421 
423 (
424  const labelUList& sendProcs,
425  const labelUList& recvProcs,
426  const bool wait
427 )
428 {
429  labelList recvSizes;
430  finalExchange(sendProcs, recvProcs, recvSizes, wait);
431 }
432 
433 
435 (
436  const labelUList& sendProcs,
437  const labelUList& recvProcs,
438  labelList& recvSizes,
439  const bool wait
440 )
441 {
442  finalExchange(sendProcs, recvProcs, recvSizes, wait);
443 
444  if (commsType_ != UPstream::commsTypes::nonBlocking)
445  {
447  << "Obtaining sizes not supported in "
448  << UPstream::commsTypeNames[commsType_] << endl
449  << " since transfers already in progress. Use non-blocking instead."
450  << exit(FatalError);
451 
452  // Note: maybe possible only if using different tag from write started
453  // by ~UOPstream. Needs some work.
454  }
455 }
456 
457 
459 (
460  bitSet& sendConnections,
461  DynamicList<label>& sendProcs,
462  DynamicList<label>& recvProcs,
463  const bool wait
464 )
465 {
466  bool changed = (sendConnections.size() != nProcs());
467 
468  if (changed)
469  {
470  sendConnections.resize(nProcs());
471  }
472 
473  // Update send connections
474  // - reasonable to assume there are no self-sends on UPstream::myProcNo
475  forAll(sendBuf_, proci)
476  {
477  // ie, sendDataCount(proci) != 0
478  if (sendConnections.set(proci, !sendBuf_[proci].empty()))
479  {
480  // The state changed
481  changed = true;
482  }
483  }
484 
485  UPstream::reduceOr(changed);
486 
487  if (changed)
488  {
489  // Create send/recv topology
490 
491  // The send ranks
492  sendProcs.clear();
493  forAll(sendBuf_, proci)
494  {
495  // ie, sendDataCount(proci) != 0
496  if (!sendBuf_[proci].empty())
497  {
498  sendProcs.append(proci);
499  }
500  }
501 
502  finishedSends(wait); // All-to-all
503 
504  // The recv ranks
505  recvProcs.clear();
506  forAll(recvBuf_, proci)
507  {
508  // ie, recvDataCount(proci)
509  if (!recvBuf_[proci].empty())
510  {
511  recvProcs.append(proci);
512  }
513  }
514  }
515  else
516  {
517  // Use existing send/recv ranks
518 
519  finishedSends(sendProcs, recvProcs, wait);
520  }
521 
522  return changed;
523 }
524 
526 void Foam::PstreamBuffers::finishedGathers(const bool wait)
527 {
528  finalExchangeGatherScatter(true, wait);
529 }
530 
531 
533 {
534  finalExchangeGatherScatter(false, wait);
535 }
536 
537 
539 (
540  labelList& recvSizes,
541  const bool wait
542 )
543 {
544  finalExchangeGatherScatter(true, wait);
545 
546  if (commsType_ != UPstream::commsTypes::nonBlocking)
547  {
549  << "Obtaining sizes not supported in "
550  << UPstream::commsTypeNames[commsType_] << endl
551  << " since transfers already in progress. Use non-blocking instead."
552  << exit(FatalError);
553 
554  // Note: maybe possible only if using different tag from write started
555  // by ~UOPstream. Needs some work.
556  }
557 
558  // For nonBlocking mode, simply recover received sizes
559  // from the buffers themselves.
560 
561  recvSizes = recvDataCounts();
562 }
563 
564 
566 (
567  labelList& recvSizes,
568  const bool wait
569 )
570 {
571  finalExchangeGatherScatter(false, wait);
572 
573  if (commsType_ != UPstream::commsTypes::nonBlocking)
574  {
576  << "Obtaining sizes not supported in "
577  << UPstream::commsTypeNames[commsType_] << endl
578  << " since transfers already in progress. Use non-blocking instead."
579  << exit(FatalError);
580 
581  // Note: maybe possible only if using different tag from write started
582  // by ~UOPstream. Needs some work.
583  }
584 
585  // For nonBlocking mode, simply recover received sizes
586  // from the buffers themselves.
587 
588  recvSizes = recvDataCounts();
589 }
590 
591 
592 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:118
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:76
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Types of communications.
Definition: UPstream.H:66
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:49
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required)
static List< T > listGatherValues(const T &localValue, const label communicator=worldComm)
Gather individual values into list locations.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
PstreamBuffers(const UPstream::commsTypes commsType, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given comms type, message tag, communicator, IO format.
static void reduceOr(bool &value, const label communicator=worldComm)
Logical (or) reduction (cf. MPI AllReduce)
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished...
UList< label > labelUList
A UList of labels.
Definition: UList.H:80
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:413
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true) ...
bool hasSendData() const
True if any (local) send buffers have data.
void finishedSends(const bool wait=true)
Mark sends as done.
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:99
const direction noexcept
Definition: Scalar.H:258
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
void clear()
Clear individual buffers and reset states.
static bool master(const label communicator=worldComm)
Am I the master rank.
Definition: UPstream.H:672
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendData, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendData for specified set of send/receive processes.
void clearStorage()
Clear individual buffer storage and reset states.
List< label > labelList
A List of labels.
Definition: List.H:62
static T listScatterValues(const UList< T > &allValues, const label communicator=worldComm)
Scatter individual values from list locations.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished...
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
Inter-processor communications stream.
Definition: UPstream.H:54
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:157