PstreamGatherList.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Gather data from all processors onto single processor according to some
29  communication schedule (usually linear-to-master or tree-to-master).
30  The gathered data will be a list with element procID the data from processor
31  procID. Before calling every processor should insert its value into
32  values[UPstream::myProcNo(comm)].
33  Note: after gather every processor only knows its own data and that of the
34  processors below it. Only the 'master' of the communication schedule holds
35  a fully filled List. Use scatter to distribute the data.
36 
37 \*---------------------------------------------------------------------------*/
38 
39 #include "IPstream.H"
40 #include "OPstream.H"
41 #include "contiguous.H"
42 
43 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
44 
45 template<class T>
47 (
48  const List<UPstream::commsStruct>& comms,
49  List<T>& values,
50  const int tag,
51  const label comm
52 )
53 {
54  if (UPstream::is_parallel(comm))
55  {
56  if (values.size() < UPstream::nProcs(comm))
57  {
59  << "List of values is too small:" << values.size()
60  << " vs numProcs:" << UPstream::nProcs(comm) << nl
62  }
63 
64  // My communication order
65  const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
66 
67  // Receive from my downstairs neighbours
68  for (const label belowID : myComm.below())
69  {
70  const labelList& belowLeaves = comms[belowID].allBelow();
71 
73  {
74  List<T> received(belowLeaves.size() + 1);
75 
77  (
79  belowID,
80  received.data_bytes(),
81  received.size_bytes(),
82  tag,
83  comm
84  );
85 
86  values[belowID] = received[0];
87 
88  forAll(belowLeaves, leafI)
89  {
90  values[belowLeaves[leafI]] = received[leafI + 1];
91  }
92  }
93  else
94  {
95  IPstream fromBelow
96  (
98  belowID,
99  0,
100  tag,
101  comm
102  );
103  fromBelow >> values[belowID];
104 
105  if (debug & 2)
106  {
107  Pout<< " received through "
108  << belowID << " data from:" << belowID
109  << " data:" << values[belowID] << endl;
110  }
111 
112  // Receive from all other processors below belowID
113  for (const label leafID : belowLeaves)
114  {
115  fromBelow >> values[leafID];
116 
117  if (debug & 2)
118  {
119  Pout<< " received through "
120  << belowID << " data from:" << leafID
121  << " data:" << values[leafID] << endl;
122  }
123  }
124  }
125  }
126 
127  // Send up from values:
128  // - my own value first
129  // - all belowLeaves next
130  if (myComm.above() != -1)
131  {
132  const labelList& belowLeaves = myComm.allBelow();
133 
134  if (debug & 2)
135  {
136  Pout<< " sending to " << myComm.above()
137  << " data from me:" << UPstream::myProcNo(comm)
138  << " data:" << values[UPstream::myProcNo(comm)] << endl;
139  }
140 
142  {
143  List<T> sending(belowLeaves.size() + 1);
144  sending[0] = values[UPstream::myProcNo(comm)];
145 
146  forAll(belowLeaves, leafI)
147  {
148  sending[leafI + 1] = values[belowLeaves[leafI]];
149  }
150 
152  (
154  myComm.above(),
155  sending.cdata_bytes(),
156  sending.size_bytes(),
157  tag,
158  comm
159  );
160  }
161  else
162  {
163  OPstream toAbove
164  (
166  myComm.above(),
167  0,
168  tag,
169  comm
170  );
171  toAbove << values[UPstream::myProcNo(comm)];
172 
173  for (const label leafID : belowLeaves)
174  {
175  if (debug & 2)
176  {
177  Pout<< " sending to "
178  << myComm.above() << " data from:" << leafID
179  << " data:" << values[leafID] << endl;
180  }
181  toAbove << values[leafID];
182  }
183  }
184  }
185  }
186 }
187 
188 
189 template<class T>
191 (
192  const List<UPstream::commsStruct>& comms,
193  List<T>& values,
194  const int tag,
195  const label comm
196 )
197 {
198  // Apart from the additional size check, the only difference
199  // between scatterList() and using broadcast(List<T>&) or a regular
200  // scatter(List<T>&) is that processor-local data is skipped.
201 
202  if (UPstream::is_parallel(comm))
203  {
204  if (values.size() < UPstream::nProcs(comm))
205  {
207  << "List of values is too small:" << values.size()
208  << " vs numProcs:" << UPstream::nProcs(comm) << nl
210  }
211 
212  // My communication order
213  const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
214 
215  // Receive from up
216  if (myComm.above() != -1)
217  {
218  const labelList& notBelowLeaves = myComm.allNotBelow();
219 
220  if (is_contiguous<T>::value)
221  {
222  List<T> received(notBelowLeaves.size());
223 
225  (
227  myComm.above(),
228  received.data_bytes(),
229  received.size_bytes(),
230  tag,
231  comm
232  );
233 
234  forAll(notBelowLeaves, leafI)
235  {
236  values[notBelowLeaves[leafI]] = received[leafI];
237  }
238  }
239  else
240  {
241  IPstream fromAbove
242  (
244  myComm.above(),
245  0,
246  tag,
247  comm
248  );
249 
250  for (const label leafID : notBelowLeaves)
251  {
252  fromAbove >> values[leafID];
253 
254  if (debug & 2)
255  {
256  Pout<< " received through "
257  << myComm.above() << " data for:" << leafID
258  << " data:" << values[leafID] << endl;
259  }
260  }
261  }
262  }
263 
264  // Send to my downstairs neighbours
265  forAllReverse(myComm.below(), belowI)
266  {
267  const label belowID = myComm.below()[belowI];
268  const labelList& notBelowLeaves = comms[belowID].allNotBelow();
269 
270  if (is_contiguous<T>::value)
271  {
272  List<T> sending(notBelowLeaves.size());
273 
274  forAll(notBelowLeaves, leafI)
275  {
276  sending[leafI] = values[notBelowLeaves[leafI]];
277  }
278 
280  (
282  belowID,
283  sending.cdata_bytes(),
284  sending.size_bytes(),
285  tag,
286  comm
287  );
288  }
289  else
290  {
291  OPstream toBelow
292  (
294  belowID,
295  0,
296  tag,
297  comm
298  );
299 
300  // Send data destined for all other processors below belowID
301  for (const label leafID : notBelowLeaves)
302  {
303  toBelow << values[leafID];
304 
305  if (debug & 2)
306  {
307  Pout<< " sent through "
308  << belowID << " data for:" << leafID
309  << " data:" << values[leafID] << endl;
310  }
311  }
312  }
313  }
314  }
315 }
316 
317 
318 template<class T>
320 (
321  List<T>& values,
322  const int tag,
323  const label comm
324 )
325 {
327 }
328 
329 
330 // Unused - slate for removal? (MAY-2023)
331 template<class T>
333 (
334  List<T>& values,
335  const int tag,
336  const label comm
337 )
338 {
340 }
341 
342 
343 template<class T>
345 (
346  List<T>& values,
347  const int tag,
348  const label comm
349 )
350 {
351  if (UPstream::is_parallel(comm))
352  {
354  {
355  if (values.size() < UPstream::nProcs(comm))
356  {
358  << "List of values is too small:" << values.size()
359  << " vs numProcs:" << UPstream::nProcs(comm) << nl
361  }
362 
363  UPstream::mpiAllGather(values.data_bytes(), sizeof(T), comm);
364  return;
365  }
366 
367  const auto& comms = UPstream::whichCommunication(comm);
368 
369  Pstream::gatherList(comms, values, tag, comm);
370  Pstream::scatterList(comms, values, tag, comm);
371  }
372 }
373 
374 
375 // ************************************************************************* //
const labelList & below() const noexcept
The procIDs of the processors directly below.
Definition: UPstream.H:193
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static void mpiAllGather(char *allData, int count, const label communicator=worldComm)
Gather/scatter identically-sized char data.
static const List< commsStruct > & whichCommunication(const label communicator=worldComm)
Communication schedule for linear/tree all-to-master (proc 0). Chooses based on the value of UPstream...
Definition: UPstream.H:1213
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1074
label above() const noexcept
The procID of the processor directly above.
Definition: UPstream.H:188
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
Input inter-processor communications stream.
Definition: IPstream.H:49
const char * cdata_bytes() const noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:279
static void allGatherList(List< T > &values, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Gather data, but keep individual values separate. Uses linear/tree communication. ...
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1065
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
"scheduled" : (MPI_Send, MPI_Recv)
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1111
errorManip< error > abort(error &err)
Definition: errorManip.H:139
Structure for communicating between processors.
Definition: UPstream.H:104
const labelList & allBelow() const noexcept
The procIDs of all processors below (so not just directly below)
Definition: UPstream.H:199
int debug
Static debugging option.
Output inter-processor communications stream.
Definition: OPstream.H:49
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition: stdFoam.H:437
List< label > labelList
A List of labels.
Definition: List.H:62
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:293
static void scatterList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Scatter data. Reverse of gatherList.