OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2019-2025 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "OFstreamCollator.H"
30 #include "OFstream.H"
31 #include "decomposedBlockData.H"
32 #include "dictionary.H"
34 
35 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39  defineTypeNameAndDebug(OFstreamCollator, 0);
40 }
41 
42 
43 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
44 
45 bool Foam::OFstreamCollator::writeFile
46 (
47  const label comm,
48  const word& objectType,
49  const fileName& fName,
50  const UList<char>& localData,
51  const labelUList& recvSizes,
52  const UList<std::string_view>& procData, // optional proc data
53  IOstreamOption streamOpt,
54  IOstreamOption::atomicType atomic,
55  IOstreamOption::appendType append,
56  const dictionary& headerEntries
57 )
58 {
59  if (debug)
60  {
61  Pout<< "OFstreamCollator : Writing local " << localData.size()
62  << " bytes to " << fName << " using comm " << comm
63  << " and " << procData.size() << " sub-ranks" << endl;
64 
65  forAll(procData, proci)
66  {
67  Pout<< " " << proci << " size:"
68  << label(procData[proci].size()) << nl;
69  }
70  }
71 
72  autoPtr<OSstream> osPtr;
73  if (UPstream::master(comm))
74  {
75  Foam::mkDir(fName.path());
76  osPtr.reset(new OFstream(atomic, fName, streamOpt, append));
77  auto& os = *osPtr;
78 
80  {
81  // No IOobject so cannot use IOobject::writeHeader
82 
83  // FoamFile
85  (
86  os,
87  streamOpt, // streamOpt for container
88  objectType,
89  "", // note
90  "", // location (leave empty, otherwise inaccurate)
91  fName.name(), // object name
92  headerEntries
93  );
94  }
95  }
96 
97  // Assuming threaded writing hides any slowness so we
98  // can use scheduled communication to send the data to
99  // the master processor in order. However can be unstable
100  // for some mpi so default is non-blocking.
101  const UPstream::commsTypes myCommsType
102  (
103  // Blocking when buffer size is 0
104  Foam::mag
105  (
106  fileOperations::masterUncollatedFileOperation::
107  maxMasterFileBufferSize
108  ) < 1
111  );
112 
113 
114  List<std::streamoff> blockOffsets; // Optional
116  (
117  comm,
118  osPtr,
119  blockOffsets, // or List<std::streamoff>::null()
120  localData,
121  recvSizes,
122  procData,
123  myCommsType,
124  false // do not sync return state
125  );
126 
127  if (osPtr && !osPtr->good())
128  {
129  FatalIOErrorInFunction(*osPtr)
130  << "Failed writing to " << fName << exit(FatalIOError);
131  }
132 
133  if (debug)
134  {
135  Pout<< "OFstreamCollator : Finished writing "
136  << localData.size() << " bytes";
137 
138  if (UPstream::master(comm))
139  {
140  off_t total = 0;
141  for (const label recv : recvSizes)
142  {
143  total += recv;
144  }
145  // Use std::to_string to display long int
146  Pout<< " (overall " << std::to_string(total) << ')';
147  }
148  Pout<< " to " << fName
149  << " using comm " << comm << endl;
150  }
151 
152  return true;
153 }
154 
155 
156 void* Foam::OFstreamCollator::writeAll(void *threadarg)
157 {
158  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
159 
160  // Consume stack
161  while (true)
162  {
163  std::unique_ptr<writeData> ptr;
164 
165  {
166  std::lock_guard<std::mutex> guard(handler.mutex_);
167 
168  if (handler.objects_.size())
169  {
170  // FIFO
171  ptr.reset(handler.objects_.front());
172  handler.objects_.pop_front();
173  }
174  }
175 
176  if (!ptr)
177  {
178  break;
179  }
180 
181  writeData& obj = *ptr;
182 
183  // Obtain views from storage
184  List<std::string_view> procData(obj.procData_.size());
185  forAll(procData, proci)
186  {
187  procData[proci] = obj.procData_[proci].view();
188  }
189 
190  bool ok = writeFile
191  (
192  obj.comm_,
193  obj.objectType_,
194  obj.pathName_,
195  obj.localData_,
196  obj.sizes_,
197  procData,
198  obj.streamOpt_,
199  obj.atomic_,
200  obj.append_,
201  obj.headerEntries_
202  );
203 
204  if (!ok)
205  {
206  FatalIOErrorInFunction(obj.pathName_)
207  << "Failed writing " << obj.pathName_
208  << exit(FatalIOError);
209  }
210  //sleep(1);
211  }
212 
213  if (debug)
214  {
215  Pout<< "OFstreamCollator : Exiting write thread " << endl;
216  }
217 
218  {
219  std::lock_guard<std::mutex> guard(handler.mutex_);
220  handler.threadRunning_ = false;
221  }
222 
223  return nullptr;
224 }
225 
226 
227 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
228 {
229  while (true)
230  {
231  // The pending output size(s)
232  off_t totalSize = 0;
233 
234  {
235  std::lock_guard<std::mutex> guard(mutex_);
236  for (const writeData* ptr : objects_)
237  {
238  if (ptr) totalSize += ptr->size();
239  }
240  }
241 
242  if
243  (
244  totalSize == 0
245  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
246  )
247  {
248  break;
249  }
250 
251  if (debug)
252  {
253  std::lock_guard<std::mutex> guard(mutex_);
254  Pout<< "OFstreamCollator : Waiting for buffer space."
255  << " Currently in use:" << totalSize
256  << " limit:" << maxBufferSize_
257  << " files:" << objects_.size()
258  << endl;
259  }
260 
261  sleep(5);
262  }
263 }
264 
265 
266 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
267 
269 :
270  OFstreamCollator(maxBufferSize, UPstream::worldComm)
271 {}
272 
273 
275 (
276  const off_t maxBufferSize,
277  const label comm
278 )
279 :
280  maxBufferSize_(maxBufferSize),
281  threadRunning_(false),
282  localComm_(comm),
283  threadComm_(UPstream::dupCommunicator(localComm_))
284 {}
285 
286 
287 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
288 
290 {
291  if (thread_)
292  {
293  if (debug)
294  {
295  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
296  }
297  thread_->join();
298  thread_.reset(nullptr);
299  }
300 
302 }
303 
304 
305 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
306 
308 (
309  const word& objectType,
310  const fileName& fName,
311  DynamicList<char>&& localData,
312  IOstreamOption streamOpt,
315  const bool useThread,
316  const dictionary& headerEntries
317 )
318 {
319  // Determine (on master) sizes to receive. Note: do NOT use thread
320  // communicator
321  const labelList recvSizes
322  (
323  UPstream::listGatherValues<label>(localData.size(), localComm_)
324  );
325 
326  off_t totalSize = 0;
327  label maxLocalSize = 0;
328 
329  if (UPstream::master(localComm_))
330  {
331  for (const label recvSize : recvSizes)
332  {
333  totalSize += recvSize;
334  maxLocalSize = Foam::max(maxLocalSize, recvSize);
335  }
336  }
337 
338  // Broadcast the information to everyone
339  {
340  int64_t sizes[2] =
341  {
342  static_cast<int64_t>(totalSize),
343  static_cast<int64_t>(maxLocalSize)
344  };
345 
346  UPstream::broadcast(sizes, 2, localComm_);
347 
348  totalSize = static_cast<off_t>(sizes[0]);
349  maxLocalSize = static_cast<label>(sizes[1]);
350  }
351 
352 
353  // Determine how things will be gathered and written...
354 
355  enum class dispatchModes { DIRECT_WRITE, THREADED_WRITE, FULL_THREADED };
356 
357  dispatchModes dispatch(dispatchModes::DIRECT_WRITE);
358 
359  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
360  {
361  dispatch = dispatchModes::DIRECT_WRITE;
362  }
363  else if (totalSize <= maxBufferSize_)
364  {
365  // Total size can be stored locally
366  // - gather all data now and only do the writing in the thread
367 
368  dispatch = dispatchModes::THREADED_WRITE;
369  }
370  else
371  {
372  // Gather data and write in the thread
373 
374  dispatch = dispatchModes::FULL_THREADED;
375 
376  if (!UPstream::haveThreads())
377  {
379  << "MPI not initialized with thread support." << nl
380  << " maxThreadFileBufferSize = 0 to disable threading" << nl
381  << " or maxThreadFileBufferSize > " << totalSize
382  << " to collate before threaded writing." << nl << nl;
383 
384  dispatch = dispatchModes::DIRECT_WRITE;
385  }
386  }
387 
388 
389  // -----------
390  // Dispatching
391  // -----------
392 
393  if (dispatch == dispatchModes::DIRECT_WRITE)
394  {
395  if (debug)
396  {
397  Pout<< "OFstreamCollator : non-thread gather "
398  << "(local comm: " << localComm_
399  << "); non-thread write of "
400  << fName << endl;
401  }
402 
403  // Direct collating and writing (so master blocks until all written!)
404  return writeFile
405  (
406  localComm_,
407  objectType,
408  fName,
409  localData,
410  recvSizes,
411  UList<std::string_view>::null(), // dummy proc data
412  streamOpt,
413  atomic,
414  append,
415  headerEntries
416  );
417  }
418  else if (dispatch == dispatchModes::THREADED_WRITE)
419  {
420  if (debug)
421  {
422  Pout<< "OFstreamCollator : non-thread gather "
423  << "(local comm: " << localComm_
424  << "); thread write of "
425  << fName << endl;
426  }
427 
428  if (UPstream::master(localComm_))
429  {
430  waitForBufferSpace(totalSize);
431  }
432 
433  std::unique_ptr<writeData> fileAndDataPtr
434  (
435  new writeData
436  (
437  threadComm_,
438  objectType,
439  fName,
440  recvSizes,
441  streamOpt,
442  atomic,
443  append,
444  headerEntries
445  )
446  );
447  auto& fileAndData = *fileAndDataPtr;
448 
449  List<List<char>>& procData = fileAndData.procData_;
450 
451  // Receive from these procs
452  DynamicList<int> recvProcs;
453 
454  if (UPstream::master(localComm_))
455  {
456  // Move in local data (master only!)
457  fileAndData.transfer(localData);
458 
459  // Storage for receive data
460  procData.resize_nocopy(UPstream::nProcs(localComm_));
461 
462  // Sorted by message size
463  labelList order(Foam::sortedOrder(recvSizes));
464  recvProcs.reserve_exact(order.size());
465 
466  // Want to receive large messages first. Ignore empty slots
467  forAllReverse(order, i)
468  {
469  const label proci = order[i];
470 
471  // Ignore empty slots and don't try to receive from self
472  if (recvSizes[proci] > 0 && proci != UPstream::masterNo())
473  {
474  recvProcs.push_back(proci);
475  }
476  }
477  }
478  else if (UPstream::is_subrank(localComm_))
479  {
480  // Requires a size for decomposedBlockData::writeBlocks() logic
481  procData.resize_nocopy(UPstream::nProcs(localComm_));
482  }
483 
484 
485  // Gather all data onto master. Is done in local communicator since
486  // not in write thread.
487  const label startOfRequests = UPstream::nRequests();
488 
489  const int messageTag = (UPstream::msgType() + 256);
490 
491  if (UPstream::master(localComm_))
492  {
493  // Receive from these procs (non-empty slots)
494  for (const int proci : recvProcs)
495  {
496  auto& slot = procData[proci];
497  slot.resize_nocopy(recvSizes[proci]);
498 
500  (
502  proci,
503  slot.data_bytes(),
504  slot.size_bytes(),
505  messageTag,
506  localComm_
507  );
508  }
509  }
510  else if (UPstream::is_subrank(localComm_) && !localData.empty())
511  {
512  // Send to content to master
513  if
514  (
516  (
519  localData.cdata_bytes(),
520  localData.size_bytes(),
521  messageTag,
522  localComm_
523  )
524  )
525  {
527  << "Failure to send message (size: "
528  << localData.size() << ") to master" << nl
530  }
531  }
532 
533  UPstream::waitRequests(startOfRequests);
534 
535  // The localData has been moved (master) or communicated
536  localData.clearStorage();
537 
538 
539  // Queue up for threading
540  {
541  std::lock_guard<std::mutex> guard(mutex_);
542 
543  // Add to thread buffer (as FIFO), take ownership
544  objects_.push_back(fileAndDataPtr.release());
545 
546  // Start thread if not running
547  if (!threadRunning_)
548  {
549  if (thread_)
550  {
551  if (debug)
552  {
553  Pout<< "OFstreamCollator : Waiting for write thread"
554  << endl;
555  }
556  thread_->join();
557  }
558 
559  if (debug)
560  {
561  Pout<< "OFstreamCollator : Starting write thread"
562  << endl;
563  }
564  thread_.reset(new std::thread(writeAll, this));
565  threadRunning_ = true;
566  }
567  }
568 
569  return true;
570  }
571  else if (dispatch == dispatchModes::FULL_THREADED)
572  {
573  if (debug)
574  {
575  Pout<< "OFstreamCollator : thread gather; thread write "
576  << "(thread comm: " << threadComm_
577  << ") of " << fName << endl;
578  }
579 
580  if (UPstream::master(localComm_))
581  {
582  waitForBufferSpace(localData.size());
583  }
584 
585  std::unique_ptr<writeData> fileAndDataPtr
586  (
587  new writeData
588  (
589  threadComm_,
590  objectType,
591  fName,
592  recvSizes,
593  streamOpt,
594  atomic,
595  append,
596  headerEntries
597  )
598  );
599 
600  // Move in local data (all procs)
601  fileAndDataPtr->transfer(localData);
602 
603 
604  // Queue up for threading
605  {
606  std::lock_guard<std::mutex> guard(mutex_);
607 
608  // Add to thread buffer (as FIFO), take ownership
609  objects_.push_back(fileAndDataPtr.release());
610 
611  // Note: no proc data provided
612  // so it will trigger communication inside the thread!!!
613 
614  if (!threadRunning_)
615  {
616  if (thread_)
617  {
618  if (debug)
619  {
620  Pout<< "OFstreamCollator : Waiting for write thread"
621  << endl;
622  }
623  thread_->join();
624  }
625 
626  if (debug)
627  {
628  Pout<< "OFstreamCollator : Starting write thread" << endl;
629  }
630  thread_.reset(new std::thread(writeAll, this));
631  threadRunning_ = true;
632  }
633  }
634 
635  return true;
636  }
637 
639  << "Unknown dispatch mode: " << int(dispatch)
640  << " - programming error?" << abort(FatalError);
641 
642  return false;
643 }
644 
645 
647 {
648  // Wait for all buffer space to be available
649  // - ie, wait for all jobs to finish
650 
651  if (UPstream::master(localComm_))
652  {
653  if (debug)
654  {
655  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
656  << endl;
657  }
658  waitForBufferSpace(-1);
659  }
660 }
661 
662 
663 // ************************************************************************* //
Threaded file writer.
A class for handling file names.
Definition: fileName.H:72
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
commsTypes
Communications types.
Definition: UPstream.H:81
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
A list of keyword definitions, which are a keyword followed by a number of values (eg...
Definition: dictionary.H:130
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:652
labelList sortedOrder(const UList< T > &input)
Return the (stable) sort order for the list.
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition: UPstream.C:622
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:40
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
label totalSize(const UList< T > &lists)
The total size of all sub-lists.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &localData, const labelUList &recvSizes, const UList< std::string_view > &procData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:518
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:2019
A simple container for options an IOstream can normally have.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr)
Receive buffer contents (contiguous types) from given processor.
const bool writeData(pdfDictionary.get< bool >("writeData"))
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:2619
UList< label > labelUList
A UList of labels.
Definition: UList.H:76
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:400
static bool is_subrank(label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
Definition: UPstream.H:1824
atomicType
Atomic operations (output)
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:620
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
bool write(const word &objectType, const fileName &fName, DynamicList< char > &&localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents, possibly taking ownership of the content.
A class for handling words, derived from Foam::string.
Definition: word.H:63
static bool broadcast(Type *buffer, std::streamsize count, const int communicator, const int root=UPstream::masterNo())
Broadcast buffer contents (contiguous types) to all ranks (default: from rank=0). The sizes must matc...
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition: UPstream.H:1784
void waitAll()
Wait for all thread actions to have finished.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents (contiguous types only) to given processor.
errorManip< error > abort(error &err)
Definition: errorManip.H:139
no append (truncates existing)
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition: POSIX.C:1553
int debug
Static debugging option.
defineTypeNameAndDebug(combustionModel, 0)
appendType
File appending (NO_APPEND | APPEND_APP | APPEND_ATE)
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size (0 = do not use thread) and with worldComm.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
static label nProcs(label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1790
decomposeUsingBbs false
Use bounding boxes (default) or unique decomposition of triangles (i.e. do not duplicate triangles) ...
#define WarningInFunction
Report a warning using Foam::Warning.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:681
static const UList< T > & null() noexcept
Return a null UList (reference to a nullObject). Behaves like an empty UList.
Definition: UList.H:226
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static bool master(label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1807
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:1779
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition: stdFoam.H:416
List< label > labelList
A List of labels.
Definition: List.H:61
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
Definition: UPstream.H:69
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header for the decomposedBlockData container.
Namespace for OpenFOAM.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...