PstreamExchange.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2016 OpenFOAM Foundation
9  Copyright (C) 2016-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Note
28  The send/recv windows for chunk-wise transfers:
29 
30  iter data window
31  ---- -----------
32  0 [0, 1*chunk]
33  1 [1*chunk, 2*chunk]
34  2 [2*chunk, 3*chunk]
35  ...
36 
37  In older versions (v2312 and earlier) the number of chunks was
38  determined by the sender sizes and used an extra MPI_Allreduce.
39  However instead rely on the send/recv buffers having been consistently
40  sized, which avoids the additional reduction.
41 
42 \*---------------------------------------------------------------------------*/
43 
44 #include "Pstream.H"
45 #include "contiguous.H"
46 #include "PstreamReduceOps.H"
47 
48 // * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
49 
50 namespace Foam
51 {
52 namespace PstreamDetail
53 {
54 
55 //- Number of elements corresponding to max byte transfer.
56 // Normal upper limit is INT_MAX since MPI sizes are limited to <int>.
57 template<class Type>
58 inline std::size_t maxTransferCount
59 (
60  const std::size_t max_bytes = std::size_t(0)
61 ) noexcept
62 {
63  return
64  (
65  (max_bytes == 0) // ie, unlimited
66  ? (std::size_t(0)) //
67  : (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int>
68  ? (std::size_t(INT_MAX) / sizeof(Type)) //
69  : (max_bytes > sizeof(Type)) // require an integral number
70  ? (max_bytes / sizeof(Type)) //
71  : (std::size_t(1)) // min of one element
72  );
73 }
74 
75 
76 //- Upper limit on number of transfer bytes.
77 // Max bytes is normally INT_MAX since MPI sizes are limited to <int>.
78 // Negative values indicate a subtraction from INT_MAX.
79 inline std::size_t maxTransferBytes(const int64_t max_bytes) noexcept
80 {
81  return
82  (
83  (max_bytes < 0) // (numBytes fewer than INT_MAX)
84  ? std::size_t(INT_MAX + max_bytes)
85  : std::size_t(max_bytes)
86  );
87 }
88 
89 
90 //- Exchange of \em contiguous data, with or without chunking.
91 //- Setup sends and receives, each specified as [rank, span] tuple.
92 // The serial list of tuples can be populated from other lists, from
93 // maps of data or subsets of lists/maps etc.
94 //
95 // Any waiting for requests is done outside by the caller
96 template<class Type>
97 void exchangeBuffers
98 (
99  const UList<std::pair<int, stdFoam::span<const Type>>>& sends,
100  const UList<std::pair<int, stdFoam::span<Type>>>& recvs,
101  const int tag,
102  const label comm,
103  const int64_t maxComms_bytes = UPstream::maxCommsSize
104 )
105 {
106  typedef stdFoam::span<const Type> sendType;
107  typedef stdFoam::span<Type> recvType;
108 
109  // Caller already checked for parRun
110 
111  if (sends.empty() && recvs.empty())
112  {
113  // Nothing to do
114  return;
115  }
116 
117  const int myProci = UPstream::myProcNo(comm);
118 
119 
120  // The chunk size (number of elements) corresponding to max byte transfer.
121  // Is zero for non-chunked exchanges.
122  const std::size_t chunkSize
123  (
124  PstreamDetail::maxTransferCount<Type>
125  (
126  PstreamDetail::maxTransferBytes(maxComms_bytes)
127  )
128  );
129 
130 
131  // Set up receives
132  // ~~~~~~~~~~~~~~~
133 
134  for (auto& slot : recvs)
135  {
136  // [rank, span]
137  const auto proci = slot.first;
138  auto& payload = slot.second;
139 
140  // No self-communication or zero-size payload
141  if (proci == myProci || payload.empty())
142  {
143  continue;
144  }
145  else if (!chunkSize)
146  {
147  // Dispatch without chunks
149  (
151  proci,
152  payload.data_bytes(),
153  payload.size_bytes(),
154  tag,
155  comm
156  );
157  continue;
158  }
159 
160  // Dispatch chunk-wise until there is nothing left
161  for (int iter = 0; /*true*/; ++iter)
162  {
163  // The begin/end for the data window
164  const std::size_t beg = (std::size_t(iter)*chunkSize);
165  const std::size_t end = (std::size_t(iter+1)*chunkSize);
166 
167  // The message tag augmented by the iteration number
168  // - avoids message collisions between different chunks
169  const int msgTagChunk = (tag + iter);
170 
171  if (payload.size() <= beg)
172  {
173  // No more data windows
174  break;
175  }
176 
177  recvType window
178  (
179  (end < payload.size())
180  ? payload.subspan(beg, end - beg)
181  : payload.subspan(beg)
182  );
183 
185  (
187  proci,
188  window.data_bytes(),
189  window.size_bytes(),
190  msgTagChunk,
191  comm
192  );
193  }
194  }
195 
196 
197  // Set up sends
198  // ~~~~~~~~~~~~
199 
200  for (const auto& slot : sends)
201  {
202  // [rank, span]
203  const auto proci = slot.first;
204  const auto& payload = slot.second;
205 
206  // No self-communication or zero-size payload
207  if (proci == myProci || payload.empty())
208  {
209  continue;
210  }
211  else if (!chunkSize)
212  {
213  // Dispatch without chunks
214  bool ok = UOPstream::write
215  (
217  proci,
218  payload.cdata_bytes(),
219  payload.size_bytes(),
220  tag,
221  comm
222  );
223 
224  if (!ok)
225  {
227  << "Failure sending message to:" << proci
228  << " nBytes:" << label(payload.size_bytes()) << nl
230  }
231  continue;
232  }
233 
234  // Dispatch chunk-wise until there is nothing left
235  for (int iter = 0; /*true*/; ++iter)
236  {
237  // The begin/end for the data window
238  const std::size_t beg = (std::size_t(iter)*chunkSize);
239  const std::size_t end = (std::size_t(iter+1)*chunkSize);
240 
241  // The message tag augmented by the iteration number
242  // - avoids message collisions between different chunks
243  const int msgTagChunk = (tag + iter);
244 
245  if (payload.size() <= beg)
246  {
247  // No more data windows
248  break;
249  }
250 
251  sendType window
252  (
253  (end < payload.size())
254  ? payload.subspan(beg, end - beg)
255  : payload.subspan(beg)
256  );
257 
258  bool ok = UOPstream::write
259  (
261  proci,
262  window.cdata_bytes(),
263  window.size_bytes(),
264  msgTagChunk,
265  comm
266  );
267 
268  if (!ok)
269  {
271  << "Failure sending message to:" << proci
272  << " nBytes:" << label(window.size_bytes()) << nl
274  }
275  }
276  }
277 }
278 
279 
280 //- Exchange \em contiguous data using point-to-point communication.
281 //- Sends sendBufs, receives into recvBufs.
282 // Data provided and received as container all of which have been
283 // properly sized before calling
284 //
285 // No internal guards or resizing.
286 template<class Container, class Type>
288 (
289  const UList<Container>& sendBufs,
290  UList<Container>& recvBufs,
291  const int tag,
292  const label comm,
293  const bool wait,
294  const int64_t maxComms_bytes = UPstream::maxCommsSize
295 )
296 {
297  typedef stdFoam::span<const Type> sendType;
298  typedef stdFoam::span<Type> recvType;
299 
300  // Caller already checked for parRun
301 
302  if (sendBufs.empty() && recvBufs.empty())
303  {
304  // Nothing to do
305  return;
306  }
307 
308  const label startOfRequests = UPstream::nRequests();
309  const int myProci = UPstream::myProcNo(comm);
310 
311  // The chunk size (number of elements) corresponding to max byte transfer
312  const std::size_t chunkSize
313  (
314  PstreamDetail::maxTransferCount<Type>
315  (
316  PstreamDetail::maxTransferBytes(maxComms_bytes)
317  )
318  );
319 
320 
321  // Set up receives
322  // ~~~~~~~~~~~~~~~
323 
324  forAll(recvBufs, proci)
325  {
326  // [rank, span]
327  recvType payload
328  (
329  recvBufs[proci].data(),
330  std::size_t(recvBufs[proci].size())
331  );
332 
333  // No self-communication or zero-size payload
334  if (proci == myProci || payload.empty())
335  {
336  continue;
337  }
338  else if (!chunkSize)
339  {
340  // Dispatch without chunks
342  (
344  proci,
345  payload.data_bytes(),
346  payload.size_bytes(),
347  tag,
348  comm
349  );
350  continue;
351  }
352 
353  // Dispatch chunk-wise until there is nothing left
354  for (int iter = 0; /*true*/; ++iter)
355  {
356  // The begin/end for the data window
357  const std::size_t beg = (std::size_t(iter)*chunkSize);
358  const std::size_t end = (std::size_t(iter+1)*chunkSize);
359 
360  // The message tag augmented by the iteration number
361  // - avoids message collisions between different chunks
362  const int msgTagChunk = (tag + iter);
363 
364  if (payload.size() <= beg)
365  {
366  // No more data windows
367  break;
368  }
369 
370  recvType window
371  (
372  (end < payload.size())
373  ? payload.subspan(beg, end - beg)
374  : payload.subspan(beg)
375  );
376 
378  (
380  proci,
381  window.data_bytes(),
382  window.size_bytes(),
383  msgTagChunk,
384  comm
385  );
386  }
387  }
388 
389 
390  // Set up sends
391  // ~~~~~~~~~~~~
392 
393  forAll(sendBufs, proci)
394  {
395  // [rank, span]
396  sendType payload
397  (
398  sendBufs[proci].cdata(),
399  std::size_t(sendBufs[proci].size())
400  );
401 
402  // No self-communication or zero-size payload
403  if (proci == myProci || payload.empty())
404  {
405  continue;
406  }
407  else if (!chunkSize)
408  {
409  // Dispatch without chunks
410  bool ok = UOPstream::write
411  (
413  proci,
414  payload.cdata_bytes(),
415  payload.size_bytes(),
416  tag,
417  comm
418  );
419 
420  if (!ok)
421  {
423  << "Fallure sending message to:" << proci
424  << " nBytes:" << label(payload.size_bytes()) << nl
426  }
427  continue;
428  }
429 
430  // Dispatch chunk-wise until there is nothing left
431  for (int iter = 0; /*true*/; ++iter)
432  {
433  // The begin/end for the data window
434  const std::size_t beg = (std::size_t(iter)*chunkSize);
435  const std::size_t end = (std::size_t(iter+1)*chunkSize);
436 
437  // The message tag augmented by the iteration number
438  // - avoids message collisions between different chunks
439  const int msgTagChunk = (tag + iter);
440 
441  if (payload.size() <= beg)
442  {
443  // No more data windows
444  break;
445  }
446 
447  sendType window
448  (
449  (end < payload.size())
450  ? payload.subspan(beg, end - beg)
451  : payload.subspan(beg)
452  );
453 
454  bool ok = UOPstream::write
455  (
457  proci,
458  window.cdata_bytes(),
459  window.size_bytes(),
460  msgTagChunk,
461  comm
462  );
463 
464  if (!ok)
465  {
467  << "Failure sending message to:" << proci
468  << " nBytes:" << label(window.size_bytes()) << nl
470  }
471  }
472  }
473 
474 
475  // Wait for all to finish
476  // ~~~~~~~~~~~~~~~~~~~~~~
477 
478  if (UPstream::debug)
479  {
480  Perr<< "Pstream::exchange with "
481  << (UPstream::nRequests() - startOfRequests)
482  << " requests" << nl;
483  }
484 
485  if (wait)
486  {
487  UPstream::waitRequests(startOfRequests);
488  }
489 }
490 
491 
492 //- Exchange \em contiguous data using point-to-point communication.
493 //- Sends sendBufs, receives into recvBufs.
494 // Data provided and received as container all of which have been
495 // properly sized before calling
496 //
497 // No internal guards or resizing.
498 template<class Container, class Type>
500 (
501  const Map<Container>& sendBufs,
502  Map<Container>& recvBufs,
503  const int tag,
504  const label comm,
505  const bool wait,
506  const int64_t maxComms_bytes = UPstream::maxCommsSize
507 )
508 {
509  typedef stdFoam::span<const Type> sendType;
510  typedef stdFoam::span<Type> recvType;
511 
512  typedef std::pair<int, sendType> sendTuple;
513  typedef std::pair<int, recvType> recvTuple;
514 
515  const label startOfRequests = UPstream::nRequests();
516  const label myProci = UPstream::myProcNo(comm);
517 
518  // Serialize recv sequences
519  DynamicList<recvTuple> recvs(recvBufs.size());
520  {
521  forAllIters(recvBufs, iter)
522  {
523  const auto proci = iter.key();
524  auto& recvData = recvBufs[proci];
525 
526  recvType payload
527  (
528  recvData.data(),
529  std::size_t(recvData.size())
530  );
531 
532  // No self-communication or zero-size payload
533  if (proci != myProci && !payload.empty())
534  {
535  recvs.emplace_back(proci, payload);
536  }
537  }
538 
539  std::sort
540  (
541  recvs.begin(),
542  recvs.end(),
543  [=](const recvTuple& a, const recvTuple& b)
544  {
545  // Sorted processor order
546  return (a.first < b.first);
547 
548  // OR: // Shorter messages first
549  // return (a.second.size() < b.second.size());
550  }
551  );
552  }
553 
554  // Serialize send sequences
555  DynamicList<sendTuple> sends(sendBufs.size());
556  {
557  forAllConstIters(sendBufs, iter)
558  {
559  const auto proci = iter.key();
560  const auto& sendData = iter.val();
561 
562  sendType payload
563  (
564  sendData.cdata(),
565  std::size_t(sendData.size())
566  );
567 
568  // No self-communication or zero-size payload
569  if (proci != myProci && !payload.empty())
570  {
571  sends.emplace_back(proci, payload);
572  }
573  }
574 
575  std::sort
576  (
577  sends.begin(),
578  sends.end(),
579  [=](const sendTuple& a, const sendTuple& b)
580  {
581  // Sorted processor order
582  return (a.first < b.first);
583 
584  // OR: // Shorter messages first
585  // return (a.second.size() < b.second.size());
586  }
587  );
588  }
589 
590  // Exchange buffers in chunk-wise transfers
591  PstreamDetail::exchangeBuffers<Type>
592  (
593  sends,
594  recvs,
595  tag,
596  comm,
597  maxComms_bytes
598  );
599 
600  // Wait for all to finish
601  // ~~~~~~~~~~~~~~~~~~~~~~
602 
603  if (UPstream::debug)
604  {
605  Perr<< "Pstream::exchange with "
606  << (UPstream::nRequests() - startOfRequests)
607  << " requests" << nl;
608  }
609 
610  if (wait)
611  {
612  UPstream::waitRequests(startOfRequests);
613  }
614 }
615 
616 } // namespace PstreamDetail
617 } // namespace Foam
618 
620 
621 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
622 
623 template<class Container, class Type>
625 (
626  const UList<Container>& sendBufs,
627  const labelUList& recvSizes,
628  List<Container>& recvBufs,
629  const int tag,
630  const label comm,
631  const bool wait
632 )
633 {
634  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
635 
636  if (!UPstream::is_rank(comm))
637  {
638  return; // Process not in communicator
639  }
640 
641  const label myProci = UPstream::myProcNo(comm);
642  const label numProc = UPstream::nProcs(comm);
643 
644  if (sendBufs.size() != numProc)
645  {
647  << "List size " << sendBufs.size()
648  << " != number of ranks " << numProc << nl
650  }
651 
652  recvBufs.resize_nocopy(numProc);
653 
654  if (UPstream::is_parallel(comm))
655  {
656  // Presize all receive buffers
657  forAll(recvSizes, proci)
658  {
659  const label count = recvSizes[proci];
660 
661  if (proci != myProci && count > 0)
662  {
663  recvBufs[proci].resize_nocopy(count);
664  }
665  else
666  {
667  recvBufs[proci].clear();
668  }
669  }
670 
671  PstreamDetail::exchangeContainer<Container, Type>
672  (
673  sendBufs,
674  recvBufs,
675  tag,
676  comm,
677  wait
678  // (default: UPstream::maxCommsSize)
679  );
680  }
681 
682  // Do myself. Already checked if in communicator
683  recvBufs[myProci] = sendBufs[myProci];
684 }
685 
686 
687 template<class Container, class Type>
689 (
690  const Map<Container>& sendBufs,
691  const Map<label>& recvSizes,
692  Map<Container>& recvBufs,
693  const int tag,
694  const label comm,
695  const bool wait
696 )
697 {
698  static_assert(is_contiguous<Type>::value, "Contiguous data only!");
699 
700  const int myProci = UPstream::myProcNo(comm);
701 
702  // Initial: clear out receive 'slots'
703  // Preferrable to clear out the map entries instead of the map itself
704  // since this can potentially preserve allocated space
705  // (eg DynamicList entries) between calls
706 
707  forAllIters(recvBufs, iter)
708  {
709  iter.val().clear();
710  }
711 
712  if (UPstream::is_parallel(comm))
713  {
714  // Presize all receive buffers
715  forAllIters(recvSizes, iter)
716  {
717  const label proci = iter.key();
718  const label count = iter.val();
719 
720  if (proci != myProci && count > 0)
721  {
722  recvBufs(proci).resize_nocopy(count);
723  }
724  }
725 
726  PstreamDetail::exchangeContainer<Container, Type>
727  (
728  sendBufs,
729  recvBufs,
730  tag,
731  comm,
732  wait
733  // (default: UPstream::maxCommsSize)
734  );
735  }
736 
737  // Do myself (if actually in the communicator)
738  if (UPstream::is_rank(comm))
739  {
740  const auto iter = sendBufs.find(myProci);
741 
742  bool needsCopy = iter.good();
743 
744  if (needsCopy)
745  {
746  const auto& sendData = iter.val();
747 
748  needsCopy = !sendData.empty();
749  if (needsCopy)
750  {
751  // insert_or_assign
752  recvBufs(myProci) = sendData;
753  }
754  }
755 
756  if (!needsCopy)
757  {
758  recvBufs.erase(myProci);
759  }
760  }
761 }
762 
763 
764 template<class Container>
766 (
767  const labelUList& sendProcs,
768  const labelUList& recvProcs,
769  const Container& sendBufs,
770  labelList& recvSizes,
771  const label tag,
772  const label comm
773 )
774 {
775  if (!UPstream::is_rank(comm))
776  {
777  recvSizes.clear();
778  return; // Process not in communicator
779  }
780 
781  const label myProci = UPstream::myProcNo(comm);
782  const label numProc = UPstream::nProcs(comm);
783 
784  if (sendBufs.size() != numProc)
785  {
787  << "Container size " << sendBufs.size()
788  << " != number of ranks " << numProc << nl
790  }
791 
792  labelList sendSizes(numProc);
793  for (label proci = 0; proci < numProc; ++proci)
794  {
795  sendSizes[proci] = sendBufs[proci].size();
796  }
797 
798  recvSizes.resize_nocopy(numProc);
799  recvSizes = 0; // Ensure non-received entries are properly zeroed
800 
801  // Preserve self-send, even if not described by neighbourhood
802  recvSizes[myProci] = sendSizes[myProci];
803 
804  const label startOfRequests = UPstream::nRequests();
805 
806  for (const label proci : recvProcs)
807  {
808  if (proci != myProci)
809  {
811  (
813  proci,
814  reinterpret_cast<char*>(&recvSizes[proci]),
815  sizeof(label),
816  tag,
817  comm
818  );
819  }
820  }
821 
822  for (const label proci : sendProcs)
823  {
824  if (proci != myProci)
825  {
827  (
829  proci,
830  reinterpret_cast<char*>(&sendSizes[proci]),
831  sizeof(label),
832  tag,
833  comm
834  );
835  }
836  }
837 
838  UPstream::waitRequests(startOfRequests);
839 }
840 
841 
842 template<class Container>
844 (
845  const labelUList& neighProcs,
846  const Container& sendBufs,
847  labelList& recvSizes,
848  const label tag,
849  const label comm
850 )
851 {
852  if (!UPstream::is_rank(comm))
853  {
854  recvSizes.clear();
855  return; // Process not in communicator
856  }
857 
858  Pstream::exchangeSizes<Container>
859  (
860  neighProcs, // send
861  neighProcs, // recv
862  sendBufs,
863  recvSizes,
864  tag,
865  comm
866  );
867 }
868 
869 
870 // Sparse sending
871 template<class Container>
873 (
874  const Map<Container>& sendBufs,
875  Map<label>& recvSizes,
876  const label tag,
877  const label comm
878 )
879 {
880  recvSizes.clear(); // Done in allToAllConsensus too, but be explicit here
882  if (!UPstream::is_rank(comm))
883  {
884  return; // Process not in communicator
885  }
886 
887  Map<label> sendSizes;
888  sendSizes.reserve(sendBufs.size());
889 
890  forAllConstIters(sendBufs, iter)
891  {
892  const label proci = iter.key();
893  const label count = iter.val().size();
894 
895  if (count)
896  {
897  sendSizes.emplace(proci, count);
898  }
899  }
900 
902  (
903  sendSizes,
904  recvSizes,
905  (tag + 314159), // some unique tag?
906  comm
907  );
908 }
909 
910 
911 template<class Container>
913 (
914  const Container& sendBufs,
915  labelList& recvSizes,
916  const label comm
917 )
918 {
919  if (!UPstream::is_rank(comm))
920  {
921  recvSizes.clear();
922  return; // Process not in communicator
923  }
924 
925  const label numProc = UPstream::nProcs(comm);
926 
927  if (sendBufs.size() != numProc)
928  {
930  << "Container size " << sendBufs.size()
931  << " != number of ranks " << numProc << nl
933  }
934 
935  labelList sendSizes(numProc);
936  forAll(sendBufs, proci)
937  {
938  sendSizes[proci] = sendBufs[proci].size();
939  }
940  recvSizes.resize_nocopy(sendSizes.size());
941 
942  if
943  (
946  )
947  {
948  // Use algorithm NBX: Nonblocking Consensus Exchange
949 
951  (
952  sendSizes,
953  recvSizes,
954  (UPstream::msgType() + 314159), // some unique tag?
955  comm
956  );
957  return;
958  }
959 
960  UPstream::allToAll(sendSizes, recvSizes, comm);
961 }
962 
963 
964 template<class Container, class Type>
966 (
967  const UList<Container>& sendBufs,
968  List<Container>& recvBufs,
969  const int tag,
970  const label comm,
971  const bool wait
972 )
973 {
974  // Algorithm PEX: Personalized Exchange
975  // - Step 1: each process writes the data sizes to each peer and
976  // redistributes the vector (eg, MPI_Alltoall or non-blocking consensus)
977  // - Step 2: size receive buffers and setup receives for all
978  // non-zero sendcounts. Post all sends and wait.
979 
980  labelList recvSizes;
981  Pstream::exchangeSizes(sendBufs, recvSizes, comm);
982 
983  Pstream::exchange<Container, Type>
984  (
985  sendBufs,
986  recvSizes,
987  recvBufs,
988  tag,
989  comm,
990  wait
991  );
992 }
993 
994 
995 template<class Container, class Type>
997 (
998  const Map<Container>& sendBufs,
999  Map<Container>& recvBufs,
1000  const int tag,
1001  const label comm,
1002  const bool wait
1003 )
1004 {
1005  // Algorithm PEX: Personalized Exchange
1006  // but using nonblocking consensus exchange for the sizes
1007 
1008  Map<label> recvSizes;
1009  Pstream::exchangeSizes(sendBufs, recvSizes, tag, comm);
1010 
1011  Pstream::exchange<Container, Type>
1012  (
1013  sendBufs,
1014  recvSizes,
1015  recvBufs,
1016  tag,
1017  comm,
1018  wait
1019  );
1020 }
1021 
1022 
1023 // ************************************************************************* //
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
Inter-processor communication reduction functions.
bool emplace(const Key &key, Args &&... args)
Emplace insert a new entry, not overwriting existing entries.
Definition: HashTableI.H:129
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:402
bool empty() const noexcept
True if List is empty (ie, size() is zero)
Definition: UList.H:675
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:168
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1252
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1086
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const label communicator=worldComm)
Exchange non-zero int32_t data between ranks [NBX].
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1561
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
static int nProcsNonblockingExchange
Number of processors to change to nonBlocking consensual exchange (NBX). Ignored for zero or negative...
Definition: UPstream.H:387
label size() const noexcept
The number of elements in table.
Definition: HashTable.H:358
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1077
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
Definition: UPstream.H:1103
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
Definition: DynamicList.H:51
const dimensionedScalar b
Wien displacement law constant: default SI units: [m.K].
Definition: createFields.H:27
void clear()
Clear the list, i.e. set size to zero.
Definition: ListI.H:130
iterator find(const label &key)
Find and return an iterator set at the hashed entry.
Definition: HashTableI.H:86
std::size_t maxTransferCount(const std::size_t max_bytes=std::size_t(0)) noexcept
Number of elements corresponding to max byte transfer.
void sort(UList< T > &list)
Sort the list.
Definition: UList.C:296
#define forAllIters(container, iter)
Iterate across all elements in the container object.
Definition: stdFoam.H:336
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1123
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
const direction noexcept
Definition: Scalar.H:258
static void exchange(const UList< Container > &sendBufs, const labelUList &recvSizes, List< Container > &recvBufs, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool wait=true)
Helper: exchange contiguous data. Sends sendBufs, receives into recvBufs using predetermined receive ...
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const label communicator=worldComm)
Exchange int32_t data with all ranks in communicator.
int debug
Static debugging option.
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
Definition: stdFoam.H:201
void exchangeContainer(const UList< Container > &sendBufs, UList< Container > &recvBufs, const int tag, const label comm, const bool wait, const int64_t maxComms_bytes=UPstream::maxCommsSize)
Exchange contiguous data using point-to-point communication. Sends sendBufs, receives into recvBufs...
bool erase(const iterator &iter)
Erase an entry specified by given iterator.
Definition: HashTable.C:496
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
void reserve(label numEntries)
Reserve space for at least the specified number of elements (not the number of buckets) and regenerat...
Definition: HashTable.C:736
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Rudimentary functionality similar to std::span for holding memory view.
Definition: stdFoam.H:500
std::size_t maxTransferBytes(const int64_t max_bytes) noexcept
Upper limit on number of transfer bytes.
List< label > labelList
A List of labels.
Definition: List.H:62
void exchangeBuffers(const UList< std::pair< int, stdFoam::span< const Type >>> &sends, const UList< std::pair< int, stdFoam::span< Type >>> &recvs, const int tag, const label comm, const int64_t maxComms_bytes=UPstream::maxCommsSize)
Exchange of contiguous data, with or without chunking. Setup sends and receives, each specified as [r...
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
A HashTable to objects of type <T> with a label key.