Pstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2016 OpenFOAM Foundation
9  Copyright (C) 2016-2022 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::Pstream
29 
30 Description
31  Inter-processor communications stream.
32 
33 SourceFiles
34  Pstream.C
35  PstreamBroadcast.C
36  PstreamGather.C
37  PstreamCombineGather.C
38  PstreamGatherList.C
39  PstreamExchange.C
40 
41 \*---------------------------------------------------------------------------*/
42 
43 #ifndef Foam_Pstream_H
44 #define Foam_Pstream_H
45 
46 #include "UPstream.H"
47 #include "DynamicList.H"
48 
49 // Legacy
50 // #define Foam_Pstream_scatter_nobroadcast
51 
52 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
53 
54 namespace Foam
55 {
56 
57 /*---------------------------------------------------------------------------*\
58  Class Pstream Declaration
59 \*---------------------------------------------------------------------------*/
60 
61 class Pstream
62 :
63  public UPstream
64 {
65  // Private Static Functions
66 
67  //- Exchange contiguous data. Sends sendBufs, receives into recvBufs.
68  // Data provided and received as container.
69  template<class Container, class T>
70  static void exchangeContainer
71  (
72  const UList<Container>& sendBufs,
73  const labelUList& recvSizes,
74  List<Container>& recvBufs,
75  const int tag,
76  const label comm,
77  const bool wait
78  );
79 
80  //- Exchange contiguous data. Sends sendBufs, receives into recvBufs.
81  // Data provided and received as pointers.
82  template<class T>
83  static void exchangeBuf
84  (
85  const labelUList& sendSizes,
86  const UList<const char*>& sendBufs,
87  const labelUList& recvSizes,
88  List<char*>& recvBufs,
89  const int tag,
90  const label comm,
91  const bool wait
92  );
93 
94 
95 protected:
96 
97  // Protected Data
98 
99  //- Allocated transfer buffer (can be used for send or receive)
101 
102 
103 public:
104 
105  // Declare name of the class and its debug switch
106  ClassName("Pstream");
107 
108 
109  // Constructors
110 
111  //- Construct for given commsTypes, with optional buffer size
112  explicit Pstream
113  (
115  const label bufSize = 0
116  )
117  :
119  {
120  if (bufSize > 0)
121  {
122  transferBuf_.setCapacity(bufSize + 2*sizeof(scalar) + 1);
123  }
124  }
125 
126 
127  // Static Functions
128 
129  // Broadcast
130 
131  //- Broadcast buffer content to all processes in communicator.
132  using UPstream::broadcast;
133 
134  //- Broadcast content (contiguous or non-contiguous)
135  //- to all processes in communicator.
136  template<class Type>
137  static void broadcast
138  (
139  Type& value,
140  const label comm = UPstream::worldComm
141  );
142 
143  //- Broadcast multiple items to all processes in communicator.
144  template<class Type, class... Args>
145  static void broadcasts(const label comm, Type& arg1, Args&&... args);
146 
147 
148  // Gather
149 
150  //- Gather (reduce) data, appyling \c bop to combine \c value
151  //- from different processors. The basis for Foam::reduce().
152  // Uses the specified communication schedule.
153  template<class T, class BinaryOp>
154  static void gather
155  (
156  const List<commsStruct>& comms,
157  T& value,
158  const BinaryOp& bop,
159  const int tag,
160  const label comm
161  );
162 
163  //- Gather (reduce) data, applying \c bop to combine \c value
164  //- from different processors. The basis for Foam::reduce().
165  // Uses linear/tree communication.
166  template<class T, class BinaryOp>
167  static void gather
168  (
169  T& value,
170  const BinaryOp& bop,
171  const int tag = UPstream::msgType(),
172  const label comm = UPstream::worldComm
173  );
174 
175 
176  // Gather/combine data
177  // Inplace combine values from processors.
178  // (Uses construct from Istream instead of <<)
179 
180  //- Gather data, applying \c cop to inplace combine \c value
181  //- from different processors.
182  // Uses the specified communication schedule.
183  template<class T, class CombineOp>
184  static void combineGather
185  (
186  const List<commsStruct>& comms,
187  T& value,
188  const CombineOp& cop,
189  const int tag,
190  const label comm
191  );
192 
193  //- Gather data, applying \c cop to inplace combine \c value
194  //- from different processors.
195  // Uses linear/tree communication.
196  template<class T, class CombineOp>
197  static void combineGather
198  (
199  T& value,
200  const CombineOp& cop,
201  const int tag = UPstream::msgType(),
202  const label comm = UPstream::worldComm
203  );
204 
205  //- Reduce inplace (cf. MPI Allreduce)
206  //- applying \c cop to inplace combine \c value
207  //- from different processors.
208  //- After completion all processors have the same data.
209  // Uses the specified communication schedule.
210  // Wraps combineGather/broadcast (may change in the future).
211  template<class T, class CombineOp>
212  static void combineReduce
213  (
214  const List<commsStruct>& comms,
215  T& value,
216  const CombineOp& cop,
217  const int tag = UPstream::msgType(),
218  const label comm = UPstream::worldComm
219  );
220 
221  //- Reduce inplace (cf. MPI Allreduce)
222  //- applying \c cop to inplace combine \c value
223  //- from different processors.
224  //- After completion all processors have the same data.
225  // Uses linear/tree communication.
226  // Wraps combineGather/broadcast (may change in the future).
227  template<class T, class CombineOp>
228  static void combineReduce
229  (
230  T& value,
231  const CombineOp& cop,
232  const int tag = UPstream::msgType(),
233  const label comm = UPstream::worldComm
234  );
235 
236  //- Same as Pstream::combineReduce
237  template<class T, class CombineOp>
238  static void combineAllGather
239  (
240  T& value,
241  const CombineOp& cop,
242  const int tag = UPstream::msgType(),
243  const label comm = UPstream::worldComm
244  )
245  {
246  Pstream::combineReduce(value, cop, tag, comm);
247  }
248 
249 
250  // Combine variants working on whole List at a time.
251 
252  template<class T, class CombineOp>
253  static void listCombineGather
254  (
255  const List<commsStruct>& comms,
256  List<T>& values,
257  const CombineOp& cop,
258  const int tag,
259  const label comm
260  );
261 
262  //- Like above but switches between linear/tree communication
263  template<class T, class CombineOp>
264  static void listCombineGather
265  (
266  List<T>& values,
267  const CombineOp& cop,
268  const int tag = UPstream::msgType(),
269  const label comm = UPstream::worldComm
270  );
271 
272  //- After completion all processors have the same data.
273  template<class T, class CombineOp>
274  static void listCombineReduce
275  (
276  List<T>& values,
277  const CombineOp& cop,
278  const int tag = UPstream::msgType(),
279  const label comm = UPstream::worldComm
280  );
281 
282  //- Same as Pstream::listCombineReduce
283  template<class T, class CombineOp>
284  static void listCombineAllGather
285  (
286  List<T>& values,
287  const CombineOp& cop,
288  const int tag = UPstream::msgType(),
289  const label comm = UPstream::worldComm
290  )
291  {
292  Pstream::listCombineReduce(values, cop, tag, comm);
293  }
294 
295 
296  // Combine variants working on whole map at a time.
297  // Container needs iterators, find() and insert methods defined.
298 
299  template<class Container, class CombineOp>
300  static void mapCombineGather
301  (
302  const List<commsStruct>& comms,
303  Container& values,
304  const CombineOp& cop,
305  const int tag,
306  const label comm
307  );
308 
309  //- Like above but switches between linear/tree communication
310  template<class Container, class CombineOp>
311  static void mapCombineGather
312  (
313  Container& values,
314  const CombineOp& cop,
315  const int tag = UPstream::msgType(),
316  const label comm = UPstream::worldComm
317  );
318 
319  //- Reduce inplace (cf. MPI Allreduce)
320  //- applying \c cop to inplace combine map \c values
321  //- from different processors.
322  //- After completion all processors have the same data.
323  // Uses the specified communication schedule.
324  // Wraps mapCombineGather/broadcast (may change in the future).
325  //- After completion all processors have the same data.
326  template<class Container, class CombineOp>
327  static void mapCombineReduce
328  (
329  Container& values,
330  const CombineOp& cop,
331  const int tag = UPstream::msgType(),
332  const label comm = UPstream::worldComm
333  );
334 
335  //- Same as Pstream::mapCombineReduce
336  template<class Container, class CombineOp>
337  static void mapCombineAllGather
338  (
339  Container& values,
340  const CombineOp& cop,
341  const int tag = UPstream::msgType(),
342  const label comm = UPstream::worldComm
343  )
344  {
345  Pstream::mapCombineReduce(values, cop, tag, comm);
346  }
347 
348 
349  // Gather/scatter keeping the individual processor data separate.
350  // The values is a List of size UPstream::nProcs() where
351  // values[UPstream::myProcNo()] is the data for the current processor.
352 
353  //- Gather data, but keep individual values separate.
354  //- Uses the specified communication schedule.
355  template<class T>
356  static void gatherList
357  (
358  const List<commsStruct>& comms,
359  List<T>& values,
360  const int tag,
361  const label comm
362  );
363 
364  //- Gather data, but keep individual values separate.
365  //- Uses linear/tree communication.
366  template<class T>
367  static void gatherList
368  (
369  List<T>& values,
370  const int tag = UPstream::msgType(),
371  const label comm = UPstream::worldComm
372  );
373 
374  //- Gather data, but keep individual values separate.
375  //- Uses linear/tree communication.
376  // After completion all processors have the same data.
377  // Wraps gatherList/scatterList (may change in the future).
378  template<class T>
379  static void allGatherList
380  (
381  List<T>& values,
382  const int tag = UPstream::msgType(),
383  const label comm = UPstream::worldComm
384  );
385 
386 
387  // Scatter
388 
389  //- Broadcast data: Distribute without modification.
390  // \note comms and tag parameters only used when
391  // Foam_Pstream_scatter_nobroadcast is defined
392  template<class T>
393  static void scatter
394  (
395  const List<commsStruct>& comms,
396  T& value,
397  const int tag,
398  const label comm
399  );
400 
401  //- Broadcast data: Distribute without modification.
402  // \note tag parameter only used when
403  // Foam_Pstream_scatter_nobroadcast is defined
404  template<class T>
405  static void scatter
406  (
407  T& value,
408  const int tag = UPstream::msgType(),
409  const label comm = UPstream::worldComm
410  );
411 
412  //- Broadcast data: Distribute without modification.
413  // \note tag parameter only used when
414  // Foam_Pstream_scatter_nobroadcast is defined
415  template<class T>
416  static void combineScatter
417  (
418  const List<commsStruct>& comms,
419  T& value,
420  const int tag,
421  const label comm
422  );
423 
424  //- Broadcast data: Distribute without modification.
425  // \note tag parameter only used when
426  // Foam_Pstream_scatter_nobroadcast is defined
427  template<class T>
428  static void combineScatter
429  (
430  T& value,
431  const int tag = UPstream::msgType(),
432  const label comm = UPstream::worldComm
433  );
434 
435  //- Broadcast data: Distribute without modification.
436  // \note comms and tag parameters only used when
437  // Foam_Pstream_scatter_nobroadcast is defined
438  template<class T>
439  static void listCombineScatter
440  (
441  const List<commsStruct>& comms,
442  List<T>& value,
443  const int tag,
444  const label comm
445  );
446 
447  //- Broadcast data: Distribute without modification.
448  // \note comms and tag parameters only used when
449  // Foam_Pstream_scatter_nobroadcast is defined
450  template<class T>
451  static void listCombineScatter
452  (
453  List<T>& value,
454  const int tag = UPstream::msgType(),
455  const label comm = UPstream::worldComm
456  );
457 
458  //- Broadcast data: Distribute without modification.
459  template<class Container>
460  static void mapCombineScatter
461  (
462  const List<commsStruct>& comms,
463  Container& values,
464  const int tag,
465  const label comm
466  );
467 
468  //- Like above but switches between linear/tree communication
469  template<class Container>
470  static void mapCombineScatter
471  (
472  Container& values,
473  const int tag = UPstream::msgType(),
474  const label comm = UPstream::worldComm
475  );
476 
477 
478  //- Scatter data. Reverse of gatherList
479  template<class T>
480  static void scatterList
481  (
482  const List<commsStruct>& comms,
483  List<T>& values,
484  const int tag,
485  const label comm
486  );
487 
488  //- Like above but switches between linear/tree communication
489  template<class T>
490  static void scatterList
491  (
492  List<T>& values,
493  const int tag = UPstream::msgType(),
494  const label comm = UPstream::worldComm
495  );
496 
497 
498  // Exchange
499 
500  //- Helper: exchange sizes of sendData for specified
501  //- set of send/receive processes.
502  template<class Container>
503  static void exchangeSizes
504  (
505  const labelUList& sendProcs,
506  const labelUList& recvProcs,
507  const Container& sendData,
508  labelList& sizes,
509  const label tag = UPstream::msgType(),
510  const label comm = UPstream::worldComm
511  );
512 
513  //- Helper: exchange sizes of sendData.
514  //- The sendData is the data per processor (in the communicator).
515  // Returns sizes of sendData on the sending processor.
516  template<class Container>
517  static void exchangeSizes
518  (
519  const Container& sendData,
520  labelList& sizes,
521  const label comm = UPstream::worldComm
522  );
523 
524 
525  //- Helper: exchange contiguous data.
526  //- Sends sendData, receives into recvData.
527  // If wait=true will wait for all transfers to finish.
528  template<class Container, class T>
529  static void exchange
530  (
531  const UList<Container>& sendData,
532  const labelUList& recvSizes,
533  List<Container>& recvData,
534  const int tag = UPstream::msgType(),
535  const label comm = UPstream::worldComm,
536  const bool wait = true
537  );
538 
539  //- Exchange contiguous data.
540  //- Sends sendData, receives into recvData.
541  //- Determines sizes to receive.
542  // If wait=true will wait for all transfers to finish.
543  template<class Container, class T>
544  static void exchange
545  (
546  const UList<Container>& sendData,
547  List<Container>& recvData,
548  const int tag = UPstream::msgType(),
549  const label comm = UPstream::worldComm,
550  const bool wait = true
551  );
552 };
553 
554 
555 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
556 
557 } // End namespace Foam
558 
559 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
560 
561 #ifdef NoRepository
562  #include "PstreamBroadcast.C"
563  #include "PstreamGather.C"
564  #include "PstreamCombineGather.C"
565  #include "PstreamGatherList.C"
566  #include "PstreamExchange.C"
567 #endif
568 
569 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
570 
571 #endif
572 
573 // ************************************************************************* //
static void mapCombineGather(const List< commsStruct > &comms, Container &values, const CombineOp &cop, const int tag, const label comm)
Gather data from all processors onto single processor according to some communication schedule (usual...
static void combineScatter(const List< commsStruct > &comms, T &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
static void exchange(const UList< Container > &sendData, const labelUList &recvSizes, List< Container > &recvData, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool wait=true)
Helper: exchange contiguous data. Sends sendData, receives into recvData.
static void mapCombineScatter(const List< commsStruct > &comms, Container &values, const int tag, const label comm)
Broadcast data: Distribute without modification.
static void mapCombineAllGather(Container &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Same as Pstream::mapCombineReduce.
Definition: Pstream.H:382
commsTypes
Types of communications.
Definition: UPstream.H:66
Pstream(const UPstream::commsTypes commsType, const label bufSize=0)
Construct for given commsTypes, with optional buffer size.
Definition: Pstream.H:118
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
void setCapacity(const label len)
Alter the size of the underlying storage.
Definition: DynamicListI.H:296
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:806
Exchange data.
Gather data from all processors onto single processor according to some communication schedule (usual...
static label worldComm
Default world communicator (all processors). May differ from globalComm if local worlds are in use...
Definition: UPstream.H:361
static void broadcast(Type &value, const label comm=UPstream::worldComm)
Broadcast content (contiguous or non-contiguous) to all processes in communicator.
static void gather(const List< commsStruct > &comms, T &value, const BinaryOp &bop, const int tag, const label comm)
Gather (reduce) data, appyling bop to combine value from different processors. The basis for Foam::re...
Definition: PstreamGather.C:37
static void allGatherList(List< T > &values, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Gather data, but keep individual values separate. Uses linear/tree communication. ...
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
static void combineGather(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag, const label comm)
Gather data, applying cop to inplace combine value from different processors.
static void listCombineScatter(const List< commsStruct > &comms, List< T > &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
Inter-processor communications stream.
Definition: Pstream.H:56
Variant of gather, scatter. Normal gather uses:
static void listCombineAllGather(List< T > &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Same as Pstream::listCombineReduce.
Definition: Pstream.H:322
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:99
static void combineReduce(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine value from different processors...
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:814
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
ClassName("Pstream")
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
static void combineAllGather(T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Same as Pstream::combineReduce.
Definition: Pstream.H:270
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendData, labelList &sizes, const label tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Helper: exchange sizes of sendData for specified set of send/receive processes.
static void listCombineGather(const List< commsStruct > &comms, List< T > &values, const CombineOp &cop, const int tag, const label comm)
static bool broadcast(char *buf, const std::streamsize bufSize, const label communicator=worldComm, const int rootProcNo=masterNo())
Broadcast buffer contents to all processes in communicator. The sizes must match on all processes...
DynamicList< char > transferBuf_
Allocated transfer buffer (can be used for send or receive)
Definition: Pstream.H:103
static void mapCombineReduce(Container &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine map values from different processo...
Foam::argList args(argc, argv)
Inter-processor communications stream.
Definition: UPstream.H:54
Namespace for OpenFOAM.
static void scatter(const List< commsStruct > &comms, T &value, const int tag, const label comm)
Broadcast data: Distribute without modification.
static void listCombineReduce(List< T > &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
After completion all processors have the same data.
static void scatterList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Scatter data. Reverse of gatherList.