UIPstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2013 OpenFOAM Foundation
9  Copyright (C) 2017-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::UIPstreamBase
29 
30 Description
31  Base class for input inter-processor communications stream
32  (ie, parallel streams).
33  Not to be used directly, thus contructors are protected.
34 
35 SourceFiles
36  UIPstreamBase.C
37 
38 \*---------------------------------------------------------------------------*/
39 
40 #include "Pstream.H"
41 
42 #ifndef Foam_UIPstream_H
43 #define Foam_UIPstream_H
44 
45 #include "UPstream.H"
46 #include "Istream.H"
47 #include "DynamicList.H"
48 #include "PstreamBuffers.H"
49 
50 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
51 
52 namespace Foam
53 {
54 
55 /*---------------------------------------------------------------------------*\
56  Class UIPstreamBase Declaration
57 \*---------------------------------------------------------------------------*/
58 
59 class UIPstreamBase
60 :
61  public UPstream,
62  public Istream
63 {
64  // Private Member Functions
65 
66  //- Check buffer position against messageSize_ for EOF
67  inline void checkEof();
68 
69  //- Prepare receive buffer by adjusting alignment
70  inline void prepareBuffer(const size_t align);
71 
72  //- Read a T from the receive buffer
73  template<class T>
74  inline void readFromBuffer(T& val);
75 
76  //- Read count bytes of data from the receive buffer.
77  // Prior data alignment is done by prepareBuffer
78  // Reading into a null pointer behaves like a forward seek
79  inline void readFromBuffer(void* data, const size_t count);
80 
81  //- Read string length and string content
82  inline Istream& readString(std::string& str);
83 
84 
85 protected:
86 
87  // Protected Data
88 
89  //- Source rank for the data
90  const int fromProcNo_;
91 
92  //- Message tag for communication
93  const int tag_;
94 
95  //- The communicator index
96  const int comm_;
97 
98  //- The message size, read on bufferIPCrecv or set directly
99  int messageSize_;
100 
101  //- Receive position in buffer data, if ony
102  //- If there is no external location for recvBufPos_
104 
105  //- Clear the receive buffer on termination (in the destructor)
106  const bool clearAtEnd_;
107 
108  //- Reference to the receive buffer data
110 
111  //- Reference to the receive position in buffer data
112  label& recvBufPos_;
114 
115  // Protected Constructors
116 
117  //- Construct given process index to read from using the given
118  //- attached receive buffer, optional communication characteristics
119  //- and IO format
121  (
123  const int fromProcNo,
124  DynamicList<char>& receiveBuf,
125  label& receiveBufPosition,
126  const int tag = UPstream::msgType(),
127  const label comm = UPstream::worldComm,
128  const bool clearAtEnd = false, // destroy receiveBuf if at end
130  );
131 
132  //- Construct given buffers
133  UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers);
135  //- Construct for an externally obtained buffer.
136  // The parameter is allowed to be const (since reading will not
137  // affect it), but must reference a concrete variable.
139  (
140  const DynamicList<char>& receiveBuf,
142  );
143 
144 public:
145 
146  //- Destructor. Optionally clears external receive buffer.
147  virtual ~UIPstreamBase();
148 
149 
150  // Member Functions
151 
152  // Inquiry
153 
154  //- Return flags of output stream
155  virtual ios_base::fmtflags flags() const
156  {
157  return ios_base::fmtflags(0);
158  }
159 
160 
161  // Read Functions
162 
163  //- Return next token from stream
164  Istream& read(token& t);
165 
166  //- Read a character
167  Istream& read(char& c);
168 
169  //- Read a word
170  Istream& read(word& str);
171 
172  // Read a string
173  Istream& read(string& str);
174 
175  //- Read a label
176  Istream& read(label& val);
177 
178  //- Read a float
179  Istream& read(float& val);
180 
181  //- Read a double
182  Istream& read(double& val);
183 
184  //- Read binary block with 8-byte alignment.
185  //- Reading into a null pointer behaves like a forward seek of
186  //- count characters.
187  Istream& read(char* data, std::streamsize count);
189  //- Low-level raw binary read.
190  //- Reading into a null pointer behaves like a forward seek of
191  //- count characters.
192  Istream& readRaw(char* data, std::streamsize count);
193 
194  //- Start of low-level raw binary read
195  bool beginRawRead();
196 
197  //- End of low-level raw binary read
198  bool endRawRead()
199  {
200  return true;
201  }
202 
203 
204  // Positioning
205 
206  //- Rewind the receive stream position so that it may be read again
207  virtual void rewind();
208 
209 
210  // Edit
211 
212  //- Set flags of stream
213  virtual ios_base::fmtflags flags(const ios_base::fmtflags)
214  {
215  return ios_base::fmtflags(0);
216  }
217 
218 
219  // Print
220 
221  //- Print stream description to Ostream
222  void print(Ostream& os) const;
223 };
224 
225 
226 /*---------------------------------------------------------------------------*\
227  Class UIPstream Declaration
228 \*---------------------------------------------------------------------------*/
229 
230 //- Input inter-processor communications stream
231 //- using MPI send/recv etc. - operating on external buffer.
232 class UIPstream
233 :
234  public UIPstreamBase
235 {
236  // Private Member Functions
237 
238  //- Initial buffer recv, called by constructor (blocking | scheduled)
239  void bufferIPCrecv();
240 
241 
242 public:
243 
244  // Constructors
245 
246  //- Construct given process index to read from using the given
247  //- attached receive buffer, optional communication characteristics
248  //- and IO format
249  UIPstream
250  (
252  const int fromProcNo,
253  DynamicList<char>& receiveBuf,
254  label& receiveBufPosition,
255  const int tag = UPstream::msgType(),
256  const label comm = UPstream::worldComm,
257  const bool clearAtEnd = false, // destroy receiveBuf if at end
259  );
260 
261  //- Construct given buffers
262  UIPstream(const int fromProcNo, PstreamBuffers& buffers);
263 
264  //- Construct for reading from a standalone buffer that has
265  //- been obtained externally by the caller.
266  // The parameter is allowed to be const (since reading will not
267  // affect it), but must reference a concrete variable.
268  explicit UIPstream
269  (
270  const DynamicList<char>& recvBuf,
272  );
273 
274 
275  //- Destructor
276  virtual ~UIPstream() = default;
277 
278 
279  // Member Functions
280 
281  //- Use all read methods from base
282  using UIPstreamBase::read;
283 
284 
285  // Static Functions
286 
287  //- Read buffer contents from given processor.
288  // \return the message size (bytes read)
289  static label read
290  (
292  const int fromProcNo,
293  char* buf,
294  const std::streamsize bufSize,
295  const int tag = UPstream::msgType(),
296  const label comm = UPstream::worldComm,
298  UPstream::Request* req = nullptr
299  );
300 
301  //- Read buffer contents (non-blocking) from given processor.
302  // \return the message size (bytes read)
303  inline static label read
304  (
306  UPstream::Request& req,
307  const int fromProcNo,
308  char* buf,
309  const std::streamsize bufSize,
310  const int tag = UPstream::msgType(),
311  const label comm = UPstream::worldComm
312  )
313  {
314  return UIPstream::read
315  (
317  fromProcNo,
318  buf,
319  bufSize,
320  tag,
321  comm,
322  &req
323  );
324  }
325 
326  //- Read into UList storage from given processor.
327  // Only valid for contiguous data types.
328  // \return the message size (bytes read). May change in the future
329  template<class Type>
330  inline static label read
331  (
333  const int fromProcNo,
334  UList<Type>& buffer,
335  const int tag = UPstream::msgType(),
336  const label comm = UPstream::worldComm,
338  UPstream::Request* req = nullptr
339  )
340  {
341  return UIPstream::read
342  (
343  commsType,
344  fromProcNo,
345  buffer.data_bytes(),
346  buffer.size_bytes(),
347  tag,
348  comm,
349  req
350  );
351  }
352 
353  //- Read into SubList storage from given processor.
354  // Only valid for contiguous data types.
355  // \return the message size (bytes read). May change in the future
356  template<class Type>
357  inline static label read
358  (
360  const int fromProcNo,
361  SubList<Type> buffer, // passed by shallow copy
362  const int tag = UPstream::msgType(),
363  const label comm = UPstream::worldComm,
365  UPstream::Request* req = nullptr
366  )
367  {
368  return UIPstream::read
369  (
370  commsType,
371  fromProcNo,
372  buffer.data_bytes(),
373  buffer.size_bytes(),
374  tag,
375  comm,
376  req
377  );
378  }
379 
380  //- Read into UList storage (non-blocking) from given processor.
381  // Only valid for contiguous data types.
382  // \return the message size (bytes read). May change in the future
383  template<class Type>
384  inline static label read
385  (
387  UPstream::Request& req,
388  const int fromProcNo,
389  UList<Type>& buffer,
390  const int tag = UPstream::msgType(),
391  const label comm = UPstream::worldComm
392  )
393  {
394  return UIPstream::read
395  (
397  fromProcNo,
398  buffer.data_bytes(),
399  buffer.size_bytes(),
400  tag,
401  comm,
402  &req
403  );
404  }
405 
406  //- Read into SubList storage (non-blocking) from given processor.
407  // Only valid for contiguous data types.
408  // \return the message size (bytes read). May change in the future
409  template<class Type>
410  inline static label read
411  (
413  UPstream::Request& req,
414  const int fromProcNo,
415  SubList<Type> buffer, // passed by shallow copy
416  const int tag = UPstream::msgType(),
417  const label comm = UPstream::worldComm
418  )
419  {
420  return UIPstream::read
421  (
423  fromProcNo,
424  buffer.data_bytes(),
425  buffer.size_bytes(),
426  tag,
427  comm,
428  &req
429  );
430  }
431 };
432 
433 
434 /*---------------------------------------------------------------------------*\
435  Class UIPBstream Declaration
436 \*---------------------------------------------------------------------------*/
437 
438 //- Input inter-processor communications stream
439 //- using MPI broadcast - operating on external buffer.
440 class UIPBstream
441 :
442  public UIPstreamBase
443 {
444  // Private Member Functions
445 
446  //- Initial buffer recv via broadcast, called by constructor
447  void bufferIPCrecv();
448 
449 
450 public:
451 
452  // Constructors
453 
454  //- Construct given process index to read from using the given
455  //- attached receive buffer, optional communication characteristics
456  //- and IO format
457  UIPBstream
458  (
459  const UPstream::commsTypes,
460  const int rootProcNo,
461  DynamicList<char>& receiveBuf,
462  label& receiveBufPosition,
463  const int tag = UPstream::msgType(),
464  const label comm = UPstream::worldComm,
465  const bool clearAtEnd = false,
467  );
468 
469 
470  //- Destructor
471  virtual ~UIPBstream() = default;
472 
473 
474  // Member Functions
475 
476  //- Use all read methods from base
477  using UIPstreamBase::read;
478 
479 
480  // Static Functions
481 
482  //- Wrapped version of UPstream::broadcast
483  // \return the message size (bytes read)
484  static label read
485  (
486  const int rootProcNo,
487  char* buf,
488  const std::streamsize bufSize,
489  const label comm = UPstream::worldComm
490  );
491 };
492 
493 
494 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
495 
496 } // End namespace Foam
497 
498 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
499 
500 #endif
501 
502 // ************************************************************************* //
UIPstream(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
Definition: IPstreams.C:28
UIPstreamBase(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
commsTypes
Communications types.
Definition: UPstream.H:74
static label read(const int rootProcNo, char *buf, const std::streamsize bufSize, const label comm=UPstream::worldComm)
Wrapped version of UPstream::broadcast.
Base class for input inter-processor communications stream (ie, parallel streams). Not to be used directly, thus contructors are protected.
Definition: UIPstream.H:54
bool beginRawRead()
Start of low-level raw binary read.
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
bool endRawRead()
End of low-level raw binary read.
Definition: UIPstream.H:251
A token holds an item read from Istream.
Definition: token.H:64
const int fromProcNo_
Source rank for the data.
Definition: UIPstream.H:98
label & recvBufPos_
Reference to the receive position in buffer data.
Definition: UIPstream.H:134
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1184
int messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
virtual ~UIPBstream()=default
Destructor.
virtual ~UIPstream()=default
Destructor.
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:411
char * data_bytes() noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:243
label storedRecvBufPos_
Receive position in buffer data, if ony If there is no external location for recvBufPos_.
Definition: UIPstream.H:119
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
Istream & read(token &t)
Return next token from stream.
virtual ios_base::fmtflags flags() const
Return flags of output stream.
Definition: UIPstream.H:188
An opaque wrapper for MPI_Request with a vendor-independent representation independent of any <mpi...
Definition: UPstream.H:1571
A class for handling words, derived from Foam::string.
Definition: word.H:63
UIPBstream(const UPstream::commsTypes, const int rootProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
Definition: IPBstreams.C:28
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1192
OBJstream os(runTime.globalPath()/outputName)
Database for solution data, solver performance and other reduced data.
Definition: data.H:52
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
const bool clearAtEnd_
Clear the receive buffer on termination (in the destructor)
Definition: UIPstream.H:124
DynamicList< char > & recvBuf_
Reference to the receive buffer data.
Definition: UIPstream.H:129
const int tag_
Message tag for communication.
Definition: UIPstream.H:103
const dimensionedScalar c
Speed of light in a vacuum.
streamFormat
Data format (ascii | binary)
"nonBlocking" : (MPI_Isend, MPI_Irecv)
Istream & readRaw(char *data, std::streamsize count)
Low-level raw binary read. Reading into a null pointer behaves like a forward seek of count character...
void print(Ostream &os) const
Print stream description to Ostream.
virtual void rewind()
Rewind the receive stream position so that it may be read again.
const int comm_
The communicator index.
Definition: UIPstream.H:108
Inter-processor communications stream.
Definition: UPstream.H:62
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:250
Namespace for OpenFOAM.