UIPstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2013 OpenFOAM Foundation
9  Copyright (C) 2017-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::UIPstreamBase
29 
30 Description
31  Base class for input inter-processor communications stream
32  (ie, parallel streams).
33  Not to be used directly, thus contructors are protected.
34 
35 SourceFiles
36  UIPstreamBase.C
37 
38 \*---------------------------------------------------------------------------*/
39 
40 #include "Pstream.H"
41 
42 #ifndef Foam_UIPstream_H
43 #define Foam_UIPstream_H
44 
45 #include "UPstream.H"
46 #include "Istream.H"
47 #include "DynamicList.H"
48 #include "PstreamBuffers.H"
49 
50 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
51 
52 namespace Foam
53 {
54 
55 /*---------------------------------------------------------------------------*\
56  Class UIPstreamBase Declaration
57 \*---------------------------------------------------------------------------*/
58 
59 class UIPstreamBase
60 :
61  public UPstream,
62  public Istream
63 {
64  // Private Member Functions
65 
66  //- Check buffer position against messageSize_ for EOF
67  inline void checkEof();
68 
69  //- Prepare receive buffer by adjusting alignment
70  inline void prepareBuffer(const size_t align);
71 
72  //- Read a T from the receive buffer
73  template<class T>
74  inline void readFromBuffer(T& val);
75 
76  //- Read count bytes of data from the receive buffer.
77  // Prior data alignment is done by prepareBuffer
78  // Reading into a null pointer behaves like a forward seek
79  inline void readFromBuffer(void* data, const size_t count);
80 
81  //- Read string length and string content
82  inline Istream& readString(std::string& str);
83 
84 
85 protected:
86 
87  // Protected Data
88 
89  //- Source rank for the data
90  const int fromProcNo_;
91 
92  //- Message tag for communication
93  const int tag_;
94 
95  //- The communicator index
96  const int comm_;
97 
98  //- The message size, read on bufferIPCrecv or set directly
99  int messageSize_;
100 
101  //- Receive position in buffer data, if ony
102  //- If there is no external location for recvBufPos_
104 
105  //- Clear the receive buffer on termination (in the destructor)
106  const bool clearAtEnd_;
107 
108  //- Reference to the receive buffer data
110 
111  //- Reference to the receive position in buffer data
112  label& recvBufPos_;
114 
115  // Protected Constructors
116 
117  //- Construct given process index to read from using the given
118  //- attached receive buffer, optional communication characteristics
119  //- and IO format
121  (
123  const int fromProcNo,
124  DynamicList<char>& receiveBuf,
125  label& receiveBufPosition,
126  const int tag = UPstream::msgType(),
127  const label comm = UPstream::worldComm,
128  const bool clearAtEnd = false, // destroy receiveBuf if at end
130  );
131 
132  //- Construct given buffers
133  UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers);
135  //- Construct for an externally obtained buffer.
136  // The parameter is allowed to be const (since reading will not
137  // affect it), but must reference a concrete variable.
139  (
140  const DynamicList<char>& receiveBuf,
142  );
143 
144 public:
145 
146  //- Destructor. Optionally clears external receive buffer.
147  virtual ~UIPstreamBase();
148 
149 
150  // Member Functions
151 
152  // Stream State Functions
153 
154  //- Return stream flags
155  virtual ios_base::fmtflags flags() const override
156  {
157  return ios_base::fmtflags(0);
158  }
159 
160  //- Set flags of stream flags
161  virtual ios_base::fmtflags flags(const ios_base::fmtflags) override
162  {
163  return ios_base::fmtflags(0);
164  }
165 
166 
167  // Read Functions
168 
169  //- Return next token from stream
170  virtual Istream& read(token&) override;
171 
172  //- Read a character
173  virtual Istream& read(char& c) override;
174 
175  //- Read a word
176  virtual Istream& read(word& str) override;
177 
178  // Read a string
179  virtual Istream& read(string& str) override;
180 
181  //- Read a label
182  virtual Istream& read(label& val) override;
183 
184  //- Read a float
185  virtual Istream& read(float& val) override;
186 
187  //- Read a double
188  virtual Istream& read(double& val) override;
189 
190  //- Read binary block with 8-byte alignment.
191  //- Reading into a null pointer behaves like a forward seek of
192  //- count characters.
193  virtual Istream& read(char* data, std::streamsize count) override;
194 
195  //- Low-level raw binary read.
196  //- Reading into a null pointer behaves like a forward seek of
197  //- count characters.
198  virtual Istream& readRaw(char* data, std::streamsize count) override;
199 
200  //- Start of low-level raw binary read
201  virtual bool beginRawRead() override;
202 
203  //- End of low-level raw binary read
204  virtual bool endRawRead() override { return true; }
205 
206 
207  // Positioning
208 
209  //- Rewind the receive stream position so that it may be read again
210  virtual void rewind() override;
211 
212 
213  // Print
214 
215  //- Print stream description to Ostream
216  void print(Ostream& os) const override;
217 };
218 
219 
220 /*---------------------------------------------------------------------------*\
221  Class UIPstream Declaration
222 \*---------------------------------------------------------------------------*/
223 
224 //- Input inter-processor communications stream
225 //- using MPI send/recv etc. - operating on external buffer.
226 class UIPstream
227 :
228  public UIPstreamBase
229 {
230  // Private Member Functions
231 
232  //- Initial buffer recv, called by constructor (blocking | scheduled)
233  void bufferIPCrecv();
234 
235 
236 public:
237 
238  // Constructors
239 
240  //- Construct given process index to read from using the given
241  //- attached receive buffer, optional communication characteristics
242  //- and IO format
243  UIPstream
244  (
246  const int fromProcNo,
247  DynamicList<char>& receiveBuf,
248  label& receiveBufPosition,
249  const int tag = UPstream::msgType(),
250  const label comm = UPstream::worldComm,
251  const bool clearAtEnd = false, // destroy receiveBuf if at end
253  );
254 
255  //- Construct given buffers
256  UIPstream(const int fromProcNo, PstreamBuffers& buffers);
257 
258  //- Construct for reading from a standalone buffer that has
259  //- been obtained externally by the caller.
260  // The parameter is allowed to be const (since reading will not
261  // affect it), but must reference a concrete variable.
262  explicit UIPstream
263  (
264  const DynamicList<char>& recvBuf,
266  );
267 
268 
269  //- Destructor
270  virtual ~UIPstream() = default;
271 
272 
273  // Member Functions
274 
275  //- Use all read methods from base
276  using UIPstreamBase::read;
277 
278 
279  // Static Functions
280 
281  //- Read buffer contents from given processor.
282  // \return the message size (bytes read)
283  static label read
284  (
286  const int fromProcNo,
287  char* buf,
288  const std::streamsize bufSize,
289  const int tag = UPstream::msgType(),
290  const label comm = UPstream::worldComm,
292  UPstream::Request* req = nullptr
293  );
294 
295  //- Read buffer contents (non-blocking) from given processor.
296  // \return the message size (bytes read)
297  inline static label read
298  (
300  UPstream::Request& req,
301  const int fromProcNo,
302  char* buf,
303  const std::streamsize bufSize,
304  const int tag = UPstream::msgType(),
305  const label comm = UPstream::worldComm
306  )
307  {
308  return UIPstream::read
309  (
311  fromProcNo,
312  buf,
313  bufSize,
314  tag,
315  comm,
316  &req
317  );
318  }
319 
320  //- Read into UList storage from given processor.
321  // Only valid for contiguous data types.
322  // \return the message size (bytes read). May change in the future
323  template<class Type>
324  inline static label read
325  (
327  const int fromProcNo,
328  UList<Type>& buffer,
329  const int tag = UPstream::msgType(),
330  const label comm = UPstream::worldComm,
332  UPstream::Request* req = nullptr
333  )
334  {
335  return UIPstream::read
336  (
337  commsType,
338  fromProcNo,
339  buffer.data_bytes(),
340  buffer.size_bytes(),
341  tag,
342  comm,
343  req
344  );
345  }
346 
347  //- Read into SubList storage from given processor.
348  // Only valid for contiguous data types.
349  // \return the message size (bytes read). May change in the future
350  template<class Type>
351  inline static label read
352  (
354  const int fromProcNo,
355  SubList<Type> buffer, // passed by shallow copy
356  const int tag = UPstream::msgType(),
357  const label comm = UPstream::worldComm,
359  UPstream::Request* req = nullptr
360  )
361  {
362  return UIPstream::read
363  (
364  commsType,
365  fromProcNo,
366  buffer.data_bytes(),
367  buffer.size_bytes(),
368  tag,
369  comm,
370  req
371  );
372  }
373 
374  //- Read into UList storage (non-blocking) from given processor.
375  // Only valid for contiguous data types.
376  // \return the message size (bytes read). May change in the future
377  template<class Type>
378  inline static label read
379  (
381  UPstream::Request& req,
382  const int fromProcNo,
383  UList<Type>& buffer,
384  const int tag = UPstream::msgType(),
385  const label comm = UPstream::worldComm
386  )
387  {
388  return UIPstream::read
389  (
391  fromProcNo,
392  buffer.data_bytes(),
393  buffer.size_bytes(),
394  tag,
395  comm,
396  &req
397  );
398  }
399 
400  //- Read into SubList storage (non-blocking) from given processor.
401  // Only valid for contiguous data types.
402  // \return the message size (bytes read). May change in the future
403  template<class Type>
404  inline static label read
405  (
407  UPstream::Request& req,
408  const int fromProcNo,
409  SubList<Type> buffer, // passed by shallow copy
410  const int tag = UPstream::msgType(),
411  const label comm = UPstream::worldComm
412  )
413  {
414  return UIPstream::read
415  (
417  fromProcNo,
418  buffer.data_bytes(),
419  buffer.size_bytes(),
420  tag,
421  comm,
422  &req
423  );
424  }
425 };
426 
427 
428 /*---------------------------------------------------------------------------*\
429  Class UIPBstream Declaration
430 \*---------------------------------------------------------------------------*/
431 
432 //- Input inter-processor communications stream
433 //- using MPI broadcast - operating on external buffer.
434 class UIPBstream
435 :
436  public UIPstreamBase
437 {
438  // Private Member Functions
439 
440  //- Initial buffer recv via broadcast, called by constructor
441  void bufferIPCrecv();
442 
443 
444 public:
445 
446  // Constructors
447 
448  //- Construct given process index to read from using the given
449  //- attached receive buffer, optional communication characteristics
450  //- and IO format
451  UIPBstream
452  (
453  const UPstream::commsTypes,
454  const int rootProcNo,
455  DynamicList<char>& receiveBuf,
456  label& receiveBufPosition,
457  const int tag = UPstream::msgType(),
458  const label comm = UPstream::worldComm,
459  const bool clearAtEnd = false,
461  );
462 
463 
464  //- Destructor
465  virtual ~UIPBstream() = default;
466 
467 
468  // Member Functions
469 
470  //- Use all read methods from base
471  using UIPstreamBase::read;
472 
473 
474  // Static Functions
475 
476  //- Wrapped version of UPstream::broadcast
477  // \return the message size (bytes read)
478  static label read
479  (
480  const int rootProcNo,
481  char* buf,
482  const std::streamsize bufSize,
483  const label comm = UPstream::worldComm
484  );
485 };
486 
487 
488 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
489 
490 } // End namespace Foam
491 
492 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
493 
494 #endif
495 
496 // ************************************************************************* //
virtual bool endRawRead() override
End of low-level raw binary read.
Definition: UIPstream.H:259
UIPstream(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
Definition: IPstreams.C:28
virtual ios_base::fmtflags flags() const override
Return stream flags.
Definition: UIPstream.H:188
UIPstreamBase(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
commsTypes
Communications types.
Definition: UPstream.H:72
static label read(const int rootProcNo, char *buf, const std::streamsize bufSize, const label comm=UPstream::worldComm)
Wrapped version of UPstream::broadcast.
Base class for input inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UIPstream.H:54
virtual bool beginRawRead() override
Start of low-level raw binary read.
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
const int fromProcNo_
Source rank for the data.
Definition: UIPstream.H:98
label & recvBufPos_
Reference to the receive position in buffer data.
Definition: UIPstream.H:134
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1229
int messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
virtual ~UIPBstream()=default
Destructor.
virtual ~UIPstream()=default
Destructor.
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:409
char * data_bytes() noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:286
void print(Ostream &os) const override
Print stream description to Ostream.
label storedRecvBufPos_
Receive position in buffer data, if ony If there is no external location for recvBufPos_.
Definition: UIPstream.H:119
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
An opaque wrapper for MPI_Request with a vendor-independent representation independent of any <mpi...
Definition: UPstream.H:1573
virtual Istream & readRaw(char *data, std::streamsize count) override
Low-level raw binary read. Reading into a null pointer behaves like a forward seek of count character...
UIPBstream(const UPstream::commsTypes, const int rootProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
Definition: IPBstreams.C:28
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1261
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:56
OBJstream os(runTime.globalPath()/outputName)
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
const bool clearAtEnd_
Clear the receive buffer on termination (in the destructor)
Definition: UIPstream.H:124
DynamicList< char > & recvBuf_
Reference to the receive buffer data.
Definition: UIPstream.H:129
virtual void rewind() override
Rewind the receive stream position so that it may be read again.
const int tag_
Message tag for communication.
Definition: UIPstream.H:103
const dimensionedScalar c
Speed of light in a vacuum.
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
const int comm_
The communicator index.
Definition: UIPstream.H:108
Inter-processor communications stream.
Definition: UPstream.H:60
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:293
virtual Istream & read(token &) override
Return next token from stream.
Namespace for OpenFOAM.
Istream(const Istream &)=default
Copy construct.