45 bool Foam::OFstreamCollator::writeFile
48 const word& objectType,
49 const fileName& fName,
50 const string& masterData,
52 const PtrList<SubList<char>>& slaveData,
53 IOstreamOption streamOpt,
54 IOstreamOption::atomicType atomic,
55 IOstreamOption::appendType
append,
56 const dictionary& headerEntries
61 Pout<<
"OFstreamCollator : Writing master " << label(masterData.size())
62 <<
" bytes to " << fName <<
" using comm " << comm
63 <<
" and " << slaveData.size() <<
" sub-ranks" <<
endl;
67 if (slaveData.set(proci))
70 <<
" size:" << slaveData[proci].size()
76 autoPtr<OSstream> osPtr;
80 osPtr.reset(
new OFstream(atomic, fName, streamOpt,
append));
108 fileOperations::masterUncollatedFileOperation::
109 maxMasterFileBufferSize == 0
118 const_cast<char*>(masterData.data()),
119 label(masterData.size())
122 List<std::streamoff> blockOffset;
135 if (osPtr && !osPtr->good())
143 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
148 for (
const label recv : recvSizes)
153 Pout<<
" (overall " << std::to_string(
sum) <<
')';
155 Pout<<
" to " << fName
156 <<
" using comm " << comm <<
endl;
163 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
165 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
173 std::lock_guard<std::mutex> guard(handler.mutex_);
174 if (handler.objects_.size())
176 ptr = handler.objects_.pop();
187 PtrList<SubList<char>> slaveData;
188 if (ptr->slaveData_.size())
190 slaveData.resize(ptr->slaveData_.size());
193 if (ptr->slaveData_.set(proci))
200 ptr->slaveData_[proci],
224 <<
"Failed writing " << ptr->pathName_
235 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
239 std::lock_guard<std::mutex> guard(handler.mutex_);
240 handler.threadRunning_ =
false;
247 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const 255 std::lock_guard<std::mutex> guard(mutex_);
258 totalSize += iter()->size();
265 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
273 std::lock_guard<std::mutex> guard(mutex_);
274 Pout<<
"OFstreamCollator : Waiting for buffer space." 275 <<
" Currently in use:" << totalSize
276 <<
" limit:" << maxBufferSize_
277 <<
" files:" << objects_.size()
290 maxBufferSize_(maxBufferSize),
291 threadRunning_(false),
292 localComm_(UPstream::worldComm),
295 UPstream::allocateCommunicator
306 const off_t maxBufferSize,
310 maxBufferSize_(maxBufferSize),
311 threadRunning_(false),
332 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
335 thread_.reset(
nullptr);
338 if (threadComm_ != -1)
349 const word& objectType,
350 const fileName& fName,
352 IOstreamOption streamOpt,
355 const bool useThread,
356 const dictionary& headerEntries
365 label maxLocalSize = 0;
367 for (
const label recvSize : recvSizes)
369 totalSize += recvSize;
370 maxLocalSize =
max(maxLocalSize, recvSize);
375 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
379 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
380 <<
" using local comm " << localComm_ <<
endl;
383 const PtrList<SubList<char>> dummySlaveData;
398 else if (totalSize <= maxBufferSize_)
405 Pout<<
"OFstreamCollator : non-thread gather; thread write of " 411 waitForBufferSpace(totalSize);
418 autoPtr<writeData> fileAndDataPtr
437 writeData& fileAndData = fileAndDataPtr();
439 PtrList<List<char>>& slaveData = fileAndData.slaveData_;
441 UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
443 slaveData.setSize(recvSizes.size());
451 for (label proci = 1; proci < slaveData.size(); proci++)
453 slaveData.set(proci,
new List<char>(recvSizes[proci]));
458 slaveData[proci].data(),
459 slaveData[proci].size_bytes(),
481 <<
"Cannot send outgoing message. " 482 <<
"to:" << 0 <<
" nBytes:" 483 << label(slice.size_bytes())
490 std::lock_guard<std::mutex> guard(mutex_);
493 objects_.push(fileAndDataPtr.ptr());
502 Pout<<
"OFstreamCollator : Waiting for write thread" 510 Pout<<
"OFstreamCollator : Starting write thread" 513 thread_.reset(
new std::thread(writeAll,
this));
514 threadRunning_ =
true;
524 Pout<<
"OFstreamCollator : thread gather and write of " << fName
525 <<
" using communicator " << threadComm_ <<
endl;
531 <<
"mpi does not seem to have thread support." 532 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'" 533 <<
" to at least " << totalSize
534 <<
" to be able to do the collating before threading." 540 waitForBufferSpace(data.size());
544 std::lock_guard<std::mutex> guard(mutex_);
570 Pout<<
"OFstreamCollator : Waiting for write thread" 578 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
580 thread_.reset(
new std::thread(writeAll,
this));
581 threadRunning_ =
true;
598 Pout<<
"OFstreamCollator : waiting for thread to have consumed all" 601 waitForBufferSpace(-1);
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Write buffer contents to given processor.
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
static void writeData(Ostream &os, const Type &val)
commsTypes
Types of communications.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
static label nRequests() noexcept
Number of outstanding requests.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static int & msgType() noexcept
Message tag of standard messages.
UList< label > labelUList
A UList of labels.
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
#define forAll(list, i)
Loop across all elements in list.
atomicType
Atomic operations (output)
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
"scheduled" : (MPI_Send, MPI_Recv)
labelList identity(const label len, label start=0)
Return an identity map of the given length with (map[i] == i)
void waitAll()
Wait for all thread actions to have finished.
errorManip< error > abort(error &err)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
static const string null
An empty string.
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
appendType
File appending (NON_APPEND | APPEND)
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
static bool master(const label communicator=worldComm)
Am I the master rank.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static bool haveThreads() noexcept
Have support for threads.
List< label > labelList
A List of labels.
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Read buffer contents from given processor.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string ¬e, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
forAllConstIters(mixture.phases(), phase)
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...