45 bool Foam::OFstreamCollator::writeFile
48 const word& objectType,
49 const fileName& fName,
50 const UList<char>& localData,
52 const UList<std::string_view>& procData,
53 IOstreamOption streamOpt,
54 IOstreamOption::atomicType atomic,
55 IOstreamOption::appendType
append,
56 const dictionary& headerEntries
61 Pout<<
"OFstreamCollator : Writing local " << localData.size()
62 <<
" bytes to " << fName <<
" using comm " << comm
63 <<
" and " << procData.size() <<
" sub-ranks" <<
endl;
67 Pout<<
" " << proci <<
" size:" 68 << label(procData[proci].size()) <<
nl;
72 autoPtr<OSstream> osPtr;
76 osPtr.reset(
new OFstream(atomic, fName, streamOpt,
append));
106 fileOperations::masterUncollatedFileOperation::
107 maxMasterFileBufferSize
114 List<std::streamoff> blockOffsets;
127 if (osPtr && !osPtr->good())
135 Pout<<
"OFstreamCollator : Finished writing " 136 << localData.size() <<
" bytes";
141 for (
const label recv : recvSizes)
146 Pout<<
" (overall " << std::to_string(total) <<
')';
148 Pout<<
" to " << fName
149 <<
" using comm " << comm <<
endl;
156 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
158 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
163 std::unique_ptr<writeData> ptr;
166 std::lock_guard<std::mutex> guard(handler.mutex_);
168 if (handler.objects_.size())
171 ptr.reset(handler.objects_.front());
172 handler.objects_.pop_front();
184 List<std::string_view> procData(obj.procData_.size());
187 procData[proci] = obj.procData_[proci].view();
207 <<
"Failed writing " << obj.pathName_
215 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
219 std::lock_guard<std::mutex> guard(handler.mutex_);
220 handler.threadRunning_ =
false;
227 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const 235 std::lock_guard<std::mutex> guard(mutex_);
245 || (wantedSize >= 0 && (
totalSize+wantedSize) <= maxBufferSize_)
253 std::lock_guard<std::mutex> guard(mutex_);
254 Pout<<
"OFstreamCollator : Waiting for buffer space." 256 <<
" limit:" << maxBufferSize_
257 <<
" files:" << objects_.size()
276 const off_t maxBufferSize,
280 maxBufferSize_(maxBufferSize),
281 threadRunning_(
false),
283 threadComm_(
UPstream::dupCommunicator(localComm_))
295 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
298 thread_.reset(
nullptr);
309 const word& objectType,
315 const bool useThread,
323 UPstream::listGatherValues<label>(localData.size(), localComm_)
327 label maxLocalSize = 0;
331 for (
const label recvSize : recvSizes)
334 maxLocalSize =
Foam::max(maxLocalSize, recvSize);
343 static_cast<int64_t>(maxLocalSize)
348 totalSize =
static_cast<off_t
>(sizes[0]);
349 maxLocalSize =
static_cast<label
>(sizes[1]);
355 enum class dispatchModes { DIRECT_WRITE, THREADED_WRITE, FULL_THREADED };
357 dispatchModes dispatch(dispatchModes::DIRECT_WRITE);
359 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
361 dispatch = dispatchModes::DIRECT_WRITE;
368 dispatch = dispatchModes::THREADED_WRITE;
374 dispatch = dispatchModes::FULL_THREADED;
379 <<
"MPI not initialized with thread support." <<
nl 380 <<
" maxThreadFileBufferSize = 0 to disable threading" <<
nl 381 <<
" or maxThreadFileBufferSize > " <<
totalSize 382 <<
" to collate before threaded writing." <<
nl <<
nl;
384 dispatch = dispatchModes::DIRECT_WRITE;
393 if (dispatch == dispatchModes::DIRECT_WRITE)
397 Pout<<
"OFstreamCollator : non-thread gather " 398 <<
"(local comm: " << localComm_
399 <<
"); non-thread write of " 418 else if (dispatch == dispatchModes::THREADED_WRITE)
422 Pout<<
"OFstreamCollator : non-thread gather " 423 <<
"(local comm: " << localComm_
424 <<
"); thread write of " 433 std::unique_ptr<writeData> fileAndDataPtr
447 auto& fileAndData = *fileAndDataPtr;
449 List<List<char>>& procData = fileAndData.procData_;
452 DynamicList<int> recvProcs;
457 fileAndData.transfer(localData);
464 recvProcs.reserve_exact(order.size());
469 const label proci = order[i];
474 recvProcs.push_back(proci);
494 for (
const int proci : recvProcs)
496 auto& slot = procData[proci];
497 slot.resize_nocopy(recvSizes[proci]);
519 localData.cdata_bytes(),
520 localData.size_bytes(),
527 <<
"Failure to send message (size: " 528 << localData.size() <<
") to master" <<
nl 536 localData.clearStorage();
541 std::lock_guard<std::mutex> guard(mutex_);
544 objects_.push_back(fileAndDataPtr.release());
553 Pout<<
"OFstreamCollator : Waiting for write thread" 561 Pout<<
"OFstreamCollator : Starting write thread" 564 thread_.reset(
new std::thread(writeAll,
this));
565 threadRunning_ =
true;
571 else if (dispatch == dispatchModes::FULL_THREADED)
575 Pout<<
"OFstreamCollator : thread gather; thread write " 576 <<
"(thread comm: " << threadComm_
577 <<
") of " << fName <<
endl;
582 waitForBufferSpace(localData.size());
585 std::unique_ptr<writeData> fileAndDataPtr
601 fileAndDataPtr->transfer(localData);
606 std::lock_guard<std::mutex> guard(mutex_);
609 objects_.push_back(fileAndDataPtr.release());
620 Pout<<
"OFstreamCollator : Waiting for write thread" 628 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
630 thread_.reset(
new std::thread(writeAll,
this));
631 threadRunning_ =
true;
639 <<
"Unknown dispatch mode: " << int(dispatch)
655 Pout<<
"OFstreamCollator : waiting for thread to have consumed all" 658 waitForBufferSpace(-1);
A class for handling file names.
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
commsTypes
Communications types.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
A list of keyword definitions, which are a keyword followed by a number of values (eg...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
labelList sortedOrder(const UList< T > &input)
Return the (stable) sort order for the list.
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
constexpr char nl
The newline '\n' character (0x0a)
label totalSize(const UList< T > &lists)
The total size of all sub-lists.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &localData, const labelUList &recvSizes, const UList< std::string_view > &procData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static int & msgType() noexcept
Message tag of standard messages.
A simple container for options an IOstream can normally have.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr)
Receive buffer contents (contiguous types) from given processor.
const bool writeData(pdfDictionary.get< bool >("writeData"))
static void waitRequests()
Wait for all requests to finish.
UList< label > labelUList
A UList of labels.
#define forAll(list, i)
Loop across all elements in list.
static bool is_subrank(label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
atomicType
Atomic operations (output)
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
bool write(const word &objectType, const fileName &fName, DynamicList< char > &&localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents, possibly taking ownership of the content.
A class for handling words, derived from Foam::string.
static bool broadcast(Type *buffer, std::streamsize count, const int communicator, const int root=UPstream::masterNo())
Broadcast buffer contents (contiguous types) to all ranks (default: from rank=0). The sizes must matc...
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
void waitAll()
Wait for all thread actions to have finished.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents (contiguous types only) to given processor.
errorManip< error > abort(error &err)
no append (truncates existing)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
int debug
Static debugging option.
defineTypeNameAndDebug(combustionModel, 0)
appendType
File appending (NO_APPEND | APPEND_APP | APPEND_ATE)
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size (0 = do not use thread) and with worldComm.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
static label nProcs(label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
decomposeUsingBbs false
Use bounding boxes (default) or unique decomposition of triangles (i.e. do not duplicate triangles) ...
#define WarningInFunction
Report a warning using Foam::Warning.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
static const UList< T > & null() noexcept
Return a null UList (reference to a nullObject). Behaves like an empty UList.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static bool master(label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static bool haveThreads() noexcept
Have support for threads.
#define forAllReverse(list, i)
Reverse loop across all elements in list.
List< label > labelList
A List of labels.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string ¬e, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header for the decomposedBlockData container.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...