PstreamExchangeConsensus.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2023-2024 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 Note
27  The algorithm NBX (Nonblocking consensus exchange) is described by
28 
29  "Scalable Communication Protocols for Dynamic Sparse Data Exchange",
30  Hoeffler, Siebert, Lumsdaine
31  May 2010 ACM SIGPLAN Notices 45(5):159-168
32  https://doi.org/10.1145/1837853.1693476
33 
34  http://unixer.de/publications/img/hoefler-dsde-protocols.pdf
35 
36 \*---------------------------------------------------------------------------*/
37 
38 #include "Pstream.H"
39 #include "contiguous.H"
40 #include "PstreamReduceOps.H"
41 
42 // * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
43 
44 namespace Foam
45 {
46 namespace PstreamDetail
47 {
48 
49 //- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
50 //- with optional tracking of the receive sizes.
51 //
52 // No internal guards or resizing - data containers are all properly
53 // sized before calling.
54 //
55 // \param[in] sendBufs The send buffers list (size: numProc)
56 // \param[out] recvBufs The recv buffers list (size: numProc)
57 // \param[out] recvSizes The recv sizes (size: 0 or numProc).
58 // This parameter can be an empty list, in which case the receive sizes
59 // are not returned.
60 // \param tag The message tag
61 // \param comm The communicator
62 
63 template<class Container, class Type>
65 (
66  const UList<Container>& sendBufs,
67  UList<Container>& recvBufs,
68  labelUList& recvSizes,
69  const int tag,
70  const label comm
71 )
72 {
73  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
74 
75  const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
76 
77  const label myProci = UPstream::myProcNo(comm);
78  const label numProc = UPstream::nProcs(comm);
79 
80  // Initial: clear all receive locations
81  for (auto& buf : recvBufs)
82  {
83  buf.clear();
84  }
85  recvSizes = Foam::zero{};
86 
87  if (!UPstream::is_rank(comm))
88  {
89  return; // Process not in communicator
90  }
91 
92  // #ifdef FULLDEBUG
93  if (sendBufs.size() > numProc)
94  {
96  << "Send buffers:" << sendBufs.size() << " > numProc:" << numProc
98  }
99  if (recvBufs.size() < numProc)
100  {
102  << "Recv buffers:" << recvBufs.size() << " < numProc:" << numProc
104  }
105  // #endif
106 
107  // Fake send/recv for myself - parallel or non-parallel
108  {
109  recvBufs[myProci] = sendBufs[myProci];
110  if (myProci < recvSizes.size())
111  {
112  recvSizes[myProci] = recvBufs[myProci].size();
113  }
114  }
115 
116  if (!UPstream::is_parallel(comm))
117  {
118  // Nothing left to do
119  return;
120  }
121 
122 
123  // ------------------------------------------------------------------------
124  // Setup sends
125  // ------------------------------------------------------------------------
126 
127  // An initial barrier may help to avoid synchronisation problems
128  // caused elsewhere
129  if (initialBarrier)
130  {
131  UPstream::barrier(comm);
132  }
133 
134 
135  // Algorithm NBX: Nonblocking consensus with List containers
136 
137  DynamicList<UPstream::Request> sendRequests(sendBufs.size());
138 
139  // Start nonblocking synchronous send to destination ranks
140  for (label proci = 0; proci < numProc; ++proci)
141  {
142  const auto& sendData = sendBufs[proci];
143 
144  if (sendData.empty())
145  {
146  // Do not send/recv empty data
147  }
148  else if (proci != myProci)
149  {
151  (
152  sendRequests.emplace_back(),
153  proci,
154  sendData.cdata_bytes(),
155  sendData.size_bytes(),
156  tag,
157  comm,
159  );
160  }
161  }
162 
163 
164  // ------------------------------------------------------------------------
165  // Probe and receive
166  // ------------------------------------------------------------------------
167  //
168  // When receiving can use resize() instead of resize_nocopy() since the
169  // slots were already initially cleared.
170  // The resize() also works fine with FixedList since it will
171  // corresponds to a no-op: send and recv sizes will always be
172  // identical to its fixed size() / max_size()
173 
174  UPstream::Request barrierRequest;
175 
176  for (bool barrier_active = false, done = false; !done; /*nil*/)
177  {
178  std::pair<int, int64_t> probed =
180  (
182  -1, // ANY_SOURCE
183  tag,
184  comm
185  );
186 
187  if (probed.second > 0)
188  {
189  // Message found and had size.
190  // - receive into dest buffer location
191 
192  const label proci(probed.first);
193  const label count(probed.second / sizeof(Type));
194 
195  auto& recvData = recvBufs[proci];
196  recvData.resize(count); // OK with resize() instead of _nocopy()
197 
198  if (proci < recvSizes.size())
199  {
200  recvSizes[proci] = count;
201  }
202 
204  (
205  UPstream::commsTypes::scheduled, // ie, MPI_Recv()
206  proci,
207  recvData.data_bytes(),
208  recvData.size_bytes(),
209  tag,
210  comm
211  );
212  }
213 
214  if (barrier_active)
215  {
216  // Test barrier for completion
217  // - all received, or nothing to receive
218  if (UPstream::finishedRequest(barrierRequest))
219  {
220  done = true;
221  }
222  }
223  else
224  {
225  // Check if all sends have arrived
226  if (UPstream::finishedRequests(sendRequests))
227  {
228  UPstream::barrier(comm, &barrierRequest);
229  barrier_active = true;
230  }
231  }
232  }
233 }
234 
235 
236 //- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
237 //
238 // No internal guards - the sending Map corresponds to a segment of
239 // 0-numProc.
240 //
241 // \param[in] sendBufs The send buffers map (addr: 0-numProc)
242 // \param[out] recvBufs The recv buffers map
243 // \param tag The message tag
244 // \param comm The communicator
245 
246 template<class Container, class Type>
248 (
249  const Map<Container>& sendBufs,
250  Map<Container>& recvBufs,
251  const int tag,
252  const label comm
253 )
254 {
255  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
256 
257  const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
258 
259  const label myProci = UPstream::myProcNo(comm);
260  const label numProc = UPstream::nProcs(comm);
261 
262  // Initial: clear all receive locations
263  // Preferrable to clear out the map entries instead of the map itself
264  // since this can potentially preserve allocated space
265  // (eg DynamicList entries) between calls
266 
267  forAllIters(recvBufs, iter)
268  {
269  iter.val().clear();
270  }
271 
272  if (!UPstream::is_rank(comm))
273  {
274  return; // Process not in communicator
275  }
276 
277  // Fake send/recv for myself - parallel or non-parallel
278  {
279  const auto iter = sendBufs.find(myProci);
280  if (iter.good())
281  {
282  const auto& sendData = iter.val();
283 
284  if (!sendData.empty())
285  {
286  // Do myself: insert_or_assign
287  recvBufs(iter.key()) = sendData;
288  }
289  }
290  }
291 
292  if (!UPstream::is_parallel(comm))
293  {
294  // Nothing left to do
295  return;
296  }
297 
298 
299  // ------------------------------------------------------------------------
300  // Setup sends
301  // ------------------------------------------------------------------------
302 
303  // An initial barrier may help to avoid synchronisation problems
304  // caused elsewhere
305  if (initialBarrier)
306  {
307  UPstream::barrier(comm);
308  }
309 
310 
311  // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
312 
313  DynamicList<UPstream::Request> sendRequests(sendBufs.size());
314 
315  // Start nonblocking synchronous send to destination ranks
316  forAllConstIters(sendBufs, iter)
317  {
318  const label proci = iter.key();
319  const auto& sendData = iter.val();
320 
321  if (sendData.empty() || proci < 0 || proci >= numProc)
322  {
323  // Do not send/recv empty data or invalid destinations
324  }
325  else if (proci != myProci)
326  {
328  (
329  sendRequests.emplace_back(),
330  proci,
331  sendData.cdata_bytes(),
332  sendData.size_bytes(),
333  tag,
334  comm,
336  );
337  }
338  }
339 
340 
341  // ------------------------------------------------------------------------
342  // Probe and receive
343  // ------------------------------------------------------------------------
344  //
345  // When receiving can use resize() instead of resize_nocopy() since the
346  // slots were already initially cleared.
347  // The resize() also works fine with FixedList since it will
348  // corresponds to a no-op: send and recv sizes will always be
349  // identical to its fixed size() / max_size()
350 
351  UPstream::Request barrierRequest;
352 
353  for (bool barrier_active = false, done = false; !done; /*nil*/)
354  {
355  std::pair<int, int64_t> probed =
357  (
359  -1, // ANY_SOURCE
360  tag,
361  comm
362  );
363 
364  if (probed.second > 0)
365  {
366  // Message found and had size.
367  // - receive into dest buffer location
368 
369  const label proci(probed.first);
370  const label count(probed.second / sizeof(Type));
371 
372  auto& recvData = recvBufs(proci);
373  recvData.resize(count); // OK with resize() instead of _nocopy()
374 
376  (
377  UPstream::commsTypes::scheduled, // ie, MPI_Recv()
378  proci,
379  recvData.data_bytes(),
380  recvData.size_bytes(),
381  tag,
382  comm
383  );
384  }
385 
386  if (barrier_active)
387  {
388  // Test barrier for completion
389  if (UPstream::finishedRequest(barrierRequest))
390  {
391  done = true;
392  }
393  }
394  else
395  {
396  // Check if all sends have arrived
397  if (UPstream::finishedRequests(sendRequests))
398  {
399  UPstream::barrier(comm, &barrierRequest);
400  barrier_active = true;
401  }
402  }
403  }
404 }
405 
406 } // namespace PstreamDetail
407 } // namespace Foam
408 
409 
410 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
411 
412 template<class Container, class Type>
414 (
415  const UList<Container>& sendBufs,
416  List<Container>& recvBufs,
417  const int tag,
418  const label comm,
419  const bool /* wait (ignored) */
420 )
421 {
422  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
423 
424  if (sendBufs.size() != UPstream::nProcs(comm))
425  {
427  << "Send buffers size:" << sendBufs.size()
428  << " != numProc:" << UPstream::nProcs(comm)
430  }
431 
432  // Resize receive buffers. Individual clearing is done internally
433  recvBufs.resize_nocopy(sendBufs.size());
434  labelList dummyRecvSizes;
435 
436  PstreamDetail::exchangeConsensus<Container, Type>
437  (
438  sendBufs,
439  recvBufs,
440  dummyRecvSizes,
441  tag,
442  comm
443  );
444 }
445 
446 
447 template<class Container, class Type>
449 (
450  const Map<Container>& sendBufs,
451  Map<Container>& recvBufs,
452  const int tag,
453  const label comm,
454  const bool /* wait (ignored) */
455 )
456 {
457  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
458 
459  PstreamDetail::exchangeConsensus<Container, Type>
460  (
461  sendBufs,
462  recvBufs,
463  tag,
464  comm
465  );
466 }
467 
468 
469 template<class Container, class Type>
472 (
473  const Map<Container>& sendBufs,
474  const int tag,
475  const label comm,
476  const bool /* wait (ignored) */
477 )
478 {
479  Map<Container> recvBufs;
480 
481  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
482 
483  PstreamDetail::exchangeConsensus<Container, Type>
484  (
485  sendBufs,
486  recvBufs,
487  tag,
488  comm
489  );
490 
491  return recvBufs;
492 }
493 
494 
495 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
Inter-processor communication reduction functions.
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished? Corresponds to MPI_Test()
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:168
static std::pair< int, int64_t > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const label communicator=worldComm)
Probe for an incoming message.
Definition: UPstream.C:89
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1086
void exchangeConsensus(const UList< Container > &sendBufs, UList< Container > &recvBufs, labelUList &recvSizes, const int tag, const label comm)
Exchange contiguous data using non-blocking consensus exchange (NBX) with optional tracking of the re...
static int tuning_NBX_
Tuning parameters for non-blocking exchange (NBX)
Definition: UPstream.H:407
label size() const noexcept
The number of elements in table.
Definition: HashTable.H:358
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1077
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
Definition: UPstream.H:1103
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
Definition: DynamicList.H:51
An opaque wrapper for MPI_Request with a vendor-independent representation without any <mpi...
Definition: UPstream.H:1741
iterator find(const label &key)
Find and return an iterator set at the hashed entry.
Definition: HashTableI.H:86
#define forAllIters(container, iter)
Iterate across all elements in the container object.
Definition: stdFoam.H:336
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1123
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
(MPI_Ssend, MPI_Issend)
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
A class representing the concept of 0 (zero) that can be used to avoid manipulating objects known to ...
Definition: zero.H:57
static void exchangeConsensus(const UList< Container > &sendBufs, List< Container > &recvBufs, const int tag, const label comm, const bool wait=true)
Exchange contiguous data using non-blocking consensus (NBX) Sends sendData, receives into recvData...
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
A HashTable to objects of type <T> with a label key.
static void barrier(const label communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking)
Definition: UPstream.C:83
static bool finishedRequests(const label pos, label len=-1)
Non-blocking comms: have all requests (from position onwards) finished? Corresponds to MPI_Testall() ...