PstreamGather.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Description
28  Gather data from all processors onto single processor according to some
29  communication schedule (usually tree-to-master).
30  The gathered data will be a single value constructed from the values
31  on individual processors using a user-specified operator.
32 
33 \*---------------------------------------------------------------------------*/
34 
35 #include "IPstream.H"
36 #include "OPstream.H"
37 #include "contiguous.H"
38 
39 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
40 
41 template<class T, class BinaryOp>
43 (
44  T& value,
45  const BinaryOp& bop,
46  const int tag,
47  const label comm
48 )
49 {
50  if (UPstream::is_parallel(comm))
51  {
52  // Communication order
53  const auto& comms = UPstream::whichCommunication(comm);
54  // if (comms.empty()) return; // extra safety?
55  const auto& myComm = comms[UPstream::myProcNo(comm)];
56 
57  // Receive from my downstairs neighbours
58  for (const label belowID : myComm.below())
59  {
60  T received;
61 
63  {
65  (
67  belowID,
68  reinterpret_cast<char*>(&received),
69  sizeof(T),
70  tag,
71  comm
72  );
73  }
74  else
75  {
76  IPstream::recv(received, belowID, tag, comm);
77  }
78 
79  value = bop(value, received);
80  }
81 
82  // Send up value
83  if (myComm.above() >= 0)
84  {
86  {
88  (
90  myComm.above(),
91  reinterpret_cast<const char*>(&value),
92  sizeof(T),
93  tag,
94  comm
95  );
96  }
97  else
98  {
99  OPstream::send(value, myComm.above(), tag, comm);
100  }
101  }
102  }
103 }
104 
105 
106 template<class T>
108 (
109  const T& localValue,
110  const label comm,
111  const int tag
112 )
113 {
114  // OR
115  // if (is_contiguous<T>::value)
116  // {
117  // return UPstream::listGatherValues(localValue, comm);
118  // }
119 
120  List<T> allValues;
121 
122  if (UPstream::is_parallel(comm))
123  {
124  const label numProc = UPstream::nProcs(comm);
125 
126  if (UPstream::master(comm))
127  {
128  allValues.resize(numProc);
129  }
130 
131  if (is_contiguous<T>::value)
132  {
134  (
135  reinterpret_cast<const char*>(&localValue),
136  allValues.data_bytes(),
137  sizeof(T), // The send/recv size per rank
138  comm
139  );
140  }
141  else
142  {
143  if (UPstream::master(comm))
144  {
145  // Non-trivial to manage non-blocking gather without a
146  // PEX/NBX approach (eg, PstreamBuffers) but leave with
147  // with simple exchange for now
148 
149  allValues[0] = localValue;
150 
151  for (int proci = 1; proci < numProc; ++proci)
152  {
153  IPstream::recv(allValues[proci], proci, tag, comm);
154  }
155  }
156  else if (UPstream::is_rank(comm))
157  {
158  OPstream::send(localValue, UPstream::masterNo(), tag, comm);
159  }
160  }
161  }
162  else
163  {
164  // non-parallel: return own value
165  // TBD: only when UPstream::is_rank(comm) as well?
166  allValues.resize(1);
167  allValues[0] = localValue;
168  }
169 
170  return allValues;
171 }
172 
173 
174 template<class T>
176 (
177  const UList<T>& allValues,
178  const label comm,
179  const int tag
180 )
181 {
182  // OR
183  // if (is_contiguous<T>::value)
184  // {
185  // return UPstream::listScatterValues(allValues, comm);
186  // }
187 
188  T localValue{};
189 
190  if (UPstream::is_parallel(comm))
191  {
192  const label numProc = UPstream::nProcs(comm);
193 
194  if (UPstream::master(comm) && allValues.size() < numProc)
195  {
197  << "Attempting to send " << allValues.size()
198  << " values to " << numProc << " processors" << endl
200  }
201 
202  if (is_contiguous<T>::value)
203  {
205  (
206  allValues.cdata_bytes(),
207  reinterpret_cast<char*>(&localValue),
208  sizeof(T), // The send/recv size per rank
209  comm
210  );
211  }
212  else
213  {
214  if (UPstream::master(comm))
215  {
216  const label startOfRequests = UPstream::nRequests();
217 
218  List<DynamicList<char>> sendBuffers(numProc);
219 
220  for (int proci = 1; proci < numProc; ++proci)
221  {
222  UOPstream toProc
223  (
225  proci,
226  sendBuffers[proci],
227  tag,
228  comm
229  );
230  toProc << allValues[proci];
231  }
232 
233  // Wait for outstanding requests
234  UPstream::waitRequests(startOfRequests);
235 
236  return allValues[0];
237  }
238  else if (UPstream::is_rank(comm))
239  {
240  IPstream::recv(localValue, UPstream::masterNo(), tag, comm);
241  }
242  }
243  }
244  else
245  {
246  // non-parallel: return first value
247  // TBD: only when UPstream::is_rank(comm) as well?
248 
249  if (!allValues.empty())
250  {
251  return allValues[0];
252  }
253  }
254 
255  return localValue;
256 }
257 
258 
259 
260 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
static T listScatterValues(const UList< T > &allValues, const label comm=UPstream::worldComm, const int tag=UPstream::msgType())
Scatter individual values from list locations.
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
bool empty() const noexcept
True if List is empty (ie, size() is zero)
Definition: UList.H:675
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static const List< commsStruct > & whichCommunication(const label communicator=worldComm)
Communication schedule for all-to-master (proc 0) as linear/tree/none with switching based on UPstrea...
Definition: UPstream.H:1227
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1086
static List< T > listGatherValues(const T &localValue, const label comm=UPstream::worldComm, const int tag=UPstream::msgType())
Gather individual values into list locations.
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1561
const char * cdata_bytes() const noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:272
static void gather(T &value, const BinaryOp &bop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Gather (reduce) data, applying bop to combine value from different processors. The basis for Foam::re...
Definition: PstreamGather.C:37
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1077
static void recv(Type &value, const int fromProcNo, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Receive and deserialize a value. Uses operator>> for de-serialization.
Definition: IPstream.H:81
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
Definition: UPstream.H:1103
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition: UPstream.H:1071
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition: UPstream.H:1123
static void mpiGather(const char *sendData, char *recvData, int count, const label communicator=worldComm)
Receive identically-sized char data from all ranks.
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
const volScalarField & T
static void mpiScatter(const char *sendData, char *recvData, int count, const label communicator=worldComm)
Send identically-sized char data to all ranks.
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1094
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
bool send()
Send buffer contents now and not in destructor [advanced usage]. Returns true on success.
Definition: OPstreams.C:84
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35