45 bool Foam::OFstreamCollator::writeFile
48 const word& objectType,
49 const fileName& fName,
50 const string& masterData,
52 const UPtrList<SubList<char>>& slaveData,
53 IOstreamOption streamOpt,
54 IOstreamOption::atomicType atomic,
55 IOstreamOption::appendType
append,
56 const dictionary& headerEntries
61 Pout<<
"OFstreamCollator : Writing master " << label(masterData.size())
62 <<
" bytes to " << fName <<
" using comm " << comm
63 <<
" and " << slaveData.size() <<
" sub-ranks" <<
endl;
67 if (slaveData.set(proci))
70 <<
" size:" << slaveData[proci].size()
76 autoPtr<OSstream> osPtr;
80 osPtr.reset(
new OFstream(atomic, fName, streamOpt,
append));
108 fileOperations::masterUncollatedFileOperation::
109 maxMasterFileBufferSize == 0
118 const_cast<char*>(masterData.data()),
119 label(masterData.size())
122 List<std::streamoff> blockOffset;
135 if (osPtr && !osPtr->good())
143 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
148 for (
const label recv : recvSizes)
153 Pout<<
" (overall " << std::to_string(
sum) <<
')';
155 Pout<<
" to " << fName
156 <<
" using comm " << comm <<
endl;
163 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
165 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
173 std::lock_guard<std::mutex> guard(handler.mutex_);
174 if (handler.objects_.size())
176 ptr = handler.objects_.pop();
187 PtrList<SubList<char>> slaveData;
188 if (ptr->slaveData_.size())
190 slaveData.resize(ptr->slaveData_.size());
193 if (ptr->slaveData_.set(proci))
200 ptr->slaveData_[proci],
224 <<
"Failed writing " << ptr->pathName_
235 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
239 std::lock_guard<std::mutex> guard(handler.mutex_);
240 handler.threadRunning_ =
false;
247 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const 255 std::lock_guard<std::mutex> guard(mutex_);
258 totalSize += iter()->size();
265 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
273 std::lock_guard<std::mutex> guard(mutex_);
274 Pout<<
"OFstreamCollator : Waiting for buffer space." 275 <<
" Currently in use:" << totalSize
276 <<
" limit:" << maxBufferSize_
277 <<
" files:" << objects_.size()
290 maxBufferSize_(maxBufferSize),
291 threadRunning_(false),
292 localComm_(UPstream::worldComm),
295 UPstream::allocateCommunicator
306 const off_t maxBufferSize,
310 maxBufferSize_(maxBufferSize),
311 threadRunning_(false),
332 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
335 thread_.reset(
nullptr);
346 const word& objectType,
352 const bool useThread,
362 label maxLocalSize = 0;
364 for (
const label recvSize : recvSizes)
366 totalSize += recvSize;
367 maxLocalSize =
max(maxLocalSize, recvSize);
372 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
376 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
377 <<
" using local comm " << localComm_ <<
endl;
380 const PtrList<SubList<char>> dummySlaveData;
395 else if (totalSize <= maxBufferSize_)
402 Pout<<
"OFstreamCollator : non-thread gather; thread write of " 408 waitForBufferSpace(totalSize);
415 autoPtr<writeData> fileAndDataPtr
434 writeData& fileAndData = fileAndDataPtr();
436 PtrList<List<char>>& slaveData = fileAndData.slaveData_;
438 UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
440 slaveData.setSize(recvSizes.
size());
448 for (label proci = 1; proci < slaveData.size(); proci++)
450 slaveData.set(proci,
new List<char>(recvSizes[proci]));
455 slaveData[proci].data(),
456 slaveData[proci].size_bytes(),
478 <<
"Cannot send outgoing message. " 479 <<
"to:" << 0 <<
" nBytes:" 480 << label(slice.size_bytes())
487 std::lock_guard<std::mutex> guard(mutex_);
490 objects_.push(fileAndDataPtr.ptr());
499 Pout<<
"OFstreamCollator : Waiting for write thread" 507 Pout<<
"OFstreamCollator : Starting write thread" 510 thread_.reset(
new std::thread(writeAll,
this));
511 threadRunning_ =
true;
521 Pout<<
"OFstreamCollator : thread gather and write of " << fName
522 <<
" using communicator " << threadComm_ <<
endl;
528 <<
"mpi does not seem to have thread support." 529 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'" 530 <<
" to at least " << totalSize
531 <<
" to be able to do the collating before threading." 537 waitForBufferSpace(data.size());
541 std::lock_guard<std::mutex> guard(mutex_);
567 Pout<<
"OFstreamCollator : Waiting for write thread" 575 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
577 thread_.reset(
new std::thread(writeAll,
this));
578 threadRunning_ =
true;
595 Pout<<
"OFstreamCollator : waiting for thread to have consumed all" 598 waitForBufferSpace(-1);
void size(const label n)
Older name for setAddressableSize.
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &f1)
A class for handling file names.
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const UPtrList< SubList< char >> &slaveData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
static void writeData(Ostream &os, const Type &val)
commsTypes
Communications types.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
A list of keyword definitions, which are a keyword followed by a number of values (eg...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
A range or interval of labels defined by a start and a size.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static int & msgType() noexcept
Message tag of standard messages.
A simple container for options an IOstream can normally have.
static void waitRequests()
Wait for all requests to finish.
UList< label > labelUList
A UList of labels.
#define forAll(list, i)
Loop across all elements in list.
atomicType
Atomic operations (output)
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
"scheduled" : (MPI_Send, MPI_Recv)
A class for handling words, derived from Foam::string.
void waitAll()
Wait for all thread actions to have finished.
errorManip< error > abort(error &err)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
static const string null
An empty string.
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
Database for solution data, solver performance and other reduced data.
appendType
File appending (NON_APPEND | APPEND)
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static bool haveThreads() noexcept
Have support for threads.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string ¬e, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
forAllConstIters(mixture.phases(), phase)
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...