PstreamCombineGather.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Variant of gather.
29  Normal gather uses:
30  - default construct and read (>>) from Istream
31  - binary operator and assignment operator to combine values
32 
33  combineGather uses:
34  - construct from Istream
35  - modify operator which modifies its lhs
36 
37 \*---------------------------------------------------------------------------*/
38 
39 #include "IPstream.H"
40 #include "OPstream.H"
41 #include "IOstreams.H"
42 #include "contiguous.H"
43 
44 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
45 
46 template<class T, class CombineOp>
48 (
49  T& value,
50  const CombineOp& cop,
51  const int tag,
52  const label comm
53 )
54 {
55  if (UPstream::is_parallel(comm))
56  {
57  // Communication order
58  const auto& comms = UPstream::whichCommunication(comm);
59  // if (comms.empty()) return; // extra safety?
60  const auto& myComm = comms[UPstream::myProcNo(comm)];
61 
62  // Receive from my downstairs neighbours
63  for (const label belowID : myComm.below())
64  {
66  {
67  T received;
68 
70  (
72  belowID,
73  reinterpret_cast<char*>(&received),
74  sizeof(T),
75  tag,
76  comm
77  );
78 
79  if (debug & 2)
80  {
81  Perr<< " received from "
82  << belowID << " data:" << received << endl;
83  }
84 
85  cop(value, received);
86  }
87  else
88  {
89  IPstream fromBelow
90  (
92  belowID,
93  0, // bufsize
94  tag,
95  comm
96  );
97  T received(fromBelow);
98 
99  if (debug & 2)
100  {
101  Perr<< " received from "
102  << belowID << " data:" << received << endl;
103  }
104 
105  cop(value, received);
106  }
107  }
108 
109  // Send up value
110  if (myComm.above() >= 0)
111  {
112  if (debug & 2)
113  {
114  Perr<< " sending to " << myComm.above()
115  << " data:" << value << endl;
116  }
117 
119  {
121  (
123  myComm.above(),
124  reinterpret_cast<const char*>(&value),
125  sizeof(T),
126  tag,
127  comm
128  );
129  }
130  else
131  {
132  OPstream::send(value, myComm.above(), tag, comm);
133  }
134  }
135  }
136 }
137 
138 
139 template<class T, class CombineOp>
141 (
142  T& value,
143  const CombineOp& cop,
144  const int tag,
145  const label comm
146 )
147 {
148  if (UPstream::is_parallel(comm))
149  {
150  Pstream::combineGather(value, cop, tag, comm);
151  Pstream::broadcast(value, comm);
152  }
153 }
155 
156 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
157 
158 template<class T, class CombineOp>
160 (
161  UList<T>& values,
162  const CombineOp& cop,
163  const int tag,
164  const label comm
165 )
166 {
167  if (UPstream::is_parallel(comm))
168  {
169  // Communication order
170  const auto& comms = UPstream::whichCommunication(comm);
171  // if (comms.empty()) return; // extra safety?
172  const auto& myComm = comms[UPstream::myProcNo(comm)];
173 
174  // Receive from my downstairs neighbours
175  for (const label belowID : myComm.below())
176  {
178  {
179  List<T> received(values.size());
180 
182  (
184  belowID,
185  received.data_bytes(),
186  received.size_bytes(),
187  tag,
188  comm
189  );
190 
191  if (debug & 2)
192  {
193  Perr<< " received from "
194  << belowID << " data:" << received << endl;
195  }
196 
197  forAll(values, i)
198  {
199  cop(values[i], received[i]);
200  }
201  }
202  else
203  {
204  IPstream fromBelow
205  (
207  belowID,
208  0, // bufsize
209  tag,
210  comm
211  );
212  List<T> received(fromBelow);
213 
214  if (debug & 2)
215  {
216  Perr<< " received from "
217  << belowID << " data:" << received << endl;
218  }
219 
220  forAll(values, i)
221  {
222  cop(values[i], received[i]);
223  }
224  }
225  }
226 
227  // Send up values
228  if (myComm.above() >= 0)
229  {
230  if (debug & 2)
231  {
232  Perr<< " sending to " << myComm.above()
233  << " data:" << values << endl;
234  }
235 
237  {
239  (
241  myComm.above(),
242  values.cdata_bytes(),
243  values.size_bytes(),
244  tag,
245  comm
246  );
247  }
248  else
249  {
250  OPstream::send(values, myComm.above(), tag, comm);
251  }
252  }
253  }
254 }
255 
256 
257 template<class T, class CombineOp>
259 (
260  List<T>& values,
261  const CombineOp& cop,
262  const int tag,
263  const label comm
264 )
265 {
266  if (UPstream::is_parallel(comm))
267  {
268  Pstream::listCombineGather(values, cop, tag, comm);
269  Pstream::broadcast(values, comm);
270  }
271 }
273 
274 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
275 
276 template<class Container, class CombineOp>
278 (
279  Container& values,
280  const CombineOp& cop,
281  const int tag,
282  const label comm
283 )
284 {
285  if (UPstream::is_parallel(comm))
286  {
287  // Communication order
288  const auto& comms = UPstream::whichCommunication(comm);
289  // if (comms.empty()) return; // extra safety?
290  const auto& myComm = comms[UPstream::myProcNo(comm)];
291 
292  // Receive from my downstairs neighbours
293  for (const label belowID : myComm.below())
294  {
295  // Map/HashTable: non-contiguous
296 
297  IPstream fromBelow
298  (
300  belowID,
301  0, // bufsize
302  tag,
303  comm
304  );
305  Container received(fromBelow);
306 
307  if (debug & 2)
308  {
309  Perr<< " received from "
310  << belowID << " data:" << received << endl;
311  }
312 
313  for
314  (
315  auto recvIter = received.cbegin();
316  recvIter != received.cend();
317  ++recvIter
318  )
319  {
320  auto masterIter = values.find(recvIter.key());
321 
322  if (masterIter.good())
323  {
324  // Combine with existing
325  cop(masterIter.val(), recvIter.val());
326  }
327  else
328  {
329  // Insert new key/value
330  values.insert(recvIter.key(), recvIter.val());
331  }
332  }
333  }
334 
335  // Send up values
336  if (myComm.above() >= 0)
337  {
338  if (debug & 2)
339  {
340  Perr<< " sending to " << myComm.above()
341  << " data:" << values << endl;
342  }
343 
344  OPstream::send(values, myComm.above(), tag, comm);
345  }
346  }
347 }
348 
349 
350 template<class Container, class CombineOp>
352 (
353  Container& values,
354  const CombineOp& cop,
355  const int tag,
356  const label comm
357 )
358 {
359  if (UPstream::is_parallel(comm))
360  {
361  Pstream::mapCombineGather(values, cop, tag, comm);
362  Pstream::broadcast(values, comm);
363  }
364 }
365 
366 
367 // ************************************************************************* //
static void listCombineGather(UList< T > &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Combines List elements.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
static void mapCombineGather(Container &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Combine Map elements.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static const List< commsStruct > & whichCommunication(const label communicator=worldComm)
Communication schedule for all-to-master (proc 0) as linear/tree/none with switching based on UPstrea...
Definition: UPstream.H:1227
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1086
static void broadcast(Type &value, const label comm=UPstream::worldComm)
Broadcast content (contiguous or non-contiguous) to all communicator ranks. Does nothing in non-paral...
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
Input inter-processor communications stream.
Definition: IPstream.H:49
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
static void combineReduce(T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine value from different processors...
static void combineGather(T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Gather data, applying cop to inplace combine value from different processors.
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1123
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
int debug
Static debugging option.
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
const volScalarField & T
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
static void mapCombineReduce(Container &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine map values from different processo...
bool send()
Send buffer contents now and not in destructor [advanced usage]. Returns true on success.
Definition: OPstreams.C:84
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
static void listCombineReduce(List< T > &values, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Combines List elements. After completion all processors have the same data.