OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2019-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "OFstreamCollator.H"
30 #include "OFstream.H"
31 #include "decomposedBlockData.H"
32 #include "dictionary.H"
34 
35 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39  defineTypeNameAndDebug(OFstreamCollator, 0);
40 }
41 
42 
43 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
44 
45 bool Foam::OFstreamCollator::writeFile
46 (
47  const label comm,
48  const word& objectType,
49  const fileName& fName,
50  const string& masterData,
51  const labelUList& recvSizes,
52  const UPtrList<SubList<char>>& slaveData, // optional slave data
53  IOstreamOption streamOpt,
54  IOstreamOption::atomicType atomic,
55  IOstreamOption::appendType append,
56  const dictionary& headerEntries
57 )
58 {
59  if (debug)
60  {
61  Pout<< "OFstreamCollator : Writing master " << label(masterData.size())
62  << " bytes to " << fName << " using comm " << comm
63  << " and " << slaveData.size() << " sub-ranks" << endl;
64 
65  forAll(slaveData, proci)
66  {
67  if (slaveData.set(proci))
68  {
69  Pout<< " " << proci
70  << " size:" << slaveData[proci].size()
71  << endl;
72  }
73  }
74  }
75 
76  autoPtr<OSstream> osPtr;
77  if (UPstream::master(comm))
78  {
79  Foam::mkDir(fName.path());
80  osPtr.reset(new OFstream(atomic, fName, streamOpt, append));
81  auto& os = *osPtr;
82 
84  {
85  // No IOobject so cannot use IOobject::writeHeader
86 
87  // FoamFile
89  (
90  os,
91  streamOpt, // streamOpt for container
92  objectType,
93  "", // note
94  "", // location (leave empty instead inaccurate)
95  fName.name(), // object name
96  headerEntries
97  );
98  }
99  }
100 
101  // Assuming threaded writing hides any slowness so we
102  // can use scheduled communication to send the data to
103  // the master processor in order. However can be unstable
104  // for some mpi so default is non-blocking.
105  const UPstream::commsTypes myCommsType
106  (
107  (
108  fileOperations::masterUncollatedFileOperation::
109  maxMasterFileBufferSize == 0
110  )
113  );
114 
115 
116  UList<char> slice
117  (
118  const_cast<char*>(masterData.data()),
119  label(masterData.size())
120  );
121 
122  List<std::streamoff> blockOffset;
124  (
125  comm,
126  osPtr,
127  blockOffset,
128  slice,
129  recvSizes,
130  slaveData,
131  myCommsType,
132  false // do not reduce return state
133  );
134 
135  if (osPtr && !osPtr->good())
136  {
137  FatalIOErrorInFunction(*osPtr)
138  << "Failed writing to " << fName << exit(FatalIOError);
139  }
140 
141  if (debug)
142  {
143  Pout<< "OFstreamCollator : Finished writing " << masterData.size()
144  << " bytes";
145  if (UPstream::master(comm))
146  {
147  off_t sum = 0;
148  for (const label recv : recvSizes)
149  {
150  sum += recv;
151  }
152  // Use std::to_string to display long int
153  Pout<< " (overall " << std::to_string(sum) << ')';
154  }
155  Pout<< " to " << fName
156  << " using comm " << comm << endl;
157  }
158 
159  return true;
160 }
161 
162 
163 void* Foam::OFstreamCollator::writeAll(void *threadarg)
164 {
165  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
166 
167  // Consume stack
168  while (true)
169  {
170  writeData* ptr = nullptr;
171 
172  {
173  std::lock_guard<std::mutex> guard(handler.mutex_);
174  if (handler.objects_.size())
175  {
176  ptr = handler.objects_.pop();
177  }
178  }
179 
180  if (!ptr)
181  {
182  break;
183  }
184  else
185  {
186  // Convert storage to pointers
187  PtrList<SubList<char>> slaveData;
188  if (ptr->slaveData_.size())
189  {
190  slaveData.resize(ptr->slaveData_.size());
191  forAll(slaveData, proci)
192  {
193  if (ptr->slaveData_.set(proci))
194  {
195  slaveData.set
196  (
197  proci,
198  new SubList<char>
199  (
200  ptr->slaveData_[proci],
201  ptr->sizes_[proci]
202  )
203  );
204  }
205  }
206  }
207 
208  bool ok = writeFile
209  (
210  ptr->comm_,
211  ptr->objectType_,
212  ptr->pathName_,
213  ptr->data_,
214  ptr->sizes_,
215  slaveData,
216  ptr->streamOpt_,
217  ptr->atomic_,
218  ptr->append_,
219  ptr->headerEntries_
220  );
221  if (!ok)
222  {
223  FatalIOErrorInFunction(ptr->pathName_)
224  << "Failed writing " << ptr->pathName_
225  << exit(FatalIOError);
226  }
227 
228  delete ptr;
229  }
230  //sleep(1);
231  }
232 
233  if (debug)
234  {
235  Pout<< "OFstreamCollator : Exiting write thread " << endl;
236  }
237 
238  {
239  std::lock_guard<std::mutex> guard(handler.mutex_);
240  handler.threadRunning_ = false;
241  }
242 
243  return nullptr;
244 }
245 
246 
247 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
248 {
249  while (true)
250  {
251  // Count files to be written
252  off_t totalSize = 0;
253 
254  {
255  std::lock_guard<std::mutex> guard(mutex_);
256  forAllConstIters(objects_, iter)
257  {
258  totalSize += iter()->size();
259  }
260  }
261 
262  if
263  (
264  totalSize == 0
265  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
266  )
267  {
268  break;
269  }
270 
271  if (debug)
272  {
273  std::lock_guard<std::mutex> guard(mutex_);
274  Pout<< "OFstreamCollator : Waiting for buffer space."
275  << " Currently in use:" << totalSize
276  << " limit:" << maxBufferSize_
277  << " files:" << objects_.size()
278  << endl;
279  }
280 
281  sleep(5);
282  }
283 }
284 
285 
286 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
287 
288 Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
289 :
290  maxBufferSize_(maxBufferSize),
291  threadRunning_(false),
292  localComm_(UPstream::worldComm),
293  threadComm_
294  (
295  UPstream::allocateCommunicator
296  (
297  localComm_,
298  labelRange(UPstream::nProcs(localComm_))
299  )
300  )
301 {}
302 
303 
305 (
306  const off_t maxBufferSize,
307  const label comm
308 )
309 :
310  maxBufferSize_(maxBufferSize),
311  threadRunning_(false),
312  localComm_(comm),
313  threadComm_
314  (
315  UPstream::allocateCommunicator
316  (
317  localComm_,
318  labelRange(UPstream::nProcs(localComm_))
319  )
320  )
321 {}
322 
323 
324 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
325 
327 {
328  if (thread_)
329  {
330  if (debug)
331  {
332  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
333  }
334  thread_->join();
335  thread_.reset(nullptr);
336  }
337 
339 }
340 
341 
342 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
343 
345 (
346  const word& objectType,
347  const fileName& fName,
348  const string& data,
349  IOstreamOption streamOpt,
352  const bool useThread,
353  const dictionary& headerEntries
354 )
355 {
356  // Determine (on master) sizes to receive. Note: do NOT use thread
357  // communicator
358  labelList recvSizes;
359  decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
360 
361  off_t totalSize = 0;
362  label maxLocalSize = 0;
363  {
364  if (UPstream::master(localComm_))
365  {
366  for (const label recvSize : recvSizes)
367  {
368  totalSize += recvSize;
369  maxLocalSize = max(maxLocalSize, recvSize);
370  }
371  }
372  Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
373  }
374 
375  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
376  {
377  if (debug)
378  {
379  Pout<< "OFstreamCollator : non-thread gather and write of " << fName
380  << " using local comm " << localComm_ << endl;
381  }
382  // Direct collating and writing (so master blocks until all written!)
383  const PtrList<SubList<char>> dummySlaveData;
384  return writeFile
385  (
386  localComm_,
387  objectType,
388  fName,
389  data,
390  recvSizes,
391  dummySlaveData,
392  streamOpt,
393  atomic,
394  append,
395  headerEntries
396  );
397  }
398  else if (totalSize <= maxBufferSize_)
399  {
400  // Total size can be stored locally so receive all data now and only
401  // do the writing in the thread
402 
403  if (debug)
404  {
405  Pout<< "OFstreamCollator : non-thread gather; thread write of "
406  << fName << endl;
407  }
408 
409  if (Pstream::master(localComm_))
410  {
411  waitForBufferSpace(totalSize);
412  }
413 
414 
415  // Receive in chunks of labelMax (2^31-1) since this is the maximum
416  // size that a List can be
417 
418  autoPtr<writeData> fileAndDataPtr
419  (
420  new writeData
421  (
422  threadComm_, // Note: comm not actually used anymore
423  objectType,
424  fName,
425  (
426  Pstream::master(localComm_)
427  ? data // Only used on master
428  : string::null
429  ),
430  recvSizes,
431  streamOpt,
432  atomic,
433  append,
434  headerEntries
435  )
436  );
437  writeData& fileAndData = fileAndDataPtr();
438 
439  PtrList<List<char>>& slaveData = fileAndData.slaveData_;
440 
441  UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
442 
443  slaveData.setSize(recvSizes.size());
444 
445  // Gather all data onto master. Is done in local communicator since
446  // not in write thread. Note that we do not store in contiguous
447  // buffer since that would limit to 2G chars.
448  const label startOfRequests = UPstream::nRequests();
449  if (Pstream::master(localComm_))
450  {
451  for (label proci = 1; proci < slaveData.size(); proci++)
452  {
453  slaveData.set(proci, new List<char>(recvSizes[proci]));
455  (
457  proci,
458  slaveData[proci].data(),
459  slaveData[proci].size_bytes(),
461  localComm_
462  );
463  }
464  }
465  else
466  {
467  if
468  (
470  (
472  0,
473  slice.cdata(),
474  slice.size_bytes(),
476  localComm_
477  )
478  )
479  {
481  << "Cannot send outgoing message. "
482  << "to:" << 0 << " nBytes:"
483  << label(slice.size_bytes())
485  }
486  }
487  UPstream::waitRequests(startOfRequests);
488 
489  {
490  std::lock_guard<std::mutex> guard(mutex_);
491 
492  // Append to thread buffer
493  objects_.push(fileAndDataPtr.ptr());
494 
495  // Start thread if not running
496  if (!threadRunning_)
497  {
498  if (thread_)
499  {
500  if (debug)
501  {
502  Pout<< "OFstreamCollator : Waiting for write thread"
503  << endl;
504  }
505  thread_->join();
506  }
507 
508  if (debug)
509  {
510  Pout<< "OFstreamCollator : Starting write thread"
511  << endl;
512  }
513  thread_.reset(new std::thread(writeAll, this));
514  threadRunning_ = true;
515  }
516  }
517 
518  return true;
519  }
520  else
521  {
522  if (debug)
523  {
524  Pout<< "OFstreamCollator : thread gather and write of " << fName
525  << " using communicator " << threadComm_ << endl;
526  }
527 
528  if (!UPstream::haveThreads())
529  {
531  << "mpi does not seem to have thread support."
532  << " Make sure to set buffer size 'maxThreadFileBufferSize'"
533  << " to at least " << totalSize
534  << " to be able to do the collating before threading."
535  << exit(FatalError);
536  }
537 
538  if (Pstream::master(localComm_))
539  {
540  waitForBufferSpace(data.size());
541  }
542 
543  {
544  std::lock_guard<std::mutex> guard(mutex_);
545 
546  // Push all file info on buffer. Note that no slave data provided
547  // so it will trigger communication inside the thread
548  objects_.push
549  (
550  new writeData
551  (
552  threadComm_,
553  objectType,
554  fName,
555  data,
556  recvSizes,
557  streamOpt,
558  atomic,
559  append,
560  headerEntries
561  )
562  );
563 
564  if (!threadRunning_)
565  {
566  if (thread_)
567  {
568  if (debug)
569  {
570  Pout<< "OFstreamCollator : Waiting for write thread"
571  << endl;
572  }
573  thread_->join();
574  }
575 
576  if (debug)
577  {
578  Pout<< "OFstreamCollator : Starting write thread" << endl;
579  }
580  thread_.reset(new std::thread(writeAll, this));
581  threadRunning_ = true;
582  }
583  }
584 
585  return true;
586  }
587 }
588 
589 
591 {
592  // Wait for all buffer space to be available i.e. wait for all jobs
593  // to finish
594  if (Pstream::master(localComm_))
595  {
596  if (debug)
597  {
598  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
599  << endl;
600  }
601  waitForBufferSpace(-1);
602  }
603 }
604 
605 
606 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &f1)
A class for handling file names.
Definition: fileName.H:72
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const UPtrList< SubList< char >> &slaveData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
static void writeData(Ostream &os, const Type &val)
commsTypes
Communications types.
Definition: UPstream.H:72
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
A list of keyword definitions, which are a keyword followed by a number of values (eg...
Definition: dictionary.H:129
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition: UPstream.C:565
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:40
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
A range or interval of labels defined by a start and a size.
Definition: labelRange.H:52
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1229
A simple container for options an IOstream can normally have.
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1538
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
atomicType
Atomic operations (output)
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:614
"scheduled" : (MPI_Send, MPI_Recv)
A class for handling words, derived from Foam::string.
Definition: word.H:63
void waitAll()
Wait for all thread actions to have finished.
errorManip< error > abort(error &err)
Definition: errorManip.H:139
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition: POSIX.C:1547
static const string null
An empty string.
Definition: string.H:202
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
appendType
File appending (NON_APPEND | APPEND)
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all communicator ranks. Does nothing in non-parallel. ...
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:627
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1082
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:1054
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
Definition: UPstream.H:60
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...