UPstreamFile.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2025 Mark Olesen
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 \*---------------------------------------------------------------------------*/
27 
28 #include "fileName.H"
29 #include "UPstreamFile.H"
30 #include "PstreamGlobals.H"
31 #include "OSspecific.H"
32 
33 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
34 
35 // Has _c() version?
36 #undef Foam_UPstream_largeCountFile
37 
38 #if (MPI_VERSION >= 4)
39 #define Foam_UPstream_largeCountFile
40 #endif
41 
42 // Macros for calling versions with or without '_c'
43 #ifdef Foam_UPstream_largeCountFile
44 #define Foam_mpiCall(Function) Function##_c
45 #else
46 #define Foam_mpiCall(Function) Function
47 #endif
48 
49 
50 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
51 
52 namespace
53 {
54 
55 inline bool checkCount(std::streamsize count, const char* what)
56 {
57  #ifndef Foam_UPstream_largeCountFile
58  if (FOAM_UNLIKELY(count > std::streamsize(INT_MAX)))
59  {
60  using namespace Foam;
62  << "Write size " << label(count)
63  << " exceeds INT_MAX bytes for '" << what << "'\n"
65  return false;
66  }
67  #endif
68 
69  return true;
70 }
71 
72 } // End anonymous namespace
73 
74 
75 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
76 
77 namespace Foam
78 {
79 
80 /*---------------------------------------------------------------------------*\
81  Class UPstream::File::Impl Declaration
82 \*---------------------------------------------------------------------------*/
83 
85 {
86  //- The file-handle
87  MPI_File fh_;
88 
89  //- Path of the open file
90  fileName name_;
91 
92  //- The current state (open|read|write|closed etc)
93  int state_;
94 
95  //- The associated rank when openned
96  int rank_;
97 
98 public:
99 
100  //- The file states
101  enum states : int { CLOSED = 0, READ, WRITE, ATOMIC_WRITE };
102 
103  // Constructors
105  //- Default construct
106  Impl()
107  :
108  fh_(MPI_FILE_NULL),
109  state_(CLOSED),
110  rank_(0)
111  {}
112 
113 
114  // Member Functions
115 
116  // The file handle
117  const MPI_File& handle() const noexcept { return fh_; }
118  MPI_File& handle() noexcept { return fh_; }
119 
120  // Path to the open file
121  const fileName& name() const noexcept { return name_; }
122  fileName& name() noexcept { return name_; }
124  // Change the file state, return the old value
125  int state(states val) noexcept
126  {
127  int old(state_);
128  state_ = val;
129  return old;
130  }
131 
132  //- Is rank 0 ? (master rank)
133  bool master() const noexcept { return (rank_ == 0); }
134 
135  //- Get the associated rank
136  int rank() const noexcept { return rank_; }
137 
138  //- Set the associated rank
139  void rank(int val) noexcept { rank_ = val; }
141 
142  // Checks
143 
144  // The file state
145  bool is_open() const noexcept { return state_; }
146 
147  // The file read state
148  bool is_read() const noexcept
149  {
150  return (states::READ == state_);
151  }
152 
153  // The file write atomic state
154  bool is_atomic() const noexcept
155  {
156  return (states::ATOMIC_WRITE == state_);
157  }
158 
159  // The file write state (atomic or non-atomic)
160  bool is_write() const noexcept
161  {
162  return (states::ATOMIC_WRITE == state_ || states::WRITE == state_);
163  }
164 
165  //- Assert is_read() or FatalError
166  inline bool checkReadable(const char* what) const
167  {
168  if (FOAM_UNLIKELY(!is_read()))
169  {
171  << "File handler not open for reading '" << what << "'\n"
172  << "name: " << name() << nl
174  return false;
175  }
176  return true;
177  }
178 
179  //- Assert is_write() or FatalError
180  inline bool checkWritable(const char* what) const
181  {
182  if (FOAM_UNLIKELY(!is_write()))
183  {
185  << "File handler not open for writing'" << what << "'\n"
186  << "name: " << name() << nl
188  return false;
189  }
190  return true;
191  }
192 };
193 
194 } // End namespace Foam
196 
197 // * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
198 
200 {
201  return true;
202 }
203 
204 
205 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
206 
208 :
209  file_(new UPstream::File::Impl)
210 {}
211 
212 
213 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
214 
216 {
217  if (FOAM_UNLIKELY(file_ && file_->is_open()))
218  {
220  << "Exited scope without close()" << nl
221  << " FIX YOUR CODE!!" << endl;
222  // Do not call close() since we don't know where that collective
223  // should have been called
224  }
225 }
226 
227 
228 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
229 
231 {
232  return (file_ ? file_->name() : fileName::null);
233 }
234 
235 
237 {
238  return bool(file_ && file_->is_open());
239 }
240 
241 
243 {
244  if (FOAM_UNLIKELY(!file_->is_open()))
245  {
247  << "Called without an open file handler !" << endl;
248  return false;
249  }
250 
251  MPI_File_close(&(file_->handle()));
252 
253  // Atomic rename of file (master only)
254  const fileName& pathname = file_->name();
255 
256  if (file_->master() && file_->is_atomic() && !pathname.empty())
257  {
258  std::rename
259  (
260  (pathname + "~tmp~").c_str(),
261  pathname.c_str()
262  );
263  }
264 
265  file_->state(Impl::CLOSED);
266  file_->name() = "";
267  file_->rank(0);
268 
269  return true;
270 }
271 
272 
273 // * * * * * * * * * * * * Member Functions (Reading) * * * * * * * * * * * //
274 
275 #if 0
276 bool Foam::UPstream::File::open_read
277 (
278  const int communicator,
279  const fileName& pathname
280 )
281 {
282  //Needed? PstreamGlobals::checkCommunicator(communicator, 0);
283 
284  if (FOAM_UNLIKELY(file_->is_open()))
285  {
287  << "Previous use of file handler did not call close()" << nl
288  << " FIX YOUR CODE!!" << endl;
289  // Do not call close() since we don't know where that collective
290  // should have been called
291  }
292  file_->state(Impl::CLOSED);
293  file_->name() = pathname; // <- set now for external error messages
294  file_->rank(0);
295 
296  int returnCode = MPI_File_open
297  (
298  PstreamGlobals::MPICommunicators_[communicator],
299  pathname.c_str(),
300  (MPI_MODE_RDONLY),
301  MPI_INFO_NULL,
302  &(file_->handle())
303  );
304 
305  if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
306  {
308  << "Error encounted in MPI_File_open() : "
309  << pathname << nl
311 
312  return false;
313  }
314 
315  file_->state(Impl::READ);
316  file_->name() = pathname;
317  file_->rank(UPstream::myProcNo(communicator));
318 
319  return true; // ie, is_read()
320 }
321 #endif
322 
323 
324 // * * * * * * * * * * * * Non-Collective Reading * * * * * * * * * * * * * //
325 
326 #if 0
327 bool Foam::UPstream::File::get_header(DynamicList<char>& content)
328 {
329  std::streamsize headerSize(4096);
330 
331  // constexpr const char* const func = "MPI_File_read_at";
332  file_->checkReadable("MPI_File_read_at");
333 
334  if (off_t fileLen = Foam::fileSize(this->name()); fileLen >= 0)
335  {
336  std::streamsize size = std::streamsize(fileLen);
337  if (headerSize > size)
338  {
339  headerSize = size;
340  }
341  }
342  else
343  {
344  content.clear();
345  return false;
346  }
347 
348  // Get the first header content:
349  content.resize_nocopy(headerSize);
350 
351  int returnCode = Foam_mpiCall(MPI_File_read_at)
352  (
353  file_->handle(),
354  0, // offset
355  content.data(),
356  content.size(),
357  MPI_BYTE,
358  MPI_STATUS_IGNORE
359  );
360 
361  // Wrap as ISpanStream headerStream(content);
362  if (MPI_SUCCESS == returnCode)
363  {
364  ISpanStream is(content);
365  dictionary headerDict;
366 
367  // Read the regular "FoamFile" header
368  bool ok = io.readHeader(headerDict, is);
369 
370  // Probably collated - extract class from "data.class"
371  if
372  (
374  && headerDict.readIfPresent("data.class", io.headerClassName())
375  )
376  {
377  return ok;
378  }
379  }
380 
381  return (MPI_SUCCESS == returnCode);
382 }
383 #endif
384 
385 
386 // * * * * * * * * * * * * Member Functions (Writing) * * * * * * * * * * * //
387 
389 (
390  const int communicator,
391  const fileName& pathname,
392  IOstreamOption::atomicType atomicType
393 )
394 {
395  //Needed? PstreamGlobals::checkCommunicator(communicator, 0);
396 
397  if (FOAM_UNLIKELY(file_->is_open()))
398  {
400  << "Previous use of file handler did not call close()" << nl
401  << " FIX YOUR CODE!!" << endl;
402  // Do not call close() since we don't know where that collective
403  // should have been called
404  }
405  file_->state(Impl::CLOSED);
406  file_->name() = pathname; // <- set now for external error messages
407  file_->rank(0);
408 
409  const bool atomic = (IOstreamOption::atomicType::ATOMIC == atomicType);
410 
411  // When opening new files, remove file variants out of the way.
412  // Eg, opening "file1"
413  // - remove old "file1.gz" (compressed)
414  // - also remove old "file1" if it is a symlink
415 
416  const fileName pathname_gz(pathname + ".gz");
417  const fileName pathname_tmp(pathname + "~tmp~");
418 
419  // File to open with MPI_File_open
420  const auto& target = (atomic ? pathname_tmp : pathname);
421 
422  // Remove old compressed version (if any)
423  if
424  (
425  auto fType = Foam::type(pathname_gz, false);
426  (fType == fileName::SYMLINK || fType == fileName::FILE)
427  )
428  {
429  Foam::rm(pathname_gz);
430  }
431 
432  // Avoid writing into symlinked files (non-append mode)
433  if
434  (
435  auto fType = Foam::type(target, false);
436  fType == fileName::SYMLINK
437  )
438  {
439  Foam::rm(target);
440  }
441 
442  int returnCode = MPI_File_open
443  (
444  PstreamGlobals::MPICommunicators_[communicator],
445  target.c_str(),
446  (MPI_MODE_CREATE | MPI_MODE_WRONLY),
447  MPI_INFO_NULL,
448  &(file_->handle())
449  );
450 
451  if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
452  {
454  << "Error encounted in MPI_File_open() : "
455  << target << nl
457 
458  return false;
459  }
460 
461  file_->state(atomic ? Impl::ATOMIC_WRITE : Impl::WRITE);
462  file_->name() = pathname;
463  file_->rank(UPstream::myProcNo(communicator));
464 
465  return true; // ie, is_write()
466 }
467 
468 
469 bool Foam::UPstream::File::set_size(std::streamsize num_bytes)
470 {
471  if (FOAM_UNLIKELY(!file_->is_open()))
472  {
474  << "Called without an open file handler !" << endl;
475  return false;
476  }
477 
478  int returnCode = MPI_File_set_size(file_->handle(), num_bytes);
479 
480  return (MPI_SUCCESS == returnCode);
481 }
482 
483 
484 // * * * * * * * * * * * * Non-Collective Writing * * * * * * * * * * * * * //
485 
487 (
488  const void* data,
489  std::streamsize count,
490  const UPstream::dataTypes dataTypeId
491 )
492 {
493  MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
494 
495  // constexpr const char* const func = "MPI_File_write";
496  file_->checkWritable("MPI_File_write");
497  checkCount(count, "MPI_File_write");
498 
499  int returnCode = Foam_mpiCall(MPI_File_write)
500  (
501  file_->handle(),
502  data,
503  count,
504  datatype,
505  MPI_STATUS_IGNORE
506  );
507 
508  return (MPI_SUCCESS == returnCode);
509 }
510 
511 
513 (
514  std::streamsize offset,
515  const void* data,
516  std::streamsize count,
517  const UPstream::dataTypes dataTypeId
518 )
519 {
520  MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
521 
522  // constexpr const char* const func = "MPI_File_write_at";
523  file_->checkWritable("MPI_File_write_at");
524  checkCount(count, "MPI_File_write_at");
525 
526  int returnCode = Foam_mpiCall(MPI_File_write_at)
527  (
528  file_->handle(),
529  offset,
530  data,
531  count,
532  datatype,
533  MPI_STATUS_IGNORE
534  );
535 
536  return (MPI_SUCCESS == returnCode);
537 }
538 
539 
540 // * * * * * * * * * * * * * Collective Writing * * * * * * * * * * * * * * //
541 
543 (
544  const void* data,
545  std::streamsize count,
546  const UPstream::dataTypes dataTypeId
547 )
548 {
549  MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
550 
551  // constexpr const char* const func = "MPI_File_write_all";
552  file_->checkWritable("MPI_File_write_all");
553  checkCount(count, "MPI_File_write_all");
554 
555  int returnCode = Foam_mpiCall(MPI_File_write_all)
556  (
557  file_->handle(),
558  data,
559  count,
560  datatype,
561  MPI_STATUS_IGNORE
562  );
563 
564  return (MPI_SUCCESS == returnCode);
565 }
566 
567 
569 (
570  std::streamsize offset,
571  const void* data,
572  std::streamsize count,
573  const UPstream::dataTypes dataTypeId
574 )
575 {
576  MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
577 
578  // constexpr const char* const func = "MPI_File_write_at_all";
579  file_->checkWritable("MPI_File_write_at_all");
580  checkCount(count, "MPI_File_write_at_all");
581 
582  int returnCode = Foam_mpiCall(MPI_File_write_at_all)
583  (
584  file_->handle(),
585  offset,
586  data,
587  count,
588  datatype,
589  MPI_STATUS_IGNORE
590  );
591 
592  return (MPI_SUCCESS == returnCode);
593 }
594 
595 
596 // bool Foam::UPstream::File::write_data_all_begin
597 // (
598 // const void* data,
599 // std::streamsize count,
600 // const UPstream::dataTypes dataTypeId
601 // )
602 // {
603 // MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
604 //
605 // // constexpr const char* const func = "MPI_File_write_all_begin";
606 // file_->checkWritable("MPI_File_write_all_begin");
607 // checkCount(count, "MPI_File_write_all_begin");
608 //
609 // int returnCode = Foam_mpiCall(MPI_File_write_all_begin)
610 // (
611 // file_->handle(),
612 // data,
613 // count,
614 // datatype,
615 // MPI_STATUS_IGNORE
616 // );
617 //
618 // return (MPI_SUCCESS == returnCode);
619 // }
620 
621 
622 // bool Foam::UPstream::File::write_data_all_end
623 // (
624 // const void* data
625 // )
626 // {
627 // file_->checkWritable("MPI_File_write_all_end");
628 // int returnCode = Foam_mpiCall(MPI_File_write_all_end)
629 // (
630 // file_->handle(),
631 // data
632 // MPI_STATUS_IGNORE
633 // );
634 //
635 // return (MPI_SUCCESS == returnCode);
636 // }
637 
638 
639 // bool Foam::UPstream::File::write_data_at_all_begin
640 // (
641 // std::streamsize offset,
642 // const void* data,
643 // std::streamsize count,
644 // const UPstream::dataTypes dataTypeId
645 // )
646 // {
647 // MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
648 //
649 // // constexpr const char* const func = "MPI_File_write_at_all_begin";
650 // file_->checkWritable("MPI_File_write_at_all_begin");
651 // checkCount(count, "MPI_File_write_at_all_begin");
652 //
653 // int returnCode = Foam_mpiCall(MPI_File_write_at_all_begin)
654 // (
655 // file_->handle(),
656 // offset,
657 // data,
658 // count,
659 // datatype,
660 // MPI_STATUS_IGNORE
661 // );
662 //
663 // return (MPI_SUCCESS == returnCode);
664 // }
665 
666 
667 // bool Foam::UPstream::File::write_data_at_all_end
668 // (
669 // const void* data
670 // )
671 // {
672 // file_->checkWritable("MPI_File_write_at_all_end");
673 //
674 // int returnCode = Foam_mpiCall(MPI_File_write_at_all_end)
675 // (
676 // file_->handle(),
677 // data
678 // MPI_STATUS_IGNORE
679 // );
680 //
681 // return (MPI_SUCCESS == returnCode);
682 // }
683 
684 
685 // ************************************************************************* //
bool checkReadable(const char *what) const
Assert is_read() or FatalError.
Definition: UPstreamFile.C:179
A symbolic link.
Definition: fileName.H:86
bool is_write() const noexcept
Definition: UPstreamFile.C:171
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:119
bool write_data(const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write [non-collective] : write data.
Definition: UPstreamFile.C:107
static bool supported()
True if MPI/IO appears to be supported.
Definition: UPstreamFile.C:40
A class for handling file names.
Definition: fileName.H:72
File()
Default construct.
Definition: UPstreamFile.C:48
off_t fileSize(const fileName &name, const bool followLink=true)
Return size of file or -1 on failure (normally follows symbolic links).
Definition: POSIX.C:911
bool set_size(std::streamsize num_bytes)
Set the (output) file size [collective].
Definition: UPstreamFile.C:156
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
An opaque wrapper for MPI_File methods without any <mpi.h> header dependency.
Definition: UPstreamFile.H:51
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
A list of keyword definitions, which are a keyword followed by a number of values (eg...
Definition: dictionary.H:130
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:652
bool is_open() const noexcept
Definition: UPstreamFile.C:156
bool is_atomic() const noexcept
Definition: UPstreamFile.C:165
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
bool write_data_all(const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write_all [collective] : write data.
Definition: UPstreamFile.C:132
static const fileName null
An empty fileName.
Definition: fileName.H:111
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition: UListI.H:267
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:518
static int myProcNo(label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition: UPstream.H:1799
bool write_data_at_all(std::streamsize offset, const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write_at_all [collective] : write data at specified offset.
Definition: UPstreamFile.C:144
bool write_data_at(std::streamsize offset, const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write_at [non-collective] : write data at specified offset.
Definition: UPstreamFile.C:119
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
fileName::Type type(const fileName &name, const bool followLink=true)
Return the file type: DIRECTORY or FILE, normally following symbolic links.
Definition: POSIX.C:805
atomicType
Atomic operations (output)
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
const MPI_File & handle() const noexcept
Definition: UPstreamFile.C:122
const fileName & name() const
The name of the open stream.
Definition: UPstreamFile.C:59
int state(states val) noexcept
Definition: UPstreamFile.C:130
A regular file.
Definition: fileName.H:84
const auto & io
#define FOAM_UNLIKELY(cond)
Definition: stdFoam.H:65
Impl()
Default construct.
Definition: UPstreamFile.C:111
static bool isCollatedType(const word &objectType)
True if object type is a known collated type.
errorManip< error > abort(error &err)
Definition: errorManip.H:139
int rank() const noexcept
Get the associated rank.
Definition: UPstreamFile.C:145
const direction noexcept
Definition: scalarImpl.H:265
bool readIfPresent(const word &keyword, T &val, enum keyType::option matchOpt=keyType::REGEX) const
Find an entry if present, and assign to T val. FatalIOError if it is found and the number of tokens i...
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
Definition: DynamicListI.H:451
dataTypes
Mapping of some fundamental and aggregate types to MPI data types.
Definition: UPstream.H:107
bool checkWritable(const char *what) const
Assert is_write() or FatalError.
Definition: UPstreamFile.C:195
#define WarningInFunction
Report a warning using Foam::Warning.
states
The file states.
Definition: UPstreamFile.C:104
#define Foam_mpiCall(Function)
Definition: UPstreamFile.C:39
MPI_Datatype getDataType(UPstream::dataTypes id)
Lookup of dataTypes enumeration as an MPI_Datatype.
bool open_write(const int communicator, const fileName &pathname, IOstreamOption::atomicType=IOstreamOption::NON_ATOMIC)
MPI_File_open [collective] : open file in write-only mode, no-append.
Definition: UPstreamFile.C:95
~File()
Destructor. Non-default in header (incomplete types)
Definition: UPstreamFile.C:53
bool is_read() const noexcept
Definition: UPstreamFile.C:159
void resize_nocopy(const label len)
Alter addressable list size, allocating new space if required without necessarily recovering old cont...
Definition: DynamicListI.H:418
bool close()
MPI_File_close [collective].
Definition: UPstreamFile.C:71
bool master() const noexcept
Is rank 0 ? (master rank)
Definition: UPstreamFile.C:140
Similar to IStringStream but using an externally managed buffer for its input. This allows the input ...
Definition: ISpanStream.H:270
Inter-processor communications stream.
Definition: UPstream.H:69
Namespace for OpenFOAM.
const fileName & name() const noexcept
Definition: UPstreamFile.C:126
bool is_open() const
True if allocated and open has been called.
Definition: UPstreamFile.C:65
bool rm(const fileName &file)
Remove a file (or its gz equivalent), returning true if successful.
Definition: POSIX.C:1410