masterOFstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017 OpenFOAM Foundation
9  Copyright (C) 2020-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "masterOFstream.H"
30 #include "OFstream.H"
31 #include "OSspecific.H"
32 #include "PstreamBuffers.H"
34 
35 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
36 
37 void Foam::masterOFstream::checkWrite
38 (
39  const fileName& fName,
40  const char* str,
41  std::streamsize len
42 )
43 {
44  if (!len)
45  {
46  // Can probably skip all of this if there is nothing to write
47  return;
48  }
49 
50  Foam::mkDir(fName.path());
51 
52  OFstream os
53  (
54  atomic_,
55  fName,
57  append_
58  );
59  if (!os.good())
60  {
62  << "Could not open file " << fName << nl
63  << exit(FatalIOError);
64  }
65 
66  // Use writeRaw() instead of writeQuoted(string,false) to output
67  // characters directly.
68 
69  os.writeRaw(str, len);
70 
71  if (!os.good())
72  {
74  << "Failed writing to " << fName << nl
75  << exit(FatalIOError);
76  }
77 }
78 
79 
80 void Foam::masterOFstream::checkWrite
81 (
82  const fileName& fName,
83  const std::string& s
84 )
85 {
86  checkWrite(fName, s.data(), s.length());
87 }
88 
89 
90 void Foam::masterOFstream::commit()
91 {
92  if (UPstream::parRun())
93  {
94  List<fileName> filePaths(UPstream::nProcs(comm_));
95  filePaths[UPstream::myProcNo(comm_)] = pathName_;
96  Pstream::gatherList(filePaths, UPstream::msgType(), comm_);
97 
98  bool uniform =
99  (
100  UPstream::master(comm_)
101  && fileOperation::uniformFile(filePaths)
102  );
103 
104  Pstream::broadcast(uniform, comm_);
105 
106  if (uniform)
107  {
108  if (UPstream::master(comm_) && writeOnProc_)
109  {
110  checkWrite(pathName_, this->str());
111  }
112 
113  this->reset();
114  return;
115  }
116 
117  // Different files
118  PstreamBuffers pBufs(comm_, UPstream::commsTypes::nonBlocking);
119 
120  if (!UPstream::master(comm_))
121  {
122  if (writeOnProc_)
123  {
124  // Send buffer to master
125  string s(this->str());
126 
127  UOPstream os(UPstream::masterNo(), pBufs);
128  os.write(s.data(), s.length());
129  }
130  this->reset(); // Done with contents
131  }
132 
133  pBufs.finishedGathers();
134 
135 
136  if (UPstream::master(comm_))
137  {
138  if (writeOnProc_)
139  {
140  // Write master data
141  checkWrite(filePaths[UPstream::masterNo()], this->str());
142  }
143  this->reset(); // Done with contents
144 
145 
146  // Allocate large enough to read without resizing
147  List<char> buf(pBufs.maxRecvCount());
148 
149  for (const int proci : UPstream::subProcs(comm_))
150  {
151  const std::streamsize count(pBufs.recvDataCount(proci));
152 
153  if (count)
154  {
155  UIPstream is(proci, pBufs);
156 
157  is.read(buf.data(), count);
158  checkWrite(filePaths[proci], buf.cdata(), count);
159  }
160  }
161  }
162  }
163  else
164  {
165  checkWrite(pathName_, this->str());
166  this->reset();
167  }
168 
169  // This method is only called once (internally)
170  // so no need to clear/flush old buffered data
171 }
172 
173 
174 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
175 
177 (
179  const label comm,
180  const fileName& pathName,
181  IOstreamOption streamOpt,
183  const bool writeOnProc
184 )
185 :
186  OStringStream(streamOpt),
187  pathName_(pathName),
188  atomic_(atomic),
189  compression_(streamOpt.compression()),
190  append_(append),
191  writeOnProc_(writeOnProc),
192  comm_(comm)
193 {}
194 
195 
196 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
197 
199 {
200  commit();
201 }
202 
203 
204 // ************************************************************************* //
A class for handling file names.
Definition: fileName.H:72
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
virtual Ostream & write(const char c) override
Write character.
Definition: OBJstream.C:69
Foam::string str() const
Get the string - as Foam::string rather than std::string.
Definition: StringStream.H:96
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1049
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1229
A simple container for options an IOstream can normally have.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1074
static void broadcast(Type &value, const label comm=UPstream::worldComm)
Broadcast content (contiguous or non-contiguous) to all communicator ranks. Does nothing in non-paral...
constexpr IOstreamOption(streamFormat fmt=streamFormat::ASCII, compressionType comp=compressionType::UNCOMPRESSED) noexcept
Default construct (ASCII, UNCOMPRESSED, currentVersion) or construct with format, compression...
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
atomicType
Atomic operations (output)
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1065
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:614
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition: UPstream.H:1059
OBJstream os(runTime.globalPath()/outputName)
appendType
File appending (NON_APPEND | APPEND)
static bool uniformFile(const fileNameList &names)
True if the file names are identical. False on an empty list.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
~masterOFstream()
Destructor - commits buffered information to file.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:627
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1082
versionNumber version() const noexcept
Get the stream version.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static rangeType subProcs(const label communicator=worldComm)
Range of process indices for sub-processes.
Definition: UPstream.H:1185
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
Output to string buffer, using a OSstream. Always UNCOMPRESSED.
Definition: StringStream.H:256
masterOFstream(IOstreamOption::atomicType atomic, const label comm, const fileName &pathname, IOstreamOption streamOpt=IOstreamOption(), IOstreamOption::appendType append=IOstreamOption::NON_APPEND, const bool writeOnProc=true)
Construct with specified atomic behaviour and communicator from pathname, stream option, optional append.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...