multiWorldConnectionsObject.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2021-2023 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 \*---------------------------------------------------------------------------*/
27 
29 #include "Pstream.H"
30 
31 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
32 
33 namespace Foam
34 {
35  defineTypeNameAndDebug(multiWorldConnections, 0);
36 }
37 
38 
39 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
40 
41 namespace Foam
42 {
43 
44 // Combine world-to-world connections.
45 // Forward connection = 1, Backward connection = 2, Both = 3
47 {
48  void operator()(EdgeMap<unsigned>& a, const EdgeMap<unsigned>& b) const
49  {
50  forAllConstIters(b, iter)
51  {
52  a(iter.key()) |= iter.val();
53  }
54  }
55 };
56 
57 
58 static void printDOT(Ostream& os, const EdgeMap<unsigned>& connections)
59 {
60  os << nl << "// Multiworld communication graph:" << nl;
61  os.beginBlock("graph");
62 
63  // Graph Nodes == worlds
64  label worldi = 0;
65  for (const word& worldName : UPstream::allWorlds())
66  {
67  os.indent();
68  os << worldi << " [xlabel=" << worldi
69  << ",label=\"" << worldName << "\"]" << nl;
70 
71  ++worldi;
72  }
73  os << nl;
74 
75  // Graph Edges == connections
76  for (const edge& connect : connections.sortedToc())
77  {
78  os.indent();
79  os << connect.first() << " -- " << connect.second();
80 
81  // Mismatched forward/backward connections?
82  if (connections.lookup(connect, 0u) != 3u)
83  {
84  os << " [style=dashed] // mismatched?";
85  }
86  os << nl;
87  }
88 
89  os.endBlock();
90 
91  os << "// end graph" << nl;
92 }
93 
94 } // End namespace Foam
95 
96 
97 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
98 
99 Foam::edge Foam::multiWorldConnections::worldPair(const label otherWorld)
100 {
101  if (otherWorld < 0 || !Pstream::parRun())
102  {
103  Perr<< "ignore: no world or non-parallel" << endl;
104  return edge(-1, -1);
105  }
106  else if (UPstream::allWorlds().size() <= otherWorld)
107  {
108  Perr<< "ignore: invalid world: " << otherWorld << endl;
109  return edge(-1, -1);
110  }
111 
112  const label thisWorldID = UPstream::myWorldID();
113 
114  // The worlds (sorted)
115  return edge(thisWorldID, otherWorld, true);
116 }
117 
118 
119 Foam::edge Foam::multiWorldConnections::worldPair(const word& otherWorld)
120 {
121  if (otherWorld.empty() || !Pstream::parRun())
122  {
123  Perr<< "ignore: no world or non-parallel" << endl;
124  return edge(-1, -1);
125  }
126 
127  const label thisWorldID = UPstream::myWorldID();
128  const label otherWorldID = UPstream::allWorlds().find(otherWorld);
129 
130  if (otherWorldID < 0)
131  {
133  << "Cannot find world " << otherWorld
134  << " in set of worlds " << flatOutput(UPstream::allWorlds())
135  << exit(FatalError);
136  }
137 
138  // The worlds (sorted)
139  return edge(thisWorldID, otherWorldID, true);
140 }
141 
142 
143 Foam::label Foam::multiWorldConnections::createCommunicator(const edge& worlds)
144 {
145  // Fallback: do not create, just use local world
146  label comm = UPstream::worldComm;
147 
148  if (!worlds.good())
149  {
150  return comm;
151  }
152 
153  const labelList& worldIDs = UPstream::worldIDs();
154 
155  DynamicList<label> subRanks(worldIDs.size());
156  forAll(worldIDs, proci)
157  {
158  if (worlds.found(worldIDs[proci]))
159  {
160  subRanks.push_back(proci);
161  }
162  }
163 
164  // Allocate new communicator with global world
166 
167  if (debug & 2)
168  {
169  Pout<< "multiWorld::communicator :"
170  << " between " << UPstream::allWorlds()[worlds.first()]
171  << " and " << UPstream::allWorlds()[worlds.second()]
172  << " sub-ranks: " << subRanks
173  << " comm:" << comm << endl;
174  }
176  return comm;
177 }
178 
179 
180 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
181 
183 :
185 {}
186 
187 
188 // * * * * * * * * * * * * * * * * Selectors * * * * * * * * * * * * * * * * //
189 
192 {
194 }
195 
196 
197 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
200 {}
201 
202 
203 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
206 {
207  return table_.empty();
208 }
209 
212 {
213  return table_.size();
214 }
215 
216 
218 {
219  // Need new communicator(s)
220 
221  const label thisWorldID = UPstream::myWorldID();
222 
223  EdgeMap<unsigned> allConnections;
224  forAllConstIters(table_, iter)
225  {
226  const edge& connect = iter.key();
227 
228  allConnections.insert
229  (
230  connect,
231  (connect.first() == thisWorldID ? 1u : 2u)
232  );
233  }
234 
235 
236  // Use MPI_COMM_WORLD
237  const label oldWarnComm = UPstream::commWarn(UPstream::commGlobal());
238  const label oldWorldComm = UPstream::commWorld(UPstream::commGlobal());
239 
240  Pstream::combineReduce(allConnections, worldConnectBitOrEq());
241 
242  // Check for mismatched connections
243  label brokenConnections = 0;
244 
245  forAllConstIters(allConnections, iter)
246  {
247  // Mismatched forward/backward connections?
248  if (iter.val() != 3u)
249  {
250  ++brokenConnections;
251  }
252  }
253 
254  if (brokenConnections)
255  {
256  // Restore communicator settings
257  UPstream::commWarn(oldWarnComm);
258  UPstream::commWorld(oldWorldComm);
259 
261  << "Has " << brokenConnections
262  << " broken world-world connections";
263 
264  printDOT(FatalError, allConnections);
265 
267  }
268  else
269  {
270  // NOTE: process in sorted order to ensure proper
271  // synchronization on all worlds and all processors
272 
273  for (const edge& connect : allConnections.sortedToc())
274  {
275  // Process known connections without communicators.
276  // - create a communicator and cache its value
277 
278  auto iter = table_.find(connect);
279  if (iter.good() && iter.val() == -1)
280  {
281  iter.val() = createCommunicator(connect);
282  }
283  }
284 
285  // Restore communicator settings
286  UPstream::commWarn(oldWarnComm);
287  UPstream::commWorld(oldWorldComm);
288  }
289 
290  if (debug)
291  {
292  printDOT(Info, allConnections);
293  }
294 }
295 
296 
297 bool Foam::multiWorldConnections::addConnectionById(const label otherWorld)
298 {
299  // The worlds (sorted)
300  edge worlds(worldPair(otherWorld));
301 
302  if (!worlds.good())
303  {
304  return false;
305  }
306 
307  const bool added = table_.insert(worlds, -1);
308 
309  Pout<< (added ? "Add" : "Existing") << " connection from "
310  << UPstream::myWorld() << " to " << otherWorld << nl;
311 
312  return added;
313 }
314 
315 
317 {
318  // The worlds (sorted)
319  edge worlds(worldPair(otherWorld));
320 
321  if (!worlds.good())
322  {
323  return false;
324  }
325 
326  const bool added = table_.insert(worlds, -1);
327 
328  Pout<< (added ? "Add" : "Existing") << " connection from "
329  << UPstream::myWorld() << " to " << otherWorld << nl;
330 
331  return added;
332 }
333 
334 
336 (
337  const label otherWorldID
338 ) const
339 {
340  // Default: use local world
341  label comm = UPstream::worldComm;
342 
343  // The communication worlds (sorted)
344  edge worlds(worldPair(otherWorldID));
345 
346  if (!worlds.good())
347  {
348  return comm;
349  }
350 
351  const auto iter = table_.cfind(worlds);
352 
353  if (!iter.good())
354  {
356  << "No connection registered for worlds " << worlds
357  << exit(FatalError);
358  }
359 
360  // Get cached value, or allocate ALL known communicators
361  comm = iter.val();
362 
363  if (comm == -1)
364  {
365  // Need new communicator(s)
366  const_cast<multiWorldConnections&>(*this).createComms();
367 
368  // Retrieve from table cache
369  comm = table_.lookup(worlds, UPstream::worldComm);
370  }
371 
372  return comm;
373 }
374 
375 
377 (
378  const word& otherWorld
379 ) const
380 {
381  // Default: use local world
382  label comm = UPstream::worldComm;
383 
384  // The communication worlds (sorted)
385  edge worlds(worldPair(otherWorld));
386 
387  if (!worlds.good())
388  {
389  return comm;
390  }
391 
392  const auto iter = table_.cfind(worlds);
393 
394  if (!iter.good())
395  {
397  << "No connection registered for worlds " << worlds
398  << exit(FatalError);
399  }
400 
401  // Get cached value, or allocate ALL known communicators
402  comm = iter.val();
403 
404  if (comm == -1)
405  {
406  // Need new communicator(s)
407  const_cast<multiWorldConnections&>(*this).createComms();
408 
409  // Retrieve from table cache
410  comm = table_.lookup(worlds, UPstream::worldComm);
411  }
412 
413  return comm;
414 }
415 
416 
418 {
419  labelList list(table_.size());
420 
421  if (list.empty())
422  {
423  // Default: use local world
424  list.resize(1, UPstream::worldComm);
425  }
426  else
427  {
428  forAllConstIters(table_, iter)
429  {
430  if (iter.val() == -1)
431  {
432  // Need new communicator(s)
433  const_cast<multiWorldConnections&>(*this).createComms();
434  break;
435  }
436  }
437 
438  // Retrieve values from table cache
439  label i = 0;
440 
441  forAllConstIters(table_, iter)
442  {
443  list[i] = iter.val();
444  ++i;
445  }
446 
447  Foam::sort(list); // Consistent order!
448  }
449 
450  return list;
451 }
452 
453 
454 // ************************************************************************* //
static const word & myWorld()
My world.
Definition: UPstream.H:1165
label getCommById(const label otherWorld) const
Get communicator for myWorld to other world connection by ID.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
const T & first() const noexcept
Access the first element.
Definition: Pair.H:137
void operator()(EdgeMap< unsigned > &a, const EdgeMap< unsigned > &b) const
static const labelList & worldIDs() noexcept
The indices into allWorlds for all processes.
Definition: UPstream.H:1149
static void printDOT(Ostream &os, const EdgeMap< unsigned > &connections)
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
void resize(const label len)
Adjust allocated size of list.
Definition: ListI.H:160
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
Centralized handling of multi-world MPI connections.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
static const multiWorldConnections & New(const Time &runTime)
Access mesh object.
engineTime & runTime
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1049
tmp< DimensionedField< TypeR, GeoMesh > > New(const tmp< DimensionedField< TypeR, GeoMesh >> &tf1, const word &name, const dimensionSet &dimensions, const bool initCopy=false)
Global function forwards to reuseTmpDimensionedField::New.
labelList comms() const
Get communicators used for myWorld to other worlds in sorted order.
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:409
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:69
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:421
bool insert(const edge &key, const T &obj)
Copy insert a new entry, not overwriting existing entries.
Definition: HashTableI.H:152
bool empty() const noexcept
True if no world-to-world connections are defined.
static label commWarn(const label communicator) noexcept
Alter communicator debugging setting. Warns for use of any communicator differing from specified...
Definition: UPstream.H:449
An edge is a list of two vertex labels. This can correspond to a directed graph edge or an edge on a ...
Definition: edge.H:59
const dimensionedScalar b
Wien displacement law constant: default SI units: [m.K].
Definition: createFields.H:27
A class for handling words, derived from Foam::string.
Definition: word.H:63
void sort(UList< T > &list)
Sort the list.
Definition: UList.C:296
virtual Ostream & endBlock()
Write end block group.
Definition: Ostream.C:108
multiWorldConnections(const Time &runTime)
Construct.
label find(const T &val) const
Find index of the first occurrence of the value.
Definition: UList.C:173
static void combineReduce(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine value from different processors...
const direction noexcept
Definition: Scalar.H:258
bool good() const noexcept
True if the vertices are unique and non-negative.
Definition: edgeI.H:100
label getCommByName(const word &otherWorld) const
Get communicator for myWorld to other world connection by NAME.
int debug
Static debugging option.
Map from edge (expressed as its endpoints) to value. Hashing (and ==) on an edge is symmetric...
Definition: edgeHashes.H:56
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
static label commWorld() noexcept
Communicator for all ranks (respecting any local worlds)
Definition: UPstream.H:429
label size() const noexcept
Number of world-to-world connections defined.
bool addConnectionByName(const word &otherWorld)
Define a connection from myWorld to other world by NAME.
virtual void indent() override
Add indentation characters.
Definition: OSstream.C:285
List< edge > sortedToc() const
The table of contents (the keys) in sorted order.
Definition: HashTable.C:139
messageStream Info
Information stream (stdout output on master, null elsewhere)
virtual Ostream & beginBlock(const keyType &kw)
Write begin block group with the given name.
Definition: Ostream.C:90
void createComms()
Create all communicators. Low-level, not normally called directly.
List< label > labelList
A List of labels.
Definition: List.H:62
static const wordList & allWorlds() noexcept
All worlds.
Definition: UPstream.H:1141
bool addConnectionById(const label otherWorld)
Define a connection from myWorld to other world by ID.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static label allocateCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Allocate new communicator with contiguous sub-ranks on the parent communicator.
Definition: UPstream.C:258
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static label myWorldID()
My worldID.
Definition: UPstream.H:1157
FlatOutput::OutputAdaptor< Container, Delimiters > flatOutput(const Container &obj, Delimiters delim)
Global flatOutput() function with specified output delimiters.
Definition: FlatOutput.H:225
static constexpr label commGlobal() noexcept
Communicator for all ranks, irrespective of any local worlds.
Definition: UPstream.H:419