PstreamGatherList.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Gather data from all processors onto single processor according to some
29  communication schedule (usually tree-to-master).
30  The gathered data will be a list with element procID the data from processor
31  procID. Before calling every processor should insert its value into
32  values[UPstream::myProcNo(comm)].
33  Note: after gather every processor only knows its own data and that of the
34  processors below it. Only the 'master' of the communication schedule holds
35  a fully filled List. Use scatter to distribute the data.
36 
37 \*---------------------------------------------------------------------------*/
38 
39 #include "IPstream.H"
40 #include "OPstream.H"
41 #include "contiguous.H"
42 
43 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
44 
45 template<class T>
47 (
48  const UList<UPstream::commsStruct>& comms,
50  const int tag,
51  const label comm
52 )
53 {
54  if (!comms.empty() && UPstream::is_parallel(comm))
55  {
56  const label myProci = UPstream::myProcNo(comm);
57  const label numProc = UPstream::nProcs(comm);
58 
59  if (values.size() < numProc)
60  {
62  << "List of values:" << values.size()
63  << " < numProcs:" << numProc << nl
65  }
66 
67  // My communication order
68  const auto& myComm = comms[myProci];
69 
70  // Receive from my downstairs neighbours
71  for (const label belowID : myComm.below())
72  {
73  const labelList& belowLeaves = comms[belowID].allBelow();
74 
76  {
77  List<T> received(belowLeaves.size() + 1);
78 
80  (
82  belowID,
83  received.data_bytes(),
84  received.size_bytes(),
85  tag,
86  comm
87  );
88 
89  values[belowID] = received[0];
90 
91  forAll(belowLeaves, leafI)
92  {
93  values[belowLeaves[leafI]] = received[leafI + 1];
94  }
95  }
96  else
97  {
98  IPstream fromBelow
99  (
101  belowID,
102  0, // bufsize
103  tag,
104  comm
105  );
106  fromBelow >> values[belowID];
107 
108  if (debug & 2)
109  {
110  Perr<< " received through "
111  << belowID << " data from:" << belowID
112  << " data:" << values[belowID] << endl;
113  }
114 
115  // Receive from all other processors below belowID
116  for (const label leafID : belowLeaves)
117  {
118  fromBelow >> values[leafID];
119 
120  if (debug & 2)
121  {
122  Perr<< " received through "
123  << belowID << " data from:" << leafID
124  << " data:" << values[leafID] << endl;
125  }
126  }
127  }
128  }
129 
130  // Send up from values:
131  // - my own value first
132  // - all belowLeaves next
133  if (myComm.above() >= 0)
134  {
135  const labelList& belowLeaves = myComm.allBelow();
136 
137  if (debug & 2)
138  {
139  Perr<< " sending to " << myComm.above()
140  << " data from me:" << myProci
141  << " data:" << values[myProci] << endl;
142  }
143 
145  {
146  List<T> sending(belowLeaves.size() + 1);
147  sending[0] = values[myProci];
148 
149  forAll(belowLeaves, leafI)
150  {
151  sending[leafI + 1] = values[belowLeaves[leafI]];
152  }
153 
155  (
157  myComm.above(),
158  sending.cdata_bytes(),
159  sending.size_bytes(),
160  tag,
161  comm
162  );
163  }
164  else
165  {
166  OPstream toAbove
167  (
169  myComm.above(),
170  0, // bufsize
171  tag,
172  comm
173  );
174  toAbove << values[myProci];
175 
176  for (const label leafID : belowLeaves)
177  {
178  if (debug & 2)
179  {
180  Perr<< " sending to "
181  << myComm.above() << " data from:" << leafID
182  << " data:" << values[leafID] << endl;
183  }
184  toAbove << values[leafID];
185  }
186  }
187  }
188  }
189 }
190 
191 
192 template<class T>
194 (
195  const UList<UPstream::commsStruct>& comms,
196  UList<T>& values,
197  const int tag,
198  const label comm
199 )
200 {
201  // Apart from the additional size check, the only difference
202  // between scatterList() and using broadcast(List<T>&) or a regular
203  // scatter(List<T>&) is that processor-local data is skipped.
204 
205  if (!comms.empty() && UPstream::is_parallel(comm))
206  {
207  const label myProci = UPstream::myProcNo(comm);
208  const label numProc = UPstream::nProcs(comm);
209 
210  if (values.size() < numProc)
211  {
213  << "List of values:" << values.size()
214  << " < numProcs:" << numProc << nl
216  }
217 
218  // My communication order
219  const auto& myComm = comms[myProci];
220 
221  // Receive from up
222  if (myComm.above() >= 0)
223  {
224  const labelList& notBelowLeaves = myComm.allNotBelow();
225 
226  if (is_contiguous<T>::value)
227  {
228  List<T> received(notBelowLeaves.size());
229 
231  (
233  myComm.above(),
234  received.data_bytes(),
235  received.size_bytes(),
236  tag,
237  comm
238  );
239 
240  forAll(notBelowLeaves, leafI)
241  {
242  values[notBelowLeaves[leafI]] = received[leafI];
243  }
244  }
245  else
246  {
247  IPstream fromAbove
248  (
250  myComm.above(),
251  0, // bufsize
252  tag,
253  comm
254  );
255 
256  for (const label leafID : notBelowLeaves)
257  {
258  fromAbove >> values[leafID];
259 
260  if (debug & 2)
261  {
262  Perr<< " received through "
263  << myComm.above() << " data for:" << leafID
264  << " data:" << values[leafID] << endl;
265  }
266  }
267  }
268  }
269 
270  // Send to my downstairs neighbours
271  forAllReverse(myComm.below(), belowI)
272  {
273  const label belowID = myComm.below()[belowI];
274  const labelList& notBelowLeaves = comms[belowID].allNotBelow();
275 
276  if (is_contiguous<T>::value)
277  {
278  List<T> sending(notBelowLeaves.size());
279 
280  forAll(notBelowLeaves, leafI)
281  {
282  sending[leafI] = values[notBelowLeaves[leafI]];
283  }
284 
286  (
288  belowID,
289  sending.cdata_bytes(),
290  sending.size_bytes(),
291  tag,
292  comm
293  );
294  }
295  else
296  {
297  OPstream toBelow
298  (
300  belowID,
301  0, // bufsize
302  tag,
303  comm
304  );
305 
306  // Send data destined for all other processors below belowID
307  for (const label leafID : notBelowLeaves)
308  {
309  toBelow << values[leafID];
310 
311  if (debug & 2)
312  {
313  Perr<< " sent through "
314  << belowID << " data for:" << leafID
315  << " data:" << values[leafID] << endl;
316  }
317  }
318  }
319  }
320  }
321 }
322 
323 
324 template<class T>
326 (
327  UList<T>& values,
328  const int tag,
329  const label comm
330 )
331 {
333  (
335  values,
336  tag,
337  comm
338  );
339 }
340 
341 
342 // Unused - slate for removal? (MAY-2023)
343 template<class T>
345 (
346  UList<T>& values,
347  const int tag,
348  const label comm
349 )
350 {
352  (
354  values,
355  tag,
356  comm
357  );
358 }
359 
360 
361 template<class T>
363 (
364  UList<T>& values,
365  const int tag,
366  const label comm
367 )
368 {
369  if (UPstream::is_parallel(comm))
370  {
372  {
373  if (values.size() < UPstream::nProcs(comm))
374  {
376  << "List of values is too small:" << values.size()
377  << " vs numProcs:" << UPstream::nProcs(comm) << nl
379  }
380 
381  UPstream::mpiAllGather(values.data_bytes(), sizeof(T), comm);
382  return;
383  }
384 
385  const auto& comms = UPstream::whichCommunication(comm);
386 
387  Pstream::gatherList(comms, values, tag, comm);
388  Pstream::scatterList(comms, values, tag, comm);
389  }
390 }
391 
392 
393 // ************************************************************************* //
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
static void scatterList(const UList< commsStruct > &comms, UList< T > &values, const int tag, const label comm)
Inverse of gatherList. Uses the specified communication schedule.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
bool empty() const noexcept
True if List is empty (ie, size() is zero)
Definition: UList.H:675
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static void gatherList(const UList< commsStruct > &comms, UList< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
static void mpiAllGather(char *allData, int count, const label communicator=worldComm)
Gather/scatter identically-sized char data.
static const List< commsStruct > & whichCommunication(const label communicator=worldComm)
Communication schedule for all-to-master (proc 0) as linear/tree/none with switching based on UPstrea...
Definition: UPstream.H:1227
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1086
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
Input inter-processor communications stream.
Definition: IPstream.H:49
const char * cdata_bytes() const noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:272
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1077
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1123
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
int debug
Static debugging option.
Output inter-processor communications stream.
Definition: OPstream.H:49
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
static void allGatherList(UList< T > &values, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Gather data, but keep individual values separate. Uses MPI_Allgather or manual linear/tree communicat...
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition: stdFoam.H:437
List< label > labelList
A List of labels.
Definition: List.H:62
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:286
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35