UIPstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2013 OpenFOAM Foundation
9  Copyright (C) 2017-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::UIPstreamBase
29 
30 Description
31  Base class for input inter-processor communications stream
32  (ie, parallel streams).
33  Not to be used directly, thus contructors are protected.
34 
35 SourceFiles
36  UIPstreamBase.C
37 
38 \*---------------------------------------------------------------------------*/
39 
40 #include "Pstream.H"
41 
42 #ifndef Foam_UIPstream_H
43 #define Foam_UIPstream_H
44 
45 #include "UPstream.H"
46 #include "Istream.H"
47 #include "DynamicList.H"
48 #include "PstreamBuffers.H"
49 
50 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
51 
52 namespace Foam
53 {
54 
55 /*---------------------------------------------------------------------------*\
56  Class UIPstreamBase Declaration
57 \*---------------------------------------------------------------------------*/
58 
59 class UIPstreamBase
60 :
61  public UPstream,
62  public Istream
63 {
64  // Private Member Functions
65 
66  //- Check buffer position against messageSize_ for EOF
67  inline void checkEof();
68 
69  //- Prepare receive buffer by adjusting alignment
70  inline void prepareBuffer(const size_t align);
71 
72  //- Read a T from the receive buffer
73  template<class T>
74  inline void readFromBuffer(T& val);
75 
76  //- Read count bytes of data from the receive buffer.
77  // Prior data alignment is done by prepareBuffer
78  // Reading into a null pointer behaves like a forward seek
79  inline void readFromBuffer(void* data, const size_t count);
80 
81  //- Read string length and string content
82  inline Istream& readString(std::string& str);
83 
84 
85 protected:
86 
87  // Protected Data
88 
89  //- Source rank for the data
90  const int fromProcNo_;
91 
92  //- Message tag for communication
93  const int tag_;
94 
95  //- The communicator index
96  const int comm_;
97 
98  //- The message size, read on bufferIPCrecv or set directly
99  label messageSize_;
100 
101  //- Receive position in buffer data, if ony
102  //- If there is no external location for recvBufPos_
104 
105  //- Clear the receive buffer on termination (in the destructor)
106  const bool clearAtEnd_;
107 
108  //- Reference to the receive buffer data
110 
111  //- Reference to the receive position in buffer data
112  label& recvBufPos_;
114 
115  // Protected Constructors
116 
117  //- Construct given process index to read from using the given
118  //- attached receive buffer, optional communication characteristics
119  //- and IO format
121  (
123  const int fromProcNo,
124  DynamicList<char>& receiveBuf,
125  label& receiveBufPosition,
126  const int tag = UPstream::msgType(),
127  const label comm = UPstream::worldComm,
128  const bool clearAtEnd = false, // destroy receiveBuf if at end
130  );
131 
132  //- Construct given buffers
133  UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers);
135  //- Construct for an externally obtained buffer.
136  // The parameter is allowed to be const (since reading will not
137  // affect it), but must reference a concrete variable.
139  (
140  const DynamicList<char>& receiveBuf,
142  );
143 
144 public:
145 
146  //- Destructor. Optionally clears external receive buffer.
147  virtual ~UIPstreamBase();
148 
149 
150  // Member Functions
151 
152  // Stream State Functions
153 
154  //- Return current stream flags.
155  //- Dummy for parallel stream, returns 0.
156  virtual std::ios_base::fmtflags flags() const override
157  {
158  return std::ios_base::fmtflags(0);
159  }
160 
161  //- Set stream flags, return old stream flags.
162  //- Dummy for parallel stream, returns 0.
163  virtual std::ios_base::fmtflags flags(std::ios_base::fmtflags) override
164  {
165  return std::ios_base::fmtflags(0);
166  }
167 
168 
169  // Read Functions
170 
171  //- Return next token from stream
172  virtual Istream& read(token&) override;
173 
174  //- Read a character
175  virtual Istream& read(char& c) override;
176 
177  //- Read a word
178  virtual Istream& read(word& str) override;
179 
180  // Read a string
181  virtual Istream& read(string& str) override;
182 
183  //- Read a label
184  virtual Istream& read(label& val) override;
185 
186  //- Read a float
187  virtual Istream& read(float& val) override;
188 
189  //- Read a double
190  virtual Istream& read(double& val) override;
191 
192  //- Read binary block with 8-byte alignment.
193  //- Reading into a null pointer behaves like a forward seek of
194  //- count characters.
195  virtual Istream& read(char* data, std::streamsize count) override;
196 
197  //- Low-level raw binary read.
198  //- Reading into a null pointer behaves like a forward seek of
199  //- count characters.
200  virtual Istream& readRaw(char* data, std::streamsize count) override;
201 
202  //- Start of low-level raw binary read
203  virtual bool beginRawRead() override;
204 
205  //- End of low-level raw binary read
206  virtual bool endRawRead() override { return true; }
207 
208 
209  // Positioning
210 
211  //- Rewind the receive stream position so that it may be read again
212  virtual void rewind() override;
213 
214 
215  // Print
216 
217  //- Print stream description to Ostream
218  void print(Ostream& os) const override;
219 };
220 
221 
222 /*---------------------------------------------------------------------------*\
223  Class UIPstream Declaration
224 \*---------------------------------------------------------------------------*/
225 
226 //- Input inter-processor communications stream
227 //- using MPI send/recv etc. - operating on external buffer.
228 class UIPstream
229 :
230  public UIPstreamBase
231 {
232  // Private Member Functions
233 
234  //- Initial buffer recv, called by constructor (blocking | scheduled)
235  void bufferIPCrecv();
236 
237 
238 public:
239 
240  // Constructors
241 
242  //- Construct given process index to read from using the given
243  //- attached receive buffer, optional communication characteristics
244  //- and IO format
245  UIPstream
246  (
248  const int fromProcNo,
249  DynamicList<char>& receiveBuf,
250  label& receiveBufPosition,
251  const int tag = UPstream::msgType(),
252  const label comm = UPstream::worldComm,
253  const bool clearAtEnd = false, // destroy receiveBuf if at end
255  );
256 
257  //- Construct given buffers
258  UIPstream(const int fromProcNo, PstreamBuffers& buffers);
259 
260  //- Construct for reading from a standalone buffer that has
261  //- been obtained externally by the caller.
262  // The parameter is allowed to be const (since reading will not
263  // affect it), but must reference a concrete variable.
264  explicit UIPstream
265  (
266  const DynamicList<char>& recvBuf,
268  );
269 
270 
271  //- Destructor
272  virtual ~UIPstream() = default;
273 
274 
275  // Member Functions
276 
277  //- Use all read methods from base
278  using UIPstreamBase::read;
279 
280 
281  // Static Functions
282 
283  //- Read buffer contents from given processor.
284  // \return the message size (bytes read). May change in the future
285  static std::streamsize read
286  (
288  const int fromProcNo,
289  char* buf,
290  const std::streamsize bufSize,
291  const int tag = UPstream::msgType(),
292  const label comm = UPstream::worldComm,
294  UPstream::Request* req = nullptr
295  );
296 
297  //- Read buffer contents (non-blocking) from given processor.
298  // \return the message size (bytes read). May change in the future
299  inline static std::streamsize read
300  (
302  UPstream::Request& req,
303  const int fromProcNo,
304  char* buf,
305  const std::streamsize bufSize,
306  const int tag = UPstream::msgType(),
307  const label comm = UPstream::worldComm
308  )
309  {
310  return UIPstream::read
311  (
313  fromProcNo,
314  buf,
315  bufSize,
316  tag,
317  comm,
318  &req
319  );
320  }
321 
322  //- Read into UList storage from given processor.
323  // Only valid for contiguous data types.
324  // \return the message size (bytes read). May change in the future
325  template<class Type>
326  inline static std::streamsize read
327  (
329  const int fromProcNo,
330  UList<Type>& buffer,
331  const int tag = UPstream::msgType(),
332  const label comm = UPstream::worldComm,
334  UPstream::Request* req = nullptr
335  )
336  {
337  return UIPstream::read
338  (
339  commsType,
340  fromProcNo,
341  buffer.data_bytes(),
342  buffer.size_bytes(),
343  tag,
344  comm,
345  req
346  );
347  }
348 
349  //- Read into SubList storage from given processor.
350  // Only valid for contiguous data types.
351  // \return the message size (bytes read). May change in the future
352  template<class Type>
353  inline static std::streamsize read
354  (
356  const int fromProcNo,
357  SubList<Type> buffer, // passed by shallow copy
358  const int tag = UPstream::msgType(),
359  const label comm = UPstream::worldComm,
361  UPstream::Request* req = nullptr
362  )
363  {
364  return UIPstream::read
365  (
366  commsType,
367  fromProcNo,
368  buffer.data_bytes(),
369  buffer.size_bytes(),
370  tag,
371  comm,
372  req
373  );
374  }
375 
376  //- Read into UList storage (non-blocking) from given processor.
377  // Only valid for contiguous data types.
378  // \return the message size (bytes read). May change in the future
379  template<class Type>
380  inline static std::streamsize read
381  (
383  UPstream::Request& req,
384  const int fromProcNo,
385  UList<Type>& buffer,
386  const int tag = UPstream::msgType(),
387  const label comm = UPstream::worldComm
388  )
389  {
390  return UIPstream::read
391  (
393  fromProcNo,
394  buffer.data_bytes(),
395  buffer.size_bytes(),
396  tag,
397  comm,
398  &req
399  );
400  }
401 
402  //- Read into SubList storage (non-blocking) from given processor.
403  // Only valid for contiguous data types.
404  // \return the message size (bytes read). May change in the future
405  template<class Type>
406  inline static std::streamsize read
407  (
409  UPstream::Request& req,
410  const int fromProcNo,
411  SubList<Type> buffer, // passed by shallow copy
412  const int tag = UPstream::msgType(),
413  const label comm = UPstream::worldComm
414  )
415  {
416  return UIPstream::read
417  (
419  fromProcNo,
420  buffer.data_bytes(),
421  buffer.size_bytes(),
422  tag,
423  comm,
424  &req
425  );
426  }
427 };
428 
429 
430 /*---------------------------------------------------------------------------*\
431  Class UIPBstream Declaration
432 \*---------------------------------------------------------------------------*/
433 
434 //- Input inter-processor communications stream
435 //- using MPI broadcast - operating on external buffer.
436 class UIPBstream
437 :
438  public UIPstreamBase
439 {
440  // Private Member Functions
441 
442  //- Initial buffer recv via broadcast, called by constructor
443  void bufferIPCrecv();
444 
445 
446 public:
447 
448  // Constructors
449 
450  //- Construct given process index to read from using the given
451  //- attached receive buffer, optional communication characteristics
452  //- and IO format
453  UIPBstream
454  (
455  const UPstream::commsTypes,
456  const int rootProcNo,
457  DynamicList<char>& receiveBuf,
458  label& receiveBufPosition,
459  const int tag = UPstream::msgType(),
460  const label comm = UPstream::worldComm,
461  const bool clearAtEnd = false,
463  );
464 
465 
466  //- Destructor
467  virtual ~UIPBstream() = default;
468 
469 
470  // Member Functions
471 
472  //- Use all read methods from base
473  using UIPstreamBase::read;
474 
475 
476  // Static Functions
477 
478  //- Wrapped version of UPstream::broadcast
479  // \return the message size (bytes read). May change in the future
480  static std::streamsize read
481  (
482  const int rootProcNo,
483  char* buf,
484  const std::streamsize bufSize,
485  const label comm = UPstream::worldComm
486  );
487 };
488 
489 
490 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
491 
492 } // End namespace Foam
493 
494 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
495 
496 #endif
497 
498 // ************************************************************************* //
virtual bool endRawRead() override
End of low-level raw binary read.
Definition: UIPstream.H:261
UIPstream(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
Definition: IPstreams.C:28
UIPstreamBase(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
commsTypes
Communications types.
Definition: UPstream.H:77
Base class for input inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UIPstream.H:54
virtual bool beginRawRead() override
Start of low-level raw binary read.
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
const int fromProcNo_
Source rank for the data.
Definition: UIPstream.H:98
label & recvBufPos_
Reference to the receive position in buffer data.
Definition: UIPstream.H:134
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1252
virtual ~UIPBstream()=default
Destructor.
virtual ~UIPstream()=default
Destructor.
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:421
char * data_bytes() noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:279
void print(Ostream &os) const override
Print stream description to Ostream.
label messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
label storedRecvBufPos_
Receive position in buffer data, if ony If there is no external location for recvBufPos_.
Definition: UIPstream.H:119
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
An opaque wrapper for MPI_Request with a vendor-independent representation without any <mpi...
Definition: UPstream.H:1741
virtual Istream & readRaw(char *data, std::streamsize count) override
Low-level raw binary read. Reading into a null pointer behaves like a forward seek of count character...
static std::streamsize read(const int rootProcNo, char *buf, const std::streamsize bufSize, const label comm=UPstream::worldComm)
Wrapped version of UPstream::broadcast.
UIPBstream(const UPstream::commsTypes, const int rootProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
Definition: IPBstreams.C:28
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1284
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:56
OBJstream os(runTime.globalPath()/outputName)
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
virtual std::ios_base::fmtflags flags() const override
Return current stream flags. Dummy for parallel stream, returns 0.
Definition: UIPstream.H:189
const bool clearAtEnd_
Clear the receive buffer on termination (in the destructor)
Definition: UIPstream.H:124
DynamicList< char > & recvBuf_
Reference to the receive buffer data.
Definition: UIPstream.H:129
virtual void rewind() override
Rewind the receive stream position so that it may be read again.
const int tag_
Message tag for communication.
Definition: UIPstream.H:103
const dimensionedScalar c
Speed of light in a vacuum.
streamFormat
Data format (ascii | binary)
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
const int comm_
The communicator index.
Definition: UIPstream.H:108
Inter-processor communications stream.
Definition: UPstream.H:65
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:286
virtual Istream & read(token &) override
Return next token from stream.
Namespace for OpenFOAM.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
Istream(const Istream &)=default
Copy construct.