collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2020-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "collatedFileOperation.H"
31 #include "Pstream.H"
32 #include "Time.H"
34 #include "decomposedBlockData.H"
35 #include "registerSwitch.H"
36 #include "masterOFstream.H"
37 #include "OFstream.H"
38 #include "foamVersion.H"
39 
40 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
41 
42 namespace Foam
43 {
44 namespace fileOperations
45 {
46  defineTypeNameAndDebug(collatedFileOperation, 0);
48  (
49  fileOperation,
50  collatedFileOperation,
51  word
52  );
54  (
55  fileOperation,
56  collatedFileOperation,
57  comm
58  );
59 
61  (
62  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
63  );
65  (
66  "maxThreadFileBufferSize",
67  float,
69  );
70 
71  // Threaded MPI: depending on buffering
73  (
74  fileOperationInitialise,
75  fileOperationInitialise_collated,
76  word,
77  collated
78  );
79 }
80 }
81 
82 
83 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
84 
86 (
87  const bool withRanks
88 ) const
89 {
91  << "I/O : " << this->type();
92 
93  if (maxThreadFileBufferSize == 0)
94  {
96  << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl
97  << " Writing may be slow for large file sizes."
98  << endl;
99  }
100  else
101  {
102  DetailInfo
103  << " [threaded] (maxThreadFileBufferSize = "
104  << maxThreadFileBufferSize << ")." << nl
105  << " Requires buffer large enough to collect all data"
106  " or thread support" << nl
107  << " enabled in MPI. If MPI thread support cannot be"
108  " enabled, deactivate" << nl
109  << " threading by setting maxThreadFileBufferSize"
110  " to 0 in" << nl
111  << " OpenFOAM etc/controlDict" << endl;
112  }
113 
114  if (withRanks)
115  {
117  }
118 
119  //- fileModificationChecking already set by base class (masterUncollated)
120  // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
121  // {
122  // WarningInFunction
123  // << "Resetting fileModificationChecking to timeStamp" << endl;
124  // }
125  // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
126  // {
127  // WarningInFunction
128  // << "Resetting fileModificationChecking to inotify" << endl;
129  // }
130 }
131 
132 
134 (
135  const regIOobject& io,
136  const fileName& pathName,
137  IOstreamOption streamOpt
138 ) const
139 {
140  // Append to processorsNN/ file
141 
142  const label proci = detectProcessorPath(io.objectPath());
143 
144  if (debug)
145  {
146  Pout<< "collatedFileOperation::writeObject :"
147  << " For local object : " << io.name()
148  << " appending processor " << proci
149  << " data to " << pathName << endl;
150  }
151  if (proci == -1)
152  {
154  << "Invalid processor path: " << pathName
155  << exit(FatalError);
156  }
157 
158  const bool isIOmaster = fileOperation::isIOrank(proci);
159 
160  // Update meta-data for current state
161  if (isIOmaster)
162  {
163  const_cast<regIOobject&>(io).updateMetaData();
164  }
165 
166  // Note: cannot do append + compression. This is a limitation
167  // of ogzstream (or rather most compressed formats)
168  //
169  // File should always be created as non-atomic
170  // (consistency between append/non-append)
171 
172  OFstream os
173  (
174  pathName,
175  // UNCOMPRESSED (binary only)
176  IOstreamOption(IOstreamOption::BINARY, streamOpt.version()),
177  // Append on sub-ranks
179  );
180 
181  if (!os.good())
182  {
184  << "Cannot open for appending"
185  << exit(FatalIOError);
186  }
187 
188  if (isIOmaster)
189  {
191  }
192 
193  std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
194  (
195  os,
196  streamOpt,
197  io,
198  proci,
199  isIOmaster // With FoamFile header on master
200  );
201 
202  return (blockOffset >= 0) && os.good();
203 }
204 
205 
206 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
207 
208 namespace Foam
209 {
210 
211 // Construction helper: self/world/local communicator and IO ranks
213 {
214  // Default is COMM_WORLD (single master)
215  Tuple2<label, labelList> commAndIORanks
216  (
219  );
220 
221  if (UPstream::parRun() && commAndIORanks.second().size() > 1)
222  {
223  // Multiple masters: ranks for my IO range
224  commAndIORanks.first() = UPstream::allocateCommunicator
225  (
227  fileOperation::subRanks(commAndIORanks.second())
228  );
229  }
230 
231  return commAndIORanks;
232 }
233 
234 } // End namespace Foam
235 
236 
237 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
238 
239 void Foam::fileOperations::collatedFileOperation::init(bool verbose)
240 {
241  verbose = (verbose && Foam::infoDetailLevel > 0);
242 
243  if (verbose)
244  {
245  this->printBanner(ioRanks_.size());
246  }
247 }
248 
249 
251 (
252  bool verbose
253 )
254 :
256  (
257  getCommPattern(),
258  false, // distributedRoots
259  false // verbose
260  ),
261  managedComm_(getManagedComm(comm_)), // Possibly locally allocated
262  writer_(mag(maxThreadFileBufferSize), comm_)
263 {
264  init(verbose);
265 }
266 
267 
269 (
270  const Tuple2<label, labelList>& commAndIORanks,
271  const bool distributedRoots,
272  bool verbose
273 )
274 :
276  (
277  commAndIORanks,
278  distributedRoots,
279  false // verbose
280  ),
281  managedComm_(-1), // Externally managed
282  writer_(mag(maxThreadFileBufferSize), comm_)
283 {
284  init(verbose);
285 }
286 
287 
289 {
290  // From externally -> locally managed
291  managedComm_ = getManagedComm(comm_);
292 }
294 
295 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
296 
298 {
299  // Wait for any outstanding file operations
300  flush();
301 
302  UPstream::freeCommunicator(managedComm_);
303 }
304 
306 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
307 
309 (
310  const IOobject& io,
311  const word& typeName
312 ) const
313 {
314  // Replacement for objectPath
315  if (io.time().processorCase())
316  {
318  (
319  io,
321  "dummy", // not used for processorsobject
322  io.instance()
323  );
324  }
325  else
326  {
328  (
329  io,
331  word::null,
332  io.instance()
333  );
334  }
335 }
336 
337 
339 (
340  const regIOobject& io,
341  IOstreamOption streamOpt,
342  const bool writeOnProc
343 ) const
344 {
345  const Time& tm = io.time();
346  const fileName& inst = io.instance();
347 
348  // Update meta-data for current state
349  const_cast<regIOobject&>(io).updateMetaData();
350 
351  if (inst.isAbsolute() || !tm.processorCase())
352  {
353  // Note: delay mkdir to masterOFstream so it does not get created
354  // if not needed (e.g. when running distributed)
355 
356  const fileName pathName(io.objectPath());
357 
358  if (debug)
359  {
360  Pout<< "collatedFileOperation::writeObject :"
361  << " For object : " << io.name()
362  << " falling back to master-only output to " << io.path()
363  << endl;
364  }
365 
366  // Note: currently still NON_ATOMIC (Dec-2022)
368  (
369  comm_,
370  pathName,
371  streamOpt,
373  writeOnProc
374  );
375 
376  // If any of these fail, return
377  // (leave error handling to Ostream class)
378 
379  const bool ok =
380  (
381  os.good()
382  && io.writeHeader(os)
383  && io.writeData(os)
384  );
385 
386  if (ok)
387  {
389  }
390 
391  return ok;
392  }
393  else
394  {
395  // Construct the equivalent processors/ directory
396  const fileName path(processorsPath(io, inst, processorsDir(io)));
397 
398  // Note: delay mkdir to masterOFstream so it does not get created
399  // if not needed (e.g. when running distributed)
400 
401  const fileName pathName(path/io.name());
402 
403  if (io.global() || io.globalObject())
404  {
405  if (debug)
406  {
407  Pout<< "collatedFileOperation::writeObject :"
408  << " For global object : " << io.name()
409  << " falling back to master-only output to " << pathName
410  << endl;
411  }
412 
413  // Note: currently still NON_ATOMIC (Dec-2022)
414  masterOFstream os
415  (
416  comm_,
417  pathName,
418  streamOpt,
420  writeOnProc
421  );
422 
423  // If any of these fail, return
424  // (leave error handling to Ostream class)
425 
426  const bool ok =
427  (
428  os.good()
429  && io.writeHeader(os)
430  && io.writeData(os)
431  );
432 
433  if (ok)
434  {
436  }
437 
438  return ok;
439  }
440  else if (!UPstream::parRun())
441  {
442  // Special path for e.g. decomposePar. Append to
443  // processorsDDD/ file
444  if (debug)
445  {
446  Pout<< "collatedFileOperation::writeObject :"
447  << " For object : " << io.name()
448  << " appending to " << pathName << endl;
449  }
450 
451  mkDir(path);
452  return appendObject(io, pathName, streamOpt);
453  }
454  else
455  {
456  // Re-check static maxThreadFileBufferSize variable to see
457  // if needs to use threading
458  const bool useThread = (maxThreadFileBufferSize != 0);
459 
460  if (debug)
461  {
462  Pout<< "collatedFileOperation::writeObject :"
463  << " For object : " << io.name()
464  << " starting collating output to " << pathName
465  << " useThread:" << useThread << endl;
466  }
467 
468  if (!useThread)
469  {
470  writer_.waitAll();
471  }
472 
473  // Note: currently still NON_ATOMIC (Dec-2022)
474  threadedCollatedOFstream os
475  (
476  writer_,
477  pathName,
478  streamOpt,
479  useThread
480  );
481 
482  bool ok = os.good();
483 
484  if (UPstream::master(comm_))
485  {
486  // Suppress comment banner
487  const bool old = IOobject::bannerEnabled(false);
488 
489  ok = ok && io.writeHeader(os);
490 
492 
493  // Additional header content
496  (
497  dict,
498  streamOpt,
499  io
500  );
501  os.setHeaderEntries(dict);
502  }
503 
504  ok = ok && io.writeData(os);
505  // No end divider for collated output
506 
507  return ok;
508  }
509  }
510 }
511 
513 {
514  if (debug)
515  {
516  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
517  << endl;
518  }
520  // Wait for thread to finish (note: also removes thread)
521  writer_.waitAll();
522 }
523 
524 
526 (
527  const fileName& fName
528 ) const
529 {
530  if (UPstream::parRun())
531  {
532  const List<int>& procs(UPstream::procID(comm_));
533 
534  word procDir(processorsBaseDir+Foam::name(nProcs_));
535 
536  if (procs.size() != nProcs_)
537  {
538  procDir +=
539  + "_"
540  + Foam::name(procs.first())
541  + "-"
542  + Foam::name(procs.last());
543  }
544  return procDir;
545  }
546  else
547  {
548  word procDir(processorsBaseDir+Foam::name(nProcs_));
549 
550  if (ioRanks_.size())
551  {
552  // Detect current processor number
553  label proci = detectProcessorPath(fName);
554 
555  if (proci != -1)
556  {
557  // Find lowest io rank
558  label minProc = 0;
559  label maxProc = nProcs_-1;
560  for (const label ranki : ioRanks_)
561  {
562  if (ranki >= nProcs_)
563  {
564  break;
565  }
566  else if (ranki <= proci)
567  {
568  minProc = ranki;
569  }
570  else
571  {
572  maxProc = ranki-1;
573  break;
574  }
575  }
576 
577  // Add range if not all processors
578  if (maxProc-minProc+1 != nProcs_)
579  {
580  procDir +=
581  + "_"
582  + Foam::name(minProc)
583  + "-"
584  + Foam::name(maxProc);
585  }
586  }
587  }
588 
589  return procDir;
590  }
591 }
592 
593 
595 (
596  const IOobject& io
597 ) const
598 {
599  return processorsDir(io.objectPath());
600 }
601 
602 
603 // ************************************************************************* //
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
dictionary dict
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
A class for handling file names.
Definition: fileName.H:72
io.objectPath() exists
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
A 2-tuple for storing two objects of dissimilar types. The container is similar in purpose to std::pa...
Definition: stringOps.H:54
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition: UPstream.C:565
int infoDetailLevel
Global for selective suppression of Info output.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
T & first()
Access first element of the list, position [0].
Definition: UList.H:853
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1049
A simple container for options an IOstream can normally have.
static bool isAbsolute(const std::string &str)
Return true if filename starts with a &#39;/&#39; or &#39;\&#39; or (windows-only) with a filesystem-root.
Definition: fileNameI.H:129
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:409
bool appendObject(const regIOobject &io, const fileName &pathName, IOstreamOption streamOpt) const
Append to processorsNN/ file.
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:69
bool processorCase() const noexcept
True if this is a processor case.
Definition: TimePathsI.H:52
Macros for easy insertion into run-time selection tables.
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
static std::streamoff writeBlockEntry(OSstream &os, const label blocki, const char *str, const size_t len)
Helper: write block of (binary) character data.
static void writeExtraHeaderContent(dictionary &dict, IOstreamOption streamOptData, const IOobject &io)
Helper: generate additional entries for FoamFile header.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:614
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
Definition: exprTraits.C:127
collatedFileOperation(bool verbose=false)
Default construct.
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A class for handling words, derived from Foam::string.
Definition: word.H:63
Master-only drop-in replacement for OFstream.
static const word null
An empty word.
Definition: word.H:84
void printBanner(const bool withRanks=false) const
Print banner information, optionally with io ranks.
addNamedToRunTimeSelectionTable(fileOperationInitialise, fileOperationInitialise_collated, word, collated)
objectPath exists in &#39;processorsNN_first-last&#39;
#define DetailInfo
Definition: evalEntry.C:30
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
virtual void storeComm() const
Transfer ownership of communicator to this fileOperation. Use with caution.
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static Tuple2< label, labelList > getCommPattern()
virtual bool writeObject(const regIOobject &, IOstreamOption streamOpt=IOstreamOption(), const bool writeOnProc=true) const
Writes a regIOobject (so header, contents and divider).
static bool bannerEnabled() noexcept
Status of output file banner.
Definition: IOobject.H:361
T & last()
Access last element of the list, position [size()-1].
Definition: UList.H:867
static labelRange subRanks(const labelUList &mainIOranks)
Get (contiguous) range/bounds of ranks addressed within the given main io-ranks.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
Ostream & flush(Ostream &os)
Flush stream.
Definition: Ostream.H:521
float floatOptimisationSwitch(const char *name, const float deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:240
void printRanks() const
Helper: output which ranks are IO.
static List< int > & procID(const label communicator)
The list of ranks within a given communicator.
Definition: UPstream.H:1130
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:627
const T2 & second() const noexcept
Access the second element.
Definition: Tuple2.H:142
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1082
registerOptSwitch("maxThreadFileBufferSize", float, collatedFileOperation::maxThreadFileBufferSize)
versionNumber version() const noexcept
Get the stream version.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:66
virtual fileName::Type type(const fileName &, const bool followLink=true) const
Return the file type: DIRECTORY, FILE or SYMLINK.
bool isIOrank(const label proci) const
Is proci a master rank in the communicator (in parallel) or a master rank in the IO ranks (non-parall...
static labelList getGlobalIORanks()
Get list of global IO ranks from FOAM_IORANKS env variable. If set, these correspond to the IO master...
defineTypeNameAndDebug(collatedFileOperation, 0)
IOobject io("surfaceFilmProperties", mesh.time().constant(), mesh, IOobject::READ_IF_PRESENT, IOobject::NO_WRITE, IOobject::NO_REGISTER)
const T1 & first() const noexcept
Access the first element.
Definition: Tuple2.H:132
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
Defines the attributes of an object for which implicit objectRegistry management is supported...
Definition: IOobject.H:172
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static label allocateCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Allocate new communicator with contiguous sub-ranks on the parent communicator.
Definition: UPstream.C:258
Namespace for OpenFOAM.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...