collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2020-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "collatedFileOperation.H"
31 #include "Pstream.H"
32 #include "Time.H"
34 #include "decomposedBlockData.H"
35 #include "registerSwitch.H"
36 #include "masterOFstream.H"
37 #include "OFstream.H"
38 #include "foamVersion.H"
39 
40 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
41 
42 namespace Foam
43 {
44 namespace fileOperations
45 {
46  defineTypeNameAndDebug(collatedFileOperation, 0);
48  (
49  fileOperation,
50  collatedFileOperation,
51  word
52  );
54  (
55  fileOperation,
56  collatedFileOperation,
57  comm
58  );
59 
61  (
62  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 0)
63  );
65  (
66  "maxThreadFileBufferSize",
67  float,
69  );
70 
71  // Threaded MPI: depending on buffering
73  (
74  fileOperationInitialise,
75  fileOperationInitialise_collated,
76  word,
77  collated
78  );
79 }
80 }
81 
82 
83 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
84 
86 (
87  const bool withRanks
88 ) const
89 {
91  << "I/O : " << this->type();
92 
94  {
95  // FUTURE: deprecate or remove threading?
97  << " [threaded] (maxThreadFileBufferSize = "
98  << maxThreadFileBufferSize << ")." << nl
99  << " Requires buffer large enough to collect all data"
100  " or MPI thread support." << nl
101  << " To avoid MPI threading [slow], set"
102  " (maxThreadFileBufferSize = 0) in" << nl
103  << " OpenFOAM etc/controlDict" << endl;
104  }
105  else
106  {
107  DetailInfo
108  << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl;
109 
110  if (mag(maxMasterFileBufferSize) < 1)
111  {
112  DetailInfo
113  << " With scheduled transfer" << nl;
114  }
115  else if (maxMasterFileBufferSize >= 1)
116  {
117  DetailInfo
118  << " With non-blocking transfer,"
119  " buffer-size = " << maxMasterFileBufferSize << nl;
120  }
121  else
122  {
123  DetailInfo
124  << " With non-blocking transfer,"
125  " minimal buffer size" << nl;
126  }
127  }
128 
129  if (withRanks)
130  {
132  }
133 
134  //- fileModificationChecking already set by base class (masterUncollated)
135  // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
136  // {
137  // WarningInFunction
138  // << "Resetting fileModificationChecking to timeStamp" << endl;
139  // }
140  // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
141  // {
142  // WarningInFunction
143  // << "Resetting fileModificationChecking to inotify" << endl;
144  // }
145 }
146 
147 
149 (
150  const regIOobject& io,
151  const fileName& pathName,
152  IOstreamOption streamOpt
153 ) const
154 {
155  // Append to processorsNN/ file
156 
157  const label proci = detectProcessorPath(io.objectPath());
158 
159  if (debug)
160  {
161  Pout<< "collatedFileOperation::writeObject :"
162  << " For local object : " << io.name()
163  << " appending processor " << proci
164  << " data to " << pathName << endl;
165  }
166  if (proci == -1)
167  {
169  << "Invalid processor path: " << pathName
170  << exit(FatalError);
171  }
172 
173  const bool isIOmaster = fileOperation::isIOrank(proci);
174 
175  // Update meta-data for current state
176  if (isIOmaster)
177  {
178  const_cast<regIOobject&>(io).updateMetaData();
179  }
180 
181  // Note: cannot do append + compression. This is a limitation
182  // of ogzstream (or rather most compressed formats)
183  //
184  // File should always be created as non-atomic
185  // (consistency between append/non-append)
186 
187  OFstream os
188  (
189  pathName,
190  // UNCOMPRESSED (binary only)
191  IOstreamOption(IOstreamOption::BINARY, streamOpt.version()),
192  // Append on sub-ranks
194  );
195 
196  if (!os.good())
197  {
199  << "Cannot open for appending"
200  << exit(FatalIOError);
201  }
202 
203  if (isIOmaster)
204  {
206  }
207 
208  std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
209  (
210  os,
211  streamOpt,
212  io,
213  proci,
214  isIOmaster // With FoamFile header on master
215  );
216 
217  return (blockOffset >= 0) && os.good();
218 }
219 
220 
221 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
222 
223 namespace Foam
224 {
225 
226 // Construction helper: self/world/local communicator and IO ranks
228 {
229  // Default is COMM_WORLD (single master)
230  Tuple2<label, labelList> commAndIORanks
231  (
234  );
235 
236  if (UPstream::parRun() && commAndIORanks.second().size() > 1)
237  {
238  // Multiple masters: ranks for my IO range
239  commAndIORanks.first() = UPstream::allocateCommunicator
240  (
242  fileOperation::subRanks(commAndIORanks.second())
243  );
244  }
245 
246  return commAndIORanks;
247 }
248 
249 } // End namespace Foam
250 
251 
252 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
253 
254 void Foam::fileOperations::collatedFileOperation::init(bool verbose)
255 {
256  verbose = (verbose && Foam::infoDetailLevel > 0);
257 
258  if (verbose)
259  {
260  this->printBanner(ioRanks_.size());
261  }
262 }
263 
264 
266 (
267  bool verbose
268 )
269 :
271  (
272  getCommPattern(),
273  false, // distributedRoots
274  false // verbose
275  ),
276  managedComm_(getManagedComm(comm_)), // Possibly locally allocated
277  writer_(mag(maxThreadFileBufferSize), comm_)
278 {
279  init(verbose);
280 }
281 
282 
284 (
285  const Tuple2<label, labelList>& commAndIORanks,
286  const bool distributedRoots,
287  bool verbose
288 )
289 :
291  (
292  commAndIORanks,
293  distributedRoots,
294  false // verbose
295  ),
296  managedComm_(-1), // Externally managed
297  writer_(mag(maxThreadFileBufferSize), comm_)
298 {
299  init(verbose);
300 }
301 
302 
304 {
305  // From externally -> locally managed
306  managedComm_ = getManagedComm(comm_);
307 }
309 
310 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
311 
313 {
314  // Wait for any outstanding file operations
315  flush();
316 
317  UPstream::freeCommunicator(managedComm_);
318 }
319 
321 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
322 
324 (
325  const IOobject& io,
326  const word& typeName
327 ) const
328 {
329  // Replacement for objectPath
330  if (io.time().processorCase())
331  {
333  (
334  io,
336  "dummy", // not used for processorsobject
337  io.instance()
338  );
339  }
340  else
341  {
343  (
344  io,
346  word::null,
347  io.instance()
348  );
349  }
350 }
351 
352 
354 (
355  const regIOobject& io,
356  IOstreamOption streamOpt,
357  const bool writeOnProc
358 ) const
359 {
360  const Time& tm = io.time();
361  const fileName& inst = io.instance();
362 
363  // Update meta-data for current state
364  const_cast<regIOobject&>(io).updateMetaData();
365 
366  if (inst.isAbsolute() || !tm.processorCase())
367  {
368  // Note: delay mkdir to masterOFstream so it does not get created
369  // if not needed (e.g. when running distributed)
370 
371  const fileName pathName(io.objectPath());
372 
373  if (debug)
374  {
375  Pout<< "collatedFileOperation::writeObject :"
376  << " For object : " << io.name()
377  << " falling back to master-only output to " << io.path()
378  << endl;
379  }
380 
381  // Note: currently still NON_ATOMIC (Dec-2022)
383  (
384  comm_,
385  pathName,
386  streamOpt,
388  writeOnProc
389  );
390 
391  // If any of these fail, return
392  // (leave error handling to Ostream class)
393 
394  const bool ok =
395  (
396  os.good()
397  && io.writeHeader(os)
398  && io.writeData(os)
399  );
400 
401  if (ok)
402  {
404  }
405 
406  return ok;
407  }
408  else
409  {
410  // Construct the equivalent processors/ directory
411  const fileName path(processorsPath(io, inst, processorsDir(io)));
412 
413  // Note: delay mkdir to masterOFstream so it does not get created
414  // if not needed (e.g. when running distributed)
415 
416  const fileName pathName(path/io.name());
417 
418  if (io.global() || io.globalObject())
419  {
420  if (debug)
421  {
422  Pout<< "collatedFileOperation::writeObject :"
423  << " For global object : " << io.name()
424  << " falling back to master-only output to " << pathName
425  << endl;
426  }
427 
428  // Note: currently still NON_ATOMIC (Dec-2022)
429  masterOFstream os
430  (
431  comm_,
432  pathName,
433  streamOpt,
435  writeOnProc
436  );
437 
438  // If any of these fail, return
439  // (leave error handling to Ostream class)
440 
441  const bool ok =
442  (
443  os.good()
444  && io.writeHeader(os)
445  && io.writeData(os)
446  );
447 
448  if (ok)
449  {
451  }
452 
453  return ok;
454  }
455  else if (!UPstream::parRun())
456  {
457  // Special path for e.g. decomposePar. Append to
458  // processorsDDD/ file
459  if (debug)
460  {
461  Pout<< "collatedFileOperation::writeObject :"
462  << " For object : " << io.name()
463  << " appending to " << pathName << endl;
464  }
465 
466  mkDir(path);
467  return appendObject(io, pathName, streamOpt);
468  }
469  else
470  {
471  // Re-check static maxThreadFileBufferSize variable to see
472  // if needs to use threading
473  const bool useThread = (mag(maxThreadFileBufferSize) > 1);
474 
475  if (debug)
476  {
477  Pout<< "collatedFileOperation::writeObject :"
478  << " For object : " << io.name()
479  << " starting collating output to " << pathName
480  << " useThread:" << useThread << endl;
481  }
482 
483  if (!useThread)
484  {
485  writer_.waitAll();
486  }
487 
488  // Note: currently still NON_ATOMIC (Dec-2022)
489  threadedCollatedOFstream os
490  (
491  writer_,
492  pathName,
493  streamOpt,
494  useThread
495  );
496 
497  bool ok = os.good();
498 
499  if (UPstream::master(comm_))
500  {
501  // Suppress comment banner
502  const bool old = IOobject::bannerEnabled(false);
503 
504  ok = ok && io.writeHeader(os);
505 
507 
508  // Additional header content
511  (
512  dict,
513  streamOpt,
514  io
515  );
516  os.setHeaderEntries(dict);
517  }
518 
519  ok = ok && io.writeData(os);
520  // No end divider for collated output
521 
522  return ok;
523  }
524  }
525 }
526 
528 {
529  if (debug)
530  {
531  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
532  << endl;
533  }
535  // Wait for thread to finish (note: also removes thread)
536  writer_.waitAll();
537 }
538 
539 
541 (
542  const fileName& fName
543 ) const
544 {
545  if (UPstream::parRun())
546  {
547  const List<int>& procs(UPstream::procID(comm_));
548 
549  word procDir(processorsBaseDir+Foam::name(nProcs_));
550 
551  if (procs.size() != nProcs_)
552  {
553  procDir +=
554  + "_"
555  + Foam::name(procs.first())
556  + "-"
557  + Foam::name(procs.last());
558  }
559  return procDir;
560  }
561  else
562  {
563  word procDir(processorsBaseDir+Foam::name(nProcs_));
564 
565  if (ioRanks_.size())
566  {
567  // Detect current processor number
568  label proci = detectProcessorPath(fName);
569 
570  if (proci != -1)
571  {
572  // Find lowest io rank
573  label minProc = 0;
574  label maxProc = nProcs_-1;
575  for (const label ranki : ioRanks_)
576  {
577  if (ranki >= nProcs_)
578  {
579  break;
580  }
581  else if (ranki <= proci)
582  {
583  minProc = ranki;
584  }
585  else
586  {
587  maxProc = ranki-1;
588  break;
589  }
590  }
591 
592  // Add range if not all processors
593  if (maxProc-minProc+1 != nProcs_)
594  {
595  procDir +=
596  + "_"
597  + Foam::name(minProc)
598  + "-"
599  + Foam::name(maxProc);
600  }
601  }
602  }
603 
604  return procDir;
605  }
606 }
607 
608 
610 (
611  const IOobject& io
612 ) const
613 {
614  return processorsDir(io.objectPath());
615 }
616 
617 
618 // ************************************************************************* //
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
dictionary dict
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
A class for handling file names.
Definition: fileName.H:72
io.objectPath() exists
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
A 2-tuple for storing two objects of dissimilar types. The container is similar in purpose to std::pa...
Definition: stringOps.H:54
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition: UPstream.C:567
int infoDetailLevel
Global for selective suppression of Info output.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
T & first()
Access first element of the list, position [0].
Definition: UList.H:862
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1061
append (seek end each write)
A simple container for options an IOstream can normally have.
static bool isAbsolute(const std::string &str)
Return true if filename starts with a &#39;/&#39; or &#39;\&#39; or (windows-only) with a filesystem-root.
Definition: fileNameI.H:129
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:421
bool appendObject(const regIOobject &io, const fileName &pathName, IOstreamOption streamOpt) const
Append to processorsNN/ file.
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:69
bool processorCase() const noexcept
True if this is a processor case.
Definition: TimePathsI.H:52
Macros for easy insertion into run-time selection tables.
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
static std::streamoff writeBlockEntry(OSstream &os, const label blocki, const char *str, const size_t len)
Helper: write block of (binary) character data.
static void writeExtraHeaderContent(dictionary &dict, IOstreamOption streamOptData, const IOobject &io)
Helper: generate additional entries for FoamFile header.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:614
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
Definition: exprTraits.C:127
static float maxMasterFileBufferSize
Max size of parallel communications. Switches from non-blocking.
collatedFileOperation(bool verbose=false)
Default construct.
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A class for handling words, derived from Foam::string.
Definition: word.H:63
Master-only drop-in replacement for OFstream.
static const word null
An empty word.
Definition: word.H:84
void printBanner(const bool withRanks=false) const
Print banner information, optionally with io ranks.
addNamedToRunTimeSelectionTable(fileOperationInitialise, fileOperationInitialise_collated, word, collated)
objectPath exists in &#39;processorsNN_first-last&#39;
#define DetailInfo
Definition: evalEntry.C:30
no append (truncates existing)
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
virtual void storeComm() const
Transfer ownership of communicator to this fileOperation. Use with caution.
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static Tuple2< label, labelList > getCommPattern()
virtual bool writeObject(const regIOobject &, IOstreamOption streamOpt=IOstreamOption(), const bool writeOnProc=true) const
Writes a regIOobject (so header, contents and divider).
static bool bannerEnabled() noexcept
Status of output file banner.
Definition: IOobject.H:369
T & last()
Access last element of the list, position [size()-1].
Definition: UList.H:876
static labelRange subRanks(const labelUList &mainIOranks)
Get (contiguous) range/bounds of ranks addressed within the given main io-ranks.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
Ostream & flush(Ostream &os)
Flush stream.
Definition: Ostream.H:521
float floatOptimisationSwitch(const char *name, const float deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:240
void printRanks() const
Helper: output which ranks are IO.
static List< int > & procID(const label communicator)
The list of ranks within a given communicator.
Definition: UPstream.H:1142
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:637
const T2 & second() const noexcept
Access the second element.
Definition: Tuple2.H:142
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1094
registerOptSwitch("maxThreadFileBufferSize", float, collatedFileOperation::maxThreadFileBufferSize)
versionNumber version() const noexcept
Get the stream version.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:68
virtual fileName::Type type(const fileName &, const bool followLink=true) const
Return the file type: DIRECTORY, FILE or SYMLINK.
bool isIOrank(const label proci) const
Is proci a master rank in the communicator (in parallel) or a master rank in the IO ranks (non-parall...
static labelList getGlobalIORanks()
Get list of global IO ranks from FOAM_IORANKS env variable. If set, these correspond to the IO master...
defineTypeNameAndDebug(collatedFileOperation, 0)
IOobject io("surfaceFilmProperties", mesh.time().constant(), mesh, IOobject::READ_IF_PRESENT, IOobject::NO_WRITE, IOobject::NO_REGISTER)
const T1 & first() const noexcept
Access the first element.
Definition: Tuple2.H:132
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
Defines the attributes of an object for which implicit objectRegistry management is supported...
Definition: IOobject.H:180
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static label allocateCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Allocate new communicator with contiguous sub-ranks on the parent communicator.
Definition: UPstream.C:260
Namespace for OpenFOAM.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...