UIPstreamBase.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2015 OpenFOAM Foundation
9  Copyright (C) 2017-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "error.H"
30 #include "UIPstream.H"
31 #include "int.H"
32 #include "token.H"
33 #include <cctype>
34 
35 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39 
40 // Convert a single character to a word with length 1
41 inline static Foam::word charToWord(char c)
42 {
43  return Foam::word(std::string(1, c), false);
44 }
45 
46 
47 // Adjust stream format based on the flagMask
48 inline static void processFlags(Istream& is, int flagMask)
49 {
50  if ((flagMask & token::ASCII))
51  {
52  is.format(IOstreamOption::ASCII);
53  }
54  else if ((flagMask & token::BINARY))
55  {
56  is.format(IOstreamOption::BINARY);
57  }
58 }
59 
60 
61 // Return the position with word boundary alignment
62 inline static label byteAlign(const label pos, const size_t align)
63 {
64  return
65  (
66  (align > 1)
67  ? (align + ((pos - 1) & ~(align - 1)))
68  : pos
69  );
70 }
71 
72 
73 // Read into compound token (assumed to be a known type)
74 inline static bool readCompoundToken
75 (
76  token& tok,
77  const word& compoundType,
78  Istream& is
79 )
80 {
81  // The isCompound() check is not needed (already checked by caller)
82  return tok.readCompoundToken(compoundType, is);
83 }
84 
85 } // End namespace Foam
86 
87 
88 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
89 
90 inline void Foam::UIPstreamBase::checkEof()
91 {
93  {
94  setEof();
95  }
96 }
97 
98 
99 inline void Foam::UIPstreamBase::prepareBuffer(const size_t align)
100 {
101  recvBufPos_ = byteAlign(recvBufPos_, align);
102 }
103 
104 
105 template<class T>
106 inline void Foam::UIPstreamBase::readFromBuffer(T& val)
107 {
108  prepareBuffer(sizeof(T));
109 
110  val = reinterpret_cast<T&>(recvBuf_[recvBufPos_]);
111  recvBufPos_ += sizeof(T);
112  checkEof();
113 }
114 
115 
116 inline void Foam::UIPstreamBase::readFromBuffer
117 (
118  void* data,
119  const size_t count
120 )
121 {
122  if (data)
123  {
124  const char* const __restrict__ buf = &recvBuf_[recvBufPos_];
125  char* const __restrict__ output = reinterpret_cast<char*>(data);
126 
127  for (size_t i = 0; i < count; ++i)
128  {
129  output[i] = buf[i];
130  }
131  }
132 
133  recvBufPos_ += count;
134  checkEof();
135 }
136 
137 
138 inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str)
139 {
140  // Use std::string::assign() to copy content, including embedded nul chars.
141  // Stripping (when desired) is the responsibility of the sending side.
142 
143  size_t len;
144  readFromBuffer(len);
145 
146  if (len)
147  {
148  str.assign(&recvBuf_[recvBufPos_], len);
149  recvBufPos_ += len;
150  checkEof();
151  }
152  else
153  {
154  str.clear();
155  }
156 
157  return *this;
158 }
159 
160 
161 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
162 
164 (
165  const UPstream::commsTypes commsType,
166  const int fromProcNo,
167  DynamicList<char>& receiveBuf,
168  label& receiveBufPosition,
169  const int tag,
170  const label comm,
171  const bool clearAtEnd,
173 )
174 :
175  UPstream(commsType),
176  Istream(fmt),
177  fromProcNo_(fromProcNo),
178  tag_(tag),
179  comm_(comm),
180  messageSize_(0),
181  storedRecvBufPos_(0),
182  clearAtEnd_(clearAtEnd),
183  recvBuf_(receiveBuf),
184  recvBufPos_(receiveBufPosition)
185 {
186  setOpened();
187  setGood();
188 }
189 
190 
192 (
193  const int fromProcNo,
194  PstreamBuffers& buffers
195 )
196 :
197  UPstream(buffers.commsType()),
198  Istream(buffers.format()),
199  fromProcNo_(fromProcNo),
200  tag_(buffers.tag()),
201  comm_(buffers.comm()),
202  messageSize_(0),
203  storedRecvBufPos_(0),
204  clearAtEnd_(buffers.allowClearRecv()),
205  recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
206  recvBufPos_(buffers.accessRecvPosition(fromProcNo))
207 {
208  if
209  (
211  && !buffers.finished()
212  )
213  {
215  << "PstreamBuffers::finishedSends() never called." << endl
216  << "Please call PstreamBuffers::finishedSends() after doing"
217  << " all your sends (using UOPstream) and before doing any"
218  << " receives (using UIPstream)" << Foam::exit(FatalError);
219  }
221  setOpened();
222  setGood();
223 }
224 
225 
227 (
228  const DynamicList<char>& receiveBuf,
230 )
231 :
232  UPstream(UPstream::commsTypes::nonBlocking), // placeholder
233  Istream(fmt),
234  fromProcNo_(UPstream::masterNo()), // placeholder
235  tag_(UPstream::msgType()), // placeholder
236  comm_(UPstream::commSelf()), // placeholder
237  messageSize_(receiveBuf.size()), // Message == buffer
238  storedRecvBufPos_(0),
239  clearAtEnd_(false), // Do not clear recvBuf if at end!!
240  recvBuf_
241  (
242  // The receive buffer is never modified with this code path
243  const_cast<DynamicList<char>&>(receiveBuf)
244  ),
245  recvBufPos_(storedRecvBufPos_) // Internal reference
246 {
248  setGood();
249 }
250 
251 
252 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
253 
255 {
256  if (clearAtEnd_ && eof())
257  {
258  if (debug)
259  {
260  Pout<< "UIPstreamBase Destructor : tag:" << tag_
261  << " fromProcNo:" << fromProcNo_
262  << " clearing receive buffer of size "
263  << recvBuf_.size()
264  << " messageSize_:" << messageSize_ << endl;
265  }
266  recvBuf_.clearStorage();
267  }
268 }
269 
270 
271 // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
272 
274 {
275  // Return the put back token if it exists
276  // - with additional handling for special stream flags
277  if (Istream::getBack(t))
278  {
279  if (t.isFlag())
280  {
281  processFlags(*this, t.flagToken());
282  }
283  else
284  {
285  return *this;
286  }
287  }
288 
289 
290  // Reset token, adjust its line number according to the stream
291  t.reset();
292  t.lineNumber(this->lineNumber());
293 
294 
295  // Read character, return on error
296  // - with additional handling for special stream flags
297 
298  char c;
299  do
300  {
301  if (!read(c))
302  {
303  t.setBad(); // Error
304  return *this;
305  }
306 
307  if (c == token::FLAG)
308  {
309  char flagVal;
310 
311  if (read(flagVal))
312  {
313  processFlags(*this, flagVal);
314  }
315  else
316  {
317  t.setBad(); // Error
318  return *this;
319  }
320  }
321  }
322  while (c == token::FLAG);
323 
324 
325  // Analyse input starting with this character.
326  switch (c)
327  {
328  // Punctuation
329  case token::END_STATEMENT :
330  case token::BEGIN_LIST :
331  case token::END_LIST :
332  case token::BEGIN_SQR :
333  case token::END_SQR :
334  case token::BEGIN_BLOCK :
335  case token::END_BLOCK :
336  case token::COLON :
337  case token::COMMA :
338  case token::ASSIGN :
339  case token::PLUS :
340  case token::MINUS :
341  case token::MULTIPLY :
342  case token::DIVIDE :
343  {
345  return *this;
346  }
347 
348  // The word-variants
349  case token::tokenType::WORD :
350  case token::tokenType::DIRECTIVE :
351  {
352  word val;
353  if (readString(val))
354  {
355  if
356  (
358  || !readCompoundToken(t, val, *this)
359  )
360  {
361  t = std::move(val);
362  t.setType(token::tokenType(c));
363  }
364  }
365  else
366  {
367  t.setBad();
368  }
369  return *this;
370  }
371 
372  // The string-variants
373  case token::tokenType::STRING :
374  case token::tokenType::EXPRESSION :
375  case token::tokenType::VARIABLE :
376  case token::tokenType::VERBATIM :
377  case token::tokenType::CHAR_DATA :
378  {
379  string val;
380  if (readString(val))
381  {
382  t = std::move(val);
383  t.setType(token::tokenType(c));
384  }
385  else
386  {
387  t.setBad();
388  }
389  return *this;
390  }
391 
392  // Label
393  case token::tokenType::LABEL :
394  {
395  label val;
396  if (read(val))
397  {
398  t = val;
399  }
400  else
401  {
402  t.setBad();
403  }
404  return *this;
405  }
406 
407  // Float
408  case token::tokenType::FLOAT :
409  {
410  float val;
411  if (read(val))
412  {
413  t = val;
414  }
415  else
416  {
417  t.setBad();
418  }
419  return *this;
420  }
421 
422  // Double
423  case token::tokenType::DOUBLE :
424  {
425  double val;
426  if (read(val))
427  {
428  t = val;
429  }
430  else
431  {
432  t.setBad();
433  }
434  return *this;
435  }
436 
437  // Character (returned as a single character word) or error
438  default:
439  {
440  if (isalpha(c))
441  {
442  t = charToWord(c);
443  return *this;
444  }
445 
446  setBad();
447  t.setBad();
449  return *this;
450  }
451  }
452 }
453 
454 
456 {
457  c = recvBuf_[recvBufPos_];
458  ++recvBufPos_;
459  checkEof();
460  return *this;
461 }
462 
465 {
466  return readString(str);
467 }
468 
471 {
472  return readString(str);
473 }
474 
475 
477 {
478  readFromBuffer(val);
479  return *this;
480 }
481 
482 
484 {
485  readFromBuffer(val);
486  return *this;
487 }
488 
489 
491 {
492  readFromBuffer(val);
493  return *this;
494 }
495 
496 
497 Foam::Istream& Foam::UIPstreamBase::read(char* data, std::streamsize count)
498 {
499  if (count)
500  {
501  // For count == 0, a no-op
502  // - see UOPstream::write(const char*, streamsize)
503  beginRawRead();
504  readRaw(data, count);
505  endRawRead();
506  }
507 
508  return *this;
509 }
510 
511 
512 Foam::Istream& Foam::UIPstreamBase::readRaw(char* data, std::streamsize count)
513 {
514  // No check for IOstreamOption::BINARY since this is either done in the
515  // beginRawRead() method, or the caller knows what they are doing.
517  // Any alignment must have been done prior to this call
518  readFromBuffer(data, count);
519  return *this;
520 }
521 
522 
524 {
526  {
528  << "stream format not binary"
530  }
531 
532  // Align on word boundary (64-bit)
533  // - as per read(const char*, streamsize)
534  // The check for zero-size will have been done by the caller
535  prepareBuffer(8);
536 
537  return true;
538 }
539 
540 
541 // Not needed yet
550 
551 
553 {
554  recvBufPos_ = 0; // Assume the entire buffer is for us to read from
555  setOpened();
556  setGood();
557  if (recvBuf_.empty() || !messageSize_)
558  {
559  setEof();
560  }
561 }
562 
563 
564 void Foam::UIPstreamBase::print(Ostream& os) const
565 {
566  os << "Reading from processor " << fromProcNo_
567  << " using communicator " << comm_
568  << " and tag " << tag_ << Foam::endl;
569 }
570 
571 
572 // ************************************************************************* //
static void processFlags(Istream &is, int flagMask)
Definition: UIPstreamBase.C:41
Subtract or start of negative number.
Definition: token.H:145
static bool isCompound(const word &compoundType)
True if a known (registered) compound type.
Definition: token.C:104
Begin block [isseparator].
Definition: token.H:165
static Foam::word charToWord(char c)
Definition: UIPstreamBase.C:34
static label byteAlign(const label pos, const size_t align)
Definition: UIPstreamBase.C:55
bool getBack(token &tok)
Retrieve the put-back token if there is one.
Definition: Istream.C:115
BINARY-mode stream.
Definition: token.H:119
System signed integer.
UIPstreamBase(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer, optional communication characteristics and IO format.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
commsTypes
Communications types.
Definition: UPstream.H:72
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
stream flag (1-byte bitmask)
Definition: token.H:82
virtual bool beginRawRead() override
Start of low-level raw binary read.
Begin dimensions [isseparator].
Definition: token.H:163
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
"ascii" (normal default)
tokenType
Enumeration defining the types of token.
Definition: token.H:76
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
label & recvBufPos_
Reference to the receive position in buffer data.
Definition: UIPstream.H:134
Begin list [isseparator].
Definition: token.H:161
Assignment/equals [isseparator].
Definition: token.H:143
int messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition: UIPstream.H:113
End entry [isseparator].
Definition: token.H:160
End dimensions [isseparator].
Definition: token.H:164
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition: int32.H:127
void print(Ostream &os) const override
Print stream description to Ostream.
dimensionedScalar pos(const dimensionedScalar &ds)
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
Addition [isseparator].
Definition: token.H:144
Comma [isseparator].
Definition: token.H:135
"scheduled" : (MPI_Send, MPI_Recv)
A class for handling words, derived from Foam::string.
Definition: word.H:63
virtual Istream & readRaw(char *data, std::streamsize count) override
Low-level raw binary read. Reading into a null pointer behaves like a forward seek of count character...
punctuationToken
Standard punctuation tokens (a character)
Definition: token.H:126
static bool readCompoundToken(token &tok, const word &compoundType, Istream &is)
Definition: UIPstreamBase.C:68
End list [isseparator].
Definition: token.H:162
errorManip< error > abort(error &err)
Definition: errorManip.H:139
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition: UPstream.H:1261
Divide [isseparator].
Definition: token.H:147
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
word format(conversionProperties.get< word >("format"))
ASCII-mode stream.
Definition: token.H:118
virtual void rewind() override
Rewind the receive stream position so that it may be read again.
const dimensionedScalar c
Speed of light in a vacuum.
streamFormat
Data format (ascii | binary)
static Ostream & output(Ostream &os, const IntRange< T > &range)
Definition: IntRanges.C:44
void setOpened() noexcept
Set stream opened.
Definition: IOstream.H:150
void setEof() noexcept
Set stream state as reached &#39;eof&#39;.
Definition: IOstream.H:443
void setGood() noexcept
Set stream state to be good.
Definition: IOstream.H:174
End block [isseparator].
Definition: token.H:166
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
Definition: UPstream.H:60
Multiply [isseparator].
Definition: token.H:146
virtual Istream & read(token &) override
Return next token from stream.
Namespace for OpenFOAM.
Colon [isseparator].
Definition: token.H:133