UIPstreamRead.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UIPstream.H"
30 #include "PstreamGlobals.H"
31 #include "profilingPstream.H"
32 #include "IOstreams.H"
33 
34 // FUTURE? probe and receive message
35 // - as of 2023-06 appears to be broken with INTELMPI + PMI-2 (slurm)
36 // and perhaps other places so currently avoid
37 
38 #undef Pstream_use_MPI_Get_count
39 
40 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
41 
42 // General blocking/non-blocking MPI receive
43 static std::streamsize UPstream_mpi_receive
44 (
45  const Foam::UPstream::commsTypes commsType,
46  char* buf,
47  const std::streamsize bufSize,
48  const int fromProcNo,
49  const int tag,
50  const Foam::label communicator,
52 )
53 {
54  using namespace Foam;
55 
57 
58  // TODO: some corrective action, at least when not nonBlocking
59  #if 0
60  // No warnings here, just on the sender side.
61  if (bufSize > std::streamsize(INT_MAX))
62  {
63  Perr<< "UIPstream::read() : from rank " << fromProcNo
64  << " exceeds INT_MAX bytes" << Foam::endl;
66  }
67  #endif
68 
69  if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm)
70  {
71  Perr<< "UIPstream::read : starting read from:" << fromProcNo
72  << " size:" << label(bufSize)
73  << " tag:" << tag << " comm:" << communicator
74  << " commsType:" << UPstream::commsTypeNames[commsType]
75  << " warnComm:" << UPstream::warnComm
76  << Foam::endl;
78  }
79  else if (UPstream::debug)
80  {
81  Perr<< "UIPstream::read : starting read from:" << fromProcNo
82  << " size:" << label(bufSize)
83  << " tag:" << tag << " comm:" << communicator
84  << " commsType:" << UPstream::commsTypeNames[commsType]
85  << Foam::endl;
86  }
87 
88  int returnCode = MPI_ERR_UNKNOWN;
89 
91 
92  if
93  (
95  || commsType == UPstream::commsTypes::scheduled
96  )
97  {
98  // Not UPstream::commsTypes::nonBlocking
99 
100  MPI_Status status;
101 
102  {
103  returnCode = MPI_Recv
104  (
105  buf,
106  bufSize,
107  MPI_BYTE,
108  fromProcNo,
109  tag,
110  PstreamGlobals::MPICommunicators_[communicator],
111  &status
112  );
113  }
114 
116 
117  if (returnCode != MPI_SUCCESS)
118  {
120  << "MPI_Recv cannot receive incoming message"
122  return 0;
123  }
124  else if (UPstream::debug)
125  {
126  Perr<< "UIPstream::read : finished recv from:"
127  << fromProcNo
128  << " size:" << label(bufSize) << " tag:" << tag
129  << Foam::endl;
130  }
131 
132  // Check size of message read
133  #ifdef Pstream_use_MPI_Get_count
134  int count(0);
135  MPI_Get_count(&status, MPI_BYTE, &count);
136  #else
137  MPI_Count count(0);
138  MPI_Get_elements_x(&status, MPI_BYTE, &count);
139  #endif
140 
141  // Errors
142  if (count == MPI_UNDEFINED || int64_t(count) < 0)
143  {
145  << "MPI_Get_count() or MPI_Get_elements_x() : "
146  "returned undefined or negative value"
148  }
149  else if (int64_t(count) > int64_t(UList<char>::max_size()))
150  {
152  << "MPI_Get_count() or MPI_Get_elements_x() : "
153  "count is larger than UList<char>::max_size() bytes"
155  }
156 
157 
158  if (bufSize < std::streamsize(count))
159  {
161  << "buffer (" << label(bufSize)
162  << ") not large enough for incoming message ("
163  << label(count) << ')'
165  }
166 
167  return std::streamsize(count);
168  }
169  else if (commsType == UPstream::commsTypes::nonBlocking)
170  {
171  MPI_Request request;
172 
173  {
174  returnCode = MPI_Irecv
175  (
176  buf,
177  bufSize,
178  MPI_BYTE,
179  fromProcNo,
180  tag,
181  PstreamGlobals::MPICommunicators_[communicator],
182  &request
183  );
184  }
185 
186  if (returnCode != MPI_SUCCESS)
187  {
189  << "MPI_Irecv cannot start non-blocking receive"
191 
192  return 0;
193  }
194 
195  PstreamGlobals::push_request(request, req);
197 
198 
199  if (UPstream::debug)
200  {
201  Perr<< "UIPstream::read : started non-blocking recv from:"
202  << fromProcNo
203  << " size:" << label(bufSize) << " tag:" << tag
204  << " request:" <<
205  (req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
206  << Foam::endl;
207  }
208 
209  // Assume the message will be completely received.
210  return bufSize;
211  }
212 
214  << "Unsupported communications type " << int(commsType)
216 
217  return 0;
218 }
219 
220 
221 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
222 
223 void Foam::UIPstream::bufferIPCrecv()
224 {
225  // Called by constructor
226  if (UPstream::debug)
227  {
228  Perr<< "UIPstream IPC read buffer :"
229  << " from:" << fromProcNo_
230  << " tag:" << tag_ << " comm:" << comm_
231  << " wanted size:" << recvBuf_.capacity()
232  << Foam::endl;
233  }
234 
235  // Fallback value
237 
239  {
240  // Non-blocking
241  // ~~~~~~~~~~~~
242  // No chance of probing for size nor relying on the returned message
243  // size (since it returns immediately without any further checks)
244  //
245  // Fortunately there are not many (any?) places that are using
246  // a non-blocking IPstream with streaming anyhow.
247 
249  }
250  else if (!recvBuf_.capacity())
251  {
252  // No buffer size allocated/specified - probe size of incoming message
254 
255  MPI_Status status;
256 
257  MPI_Probe
258  (
259  fromProcNo_,
260  tag_,
262  &status
263  );
264 
266 
267 
268  #ifdef Pstream_use_MPI_Get_count
269  int count(0);
270  MPI_Get_count(&status, MPI_BYTE, &count);
271  #else
272  MPI_Count count(0);
273  MPI_Get_elements_x(&status, MPI_BYTE, &count);
274  #endif
275 
276  // Errors
277  if (count == MPI_UNDEFINED || int64_t(count) < 0)
278  {
280  << "MPI_Get_count() or MPI_Get_elements_x() : "
281  "returned undefined or negative value"
283  }
284  else if (int64_t(count) > int64_t(UList<char>::max_size()))
285  {
287  << "MPI_Get_count() or MPI_Get_elements_x() : "
288  "count is larger than UList<char>::max_size() bytes"
290  }
291 
292  if (UPstream::debug)
293  {
294  Perr<< "UIPstream::UIPstream : probed size:"
295  << label(count) << Foam::endl;
296  }
297 
298  recvBuf_.resize(label(count));
299  messageSize_ = label(count);
300  }
301 
302  std::streamsize count = UPstream_mpi_receive
303  (
304  commsType(),
305  recvBuf_.data(),
306  messageSize_, // The expected size
307  fromProcNo_,
308  tag_,
309  comm_,
310  nullptr // UPstream::Request
311  );
312 
313  if (count < 0)
314  {
316  << "MPI_recv() with negative size??"
318  }
319  else if (int64_t(count) > int64_t(UList<char>::max_size()))
320  {
322  << "MPI_recv() larger than "
323  "UList<char>::max_size() bytes"
325  }
326 
327  // Set addressed size. Leave actual allocated memory intact.
328  recvBuf_.resize(label(count));
329  messageSize_ = label(count);
330 
331  if (recvBuf_.empty())
332  {
333  setEof();
334  }
335 }
336 
337 
338 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
339 
340 std::streamsize Foam::UIPstream::read
341 (
342  const UPstream::commsTypes commsType,
343  const int fromProcNo,
344  char* buf,
345  const std::streamsize bufSize,
346  const int tag,
347  const label communicator,
348  UPstream::Request* req
349 )
350 {
351  return UPstream_mpi_receive
352  (
353  commsType,
354  buf,
355  bufSize,
356  fromProcNo,
357  tag,
358  communicator,
359  req
360  );
361 }
362 
363 
364 // ************************************************************************* //
static void addProbeTime()
Add time increment to probe time.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:89
commsTypes
Communications types.
Definition: UPstream.H:77
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
bool empty() const noexcept
True if List is empty (ie, size() is zero)
Definition: UList.H:675
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:265
void resize(const label len)
Alter addressable list size, allocating new space if required while recovering old content...
Definition: DynamicListI.H:353
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
const int fromProcNo_
Source rank for the data.
Definition: UIPstream.H:98
void reset_request(UPstream::Request *requestPtr, label *requestIdx=nullptr)
Reset UPstream::Request to null and/or the index of the outstanding request to -1.
label capacity() const noexcept
Size of the underlying storage.
Definition: DynamicList.H:225
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
label messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
static std::streamsize UPstream_mpi_receive(const Foam::UPstream::commsTypes commsType, char *buf, const std::streamsize bufSize, const int fromProcNo, const int tag, const Foam::label communicator, Foam::UPstream::Request *req)
Definition: UIPstreamRead.C:37
An opaque wrapper for MPI_Request with a vendor-independent representation without any <mpi...
Definition: UPstream.H:1741
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:426
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1284
static void beginTiming()
Update timer prior to measurement.
int debug
Static debugging option.
static void addRequestTime()
Add time increment to request time.
void push_request(MPI_Request request, UPstream::Request *requestPtr=nullptr, label *requestIdx=nullptr)
Transcribe MPI_Request to UPstream::Request (does not affect the stack of outstanding requests) or el...
DynamicList< char > & recvBuf_
Reference to the receive buffer data.
Definition: UIPstream.H:129
const int tag_
Message tag for communication.
Definition: UIPstream.H:103
static void addGatherTime()
Add time increment to gather time.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static void printStack(Ostream &os, int size=-1)
Helper function to print a stack, with optional upper limit.
void setEof() noexcept
Set stream state as reached &#39;eof&#39;.
Definition: IOstream.H:453
"buffered" : (MPI_Bsend, MPI_Recv)
const int comm_
The communicator index.
Definition: UIPstream.H:108
Namespace for OpenFOAM.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35