PstreamExchangeConsensus.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2023 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 Note
27  The algorithm NBX (Nonblocking consensus exchange) is described by
28 
29  "Scalable Communication Protocols for Dynamic Sparse Data Exchange",
30  Hoeffler, Siebert, Lumsdaine
31  May 2010 ACM SIGPLAN Notices 45(5):159-168
32  https://doi.org/10.1145/1837853.1693476
33 
34  http://unixer.de/publications/img/hoefler-dsde-protocols.pdf
35 
36 \*---------------------------------------------------------------------------*/
37 
38 #include "Pstream.H"
39 #include "contiguous.H"
40 #include "PstreamReduceOps.H"
41 
42 // * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
43 
44 namespace Foam
45 {
46 namespace PstreamDetail
47 {
48 
49 //- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
50 //- with optional tracking of the receive sizes.
51 //
52 // No internal guards or resizing - data containers are all properly
53 // sized before calling.
54 //
55 // \param[in] sendBufs The send buffers list (size: numProc)
56 // \param[out] recvBufs The recv buffers list (size: numProc)
57 // \param[out] recvSizes The recv sizes (size: 0 or numProc).
58 // This parameter can be an empty list, in which case the receive sizes
59 // are not returned.
60 // \param tag The message tag
61 // \param comm The communicator
62 
63 template<class Container, class Type>
65 (
66  const UList<Container>& sendBufs,
67  UList<Container>& recvBufs,
68  labelUList& recvSizes,
69  const int tag,
70  const label comm
71 )
72 {
73  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
74 
75  const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
76 
77  const label myProci = UPstream::myProcNo(comm);
78  const label numProc = UPstream::nProcs(comm);
79 
80  // Initial: clear all receive locations
81  for (auto& buf : recvBufs)
82  {
83  buf.clear();
84  }
85  recvSizes = Zero;
86 
87  if (!UPstream::is_rank(comm))
88  {
89  return; // Process not in communicator
90  }
91 
92  // #ifdef FULLDEBUG
93  if (sendBufs.size() > numProc)
94  {
96  << "Send buffers:" << sendBufs.size() << " > numProc:" << numProc
98  }
99  if (recvBufs.size() < numProc)
100  {
102  << "Recv buffers:" << recvBufs.size() << " < numProc:" << numProc
104  }
105  // #endif
106 
107  // Fake send/recv for myself - parallel or non-parallel
108  {
109  recvBufs[myProci] = sendBufs[myProci];
110  if (myProci < recvSizes.size())
111  {
112  recvSizes[myProci] = recvBufs.size();
113  }
114  }
115 
116  if (!UPstream::is_parallel(comm))
117  {
118  // Nothing left to do
119  return;
120  }
121 
122 
123  // ------------------------------------------------------------------------
124  // Setup sends
125  // ------------------------------------------------------------------------
126 
127  // An initial barrier may help to avoid synchronisation problems
128  // caused elsewhere
129  if (initialBarrier)
130  {
131  UPstream::barrier(comm);
132  }
133 
134 
135  // Algorithm NBX: Nonblocking consensus with List containers
136 
137  DynamicList<UPstream::Request> sendRequests(sendBufs.size());
138 
139  // Start nonblocking synchronous send to destination ranks
140  for (label proci = 0; proci < numProc; ++proci)
141  {
142  const auto& sendData = sendBufs[proci];
143 
144  if (sendData.empty())
145  {
146  // Do not send/recv empty data
147  }
148  else if (proci != myProci)
149  {
151  (
152  sendRequests.emplace_back(),
153  proci,
154  sendData.cdata_bytes(),
155  sendData.size_bytes(),
156  tag,
157  comm,
159  );
160  }
161  }
162 
163 
164  // ------------------------------------------------------------------------
165  // Probe and receive
166  // ------------------------------------------------------------------------
167  //
168  // When receiving can use resize() instead of resize_nocopy() since the
169  // slots were already initially cleared.
170  // The resize() also works fine with FixedList since it will
171  // corresponds to a no-op: send and recv sizes will always be
172  // identical to its fixed size() / max_size()
173 
174  UPstream::Request barrierRequest;
175 
176  for (bool barrier_active = false, done = false; !done; /*nil*/)
177  {
178  std::pair<int, int> probed =
180  (
182  -1, // ANY_SOURCE
183  tag,
184  comm
185  );
186 
187  if (probed.second > 0)
188  {
189  // Message found and had size.
190  // - receive into dest buffer location
191 
192  const label proci = probed.first;
193  const label count = (probed.second / sizeof(Type));
194 
195  auto& recvData = recvBufs[proci];
196  recvData.resize(count); // OK with resize() instead of _nocopy()
197 
198  if (proci < recvSizes.size())
199  {
200  recvSizes[proci] = count;
201  }
202 
204  (
206  proci,
207  recvData.data_bytes(),
208  recvData.size_bytes(),
209  tag,
210  comm
211  );
212  }
213 
214  if (barrier_active)
215  {
216  // Test barrier for completion
217  // - all received, or nothing to receive
218  if (UPstream::finishedRequest(barrierRequest))
219  {
220  done = true;
221  }
222  }
223  else
224  {
225  // Check if all sends have arrived
226  if (UPstream::finishedRequests(sendRequests))
227  {
228  UPstream::barrier(comm, &barrierRequest);
229  barrier_active = true;
230  }
231  }
232  }
233 }
234 
235 
236 //- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
237 //
238 // No internal guards - the sending Map corresponds to a segment of
239 // 0-numProc.
240 //
241 // \param[in] sendBufs The send buffers map (addr: 0-numProc)
242 // \param[out] recvBufs The recv buffers map
243 // \param tag The message tag
244 // \param comm The communicator
245 
246 template<class Container, class Type>
248 (
249  const Map<Container>& sendBufs,
250  Map<Container>& recvBufs,
251  const int tag,
252  const label comm
253 )
254 {
255  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
256 
257  // TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
258 
259  const label myProci = UPstream::myProcNo(comm);
260  const label numProc = UPstream::myProcNo(comm);
261 
262  // Initial: clear all receive locations
263  // Preferrable to clear out the map entries instead of the map itself
264  // since this can potentially preserve allocated space
265  // (eg DynamicList entries) between calls
266 
267  forAllIters(recvBufs, iter)
268  {
269  iter.val().clear();
270  }
271 
272  if (!UPstream::is_rank(comm))
273  {
274  return; // Process not in communicator
275  }
276 
277  // Fake send/recv for myself - parallel or non-parallel
278  {
279  const auto iter = sendBufs.find(myProci);
280  if (iter.good())
281  {
282  const auto& sendData = iter.val();
283 
284  if (!sendData.empty())
285  {
286  // Do myself: insert_or_assign
287  recvBufs(iter.key()) = sendData;
288  }
289  }
290  }
291 
292  if (!UPstream::is_parallel(comm))
293  {
294  // Nothing left to do
295  return;
296  }
297 
298 
299  // ------------------------------------------------------------------------
300  // Setup sends
301  // ------------------------------------------------------------------------
302 
303  // TDB: initialBarrier ...
304 
305 
306  // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
307 
308  DynamicList<UPstream::Request> sendRequests(sendBufs.size());
309 
310  // Start nonblocking synchronous send to destination ranks
311  forAllConstIters(sendBufs, iter)
312  {
313  const label proci = iter.key();
314  const auto& sendData = iter.val();
315 
316  if (sendData.empty() || proci < 0 || proci >= numProc)
317  {
318  // Do not send/recv empty data or invalid destinations
319  }
320  else if (proci != myProci)
321  {
323  (
324  sendRequests.emplace_back(),
325  proci,
326  sendData.cdata_bytes(),
327  sendData.size_bytes(),
328  tag,
329  comm,
331  );
332  }
333  }
334 
335 
336  // ------------------------------------------------------------------------
337  // Probe and receive
338  // ------------------------------------------------------------------------
339  //
340  // When receiving can use resize() instead of resize_nocopy() since the
341  // slots were already initially cleared.
342  // The resize() also works fine with FixedList since it will
343  // corresponds to a no-op: send and recv sizes will always be
344  // identical to its fixed size() / max_size()
345 
346  UPstream::Request barrierRequest;
347 
348  for (bool barrier_active = false, done = false; !done; /*nil*/)
349  {
350  std::pair<int, int> probed =
352  (
354  -1, // ANY_SOURCE
355  tag,
356  comm
357  );
358 
359  if (probed.second > 0)
360  {
361  // Message found and had size.
362  // - receive into dest buffer location
363 
364  const label proci = probed.first;
365  const label count = (probed.second / sizeof(Type));
366 
367  auto& recvData = recvBufs(proci);
368  recvData.resize(count); // OK with resize() instead of _nocopy()
369 
371  (
373  proci,
374  recvData.data_bytes(),
375  recvData.size_bytes(),
376  tag,
377  comm
378  );
379  }
380 
381  if (barrier_active)
382  {
383  // Test barrier for completion
384  if (UPstream::finishedRequest(barrierRequest))
385  {
386  done = true;
387  }
388  }
389  else
390  {
391  // Check if all sends have arrived
392  if (UPstream::finishedRequests(sendRequests))
393  {
394  UPstream::barrier(comm, &barrierRequest);
395  barrier_active = true;
396  }
397  }
398  }
399 }
400 
401 } // namespace PstreamDetail
402 } // namespace Foam
403 
404 
405 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
406 
407 template<class Container, class Type>
409 (
410  const UList<Container>& sendBufs,
411  List<Container>& recvBufs,
412  const int tag,
413  const label comm,
414  const bool /* wait (ignored) */
415 )
416 {
417  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
418 
419  if (sendBufs.size() != UPstream::nProcs(comm))
420  {
422  << "Send buffers size:" << sendBufs.size()
423  << " != numProc:" << UPstream::nProcs(comm)
425  }
426 
427  // Resize receive buffers. Individual clearing is done internally
428  recvBufs.resize_nocopy(sendBufs.size());
429  labelList dummyRecvSizes;
430 
431  PstreamDetail::exchangeConsensus<Container, Type>
432  (
433  sendBufs,
434  recvBufs,
435  dummyRecvSizes,
436  tag,
437  comm
438  );
439 }
440 
441 
442 template<class Container, class Type>
444 (
445  const Map<Container>& sendBufs,
446  Map<Container>& recvBufs,
447  const int tag,
448  const label comm,
449  const bool /* wait (ignored) */
450 )
451 {
452  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
453 
454  PstreamDetail::exchangeConsensus<Container, Type>
455  (
456  sendBufs,
457  recvBufs,
458  tag,
459  comm
460  );
461 }
462 
463 
464 template<class Container, class Type>
467 (
468  const Map<Container>& sendBufs,
469  const int tag,
470  const label comm,
471  const bool /* wait (ignored) */
472 )
473 {
474  Map<Container> recvBufs;
475 
476  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
477 
478  PstreamDetail::exchangeConsensus<Container, Type>
479  (
480  sendBufs,
481  recvBufs,
482  tag,
483  comm
484  );
485 
486  return recvBufs;
487 }
488 
489 
490 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
"blocking" : (MPI_Bsend, MPI_Recv)
Inter-processor communication reduction functions.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished? Corresponds to MPI_Test()
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:175
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1074
void exchangeConsensus(const UList< Container > &sendBufs, UList< Container > &recvBufs, labelUList &recvSizes, const int tag, const label comm)
Exchange contiguous data using non-blocking consensus exchange (NBX) with optional tracking of the re...
static int tuning_NBX_
Tuning parameters for non-blocking exchange (NBX)
Definition: UPstream.H:395
label size() const noexcept
The number of elements in table.
Definition: HashTable.H:342
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1065
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
Definition: UPstream.H:1091
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
Definition: DynamicList.H:51
An opaque wrapper for MPI_Request with a vendor-independent representation independent of any <mpi...
Definition: UPstream.H:1573
iterator find(const label &key)
Find and return an iterator set at the hashed entry.
Definition: HashTableI.H:86
#define forAllIters(container, iter)
Iterate across all elements in the container object.
Definition: stdFoam.H:336
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1111
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
(MPI_Ssend, MPI_Issend)
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static std::pair< int, int > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const label communicator=worldComm)
Probe for an incoming message.
Definition: UPstream.C:89
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static void exchangeConsensus(const UList< Container > &sendBufs, List< Container > &recvBufs, const int tag, const label comm, const bool wait=true)
Exchange contiguous data using non-blocking consensus (NBX) Sends sendData, receives into recvData...
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
A HashTable to objects of type <T> with a label key.
static void barrier(const label communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking)
Definition: UPstream.C:83
static bool finishedRequests(const label pos, label len=-1)
Non-blocking comms: have all requests (from position onwards) finished? Corresponds to MPI_Testall() ...
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:127