fileOperationBroadcast.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2022-2023 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 \*---------------------------------------------------------------------------*/
27 
28 #include "fileOperation.H"
29 #include "Pstream.H"
30 #include "OSspecific.H"
31 #include <fstream>
32 #include <cinttypes>
33 
34 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
35 
36 namespace Foam
37 {
38 
39 // Implementation for broadcasting an individual file
40 static void broadcastFile_single
41 (
42  const label comm,
43  const bool writeOnProc,
44  const fileName& srcName,
45  const fileName& dstName,
46  DynamicList<char>& buffer
47 )
48 {
50  {
52  << "Reading file " << srcName
53  << " on master processor and copying to " << dstName
54  << endl;
55  }
56 
57  // Read file on master, broadcast to all but write on IOranks only.
58  // This is a lot easier / possibly quicker?
59  // than -allocating our own IO communicator
60  // -send to all IO subProcs -releasing communicator
61 
62  // The file length uses uint64_t instead of off_t
63  // since MINGW has something odd for off_t:
64  //
65  // ... error: call of overloaded 'min(off_t&, off_t&)' is ambiguous
66 
67  // Tuple2<off_t, mode_t> lengthAndMode(0, 0);
68  Tuple2<uint64_t, mode_t> lengthAndMode(0, 0);
69 
70  std::unique_ptr<std::ifstream> srcStream;
71  std::unique_ptr<std::ofstream> dstStream;
72 
73  if (UPstream::master(comm))
74  {
75  // Read (see newIFstream)
76  auto fileLen = Foam::fileSize(srcName);
77  if (fileLen > 0)
78  {
79  lengthAndMode.first() = uint64_t(fileLen);
80  }
81  lengthAndMode.second() = Foam::mode(srcName);
82 
83  srcStream.reset
84  (
85  new std::ifstream
86  (
87  srcName,
88  std::ios_base::in | std::ios_base::binary
89  )
90  );
91  if (!srcStream->good())
92  {
93  FatalIOErrorInFunction(srcName)
94  << "Could not open file for reading!"
95  << exit(FatalIOError);
96  }
97  }
98 
99  if
100  (
101  writeOnProc
102  &&
103  (
104  UPstream::master(comm)
105  ? (srcName != dstName)
106  : UPstream::is_subrank(comm)
107  )
108  )
109  {
110  // Make sure the destination directory exists.
111  // - will fail itself if not possible
112  // - no-op if directory already exists
113  Foam::mkDir(dstName.path());
114 
115  dstStream.reset
116  (
117  new std::ofstream
118  (
119  dstName,
120  std::ios_base::out | std::ios_base::binary
121  )
122  );
123 
124  if (!dstStream->good())
125  {
126  // Fail noisily or silently?
127  if (!dstStream->good())
128  {
129  dstStream.reset(nullptr);
130  }
131  }
132 
133  // Adjust mode?
134  // Foam::chMode(dstName, fileMode);
135  }
136 
137  // Broadcast size and mode (contiguous data)
139  (
140  reinterpret_cast<char*>(&lengthAndMode),
141  sizeof(lengthAndMode),
142  comm
143  );
144 
145  uint64_t fileLength = lengthAndMode.first();
146 
147  const uint64_t maxChunkSize =
148  (
150  ? uint64_t(UPstream::maxCommsSize)
151  : (UPstream::maxCommsSize < 0) // (numBytes fewer than INT_MAX)
152  ? uint64_t(INT_MAX + UPstream::maxCommsSize)
153  : uint64_t(INT_MAX) // MPI limit is <int>
154  );
155 
156 
157  while (fileLength > 0)
158  {
159  const uint64_t sendSize = std::min(fileLength, maxChunkSize);
160  fileLength -= sendSize;
161 
162  // Read file contents into a character buffer
163  buffer.resize_nocopy(static_cast<label>(sendSize));
164 
165  if (srcStream)
166  {
167  srcStream->read(buffer.data_bytes(), buffer.size_bytes());
168  }
169 
170  UPstream::broadcast(buffer.data_bytes(), buffer.size_bytes(), comm);
171 
172  if (dstStream)
173  {
174  dstStream->write(buffer.data_bytes(), buffer.size_bytes());
175  }
176  }
177 }
178 
179 
180 // Implementation for broadcasting directory contents or an individual file
181 static bool broadcastFile_recursive
182 (
183  const label comm,
184  const bool writeOnProc,
185  const fileName& src,
186  const fileName& dst,
187  // const bool followLink
188  DynamicList<char>& buffer
189 )
190 {
191  // Read file on master, broadcast to all but write on procs with doWrite
192  // only. This is a lot easier / possibly quicker?
193  // than -allocating our own IO communicator
194  // -send to all IO subProcs -releasing communicator
195 
197  {
199  << "Reading " << src
200  << " on master processor and writing a copy to " << dst
201  << endl;
202  }
203 
204  fileName::Type srcType;
205  if (UPstream::master(comm))
206  {
207  srcType = src.type(false); // followLink = false, gzip = false
208  }
209 
211  (
212  reinterpret_cast<char*>(&srcType), // contiguous data
213  sizeof(srcType),
214  comm
215  );
216 
217  // Check type of source file.
218  if (srcType == fileName::FILE)
219  {
220  broadcastFile_single(comm, writeOnProc, src, dst, buffer);
221  }
222  else if (srcType == fileName::SYMLINK)
223  {
224  WarningInFunction<< "Copying symbolic links not supported" << endl;
225 
226  // Read the link target
227  fileName linkTarget;
228  if (UPstream::master(comm))
229  {
230  linkTarget = Foam::readLink(src);
231  }
232  Pstream::broadcast(linkTarget, comm);
233 
234  if
235  (
236  writeOnProc
237  &&
238  (
239  UPstream::master(comm)
240  ? (src != dst)
241  : UPstream::is_subrank(comm)
242  )
243  )
244  {
245  // Recreate softlink on remote processor
246  return Foam::ln(linkTarget, dst);
247  }
248  }
249  else if (srcType == fileName::DIRECTORY)
250  {
251  // Copy files
252  {
253  fileNameList files;
254  if (UPstream::master(comm))
255  {
256  files = Foam::readDir
257  (
258  src,
260  false // Never trim '.gz' ending
261  //followLink
262  );
263  }
264  Pstream::broadcast(files, comm);
265 
266  for (const fileName& item : files)
267  {
268  // File to file
270  (
271  comm,
272  writeOnProc,
273  src/item,
274  dst/item,
275  //followLink
276  buffer
277  );
278  }
279  }
280 
281  // Copy softlinks
282  {
283  fileNameList files;
284  if (UPstream::master(comm))
285  {
286  files = Foam::readDir
287  (
288  src,
290  false // Never trim '.gz' ending
291  //followLink
292  );
293  }
294  Pstream::broadcast(files, comm);
295 
296  for (const fileName& item : files)
297  {
298  // Softlink to softlink
300  (
301  comm,
302  writeOnProc,
303  src/item,
304  dst/item,
305  //followLink
306  buffer
307  );
308  }
309  }
310 
311 
312  // Copy sub directories
313  {
314  fileNameList dirs;
315  if (UPstream::master(comm))
316  {
317  dirs = Foam::readDir
318  (
319  src,
321  false // Never trim '.gz' ending
322  //followLink
323  );
324  }
325  Pstream::broadcast(dirs, comm);
326 
327  for (const fileName& item : dirs)
328  {
329  // Dir to Dir
331  (
332  comm,
333  writeOnProc,
334  src/item,
335  dst/item,
336  //followLink
337  buffer
338  );
339  }
340  }
341  }
342  else if (srcType == fileName::UNDEFINED)
343  {
344  WarningInFunction<< "No known file type: " << src << endl;
345  return false;
346  }
347  else
348  {
349  return false;
350  }
351 
352  return true;
353 }
355 } // End namespace Foam
356 
357 
358 // * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
359 
361 (
362  const label comm,
363  const bool writeOnProc,
364  const fileName& srcPath,
365  const fileName& dstPath
366  //const bool followLink
367 ) const
368 {
369  DynamicList<char> fileContents;
370 
371 
372  // FUTURE:
373  // Handling with broadcast of file -> directory etc
374  // as per Foam::cp()
375  //
376  // However, this adds extra communicattion and logic etc...
377 
378 #if 0
379  const fileName::Type srcType = src.type(followLink);
380 
381  // Check type of source file.
382  if (srcType == fileName::FILE)
383  {
384  // If dest is a directory, create the destination file name.
385  if (destFile.type() == fileName::DIRECTORY)
386  {
387  destFile = destFile/src.name();
388  }
389  }
390  else if (srcType == fileName::DIRECTORY)
391  {
392  if (destFile.type() == fileName::DIRECTORY)
393  {
394  // Both are directories. Could mean copy contents or copy
395  // recursively. Don't actually know what the user wants,
396  // but assume that if names are identical == copy contents.
397  //
398  // So: "path1/foo" "path2/foo" copy contents
399  // So: "path1/foo" "path2/bar" copy directory
400 
401  const word srcDirName = src.name();
402  if (destFile.name() != srcDirName)
403  {
404  destFile /= srcDirName;
405  }
406  }
407  }
408 #endif
409 
411  (
412  comm,
413  writeOnProc,
414  srcPath,
415  (dstPath.empty() ? srcPath : dstPath),
416  fileContents
417  );
418 }
419 
420 
421 // ************************************************************************* //
A symbolic link.
Definition: fileName.H:86
A class for handling file names.
Definition: fileName.H:72
off_t fileSize(const fileName &name, const bool followLink=true)
Return size of file or -1 on failure (normally follows symbolic links).
Definition: POSIX.C:905
fileName readLink(const fileName &link)
Return the contents (target) of a symlink.
Definition: POSIX.C:1290
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
A 2-tuple for storing two objects of dissimilar types. The container is similar in purpose to std::pa...
Definition: stringOps.H:54
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:402
Undefined type.
Definition: fileName.H:83
static bool broadcastFile_recursive(const label comm, const bool writeOnProc, const fileName &src, const fileName &dst, DynamicList< char > &buffer)
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static std::string path(const std::string &str)
Return directory path name (part before last /)
Definition: fileNameI.H:169
char * data_bytes() noexcept
Return pointer to the underlying array serving as data storage,.
Definition: UListI.H:279
static void broadcast(Type &value, const label comm=UPstream::worldComm)
Broadcast content (contiguous or non-contiguous) to all communicator ranks. Does nothing in non-paral...
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:614
virtual bool broadcastCopy(const label comm, const bool writeOnProc, const fileName &src, const fileName &dst) const
Read dir/file (recursively if necessary) on master of the communicator, send and write contents to al...
A regular file.
Definition: fileName.H:84
label min(const labelHashSet &set, label minValue=labelMax)
Find the min value in labelHashSet, optionally limited by second argument.
Definition: hashSets.C:26
bool ln(const fileName &src, const fileName &dst)
Create a softlink. dst should not exist. Returns true if successful.
Definition: POSIX.C:1237
int debug
Static debugging option.
static bool is_subrank(const label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
Definition: UPstream.H:1111
static void broadcastFile_single(const label comm, const bool writeOnProc, const fileName &srcName, const fileName &dstName, DynamicList< char > &buffer)
#define WarningInFunction
Report a warning using Foam::Warning.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:637
static bool broadcast(char *buf, const std::streamsize bufSize, const label communicator, const int rootProcNo=masterNo())
Broadcast buffer contents to all processes in given communicator. The sizes must match on all process...
label comm() const noexcept
Communicator to use.
const T2 & second() const noexcept
Access the second element.
Definition: Tuple2.H:142
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition: UPstream.H:1094
void resize_nocopy(const label len)
Alter addressable list size, allocating new space if required without necessarily recovering old cont...
Definition: DynamicListI.H:375
const T1 & first() const noexcept
Access the first element.
Definition: Tuple2.H:132
List< fileName > fileNameList
List of fileName.
Definition: fileNameList.H:32
fileNameList readDir(const fileName &directory, const fileName::Type type=fileName::Type::FILE, const bool filtergz=true, const bool followLink=true)
Read a directory and return the entries as a fileName List.
Definition: POSIX.C:963
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Definition: POSIX.C:773
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition: UListI.H:286
Type
Enumerations to handle directory entry types.
Definition: fileName.H:81
Namespace for OpenFOAM.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL IO ERROR&#39; header text and ...
#define InfoInFunction
Report an information message using Foam::Info.