PstreamExchangeConsensus.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2023 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 Note
27  The algorithm NBX (Nonblocking consensus exchange) is described by
28 
29  "Scalable Communication Protocols for Dynamic Sparse Data Exchange",
30  Hoeffler, Siebert, Lumsdaine
31  May 2010 ACM SIGPLAN Notices 45(5):159-168
32  https://doi.org/10.1145/1837853.1693476
33 
34  http://unixer.de/publications/img/hoefler-dsde-protocols.pdf
35 
36 \*---------------------------------------------------------------------------*/
37 
38 #include "Pstream.H"
39 #include "contiguous.H"
40 #include "PstreamReduceOps.H"
41 
42 // * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
43 
44 namespace Foam
45 {
46 namespace PstreamDetail
47 {
48 
49 //- Exchange \em contiguous data using non-blocking consensus exchange
50 //- with optional tracking of the receive sizes.
51 //
52 // No internal guards or resizing - data containers are all properly
53 // sized before calling.
54 //
55 // \param[in] sendBufs The send buffers list (size: numProcs)
56 // \param[out] recvBufs The recv buffers list (size: numProcs)
57 // \param[out] recvSizes The recv sizes (size: 0 or numProcs).
58 // This parameter can be an empty list, in which case the receive sizes
59 // are not returned.
60 // \param tag The message tag
61 // \param comm The communicator
62 // \param wait Wait for non-blocking receives to complete
63 // \param recvCommType If blocking or (default) non-blocking
64 
65 template<class Container, class Type>
67 (
68  const UList<Container>& sendBufs,
69  UList<Container>& recvBufs,
70  labelUList& recvSizes,
71  const int tag,
72  const label comm,
73  const bool wait = true,
75 )
76 {
77  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
78 
79  const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
80 
81  const label startOfRequests = UPstream::nRequests();
82  const label myProci = UPstream::myProcNo(comm);
83  const label numProc = UPstream::nProcs(comm);
84 
85  // Initial: clear all receive information
86  for (auto& buf : recvBufs)
87  {
88  buf.clear();
89  }
90  recvSizes = Zero;
91 
92  if (!UPstream::is_rank(comm))
93  {
94  return; // Process not in communicator
95  }
96 
97  // #ifdef FULLDEBUG
98  if (sendBufs.size() > numProc)
99  {
101  << "Send buffers:" << sendBufs.size() << " > numProcs:" << numProc
103  }
104  if (recvBufs.size() < numProc)
105  {
107  << "Recv buffers:" << recvBufs.size() << " < numProcs:" << numProc
109  }
110  // #endif
111 
112  if (!UPstream::is_parallel(comm))
113  {
114  // Do myself
115  recvBufs[myProci] = sendBufs[myProci];
116  if (myProci < recvSizes.size())
117  {
118  recvSizes[myProci] = recvBufs.size();
119  }
120  return;
121  }
122 
123  // An initial barrier may help to avoid synchronisation problems
124  // caused elsewhere
125  if (initialBarrier)
126  {
127  UPstream::barrier(comm);
128  }
129 
130  // Algorithm NBX: Nonblocking consensus with List containers
131 
132  DynamicList<UPstream::Request> sendRequests(sendBufs.size());
133 
134  // Start nonblocking synchronous send to processor dest
135  for (label proci = 0; proci < numProc; ++proci)
136  {
137  const auto& sendData = sendBufs[proci];
138 
139  if (sendData.empty())
140  {
141  // Do not send/recv empty data
142  }
143  else if (proci == myProci)
144  {
145  // Do myself
146  recvBufs[proci] = sendData;
147  if (proci < recvSizes.size())
148  {
149  recvSizes[proci] = sendData.size();
150  }
151  }
152  else
153  {
154  // Has data to send.
155  // The MPI send requests are tracked on a local list
157  (
158  sendRequests.emplace_back(),
159  proci,
160  sendData.cdata_bytes(),
161  sendData.size_bytes(),
162  tag,
163  comm,
165  );
166  }
167  }
168 
169 
170  // Probe and receive
171 
172  UPstream::Request barrierRequest;
173 
174  for (bool barrier_active = false, done = false; !done; /*nil*/)
175  {
176  std::pair<int, int> probed =
178  (
180  -1, // ANY_SOURCE
181  tag,
182  comm
183  );
184 
185  if (probed.second > 0)
186  {
187  // Message found and had size.
188  // - receive into dest buffer location
189 
190  const label proci = probed.first;
191  const label count = (probed.second / sizeof(Type));
192 
193  auto& recvData = recvBufs[proci];
194  recvData.resize_nocopy(count);
195 
196  if (proci < recvSizes.size())
197  {
198  recvSizes[proci] = count;
199  }
200 
201  // Any non-blocking MPI recv requests are tracked on internal stack
203  (
204  recvCommType,
205  proci,
206  recvData.data_bytes(),
207  recvData.size_bytes(),
208  tag,
209  comm
210  );
211  }
212 
213  if (barrier_active)
214  {
215  // Test barrier for completion
216  // - all received, or nothing to receive
217  if (UPstream::finishedRequest(barrierRequest))
218  {
219  done = true;
220  }
221  }
222  else
223  {
224  // Check if all sends have arrived
225  if (UPstream::finishedRequests(sendRequests))
226  {
227  UPstream::barrier(comm, &barrierRequest);
228  barrier_active = true;
229  }
230  }
231  }
232 
233  // Wait for non-blocking receives to finish
234  if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
235  {
236  UPstream::waitRequests(startOfRequests);
237  }
238 }
239 
240 
241 //- Exchange \em contiguous data using non-blocking consensus exchange.
242 //
243 // No internal guards - the sending Map corresponds to a segment of
244 // 0-numProcs.
245 //
246 // \param[in] sendBufs The send buffers map (addr: 0-numProcs)
247 // \param[out] recvBufs The recv buffers map
248 // \param tag The message tag
249 // \param comm The communicator
250 // \param wait Wait for non-blocking receives to complete
251 // \param recvCommType If blocking or (default) non-blocking
252 
253 template<class Container, class Type>
255 (
256  const Map<Container>& sendBufs,
257  Map<Container>& recvBufs,
258  const int tag,
259  const label comm,
260  const bool wait = true,
262 )
263 {
264  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
265 
266  const label startOfRequests = UPstream::nRequests();
267  const label myProci = UPstream::myProcNo(comm);
268 
269  // Initial: clear out receive 'slots'
270  // Preferrable to clear out the map entries instead of the map itself
271  // since this can potentially preserve allocated space
272  // (eg DynamicList entries) between calls
273 
274  forAllIters(recvBufs, iter)
275  {
276  iter.val().clear();
277  }
278 
279  if (!UPstream::is_parallel(comm))
280  {
281  // Do myself
282  const auto iter = sendBufs.find(myProci);
283  if (iter.good())
284  {
285  const auto& sendData = iter.val();
286 
287  if (!sendData.empty())
288  {
289  // Do myself: insert_or_assign
290  recvBufs(iter.key()) = sendData;
291  }
292  }
293  return;
294  }
295 
296 
297  // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
298 
299  DynamicList<UPstream::Request> sendRequests(sendBufs.size());
300 
301  // Start nonblocking synchronous send to process dest
302  forAllConstIters(sendBufs, iter)
303  {
304  const label proci = iter.key();
305  const auto& sendData = iter.val();
306 
307  #ifdef FULLDEBUG
308  if (proci >= UPstream::nProcs(comm))
309  {
311  << "Send buffer:" << proci << " >= numProcs:"
312  << UPstream::nProcs(comm)
314  }
315  #endif
316 
317  if (sendData.empty())
318  {
319  // Do not send/recv empty data
320  }
321  else if (proci == myProci)
322  {
323  // Do myself: insert_or_assign
324  recvBufs(proci) = sendData;
325  }
326  else
327  {
328  // Has data to send.
329  // The MPI send requests are tracked on a local list
331  (
332  sendRequests.emplace_back(),
333  proci,
334  sendData.cdata_bytes(),
335  sendData.size_bytes(),
336  tag,
337  comm,
339  );
340  }
341  }
342 
343 
344  // Probe and receive
345 
346  UPstream::Request barrierRequest;
347 
348  for (bool barrier_active = false, done = false; !done; /*nil*/)
349  {
350  std::pair<int, int> probed =
352  (
354  -1, // ANY_SOURCE
355  tag,
356  comm
357  );
358 
359  if (probed.second > 0)
360  {
361  // Message found and had size.
362  // - receive into dest buffer location
363 
364  const label proci = probed.first;
365  const label count = (probed.second / sizeof(Type));
366 
367  auto& recvData = recvBufs(proci);
368  recvData.resize_nocopy(count);
369 
370  // Any non-blocking MPI recv requests are tracked on internal stack
372  (
373  recvCommType,
374  proci,
375  recvData.data_bytes(),
376  recvData.size_bytes(),
377  tag,
378  comm
379  );
380  }
381 
382  if (barrier_active)
383  {
384  // Test barrier for completion
385  if (UPstream::finishedRequest(barrierRequest))
386  {
387  done = true;
388  }
389  }
390  else
391  {
392  // Check if all sends have arrived
393  if (UPstream::finishedRequests(sendRequests))
394  {
395  UPstream::barrier(comm, &barrierRequest);
396  barrier_active = true;
397  }
398  }
399  }
400 
401  // Wait for non-blocking receives to finish
402  if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
403  {
404  UPstream::waitRequests(startOfRequests);
405  }
406 }
407 
408 } // namespace PstreamDetail
409 } // namespace Foam
410 
411 
412 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
413 
414 template<class Container, class Type>
416 (
417  const UList<Container>& sendBufs,
418  List<Container>& recvBufs,
419  const int tag,
420  const label comm,
421  const bool wait
422 )
423 {
424  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
425 
426  if (sendBufs.size() != UPstream::nProcs(comm))
427  {
429  << "Send buffers size:" << sendBufs.size()
430  << " != numProcs:" << UPstream::nProcs(comm)
432  }
433 
434  // Resize receive buffers. Individual clearing is done internally
435  recvBufs.resize_nocopy(sendBufs.size());
436  labelList dummyRecvSizes;
437 
439  (
440  sendBufs,
441  recvBufs,
442  dummyRecvSizes,
443  tag,
444  comm,
445  wait
446  );
447 }
448 
449 
450 template<class Container, class Type>
452 (
453  const Map<Container>& sendBufs,
454  Map<Container>& recvBufs,
455  const int tag,
456  const label comm,
457  const bool wait
458 )
459 {
460  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
461 
463  (
464  sendBufs,
465  recvBufs,
466  tag,
467  comm,
468  wait
469  );
470 }
471 
472 
473 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
Inter-processor communication reduction functions.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
commsTypes
Communications types.
Definition: UPstream.H:74
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished? Corresponds to MPI_Test()
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:139
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1029
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1536
static int tuning_NBX_
Tuning parameters for non-blocking exchange (NBX)
Definition: UPstream.H:397
label size() const noexcept
The number of elements in table.
Definition: HashTable.H:331
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1020
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
Definition: UPstream.H:1046
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
Definition: DynamicList.H:51
An opaque wrapper for MPI_Request with a vendor-independent representation independent of any <mpi...
Definition: UPstream.H:1571
iterator find(const label &key)
Find and return an iterator set at the hashed entry.
Definition: HashTableI.H:86
#define forAllIters(container, iter)
Iterate across all elements in the container object.
Definition: stdFoam.H:329
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1066
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:99
(MPI_Ssend, MPI_Issend)
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static std::pair< int, int > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const label communicator=worldComm)
Probe for an incoming message.
Definition: UPstream.C:89
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static void exchangeConsensus(const UList< Container > &sendBufs, List< Container > &recvBufs, const int tag, const label comm, const bool wait=true)
Exchange contiguous data using non-blocking consensus Sends sendData, receives into recvData...
void exchangeConsensus(const UList< Container > &sendBufs, UList< Container > &recvBufs, labelUList &recvSizes, const int tag, const label comm, const bool wait=true, const UPstream::commsTypes recvCommType=UPstream::commsTypes::nonBlocking)
Exchange contiguous data using non-blocking consensus exchange with optional tracking of the receive ...
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
A HashTable to objects of type <T> with a label key.
static void barrier(const label communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking)
Definition: UPstream.C:83
static bool finishedRequests(const label pos, label len=-1)
Non-blocking comms: have all requests (from position onwards) finished? Corresponds to MPI_Testall() ...
static constexpr const zero Zero
Global zero (0)
Definition: zero.H:133