OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2019-2022 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "OFstreamCollator.H"
30 #include "OFstream.H"
31 #include "decomposedBlockData.H"
32 #include "dictionary.H"
34 
35 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36 
37 namespace Foam
38 {
39  defineTypeNameAndDebug(OFstreamCollator, 0);
40 }
41 
42 
43 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
44 
45 bool Foam::OFstreamCollator::writeFile
46 (
47  const label comm,
48  const word& objectType,
49  const fileName& fName,
50  const string& masterData,
51  const labelUList& recvSizes,
52  const UPtrList<SubList<char>>& slaveData, // optional slave data
53  IOstreamOption streamOpt,
54  IOstreamOption::atomicType atomic,
55  IOstreamOption::appendType append,
56  const dictionary& headerEntries
57 )
58 {
59  if (debug)
60  {
61  Pout<< "OFstreamCollator : Writing master " << label(masterData.size())
62  << " bytes to " << fName << " using comm " << comm
63  << " and " << slaveData.size() << " sub-ranks" << endl;
64 
65  forAll(slaveData, proci)
66  {
67  if (slaveData.set(proci))
68  {
69  Pout<< " " << proci
70  << " size:" << slaveData[proci].size()
71  << endl;
72  }
73  }
74  }
75 
76  autoPtr<OSstream> osPtr;
77  if (UPstream::master(comm))
78  {
79  Foam::mkDir(fName.path());
80  osPtr.reset(new OFstream(atomic, fName, streamOpt, append));
81  auto& os = *osPtr;
82 
84  {
85  // No IOobject so cannot use IOobject::writeHeader
86 
87  // FoamFile
89  (
90  os,
91  streamOpt, // streamOpt for container
92  objectType,
93  "", // note
94  "", // location (leave empty instead inaccurate)
95  fName.name(), // object name
96  headerEntries
97  );
98  }
99  }
100 
101  // Assuming threaded writing hides any slowness so we
102  // can use scheduled communication to send the data to
103  // the master processor in order. However can be unstable
104  // for some mpi so default is non-blocking.
105  const UPstream::commsTypes myCommsType
106  (
107  (
108  fileOperations::masterUncollatedFileOperation::
109  maxMasterFileBufferSize == 0
110  )
113  );
114 
115 
116  UList<char> slice
117  (
118  const_cast<char*>(masterData.data()),
119  label(masterData.size())
120  );
121 
122  List<std::streamoff> blockOffset;
124  (
125  comm,
126  osPtr,
127  blockOffset,
128  slice,
129  recvSizes,
130  slaveData,
131  myCommsType,
132  false // do not reduce return state
133  );
134 
135  if (osPtr && !osPtr->good())
136  {
137  FatalIOErrorInFunction(*osPtr)
138  << "Failed writing to " << fName << exit(FatalIOError);
139  }
140 
141  if (debug)
142  {
143  Pout<< "OFstreamCollator : Finished writing " << masterData.size()
144  << " bytes";
145  if (UPstream::master(comm))
146  {
147  off_t sum = 0;
148  for (const label recv : recvSizes)
149  {
150  sum += recv;
151  }
152  // Use std::to_string to display long int
153  Pout<< " (overall " << std::to_string(sum) << ')';
154  }
155  Pout<< " to " << fName
156  << " using comm " << comm << endl;
157  }
158 
159  return true;
160 }
161 
162 
163 void* Foam::OFstreamCollator::writeAll(void *threadarg)
164 {
165  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
166 
167  // Consume stack
168  while (true)
169  {
170  writeData* ptr = nullptr;
171 
172  {
173  std::lock_guard<std::mutex> guard(handler.mutex_);
174  if (handler.objects_.size())
175  {
176  ptr = handler.objects_.pop();
177  }
178  }
179 
180  if (!ptr)
181  {
182  break;
183  }
184  else
185  {
186  // Convert storage to pointers
187  PtrList<SubList<char>> slaveData;
188  if (ptr->slaveData_.size())
189  {
190  slaveData.resize(ptr->slaveData_.size());
191  forAll(slaveData, proci)
192  {
193  if (ptr->slaveData_.set(proci))
194  {
195  slaveData.set
196  (
197  proci,
198  new SubList<char>
199  (
200  ptr->slaveData_[proci],
201  ptr->sizes_[proci]
202  )
203  );
204  }
205  }
206  }
207 
208  bool ok = writeFile
209  (
210  ptr->comm_,
211  ptr->objectType_,
212  ptr->pathName_,
213  ptr->data_,
214  ptr->sizes_,
215  slaveData,
216  ptr->streamOpt_,
217  ptr->atomic_,
218  ptr->append_,
219  ptr->headerEntries_
220  );
221  if (!ok)
222  {
223  FatalIOErrorInFunction(ptr->pathName_)
224  << "Failed writing " << ptr->pathName_
225  << exit(FatalIOError);
226  }
227 
228  delete ptr;
229  }
230  //sleep(1);
231  }
232 
233  if (debug)
234  {
235  Pout<< "OFstreamCollator : Exiting write thread " << endl;
236  }
237 
238  {
239  std::lock_guard<std::mutex> guard(handler.mutex_);
240  handler.threadRunning_ = false;
241  }
242 
243  return nullptr;
244 }
245 
246 
247 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
248 {
249  while (true)
250  {
251  // Count files to be written
252  off_t totalSize = 0;
253 
254  {
255  std::lock_guard<std::mutex> guard(mutex_);
256  forAllConstIters(objects_, iter)
257  {
258  totalSize += iter()->size();
259  }
260  }
261 
262  if
263  (
264  totalSize == 0
265  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
266  )
267  {
268  break;
269  }
270 
271  if (debug)
272  {
273  std::lock_guard<std::mutex> guard(mutex_);
274  Pout<< "OFstreamCollator : Waiting for buffer space."
275  << " Currently in use:" << totalSize
276  << " limit:" << maxBufferSize_
277  << " files:" << objects_.size()
278  << endl;
279  }
280 
281  sleep(5);
282  }
283 }
284 
285 
286 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
287 
288 Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
289 :
290  maxBufferSize_(maxBufferSize),
291  threadRunning_(false),
292  localComm_(UPstream::worldComm),
293  threadComm_
294  (
295  UPstream::allocateCommunicator
296  (
297  localComm_,
298  labelRange(UPstream::nProcs(localComm_))
299  )
300  )
301 {}
302 
303 
305 (
306  const off_t maxBufferSize,
307  const label comm
308 )
309 :
310  maxBufferSize_(maxBufferSize),
311  threadRunning_(false),
312  localComm_(comm),
313  threadComm_
314  (
315  UPstream::allocateCommunicator
316  (
317  localComm_,
318  labelRange(UPstream::nProcs(localComm_))
319  )
320  )
321 {}
322 
323 
324 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
325 
327 {
328  if (thread_)
329  {
330  if (debug)
331  {
332  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
333  }
334  thread_->join();
335  thread_.reset(nullptr);
336  }
337 
339 }
340 
341 
342 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
343 
345 (
346  const word& objectType,
347  const fileName& fName,
348  const string& data,
349  IOstreamOption streamOpt,
352  const bool useThread,
353  const dictionary& headerEntries
354 )
355 {
356  // Determine (on master) sizes to receive. Note: do NOT use thread
357  // communicator
358  labelList recvSizes;
359  decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
360 
361  off_t totalSize = 0;
362  label maxLocalSize = 0;
363  {
364  for (const label recvSize : recvSizes)
365  {
366  totalSize += recvSize;
367  maxLocalSize = max(maxLocalSize, recvSize);
368  }
369  Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
370  }
371 
372  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
373  {
374  if (debug)
375  {
376  Pout<< "OFstreamCollator : non-thread gather and write of " << fName
377  << " using local comm " << localComm_ << endl;
378  }
379  // Direct collating and writing (so master blocks until all written!)
380  const PtrList<SubList<char>> dummySlaveData;
381  return writeFile
382  (
383  localComm_,
384  objectType,
385  fName,
386  data,
387  recvSizes,
388  dummySlaveData,
389  streamOpt,
390  atomic,
391  append,
392  headerEntries
393  );
394  }
395  else if (totalSize <= maxBufferSize_)
396  {
397  // Total size can be stored locally so receive all data now and only
398  // do the writing in the thread
399 
400  if (debug)
401  {
402  Pout<< "OFstreamCollator : non-thread gather; thread write of "
403  << fName << endl;
404  }
405 
406  if (Pstream::master(localComm_))
407  {
408  waitForBufferSpace(totalSize);
409  }
410 
411 
412  // Receive in chunks of labelMax (2^31-1) since this is the maximum
413  // size that a List can be
414 
415  autoPtr<writeData> fileAndDataPtr
416  (
417  new writeData
418  (
419  threadComm_, // Note: comm not actually used anymore
420  objectType,
421  fName,
422  (
423  Pstream::master(localComm_)
424  ? data // Only used on master
425  : string::null
426  ),
427  recvSizes,
428  streamOpt,
429  atomic,
430  append,
431  headerEntries
432  )
433  );
434  writeData& fileAndData = fileAndDataPtr();
435 
436  PtrList<List<char>>& slaveData = fileAndData.slaveData_;
437 
438  UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
439 
440  slaveData.setSize(recvSizes.size());
441 
442  // Gather all data onto master. Is done in local communicator since
443  // not in write thread. Note that we do not store in contiguous
444  // buffer since that would limit to 2G chars.
445  const label startOfRequests = UPstream::nRequests();
446  if (Pstream::master(localComm_))
447  {
448  for (label proci = 1; proci < slaveData.size(); proci++)
449  {
450  slaveData.set(proci, new List<char>(recvSizes[proci]));
452  (
454  proci,
455  slaveData[proci].data(),
456  slaveData[proci].size_bytes(),
458  localComm_
459  );
460  }
461  }
462  else
463  {
464  if
465  (
467  (
469  0,
470  slice.cdata(),
471  slice.size_bytes(),
473  localComm_
474  )
475  )
476  {
478  << "Cannot send outgoing message. "
479  << "to:" << 0 << " nBytes:"
480  << label(slice.size_bytes())
482  }
483  }
484  UPstream::waitRequests(startOfRequests);
485 
486  {
487  std::lock_guard<std::mutex> guard(mutex_);
488 
489  // Append to thread buffer
490  objects_.push(fileAndDataPtr.ptr());
491 
492  // Start thread if not running
493  if (!threadRunning_)
494  {
495  if (thread_)
496  {
497  if (debug)
498  {
499  Pout<< "OFstreamCollator : Waiting for write thread"
500  << endl;
501  }
502  thread_->join();
503  }
504 
505  if (debug)
506  {
507  Pout<< "OFstreamCollator : Starting write thread"
508  << endl;
509  }
510  thread_.reset(new std::thread(writeAll, this));
511  threadRunning_ = true;
512  }
513  }
514 
515  return true;
516  }
517  else
518  {
519  if (debug)
520  {
521  Pout<< "OFstreamCollator : thread gather and write of " << fName
522  << " using communicator " << threadComm_ << endl;
523  }
524 
525  if (!UPstream::haveThreads())
526  {
528  << "mpi does not seem to have thread support."
529  << " Make sure to set buffer size 'maxThreadFileBufferSize'"
530  << " to at least " << totalSize
531  << " to be able to do the collating before threading."
532  << exit(FatalError);
533  }
534 
535  if (Pstream::master(localComm_))
536  {
537  waitForBufferSpace(data.size());
538  }
539 
540  {
541  std::lock_guard<std::mutex> guard(mutex_);
542 
543  // Push all file info on buffer. Note that no slave data provided
544  // so it will trigger communication inside the thread
545  objects_.push
546  (
547  new writeData
548  (
549  threadComm_,
550  objectType,
551  fName,
552  data,
553  recvSizes,
554  streamOpt,
555  atomic,
556  append,
557  headerEntries
558  )
559  );
560 
561  if (!threadRunning_)
562  {
563  if (thread_)
564  {
565  if (debug)
566  {
567  Pout<< "OFstreamCollator : Waiting for write thread"
568  << endl;
569  }
570  thread_->join();
571  }
572 
573  if (debug)
574  {
575  Pout<< "OFstreamCollator : Starting write thread" << endl;
576  }
577  thread_.reset(new std::thread(writeAll, this));
578  threadRunning_ = true;
579  }
580  }
581 
582  return true;
583  }
584 }
585 
586 
588 {
589  // Wait for all buffer space to be available i.e. wait for all jobs
590  // to finish
591  if (Pstream::master(localComm_))
592  {
593  if (debug)
594  {
595  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
596  << endl;
597  }
598  waitForBufferSpace(-1);
599  }
600 }
601 
602 
603 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &f1)
A class for handling file names.
Definition: fileName.H:71
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &masterData, const labelUList &recvSizes, const UPtrList< SubList< char >> &slaveData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
static void writeData(Ostream &os, const Type &val)
commsTypes
Communications types.
Definition: UPstream.H:74
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
A list of keyword definitions, which are a keyword followed by a number of values (eg...
Definition: dictionary.H:120
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition: UPstream.C:565
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:40
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
A range or interval of labels defined by a start and a size.
Definition: labelRange.H:51
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
static int & msgType() noexcept
Message tag of standard messages.
Definition: UPstream.H:1184
A simple container for options an IOstream can normally have.
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1536
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:414
atomicType
Atomic operations (output)
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:614
"scheduled" : (MPI_Send, MPI_Recv)
A class for handling words, derived from Foam::string.
Definition: word.H:63
void waitAll()
Wait for all thread actions to have finished.
errorManip< error > abort(error &err)
Definition: errorManip.H:139
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition: POSIX.C:1547
static const string null
An empty string.
Definition: string.H:202
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
Database for solution data, solver performance and other reduced data.
Definition: data.H:52
appendType
File appending (NON_APPEND | APPEND)
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:607
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1037
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:1009
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
Definition: UPstream.H:62
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...