PstreamCombineGather.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Variant of gather, scatter.
29  Normal gather uses:
30  - default construct and read (>>) from Istream
31  - binary operator and assignment operator to combine values
32 
33  combineGather uses:
34  - construct from Istream
35  - modify operator which modifies its lhs
36 
37 \*---------------------------------------------------------------------------*/
38 
39 #include "OPstream.H"
40 #include "IPstream.H"
41 #include "IOstreams.H"
42 #include "contiguous.H"
43 
44 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
45 
46 template<class T, class CombineOp>
48 (
49  const List<UPstream::commsStruct>& comms,
50  T& value,
51  const CombineOp& cop,
52  const int tag,
53  const label comm
54 )
55 {
56  if (UPstream::is_parallel(comm))
57  {
58  // My communication order
59  const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
60 
61  // Receive from my downstairs neighbours
62  for (const label belowID : myComm.below())
63  {
65  {
66  T received;
67 
69  (
71  belowID,
72  reinterpret_cast<char*>(&received),
73  sizeof(T),
74  tag,
75  comm
76  );
77 
78  if (debug & 2)
79  {
80  Pout<< " received from "
81  << belowID << " data:" << received << endl;
82  }
83 
84  cop(value, received);
85  }
86  else
87  {
88  IPstream fromBelow
89  (
91  belowID,
92  0,
93  tag,
94  comm
95  );
96  T received(fromBelow);
97 
98  if (debug & 2)
99  {
100  Pout<< " received from "
101  << belowID << " data:" << received << endl;
102  }
103 
104  cop(value, received);
105  }
106  }
107 
108  // Send up value
109  if (myComm.above() != -1)
110  {
111  if (debug & 2)
112  {
113  Pout<< " sending to " << myComm.above()
114  << " data:" << value << endl;
115  }
116 
118  {
120  (
122  myComm.above(),
123  reinterpret_cast<const char*>(&value),
124  sizeof(T),
125  tag,
126  comm
127  );
128  }
129  else
130  {
131  OPstream toAbove
132  (
134  myComm.above(),
135  0,
136  tag,
137  comm
138  );
139  toAbove << value;
140  }
141  }
142  }
143 }
144 
145 
146 template<class T>
148 (
149  const List<UPstream::commsStruct>& comms,
150  T& value,
151  const int tag,
152  const label comm
153 )
154 {
155  #ifndef Foam_Pstream_scatter_nobroadcast
156  Pstream::broadcast(value, comm);
157  #else
158  if (UPstream::is_parallel(comm))
159  {
160  // My communication order
161  const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
162 
163  // Receive from up
164  if (myComm.above() != -1)
165  {
166  if (is_contiguous<T>::value)
167  {
169  (
171  myComm.above(),
172  reinterpret_cast<char*>(&value),
173  sizeof(T),
174  tag,
175  comm
176  );
177  }
178  else
179  {
180  IPstream fromAbove
181  (
183  myComm.above(),
184  0,
185  tag,
186  comm
187  );
188  value = T(fromAbove);
189  }
190  }
191 
192  // Send to my downstairs neighbours
193  forAllReverse(myComm.below(), belowI)
194  {
195  const label belowID = myComm.below()[belowI];
196 
197  if (is_contiguous<T>::value)
198  {
200  (
202  belowID,
203  reinterpret_cast<const char*>(&value),
204  sizeof(T),
205  tag,
206  comm
207  );
208  }
209  else
210  {
211  OPstream toBelow
212  (
214  belowID,
215  0,
216  tag,
217  comm
218  );
219  toBelow << value;
220  }
221  }
222  }
223  #endif
224 }
225 
226 
227 template<class T, class CombineOp>
229 (
230  T& value,
231  const CombineOp& cop,
232  const int tag,
233  const label comm
234 )
235 {
237  (
239  value,
240  cop,
241  tag,
242  comm
243  );
244 }
245 
246 
247 template<class T>
249 (
250  T& value,
251  const int tag,
252  const label comm
253 )
254 {
255  #ifndef Foam_Pstream_scatter_nobroadcast
256  Pstream::broadcast(value, comm);
257  #else
259  (
261  value,
262  tag,
263  comm
264  );
265  #endif
266 }
267 
268 
269 template<class T, class CombineOp>
271 (
272  const List<UPstream::commsStruct>& comms,
273  T& value,
274  const CombineOp& cop,
275  const int tag,
276  const label comm
277 )
278 {
279  Pstream::combineGather(comms, value, cop, tag, comm);
280  Pstream::broadcast(value, comm);
281 }
282 
283 
284 template<class T, class CombineOp>
286 (
287  T& value,
288  const CombineOp& cop,
289  const int tag,
290  const label comm
291 )
292 {
293  if (UPstream::is_parallel(comm))
294  {
295  const auto& comms = UPstream::whichCommunication(comm);
296 
297  Pstream::combineGather(comms, value, cop, tag, comm);
298  Pstream::broadcast(value, comm);
299  }
300 }
302 
303 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
304 
305 template<class T, class CombineOp>
307 (
308  const List<UPstream::commsStruct>& comms,
309  List<T>& values,
310  const CombineOp& cop,
311  const int tag,
312  const label comm
313 )
314 {
315  if (UPstream::is_parallel(comm))
316  {
317  // My communication order
318  const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
319 
320  // Receive from my downstairs neighbours
321  for (const label belowID : myComm.below())
322  {
324  {
325  List<T> received(values.size());
326 
328  (
330  belowID,
331  received.data_bytes(),
332  received.size_bytes(),
333  tag,
334  comm
335  );
336 
337  if (debug & 2)
338  {
339  Pout<< " received from "
340  << belowID << " data:" << received << endl;
341  }
342 
343  forAll(values, i)
344  {
345  cop(values[i], received[i]);
346  }
347  }
348  else
349  {
350  IPstream fromBelow
351  (
353  belowID,
354  0,
355  tag,
356  comm
357  );
358  List<T> received(fromBelow);
359 
360  if (debug & 2)
361  {
362  Pout<< " received from "
363  << belowID << " data:" << received << endl;
364  }
365 
366  forAll(values, i)
367  {
368  cop(values[i], received[i]);
369  }
370  }
371  }
372 
373  // Send up values
374  if (myComm.above() != -1)
375  {
376  if (debug & 2)
377  {
378  Pout<< " sending to " << myComm.above()
379  << " data:" << values << endl;
380  }
381 
383  {
385  (
387  myComm.above(),
388  values.cdata_bytes(),
389  values.size_bytes(),
390  tag,
391  comm
392  );
393  }
394  else
395  {
396  OPstream toAbove
397  (
399  myComm.above(),
400  0,
401  tag,
402  comm
403  );
404  toAbove << values;
405  }
406  }
407  }
408 }
409 
410 
411 template<class T>
413 (
414  const List<UPstream::commsStruct>& comms,
415  List<T>& values,
416  const int tag,
417  const label comm
418 )
419 {
420  #ifndef Foam_Pstream_scatter_nobroadcast
421  Pstream::broadcast(values, comm);
422  #else
423  if (UPstream::is_parallel(comm))
424  {
425  // My communication order
426  const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
427 
428  // Receive from up
429  if (myComm.above() != -1)
430  {
431  if (is_contiguous<T>::value)
432  {
434  (
436  myComm.above(),
437  values.data_bytes(),
438  values.size_bytes(),
439  tag,
440  comm
441  );
442  }
443  else
444  {
445  IPstream fromAbove
446  (
448  myComm.above(),
449  0,
450  tag,
451  comm
452  );
453  fromAbove >> values;
454  }
455  }
456 
457  // Send to my downstairs neighbours
458  forAllReverse(myComm.below(), belowI)
459  {
460  const label belowID = myComm.below()[belowI];
461 
462  if (is_contiguous<T>::value)
463  {
465  (
467  belowID,
468  values.cdata_bytes(),
469  values.size_bytes(),
470  tag,
471  comm
472  );
473  }
474  else
475  {
476  OPstream toBelow
477  (
479  belowID,
480  0,
481  tag,
482  comm
483  );
484  toBelow << values;
485  }
486  }
487  }
488  #endif
489 }
490 
491 
492 template<class T, class CombineOp>
494 (
495  List<T>& values,
496  const CombineOp& cop,
497  const int tag,
498  const label comm
499 )
500 {
502  (
504  values,
505  cop,
506  tag,
507  comm
508  );
509 }
510 
511 
512 template<class T>
514 (
515  List<T>& values,
516  const int tag,
517  const label comm
518 )
519 {
520  #ifndef Foam_Pstream_scatter_nobroadcast
521  Pstream::broadcast(values, comm);
522  #else
524  (
526  values,
527  tag,
528  comm
529  );
530  #endif
531 }
532 
533 
534 template<class T, class CombineOp>
536 (
537  List<T>& values,
538  const CombineOp& cop,
539  const int tag,
540  const label comm
541 )
542 {
543  if (UPstream::is_parallel(comm))
544  {
545  const auto& comms = UPstream::whichCommunication(comm);
546 
547  Pstream::listCombineGather(comms, values, cop, tag, comm);
548  Pstream::broadcast(values, comm);
549  }
550 }
552 
553 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
554 
555 template<class Container, class CombineOp>
557 (
558  const List<UPstream::commsStruct>& comms,
559  Container& values,
560  const CombineOp& cop,
561  const int tag,
562  const label comm
563 )
564 {
565  if (UPstream::is_parallel(comm))
566  {
567  // My communication order
568  const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
569 
570  // Receive from my downstairs neighbours
571  for (const label belowID : myComm.below())
572  {
573  // Map/HashTable: non-contiguous
574 
575  IPstream fromBelow
576  (
578  belowID,
579  0,
580  tag,
581  comm
582  );
583  Container received(fromBelow);
584 
585  if (debug & 2)
586  {
587  Pout<< " received from "
588  << belowID << " data:" << received << endl;
589  }
590 
591  for
592  (
593  auto recvIter = received.cbegin();
594  recvIter != received.cend();
595  ++recvIter
596  )
597  {
598  auto masterIter = values.find(recvIter.key());
599 
600  if (masterIter.good())
601  {
602  // Combine with existing
603  cop(masterIter.val(), recvIter.val());
604  }
605  else
606  {
607  // Insert new key/value
608  values.insert(recvIter.key(), recvIter.val());
609  }
610  }
611  }
612 
613  // Send up values
614  if (myComm.above() != -1)
615  {
616  if (debug & 2)
617  {
618  Pout<< " sending to " << myComm.above()
619  << " data:" << values << endl;
620  }
621 
622  OPstream toAbove
623  (
625  myComm.above(),
626  0,
627  tag,
628  comm
629  );
630  toAbove << values;
631  }
632  }
633 }
634 
635 
636 template<class Container>
638 (
639  const List<UPstream::commsStruct>& comms,
640  Container& values,
641  const int tag,
642  const label comm
643 )
644 {
645  #ifndef Foam_Pstream_scatter_nobroadcast
646  Pstream::broadcast(values, comm);
647  #else
648  if (UPstream::is_parallel(comm))
649  {
650  // My communication order
651  const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
652 
653  // Receive from up
654  if (myComm.above() != -1)
655  {
656  IPstream fromAbove
657  (
659  myComm.above(),
660  0,
661  tag,
662  comm
663  );
664  fromAbove >> values;
665 
666  if (debug & 2)
667  {
668  Pout<< " received from "
669  << myComm.above() << " data:" << values << endl;
670  }
671  }
672 
673  // Send to my downstairs neighbours
674  forAllReverse(myComm.below(), belowI)
675  {
676  const label belowID = myComm.below()[belowI];
677 
678  if (debug & 2)
679  {
680  Pout<< " sending to " << belowID << " data:" << values << endl;
681  }
682 
683  OPstream toBelow
684  (
686  belowID,
687  0,
688  tag,
689  comm
690  );
691  toBelow << values;
692  }
693  }
694  #endif
695 }
696 
697 
698 template<class Container, class CombineOp>
700 (
701  Container& values,
702  const CombineOp& cop,
703  const int tag,
704  const label comm
705 )
706 {
708  (
710  values,
711  cop,
712  tag,
713  comm
714  );
715 }
716 
717 
718 template<class Container>
720 (
721  Container& values,
722  const int tag,
723  const label comm
724 )
725 {
726  #ifndef Foam_Pstream_scatter_nobroadcast
727  Pstream::broadcast(values, comm);
728  #else
730  (
732  values,
733  tag,
734  comm
735  );
736  #endif
737 }
738 
739 
740 template<class Container, class CombineOp>
742 (
743  Container& values,
744  const CombineOp& cop,
745  const int tag,
746  const label comm
747 )
748 {
749  if (UPstream::is_parallel(comm))
750  {
751  const auto& comms = UPstream::whichCommunication(comm);
752 
753  Pstream::mapCombineGather(comms, values, cop, tag, comm);
754  Pstream::broadcast(values, comm);
755  }
756 }
757 
758 
759 // ************************************************************************* //
const labelList & below() const noexcept
The procIDs of the processors directly below.
Definition: UPstream.H:193
static void mapCombineGather(const List< commsStruct > &comms, Container &values, const CombineOp &cop, const int tag, const label comm)
static void combineScatter(const List< commsStruct > &comms, T &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
static void mapCombineScatter(const List< commsStruct > &comms, Container &values, const int tag, const label comm)
Broadcast data: Distribute without modification.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static const List< commsStruct > & whichCommunication(const label communicator=worldComm)
Communication schedule for linear/tree all-to-master (proc 0). Chooses based on the value of UPstream...
Definition: UPstream.H:1213
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1074
label above() const noexcept
The procID of the processor directly above.
Definition: UPstream.H:188
static void broadcast(Type &value, const label comm=UPstream::worldComm)
Broadcast content (contiguous or non-contiguous) to all communicator ranks. Does nothing in non-paral...
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
Input inter-processor communications stream.
Definition: IPstream.H:49
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
static void combineGather(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag, const label comm)
Gather data, applying cop to inplace combine value from different processors.
"scheduled" : (MPI_Send, MPI_Recv)
static void listCombineScatter(const List< commsStruct > &comms, List< T > &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1111
Structure for communicating between processors.
Definition: UPstream.H:104
static void combineReduce(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine value from different processors...
int debug
Static debugging option.
Output inter-processor communications stream.
Definition: OPstream.H:49
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
const volScalarField & T
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
static void listCombineGather(const List< commsStruct > &comms, List< T > &values, const CombineOp &cop, const int tag, const label comm)
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition: stdFoam.H:437
static void mapCombineReduce(Container &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine map values from different processo...
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void listCombineReduce(List< T > &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
After completion all processors have the same data.