collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2020-2022 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "collatedFileOperation.H"
31 #include "Pstream.H"
32 #include "Time.H"
34 #include "decomposedBlockData.H"
35 #include "registerSwitch.H"
36 #include "masterOFstream.H"
37 #include "OFstream.H"
38 #include "foamVersion.H"
39 
40 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
41 
42 namespace Foam
43 {
44 namespace fileOperations
45 {
46  defineTypeNameAndDebug(collatedFileOperation, 0);
48  (
49  fileOperation,
50  collatedFileOperation,
51  word
52  );
53 
55  (
56  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
57  );
59  (
60  "maxThreadFileBufferSize",
61  float,
63  );
64 
65  // Mark as needing threaded mpi
67  (
68  fileOperationInitialise,
69  collatedFileOperationInitialise,
70  word,
71  collated
72  );
73 }
74 }
75 
76 
77 // * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
78 
80 (
81  const bool printRanks
82 ) const
83 {
85  << "I/O : " << this->type();
86 
87  if (maxThreadFileBufferSize == 0)
88  {
90  << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl
91  << " Writing may be slow for large file sizes."
92  << endl;
93  }
94  else
95  {
97  << " [threaded] (maxThreadFileBufferSize = "
98  << maxThreadFileBufferSize << ")." << nl
99  << " Requires buffer large enough to collect all data"
100  " or thread support" << nl
101  << " enabled in MPI. If MPI thread support cannot be"
102  " enabled, deactivate" << nl
103  << " threading by setting maxThreadFileBufferSize"
104  " to 0 in" << nl
105  << " OpenFOAM etc/controlDict" << endl;
106  }
107 
108  if (printRanks)
109  {
110  // Information about the ranks
111  stringList hosts(Pstream::nProcs());
112  if (Pstream::master(comm_))
113  {
114  hosts[Pstream::myProcNo()] = hostName();
115  }
116  Pstream::gatherList(hosts);
117 
118  DynamicList<label> offsetMaster(Pstream::nProcs());
119 
120  forAll(hosts, ranki)
121  {
122  if (!hosts[ranki].empty())
123  {
124  offsetMaster.append(ranki);
125  }
126  }
127 
128  if (offsetMaster.size() > 1)
129  {
130  DetailInfo
131  << "IO nodes:" << nl << '(' << nl;
132 
133  offsetMaster.append(Pstream::nProcs());
134 
135  for (label group = 1; group < offsetMaster.size(); ++group)
136  {
137  const label beg = offsetMaster[group-1];
138  const label end = offsetMaster[group];
139 
140  DetailInfo
141  << " (" << hosts[beg].c_str() << ' '
142  << (end-beg) << ')' << nl;
143  }
144  DetailInfo
145  << ')' << nl;
146  }
147  }
148 
149  // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
150  // {
151  // WarningInFunction
152  // << "Resetting fileModificationChecking to timeStamp" << endl;
153  // }
154  // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
155  // {
156  // WarningInFunction
157  // << "Resetting fileModificationChecking to inotify" << endl;
158  // }
159 }
160 
161 
163 (
164  const label proci
165 )
166 const
167 {
168  if (Pstream::parRun())
169  {
170  return Pstream::master(comm_);
171  }
172  else if (ioRanks_.size())
173  {
174  // Found myself in IO rank
175  return ioRanks_.found(proci);
176  }
177  else
178  {
179  // Assume all in single communicator
180  return proci == 0;
181  }
182 }
183 
184 
186 (
187  const regIOobject& io,
188  const fileName& pathName,
189  IOstreamOption streamOpt
190 ) const
191 {
192  // Append to processorsNN/ file
193 
194  const label proci = detectProcessorPath(io.objectPath());
195 
196  if (debug)
197  {
198  Pout<< "collatedFileOperation::writeObject :"
199  << " For local object : " << io.name()
200  << " appending processor " << proci
201  << " data to " << pathName << endl;
202  }
203  if (proci == -1)
204  {
206  << "Invalid processor path: " << pathName
207  << exit(FatalError);
208  }
209 
210  const bool isMaster = isMasterRank(proci);
211 
212  // Update meta-data for current state
213  if (isMaster)
214  {
215  const_cast<regIOobject&>(io).updateMetaData();
216  }
217 
218  // Note: cannot do append + compression. This is a limitation
219  // of ogzstream (or rather most compressed formats)
220  //
221  // File should always be created as non-atomic
222  // (consistency between append/non-append)
223 
224  OFstream os
225  (
226  pathName,
227  // UNCOMPRESSED (binary only)
228  IOstreamOption(IOstreamOption::BINARY, streamOpt.version()),
229  // Append on sub-ranks
231  );
232 
233  if (!os.good())
234  {
236  << "Cannot open for appending"
237  << exit(FatalIOError);
238  }
239 
240  if (isMaster)
241  {
243  }
244 
245  std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
246  (
247  os,
248  streamOpt,
249  io,
250  proci,
251  // With FoamFile header on master?
252  isMaster
253  );
254 
255  return (blockOffset >= 0) && os.good();
256 }
257 
258 
259 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
260 
261 void Foam::fileOperations::collatedFileOperation::init(bool verbose)
262 {
263  verbose = (verbose && Foam::infoDetailLevel > 0);
264 
265  if (verbose)
266  {
267  this->printBanner(ioRanks_.size());
268  }
269 }
270 
271 
273 (
274  bool verbose
275 )
276 :
277  masterUncollatedFileOperation
278  (
279  (
280  ioRanks().size()
281  ? UPstream::allocateCommunicator
282  (
283  UPstream::worldComm,
284  subRanks(Pstream::nProcs())
285  )
286  : UPstream::worldComm
287  ),
288  false
289  ),
290  managedComm_(comm_),
291  writer_(mag(maxThreadFileBufferSize), comm_),
292  nProcs_(Pstream::nProcs()),
293  ioRanks_(ioRanks())
294 {
295  init(verbose);
296 }
297 
298 
300 (
301  const label comm,
302  const labelList& ioRanks,
303  const word& typeName,
304  bool verbose
305 )
306 :
307  masterUncollatedFileOperation(comm, false),
308  managedComm_(-1), // Externally managed
309  writer_(mag(maxThreadFileBufferSize), comm),
310  nProcs_(Pstream::nProcs()),
311  ioRanks_(ioRanks)
312 {
313  init(verbose);
314 }
315 
316 
317 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
318 
320 {
321  // Wait for any outstanding file operations
322  flush();
323 
324  if (UPstream::isUserComm(managedComm_))
325  {
326  UPstream::freeCommunicator(managedComm_);
327  }
328 }
329 
330 
331 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
332 
334 (
335  const IOobject& io,
336  const word& typeName
337 ) const
338 {
339  // Replacement for objectPath
340  if (io.time().processorCase())
341  {
343  (
344  io,
346  "dummy", // not used for processorsobject
347  io.instance()
348  );
349  }
350  else
351  {
353  (
354  io,
356  word::null,
357  io.instance()
358  );
359  }
360 }
361 
362 
364 (
365  const regIOobject& io,
366  IOstreamOption streamOpt,
367  const bool valid
368 ) const
369 {
370  const Time& tm = io.time();
371  const fileName& inst = io.instance();
372 
373  // Update meta-data for current state
374  const_cast<regIOobject&>(io).updateMetaData();
375 
376  if (inst.isAbsolute() || !tm.processorCase())
377  {
378  mkDir(io.path());
379  fileName pathName(io.objectPath());
380 
381  if (debug)
382  {
383  Pout<< "collatedFileOperation::writeObject :"
384  << " For object : " << io.name()
385  << " falling back to master-only output to " << io.path()
386  << endl;
387  }
388 
389  // Note: currently still NON_ATOMIC (Dec-2022)
390  masterOFstream os
391  (
392  pathName,
393  streamOpt,
395  valid
396  );
397 
398  // If any of these fail, return
399  // (leave error handling to Ostream class)
400 
401  const bool ok =
402  (
403  os.good()
404  && io.writeHeader(os)
405  && io.writeData(os)
406  );
407 
408  if (ok)
409  {
411  }
412 
413  return ok;
414  }
415  else
416  {
417  // Construct the equivalent processors/ directory
418  fileName path(processorsPath(io, inst, processorsDir(io)));
419 
420  mkDir(path);
421  fileName pathName(path/io.name());
422 
423  if (io.global())
424  {
425  if (debug)
426  {
427  Pout<< "collatedFileOperation::writeObject :"
428  << " For global object : " << io.name()
429  << " falling back to master-only output to " << pathName
430  << endl;
431  }
432 
433  // Note: currently still NON_ATOMIC (Dec-2022)
434  masterOFstream os
435  (
436  pathName,
437  streamOpt,
439  valid
440  );
441 
442  // If any of these fail, return
443  // (leave error handling to Ostream class)
444 
445  const bool ok =
446  (
447  os.good()
448  && io.writeHeader(os)
449  && io.writeData(os)
450  );
451 
452  if (ok)
453  {
455  }
456 
457  return ok;
458  }
459  else if (!Pstream::parRun())
460  {
461  // Special path for e.g. decomposePar. Append to
462  // processorsDDD/ file
463  if (debug)
464  {
465  Pout<< "collatedFileOperation::writeObject :"
466  << " For object : " << io.name()
467  << " appending to " << pathName << endl;
468  }
469 
470  return appendObject(io, pathName, streamOpt);
471  }
472  else
473  {
474  // Re-check static maxThreadFileBufferSize variable to see
475  // if needs to use threading
476  const bool useThread = (maxThreadFileBufferSize != 0);
477 
478  if (debug)
479  {
480  Pout<< "collatedFileOperation::writeObject :"
481  << " For object : " << io.name()
482  << " starting collating output to " << pathName
483  << " useThread:" << useThread << endl;
484  }
485 
486  if (!useThread)
487  {
488  writer_.waitAll();
489  }
490 
491  // Note: currently still NON_ATOMIC (Dec-2022)
492  threadedCollatedOFstream os
493  (
494  writer_,
495  pathName,
496  streamOpt,
497  useThread
498  );
499 
500  bool ok = os.good();
501 
502  if (Pstream::master(comm_))
503  {
504  // Suppress comment banner
505  const bool old = IOobject::bannerEnabled(false);
506 
507  ok = ok && io.writeHeader(os);
508 
510 
511  // Additional header content
512  dictionary dict;
514  (
515  dict,
516  streamOpt,
517  io
518  );
519  os.setHeaderEntries(dict);
520  }
521 
522  ok = ok && io.writeData(os);
523  // No end divider for collated output
524 
525  return ok;
526  }
527  }
528 }
529 
531 {
532  if (debug)
533  {
534  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
535  << endl;
536  }
538  // Wait for thread to finish (note: also removes thread)
539  writer_.waitAll();
540 }
541 
542 
544 (
545  const fileName& fName
546 ) const
547 {
548  if (Pstream::parRun())
549  {
550  const List<int>& procs(UPstream::procID(comm_));
551 
552  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
553 
554  if (procs.size() != Pstream::nProcs())
555  {
556  procDir +=
557  + "_"
558  + Foam::name(procs.first())
559  + "-"
560  + Foam::name(procs.last());
561  }
562  return procDir;
563  }
564  else
565  {
566  word procDir(processorsBaseDir+Foam::name(nProcs_));
567 
568  if (ioRanks_.size())
569  {
570  // Detect current processor number
571  label proci = detectProcessorPath(fName);
572 
573  if (proci != -1)
574  {
575  // Find lowest io rank
576  label minProc = 0;
577  label maxProc = nProcs_-1;
578  for (const label ranki : ioRanks_)
579  {
580  if (ranki >= nProcs_)
581  {
582  break;
583  }
584  else if (ranki <= proci)
585  {
586  minProc = ranki;
587  }
588  else
589  {
590  maxProc = ranki-1;
591  break;
592  }
593  }
594  procDir +=
595  + "_"
596  + Foam::name(minProc)
597  + "-"
598  + Foam::name(maxProc);
599  }
600  }
602  return procDir;
603  }
604 }
605 
606 
608 (
609  const IOobject& io
610 ) const
611 {
612  return processorsDir(io.objectPath());
613 }
614 
615 
617 {
618  nProcs_ = nProcs;
619 
620  if (debug)
621  {
622  Pout<< "collatedFileOperation::setNProcs :"
623  << " Setting number of processors to " << nProcs_ << endl;
624  }
625 }
626 
627 
628 // ************************************************************************* //
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
dictionary dict
A class for handling file names.
Definition: fileName.H:71
io.objectPath() exists
Definition: fileOperation.H:76
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
int infoDetailLevel
Global for selective suppression of Info output.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:49
virtual bool writeObject(const regIOobject &, IOstreamOption streamOpt=IOstreamOption(), const bool valid=true) const
Writes a regIOobject (so header, contents and divider).
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:639
collatedFileOperation(bool verbose)
Default construct.
A simple container for options an IOstream can normally have.
static bool isAbsolute(const std::string &str)
Return true if filename starts with a &#39;/&#39; or &#39;\&#39; or (windows-only) with a filesystem-root.
Definition: fileNameI.H:129
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:688
bool appendObject(const regIOobject &io, const fileName &pathName, IOstreamOption streamOpt) const
Append to processorsNN/ file.
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:69
bool processorCase() const noexcept
Return true if this is a processor case.
Definition: TimePathsI.H:29
Macros for easy insertion into run-time selection tables.
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:413
constexpr const char *const group
Group name for atomic constants.
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator) is 1 for serial run.
Definition: UPstream.H:656
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
static void writeExtraHeaderContent(dictionary &dict, IOstreamOption streamOptData, const IOobject &io)
Helper: generate additional entries for FoamFile header.
IOobject io("surfaceFilmProperties", mesh.time().constant(), mesh, IOobject::READ_IF_PRESENT, IOobject::NO_WRITE, false)
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:567
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:52
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A class for handling words, derived from Foam::string.
Definition: word.H:63
Inter-processor communications stream.
Definition: Pstream.H:56
static std::streamoff writeBlockEntry(OSstream &os, const label blocki, const UList< char > &charData)
Helper: write block of (binary) character data.
static const word null
An empty word.
Definition: word.H:84
objectPath exists in &#39;processorsNN_first-last&#39;
Definition: fileOperation.H:85
#define DetailInfo
Definition: evalEntry.C:30
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of the io ranks (non-parallel) ...
string hostName()
Return the system&#39;s host name, as per hostname(1)
Definition: POSIX.C:324
static bool isUserComm(label communicator) noexcept
True if communicator appears to be user-allocated.
Definition: UPstream.H:386
void printBanner(const bool printRanks=false) const
Print banner information, optionally with io ranks.
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
int debug
Static debugging option.
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
Definition: stdFoam.H:193
OBJstream os(runTime.globalPath()/outputName)
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static bool bannerEnabled() noexcept
Status of output file banner.
Definition: IOobject.H:331
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
List< string > stringList
A List of strings.
Definition: stringList.H:54
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
Ostream & flush(Ostream &os)
Flush stream.
Definition: Ostream.H:477
float floatOptimisationSwitch(const char *name, const float deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:240
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in decomposePar.
static List< int > & procID(const label communicator)
Process IDs within a given communicator.
Definition: UPstream.H:704
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:607
static bool master(const label communicator=worldComm)
Am I the master rank.
Definition: UPstream.H:672
registerOptSwitch("maxThreadFileBufferSize", float, collatedFileOperation::maxThreadFileBufferSize)
versionNumber version() const noexcept
Get the stream version.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:69
const label comm_
Communicator to use.
virtual fileName::Type type(const fileName &, const bool followLink=true) const
Return the file type: DIRECTORY, FILE or SYMLINK.
defineTypeNameAndDebug(collatedFileOperation, 0)
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
Defines the attributes of an object for which implicit objectRegistry management is supported...
Definition: IOobject.H:166
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Inter-processor communications stream.
Definition: UPstream.H:54
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:239
Namespace for OpenFOAM.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...