UIPstreamBase.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2015 OpenFOAM Foundation
9  Copyright (C) 2017-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "error.H"
30 #include "UIPstream.H"
31 #include "int.H"
32 #include "token.H"
33 #include <cctype>
34 
35 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39 
40 // Convert a single character to a word with length 1
41 inline static Foam::word charToWord(char c)
42 {
43  return Foam::word(std::string(1, c), false);
44 }
45 
46 
47 // Adjust stream format based on the flagMask
48 inline static void processFlags(Istream& is, int flagMask)
49 {
50  if ((flagMask & token::ASCII))
51  {
52  is.format(IOstreamOption::ASCII);
53  }
54  else if ((flagMask & token::BINARY))
55  {
56  is.format(IOstreamOption::BINARY);
57  }
58 }
59 
60 
61 // Return the position with word boundary alignment
62 inline static label byteAlign(const label pos, const size_t align)
63 {
64  return
65  (
66  (align > 1)
67  ? (align + ((pos - 1) & ~(align - 1)))
68  : pos
69  );
70 }
71 
72 } // End namespace Foam
73 
74 
75 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
76 
77 inline void Foam::UIPstreamBase::checkEof()
78 {
80  {
81  setEof();
82  }
83 }
84 
85 
86 inline void Foam::UIPstreamBase::prepareBuffer(const size_t align)
87 {
88  recvBufPos_ = byteAlign(recvBufPos_, align);
89 }
90 
91 
92 template<class T>
93 inline void Foam::UIPstreamBase::readFromBuffer(T& val)
94 {
95  prepareBuffer(sizeof(T));
96 
97  val = reinterpret_cast<T&>(recvBuf_[recvBufPos_]);
98  recvBufPos_ += sizeof(T);
99  checkEof();
100 }
101 
102 
103 inline void Foam::UIPstreamBase::readFromBuffer
104 (
105  void* data,
106  const size_t count
107 )
108 {
109  if (data)
110  {
111  const char* const __restrict__ buf = &recvBuf_[recvBufPos_];
112  char* const __restrict__ output = reinterpret_cast<char*>(data);
113 
114  for (size_t i = 0; i < count; ++i)
115  {
116  output[i] = buf[i];
117  }
118  }
119 
120  recvBufPos_ += count;
121  checkEof();
122 }
123 
124 
125 inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str)
126 {
127  // Use std::string::assign() to copy content, including '\0'.
128  // Stripping (when desired) is the responsibility of the sending side.
129 
130  size_t len;
131  readFromBuffer(len);
132 
133  if (len)
134  {
135  str.assign(&recvBuf_[recvBufPos_], len);
136  recvBufPos_ += len;
137  checkEof();
138  }
139  else
140  {
141  str.clear();
142  }
143 
144  return *this;
145 }
146 
147 
148 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
149 
151 (
152  const UPstream::commsTypes commsType,
153  const int fromProcNo,
154  DynamicList<char>& receiveBuf,
155  label& receiveBufPosition,
156  const int tag,
157  const label comm,
158  const bool clearAtEnd,
160 )
161 :
162  UPstream(commsType),
163  Istream(fmt),
164  fromProcNo_(fromProcNo),
165  tag_(tag),
166  comm_(comm),
167  messageSize_(0),
168  storedRecvBufPos_(0),
169  clearAtEnd_(clearAtEnd),
170  recvBuf_(receiveBuf),
171  recvBufPos_(receiveBufPosition)
172 {
173  setOpened();
174  setGood();
175 }
176 
177 
179 (
180  const int fromProcNo,
181  PstreamBuffers& buffers
182 )
183 :
184  UPstream(buffers.commsType()),
185  Istream(buffers.format()),
186  fromProcNo_(fromProcNo),
187  tag_(buffers.tag()),
188  comm_(buffers.comm()),
189  messageSize_(0),
190  storedRecvBufPos_(0),
191  clearAtEnd_(buffers.allowClearRecv()),
192  recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
193  recvBufPos_(buffers.accessRecvPosition(fromProcNo))
194 {
195  if
196  (
198  && !buffers.finished()
199  )
200  {
202  << "PstreamBuffers::finishedSends() never called." << endl
203  << "Please call PstreamBuffers::finishedSends() after doing"
204  << " all your sends (using UOPstream) and before doing any"
205  << " receives (using UIPstream)" << Foam::exit(FatalError);
206  }
208  setOpened();
209  setGood();
210 }
211 
212 
214 (
215  const DynamicList<char>& receiveBuf,
217 )
218 :
219  UPstream(UPstream::commsTypes::nonBlocking), // placeholder
220  Istream(fmt),
221  fromProcNo_(UPstream::masterNo()), // placeholder
222  tag_(UPstream::msgType()), // placeholder
223  comm_(UPstream::commSelf()), // placeholder
224  messageSize_(receiveBuf.size()), // Message == buffer
225  storedRecvBufPos_(0),
226  clearAtEnd_(false), // Do not clear recvBuf if at end!!
227  recvBuf_
228  (
229  // The receive buffer is never modified with this code path
230  const_cast<DynamicList<char>&>(receiveBuf)
231  ),
232  recvBufPos_(storedRecvBufPos_) // Internal reference
233 {
235  setGood();
236 }
237 
238 
239 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
240 
242 {
243  if (clearAtEnd_ && eof())
244  {
245  if (debug)
246  {
247  Pout<< "UIPstreamBase Destructor : tag:" << tag_
248  << " fromProcNo:" << fromProcNo_
249  << " clearing receive buffer of size "
250  << recvBuf_.size()
251  << " messageSize_:" << messageSize_ << endl;
252  }
253  recvBuf_.clearStorage();
254  }
255 }
256 
257 
258 // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
259 
261 {
262  // Return the put back token if it exists
263  // - with additional handling for special stream flags
264  if (Istream::getBack(t))
265  {
266  if (t.isFlag())
267  {
268  processFlags(*this, t.flagToken());
269  }
270  else
271  {
272  return *this;
273  }
274  }
275 
276  // Read character, return on error
277  // - with additional handling for special stream flags
278 
279  char c;
280  do
281  {
282  if (!read(c))
283  {
284  t.setBad(); // Error
285  return *this;
286  }
287 
288  if (c == token::FLAG)
289  {
290  char flagVal;
291 
292  if (read(flagVal))
293  {
294  processFlags(*this, flagVal);
295  }
296  else
297  {
298  t.setBad(); // Error
299  return *this;
300  }
301  }
302  }
303  while (c == token::FLAG);
304 
305 
306  // Set the line number of this token to the current stream line number
307  t.lineNumber(this->lineNumber());
308 
309  // Analyse input starting with this character.
310  switch (c)
311  {
312  // Punctuation
313  case token::END_STATEMENT :
314  case token::BEGIN_LIST :
315  case token::END_LIST :
316  case token::BEGIN_SQR :
317  case token::END_SQR :
318  case token::BEGIN_BLOCK :
319  case token::END_BLOCK :
320  case token::COLON :
321  case token::COMMA :
322  case token::ASSIGN :
323  case token::PLUS :
324  case token::MINUS :
325  case token::MULTIPLY :
326  case token::DIVIDE :
327  {
329  return *this;
330  }
331 
332  // The word-variants
333  case token::tokenType::WORD :
334  case token::tokenType::DIRECTIVE :
335  {
336  word val;
337  if (readString(val))
338  {
340  {
341  t = token::compound::New(val, *this).ptr();
342  }
343  else
344  {
345  t = std::move(val);
346  t.setType(token::tokenType(c));
347  }
348  }
349  else
350  {
351  t.setBad();
352  }
353  return *this;
354  }
355 
356  // The string-variants
357  case token::tokenType::STRING :
358  case token::tokenType::EXPRESSION :
359  case token::tokenType::VARIABLE :
360  case token::tokenType::VERBATIM :
361  {
362  string val;
363  if (readString(val))
364  {
365  t = std::move(val);
366  t.setType(token::tokenType(c));
367  }
368  else
369  {
370  t.setBad();
371  }
372  return *this;
373  }
374 
375  // Label
376  case token::tokenType::LABEL :
377  {
378  label val;
379  if (read(val))
380  {
381  t = val;
382  }
383  else
384  {
385  t.setBad();
386  }
387  return *this;
388  }
389 
390  // Float
391  case token::tokenType::FLOAT :
392  {
393  float val;
394  if (read(val))
395  {
396  t = val;
397  }
398  else
399  {
400  t.setBad();
401  }
402  return *this;
403  }
404 
405  // Double
406  case token::tokenType::DOUBLE :
407  {
408  double val;
409  if (read(val))
410  {
411  t = val;
412  }
413  else
414  {
415  t.setBad();
416  }
417  return *this;
418  }
419 
420  // Character (returned as a single character word) or error
421  default:
422  {
423  if (isalpha(c))
424  {
425  t = charToWord(c);
426  return *this;
427  }
428 
429  setBad();
430  t.setBad();
432  return *this;
433  }
434  }
435 }
436 
437 
439 {
440  c = recvBuf_[recvBufPos_];
441  ++recvBufPos_;
442  checkEof();
443  return *this;
444 }
445 
448 {
449  return readString(str);
450 }
451 
454 {
455  return readString(str);
456 }
457 
458 
460 {
461  readFromBuffer(val);
462  return *this;
463 }
464 
465 
467 {
468  readFromBuffer(val);
469  return *this;
470 }
471 
472 
474 {
475  readFromBuffer(val);
476  return *this;
477 }
478 
479 
480 Foam::Istream& Foam::UIPstreamBase::read(char* data, std::streamsize count)
481 {
482  if (count)
483  {
484  // For count == 0, a no-op
485  // - see UOPstream::write(const char*, streamsize)
486  beginRawRead();
487  readRaw(data, count);
488  endRawRead();
489  }
490 
491  return *this;
492 }
493 
494 
495 Foam::Istream& Foam::UIPstreamBase::readRaw(char* data, std::streamsize count)
496 {
497  // No check for format() == BINARY since this is either done in the
498  // beginRawRead() method, or the caller knows what they are doing.
500  // Any alignment must have been done prior to this call
501  readFromBuffer(data, count);
502  return *this;
503 }
504 
505 
507 {
508  if (format() != BINARY)
509  {
511  << "stream format not binary"
513  }
514 
515  // Align on word boundary (64-bit)
516  // - as per read(const char*, streamsize)
517  // The check for zero-size will have been done by the caller
518  prepareBuffer(8);
519 
520  return true;
521 }
522 
523 
524 // Not needed yet
533 
534 
536 {
537  recvBufPos_ = 0; // Assume the entire buffer is for us to read from
538  setOpened();
539  setGood();
540  if (recvBuf_.empty() || !messageSize_)
541  {
542  setEof();
543  }
544 }
545 
546 
547 void Foam::UIPstreamBase::print(Ostream& os) const
548 {
549  os << "Reading from processor " << fromProcNo_
550  << " using communicator " << comm_
551  << " and tag " << tag_ << Foam::endl;
552 }
553 
554 
555 // ************************************************************************* //
static void processFlags(Istream &is, int flagMask)
Definition: UIPstreamBase.C:41
Subtract or start of negative number.
Definition: token.H:142
Begin block [isseparator].
Definition: token.H:162
static Foam::word charToWord(char c)
Definition: UIPstreamBase.C:34
static label byteAlign(const label pos, const size_t align)
Definition: UIPstreamBase.C:55
bool getBack(token &tok)
Get the put-back token if there is one.
Definition: Istream.C:85
BINARY-mode stream.
Definition: token.H:116
System signed integer.
UIPstreamBase(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
commsTypes
Communications types.
Definition: UPstream.H:74
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
stream flag (1-byte bitmask)
Definition: token.H:81
Begin dimensions [isseparator].
Definition: token.H:160
bool beginRawRead()
Start of low-level raw binary read.
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
"ascii" (normal default)
tokenType
Enumeration defining the types of token.
Definition: token.H:75
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
label & recvBufPos_
Reference to the receive position in buffer data.
Definition: UIPstream.H:134
Begin list [isseparator].
Definition: token.H:158
Assignment/equals [isseparator].
Definition: token.H:140
int messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
End entry [isseparator].
Definition: token.H:157
End dimensions [isseparator].
Definition: token.H:161
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:127
dimensionedScalar pos(const dimensionedScalar &ds)
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
Addition [isseparator].
Definition: token.H:141
Comma [isseparator].
Definition: token.H:132
Istream & read(token &t)
Return next token from stream.
"scheduled" : (MPI_Send, MPI_Recv)
A class for handling words, derived from Foam::string.
Definition: word.H:63
punctuationToken
Standard punctuation tokens (a character)
Definition: token.H:123
End list [isseparator].
Definition: token.H:159
static autoPtr< compound > New(const word &type, Istream &is)
Construct compound from Istream.
Definition: token.C:52
errorManip< error > abort(error &err)
Definition: errorManip.H:139
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1192
Divide [isseparator].
Definition: token.H:144
int debug
Static debugging option.
static bool isCompound(const word &name)
Test if name is a known (registered) compound type.
Definition: token.C:76
OBJstream os(runTime.globalPath()/outputName)
Database for solution data, solver performance and other reduced data.
Definition: data.H:52
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
word format(conversionProperties.get< word >("format"))
ASCII-mode stream.
Definition: token.H:115
const dimensionedScalar c
Speed of light in a vacuum.
streamFormat
Data format (ascii | binary)
Istream & readRaw(char *data, std::streamsize count)
Low-level raw binary read. Reading into a null pointer behaves like a forward seek of count character...
static Ostream & output(Ostream &os, const IntRange< T > &range)
Definition: IntRanges.C:58
void setOpened() noexcept
Set stream opened.
Definition: IOstream.H:150
void setEof() noexcept
Set stream state as reached &#39;eof&#39;.
Definition: IOstream.H:443
void print(Ostream &os) const
Print stream description to Ostream.
void setGood() noexcept
Set stream state to be good.
Definition: IOstream.H:174
virtual void rewind()
Rewind the receive stream position so that it may be read again.
End block [isseparator].
Definition: token.H:163
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
Definition: UPstream.H:62
Multiply [isseparator].
Definition: token.H:143
Namespace for OpenFOAM.
Colon [isseparator].
Definition: token.H:130