UIPstreamRead.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2019-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UIPstream.H"
30 #include "PstreamGlobals.H"
31 #include "profilingPstream.H"
32 #include "IOstreams.H"
33 
34 // FUTURE? probe and receive message
35 // - as of 2023-06 appears to be broken with INTELMPI + PMI-2 (slurm)
36 // and perhaps other places so currently avoid
37 
38 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
39 
40 // General blocking/non-blocking MPI receive, optionally with probed
41 // message information.
42 static Foam::label UPstream_mpi_receive
43 (
44  const Foam::UPstream::commsTypes commsType,
45  char* buf,
46  const std::streamsize bufSize,
47  const int fromProcNo,
48  const int tag,
49  const Foam::label communicator,
51 )
52 {
53  using namespace Foam;
54 
56 
57  if (UPstream::debug)
58  {
59  Pout<< "UIPstream::read : starting read from:" << fromProcNo
60  << " tag:" << tag << " comm:" << communicator
61  << " wanted size:" << label(bufSize)
62  << " commsType:" << UPstream::commsTypeNames[commsType]
63  << Foam::endl;
64  }
65  if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm)
66  {
67  Pout<< "UIPstream::read : starting read from:" << fromProcNo
68  << " tag:" << tag << " comm:" << communicator
69  << " wanted size:" << label(bufSize)
70  << " commsType:" << UPstream::commsTypeNames[commsType]
71  << " warnComm:" << UPstream::warnComm
72  << Foam::endl;
74  }
75 
77 
78  if
79  (
81  || commsType == UPstream::commsTypes::scheduled
82  )
83  {
84  int returnCode = 0;
85  MPI_Status status;
86 
87  {
88  returnCode = MPI_Recv
89  (
90  buf,
91  bufSize,
92  MPI_BYTE,
93  fromProcNo,
94  tag,
96  &status
97  );
98  }
99 
100  if (returnCode != MPI_SUCCESS)
101  {
103  << "MPI_Recv cannot receive incoming message"
105  return 0;
106  }
107 
109 
110  // Check size of message read
111 
112  int messageSize;
113  MPI_Get_count(&status, MPI_BYTE, &messageSize);
114 
115  if (UPstream::debug)
116  {
117  Pout<< "UIPstream::read : finished read from:" << fromProcNo
118  << " tag:" << tag << " read size:" << label(bufSize)
119  << " commsType:" << UPstream::commsTypeNames[commsType]
120  << Foam::endl;
121  }
122 
123  if (messageSize > bufSize)
124  {
126  << "buffer (" << label(bufSize)
127  << ") not large enough for incoming message ("
128  << messageSize << ')'
130  }
131 
132  return messageSize;
133  }
134  else if (commsType == UPstream::commsTypes::nonBlocking)
135  {
136  int returnCode = 0;
137  MPI_Request request;
138 
139  {
140  returnCode = MPI_Irecv
141  (
142  buf,
143  bufSize,
144  MPI_BYTE,
145  fromProcNo,
146  tag,
147  PstreamGlobals::MPICommunicators_[communicator],
148  &request
149  );
150  }
151 
152  if (returnCode != MPI_SUCCESS)
153  {
155  << "MPI_Irecv cannot start non-blocking receive"
157 
158  return 0;
159  }
160 
161  if (UPstream::debug)
162  {
163  Pout<< "UIPstream::read : started read from:" << fromProcNo
164  << " tag:" << tag << " read size:" << label(bufSize)
165  << " commsType:" << UPstream::commsTypeNames[commsType]
166  << " request:" <<
167  (req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
168  << Foam::endl;
169  }
170 
171  PstreamGlobals::push_request(request, req);
173 
174  // Assume the message will be completely received.
175  return bufSize;
176  }
177 
179  << "Unsupported communications type " << int(commsType)
181 
182  return 0;
183 }
184 
185 
186 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
187 
188 void Foam::UIPstream::bufferIPCrecv()
189 {
190  // Called by constructor
191  if (UPstream::debug)
192  {
193  Pout<< "UIPstream IPC read buffer :"
194  << " from:" << fromProcNo_
195  << " tag:" << tag_ << " comm:" << comm_
196  << " wanted size:" << recvBuf_.capacity()
197  << Foam::endl;
198  }
199 
200  // No buffer size allocated/specified - probe size of incoming message
201  if (!recvBuf_.capacity())
202  {
204 
205  MPI_Status status;
206 
207  MPI_Probe
208  (
209  fromProcNo_,
210  tag_,
212  &status
213  );
214 
215  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
216 
218 
220 
221  if (UPstream::debug)
222  {
223  Pout<< "UIPstream::UIPstream : probed size:"
224  << messageSize_ << Foam::endl;
225  }
226  }
227 
229  (
230  commsType(),
231  recvBuf_.data(),
232  recvBuf_.capacity(),
233  fromProcNo_,
234  tag_,
235  comm_,
236  nullptr // UPstream::Request
237  );
238 
239  // Set addressed size. Leave actual allocated memory intact.
241 
242  if (!messageSize_)
243  {
244  setEof();
245  }
246 }
247 
248 
249 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
250 
251 Foam::label Foam::UIPstream::read
252 (
253  const UPstream::commsTypes commsType,
254  const int fromProcNo,
255  char* buf,
256  const std::streamsize bufSize,
257  const int tag,
258  const label communicator,
259  UPstream::Request* req
260 )
261 {
262  return UPstream_mpi_receive
263  (
264  commsType,
265  buf,
266  bufSize,
267  fromProcNo,
268  tag,
269  communicator,
270  req
271  );
272 }
273 
274 
275 // ************************************************************************* //
static void addProbeTime()
Add time increment to probe time.
"blocking" : (MPI_Bsend, MPI_Recv)
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:82
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
commsTypes
Communications types.
Definition: UPstream.H:72
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:272
void resize(const label len)
Alter addressable list size, allocating new space if required while recovering old content...
Definition: DynamicListI.H:353
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
const int fromProcNo_
Source rank for the data.
Definition: UIPstream.H:98
int messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
void reset_request(UPstream::Request *requestPtr, label *requestIdx=nullptr)
Reset UPstream::Request to null and/or the index of the outstanding request to -1.
label capacity() const noexcept
Size of the underlying storage.
Definition: DynamicList.H:225
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
"scheduled" : (MPI_Send, MPI_Recv)
An opaque wrapper for MPI_Request with a vendor-independent representation independent of any <mpi...
Definition: UPstream.H:1573
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:414
errorManip< error > abort(error &err)
Definition: errorManip.H:139
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1261
static void beginTiming()
Update timer prior to measurement.
int debug
Static debugging option.
static void addRequestTime()
Add time increment to request time.
void push_request(MPI_Request request, UPstream::Request *requestPtr=nullptr, label *requestIdx=nullptr)
Transcribe MPI_Request to UPstream::Request (does not affect the stack of outstanding requests) or el...
static Foam::label UPstream_mpi_receive(const Foam::UPstream::commsTypes commsType, char *buf, const std::streamsize bufSize, const int fromProcNo, const int tag, const Foam::label communicator, Foam::UPstream::Request *req)
Definition: UIPstreamRead.C:36
DynamicList< char > & recvBuf_
Reference to the receive buffer data.
Definition: UIPstream.H:129
const int tag_
Message tag for communication.
Definition: UIPstream.H:103
static void addGatherTime()
Add time increment to gather time.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static void printStack(Ostream &os, int size=-1)
Helper function to print a stack, with optional upper limit.
void setEof() noexcept
Set stream state as reached &#39;eof&#39;.
Definition: IOstream.H:443
const int comm_
The communicator index.
Definition: UIPstream.H:108
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Namespace for OpenFOAM.