OFstreamCollator.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2017-2018 OpenFOAM Foundation
9  Copyright (C) 2021-2022 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Class
28  Foam::OFstreamCollator
29 
30 Description
31  Threaded file writer.
32 
33  Collects all data from all processors and writes as single
34  'decomposedBlockData' file. The operation is determined by the
35  buffer size (maxThreadFileBufferSize setting):
36  - local size of data is larger than buffer: receive and write processor
37  by processor (i.e. 'scheduled'). Does not use a thread, no file size
38  limit.
39  - total size of data is larger than buffer (but local is not):
40  thread does all the collecting and writing of the processors. No file
41  size limit.
42  - total size of data is less than buffer:
43  collecting is done locally; the thread only does the writing
44  (since the data has already been collected)
45 
46 SourceFiles
47  OFstreamCollator.C
48 
49 \*---------------------------------------------------------------------------*/
50 
51 #ifndef Foam_OFstreamCollator_H
52 #define Foam_OFstreamCollator_H
53 
54 #include <thread>
55 #include <mutex>
56 #include "IOstream.H"
57 #include "labelList.H"
58 #include "FIFOStack.H"
59 #include "SubList.H"
60 #include "dictionary.H"
61 
62 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
63 
64 namespace Foam
65 {
66 
67 /*---------------------------------------------------------------------------*\
68  Class OFstreamCollator Declaration
69 \*---------------------------------------------------------------------------*/
70 
71 class OFstreamCollator
72 {
73  // Private Class
74 
75  struct writeData
76  {
77  const label comm_;
78  const word objectType_;
79  const fileName pathName_;
80  const string data_;
81  const labelList sizes_;
82  PtrList<List<char>> slaveData_;
83  const IOstreamOption streamOpt_;
86  const dictionary headerEntries_;
87 
88  writeData
89  (
90  const label comm,
91  const word& objectType,
92  const fileName& pathName,
93  const string& data,
94  const labelList& sizes,
95  IOstreamOption streamOpt,
98  const dictionary& headerEntries
99  )
100  :
101  comm_(comm),
102  objectType_(objectType),
103  pathName_(pathName),
104  data_(data),
105  sizes_(sizes),
106  slaveData_(),
107  streamOpt_(streamOpt),
108  atomic_(atomic),
109  append_(append),
110  headerEntries_(headerEntries)
111  {}
112 
113  //- The (approximate) size of master + any optional slave data
114  off_t size() const
115  {
116  off_t totalSize = data_.size();
117  forAll(slaveData_, i)
118  {
119  if (slaveData_.set(i))
120  {
121  totalSize += slaveData_[i].size();
122  }
123  }
124  return totalSize;
125  }
126  };
127 
128 
129  // Private Data
130 
131  //- Total amount of storage to use for object stack below
132  const off_t maxBufferSize_;
133 
134  mutable std::mutex mutex_;
135 
136  std::unique_ptr<std::thread> thread_;
137 
138  //- Stack of files to write + contents
139  FIFOStack<writeData*> objects_;
140 
141  //- Whether thread is running (and not exited)
142  bool threadRunning_;
143 
144  //- Communicator to use for all parallel ops (in simulation thread)
145  label localComm_;
146 
147  //- Communicator to use for all parallel ops (in write thread)
148  label threadComm_;
149 
150 
151  // Private Member Functions
152 
153  //- Write actual file
154  static bool writeFile
155  (
156  const label comm,
157  const word& objectType,
158  const fileName& fName,
159  const string& masterData,
160  const labelUList& recvSizes,
161  const UPtrList<SubList<char>>& slaveData,
162  IOstreamOption streamOpt,
165  const dictionary& headerEntries
166  );
167 
168  //- Write all files in stack
169  static void* writeAll(void *threadarg);
170 
171  //- Wait for total size of objects_ (master + optional slave data)
172  // to be wantedSize less than overall maxBufferSize.
173  void waitForBufferSpace(const off_t wantedSize) const;
174 
175 
176 public:
177 
178  // Declare name of the class and its debug switch
179  TypeName("OFstreamCollator");
180 
181 
182  // Constructors
183 
184  //- Construct from buffer size. 0 = do not use thread
185  explicit OFstreamCollator(const off_t maxBufferSize);
186 
187  //- Construct from buffer size (0 = do not use thread)
188  //- and specified communicator
189  OFstreamCollator(const off_t maxBufferSize, const label comm);
190 
191 
192  //- Destructor
193  virtual ~OFstreamCollator();
194 
195 
196  // Member Functions
197 
198  //- Write file with contents.
199  // Blocks until writethread has space available
200  // (total file sizes < maxBufferSize)
201  bool write
202  (
203  const word& objectType,
204  const fileName&,
205  const string& data,
206  IOstreamOption streamOpt,
209  const bool useThread = true,
210  const dictionary& headerEntries = dictionary::null
211  );
212 
213  //- Wait for all thread actions to have finished
214  void waitAll();
215 };
216 
217 
218 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
219 
220 } // End namespace Foam
221 
222 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
223 
224 #endif
225 
226 // ************************************************************************* //
A FIFO stack based on a singly-linked list.
Definition: FIFOStack.H:45
Threaded file writer.
A class for handling file names.
Definition: fileName.H:72
virtual ~OFstreamCollator()
Destructor.
static void writeData(Ostream &os, const Type &val)
A list of keyword definitions, which are a keyword followed by a number of values (eg...
Definition: dictionary.H:129
A simple container for options an IOstream can normally have.
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
atomicType
Atomic operations (output)
A List obtained as a section of another List.
Definition: SubList.H:50
bool write(const word &objectType, const fileName &, const string &data, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents.
A class for handling words, derived from Foam::string.
Definition: word.H:63
label size() const noexcept
The number of entries in the list.
Definition: UPtrListI.H:106
void waitAll()
Wait for all thread actions to have finished.
static const dictionary null
An empty dictionary, which is also the parent for all dictionaries.
Definition: dictionary.H:474
A list of pointers to objects of type <T>, without allocation/deallocation management of the pointers...
Definition: HashTable.H:106
const T * set(const label i) const
Return const pointer to element (can be nullptr), or nullptr for out-of-range access (ie...
Definition: PtrList.H:159
appendType
File appending (NON_APPEND | APPEND)
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers...
Definition: List.H:55
TypeName("OFstreamCollator")
Namespace for OpenFOAM.