multiWorldConnectionsObject.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2021-2023 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 \*---------------------------------------------------------------------------*/
27 
29 #include "Pstream.H"
30 
31 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
32 
33 namespace Foam
34 {
35  defineTypeNameAndDebug(multiWorldConnections, 0);
36 }
37 
38 
39 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
40 
41 namespace Foam
42 {
43 
44 // Combine world-to-world connections.
45 // Forward connection = 1, Backward connection = 2, Both = 3
47 {
48  void operator()(EdgeMap<unsigned>& a, const EdgeMap<unsigned>& b) const
49  {
50  forAllConstIters(b, iter)
51  {
52  a(iter.key()) |= iter.val();
53  }
54  }
55 };
56 
57 
58 static void printDOT(Ostream& os, const EdgeMap<unsigned>& connections)
59 {
60  os << nl << "// Multiworld communication graph:" << nl;
61  os.beginBlock("graph");
62 
63  // Graph Nodes == worlds
64  label worldi = 0;
65  for (const word& worldName : UPstream::allWorlds())
66  {
67  os.indent();
68  os << worldi << " [xlabel=" << worldi
69  << ",label=\"" << worldName << "\"]" << nl;
70 
71  ++worldi;
72  }
73  os << nl;
74 
75  // Graph Edges == connections
76  for (const edge& connect : connections.sortedToc())
77  {
78  os.indent();
79  os << connect.first() << " -- " << connect.second();
80 
81  // Mismatched forward/backward connections?
82  if (connections.lookup(connect, 0u) != 3u)
83  {
84  os << " [style=dashed] // mismatched?";
85  }
86  os << nl;
87  }
88 
89  os.endBlock();
90 
91  os << "// end graph" << nl;
92 }
93 
94 } // End namespace Foam
95 
96 
97 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
98 
99 Foam::edge Foam::multiWorldConnections::worldPair(const label otherWorld)
100 {
101  if (otherWorld < 0 || !Pstream::parRun())
102  {
103  Perr<< "ignore: no world or non-parallel" << endl;
104  return edge(-1, -1);
105  }
106  else if (UPstream::allWorlds().size() <= otherWorld)
107  {
108  Perr<< "ignore: invalid world: " << otherWorld << endl;
109  return edge(-1, -1);
110  }
111 
112  const label thisWorldID = UPstream::myWorldID();
113 
114  // The worlds (sorted)
115  return edge(thisWorldID, otherWorld, true);
116 }
117 
118 
119 Foam::edge Foam::multiWorldConnections::worldPair(const word& otherWorld)
120 {
121  if (otherWorld.empty() || !Pstream::parRun())
122  {
123  Perr<< "ignore: no world or non-parallel" << endl;
124  return edge(-1, -1);
125  }
126 
127  const label thisWorldID = UPstream::myWorldID();
128  const label otherWorldID = UPstream::allWorlds().find(otherWorld);
129 
130  if (otherWorldID < 0)
131  {
133  << "Cannot find world " << otherWorld
134  << " in set of worlds " << flatOutput(UPstream::allWorlds())
135  << exit(FatalError);
136  }
137 
138  // The worlds (sorted)
139  return edge(thisWorldID, otherWorldID, true);
140 }
141 
142 
143 Foam::label Foam::multiWorldConnections::createCommunicator(const edge& worlds)
144 {
145  // Fallback: do not create, just use local world
146  label comm = UPstream::worldComm;
147 
148  if (!worlds.valid())
149  {
150  return comm;
151  }
152 
153  const labelList& worldIDs = UPstream::worldIDs();
154 
155  DynamicList<label> subRanks(worldIDs.size());
156  forAll(worldIDs, proci)
157  {
158  if (worlds.found(worldIDs[proci]))
159  {
160  subRanks.push_back(proci);
161  }
162  }
163 
164  // Allocate new communicator with global world
166 
167  if (debug & 2)
168  {
169  Pout<< "multiWorld::communicator :"
170  << " between " << UPstream::allWorlds()[worlds.first()]
171  << " and " << UPstream::allWorlds()[worlds.second()]
172  << " sub-ranks: " << subRanks
173  << " comm:" << comm << endl;
174  }
176  return comm;
177 }
178 
179 
180 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
181 
183 :
185 {}
186 
187 
188 // * * * * * * * * * * * * * * * * Selectors * * * * * * * * * * * * * * * * //
189 
192 {
194 }
195 
196 
197 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
200 {}
201 
202 
203 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
206 {
207  return table_.empty();
208 }
209 
212 {
213  return table_.size();
214 }
215 
216 
218 {
219  // Need new communicator(s)
220 
221  const label thisWorldID = UPstream::myWorldID();
222 
223  EdgeMap<unsigned> allConnections;
224  forAllConstIters(table_, iter)
225  {
226  const edge& connect = iter.key();
227 
228  allConnections.insert
229  (
230  connect,
231  (connect.first() == thisWorldID ? 1u : 2u)
232  );
233  }
234 
235 
236  // Use MPI_COMM_WORLD
237  const label oldWarnComm = UPstream::commWarn(UPstream::commGlobal());
238  const label oldWorldComm = UPstream::commWorld(UPstream::commGlobal());
239 
240  if (Pstream::parRun())
241  {
242  Pstream::combineReduce(allConnections, worldConnectBitOrEq());
243  }
244 
245  // Check for mismatched connections
246  label brokenConnections = 0;
247 
248  forAllConstIters(allConnections, iter)
249  {
250  // Mismatched forward/backward connections?
251  if (iter.val() != 3u)
252  {
253  ++brokenConnections;
254  }
255  }
256 
257  if (brokenConnections)
258  {
259  // Restore communicator settings
260  UPstream::commWarn(oldWarnComm);
261  UPstream::commWorld(oldWorldComm);
262 
264  << "Has " << brokenConnections
265  << " broken world-world connections";
266 
267  printDOT(FatalError, allConnections);
268 
270  }
271  else
272  {
273  // NOTE: process in sorted order to ensure proper
274  // synchronization on all worlds and all processors
275 
276  for (const edge& connect : allConnections.sortedToc())
277  {
278  // Process known connections without communicators.
279  // - create a communicator and cache its value
280 
281  auto iter = table_.find(connect);
282  if (iter.good() && iter.val() == -1)
283  {
284  iter.val() = createCommunicator(connect);
285  }
286  }
287 
288  // Restore communicator settings
289  UPstream::commWarn(oldWarnComm);
290  UPstream::commWorld(oldWorldComm);
291  }
292 
293  if (debug)
294  {
295  printDOT(Info, allConnections);
296  }
297 }
298 
299 
300 bool Foam::multiWorldConnections::addConnectionById(const label otherWorld)
301 {
302  // The worlds (sorted)
303  edge worlds(worldPair(otherWorld));
304 
305  if (!worlds.valid())
306  {
307  return false;
308  }
309 
310  const bool added = table_.insert(worlds, -1);
311 
312  Pout<< (added ? "Add" : "Existing") << " connection from "
313  << UPstream::myWorld() << " to " << otherWorld << nl;
314 
315  return added;
316 }
317 
318 
320 {
321  // The worlds (sorted)
322  edge worlds(worldPair(otherWorld));
323 
324  if (!worlds.valid())
325  {
326  return false;
327  }
328 
329  const bool added = table_.insert(worlds, -1);
330 
331  Pout<< (added ? "Add" : "Existing") << " connection from "
332  << UPstream::myWorld() << " to " << otherWorld << nl;
333 
334  return added;
335 }
336 
337 
339 (
340  const label otherWorldID
341 ) const
342 {
343  // Default: use local world
344  label comm = UPstream::worldComm;
345 
346  // The communication worlds (sorted)
347  edge worlds(worldPair(otherWorldID));
348 
349  if (!worlds.valid())
350  {
351  return comm;
352  }
353 
354  const auto iter = table_.cfind(worlds);
355 
356  if (!iter.good())
357  {
359  << "No connection registered for worlds " << worlds
360  << exit(FatalError);
361  }
362 
363  // Get cached value, or allocate ALL known communicators
364  comm = iter.val();
365 
366  if (comm == -1)
367  {
368  // Need new communicator(s)
369  const_cast<multiWorldConnections&>(*this).createComms();
370 
371  // Retrieve from table cache
372  comm = table_.lookup(worlds, UPstream::worldComm);
373  }
374 
375  return comm;
376 }
377 
378 
380 (
381  const word& otherWorld
382 ) const
383 {
384  // Default: use local world
385  label comm = UPstream::worldComm;
386 
387  // The communication worlds (sorted)
388  edge worlds(worldPair(otherWorld));
389 
390  if (!worlds.valid())
391  {
392  return comm;
393  }
394 
395  const auto iter = table_.cfind(worlds);
396 
397  if (!iter.good())
398  {
400  << "No connection registered for worlds " << worlds
401  << exit(FatalError);
402  }
403 
404  // Get cached value, or allocate ALL known communicators
405  comm = iter.val();
406 
407  if (comm == -1)
408  {
409  // Need new communicator(s)
410  const_cast<multiWorldConnections&>(*this).createComms();
411 
412  // Retrieve from table cache
413  comm = table_.lookup(worlds, UPstream::worldComm);
414  }
415 
416  return comm;
417 }
418 
419 
421 {
422  labelList list(table_.size());
423 
424  if (list.empty())
425  {
426  // Default: use local world
427  list.resize(1, UPstream::worldComm);
428  }
429  else
430  {
431  forAllConstIters(table_, iter)
432  {
433  if (iter.val() == -1)
434  {
435  // Need new communicator(s)
436  const_cast<multiWorldConnections&>(*this).createComms();
437  break;
438  }
439  }
440 
441  // Retrieve values from table cache
442  label i = 0;
443 
444  forAllConstIters(table_, iter)
445  {
446  list[i] = iter.val();
447  ++i;
448  }
449 
450  Foam::sort(list); // Consistent order!
451  }
452 
453  return list;
454 }
455 
456 
457 // ************************************************************************* //
static const word & myWorld()
My world.
Definition: UPstream.H:1120
label getCommById(const label otherWorld) const
Get communicator for myWorld to other world connection by ID.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
const T & first() const noexcept
Access the first element.
Definition: Pair.H:137
void operator()(EdgeMap< unsigned > &a, const EdgeMap< unsigned > &b) const
static const labelList & worldIDs() noexcept
The indices into allWorlds for all processes.
Definition: UPstream.H:1104
static void printDOT(Ostream &os, const EdgeMap< unsigned > &connections)
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
bool valid() const noexcept
Same as good()
Definition: edge.H:466
void resize(const label len)
Adjust allocated size of list.
Definition: ListI.H:132
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
label find(const T &val, label pos=0) const
Find index of the first occurrence of the value.
Definition: UList.C:204
Centralized handling of multi-world MPI connections.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:49
static const multiWorldConnections & New(const Time &runTime)
Access mesh object.
engineTime & runTime
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1004
tmp< DimensionedField< TypeR, GeoMesh > > New(const tmp< DimensionedField< TypeR, GeoMesh >> &tf1, const word &name, const dimensionSet &dimensions, const bool initCopy=false)
Global function forwards to reuseTmpDimensionedField::New.
labelList comms() const
Get communicators used for myWorld to other worlds in sorted order.
virtual void indent()
Add indentation characters.
Definition: OSstream.C:256
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition: UPstream.H:411
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:69
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:414
bool insert(const edge &key, const T &obj)
Copy insert a new entry, not overwriting existing entries.
Definition: HashTableI.H:152
bool empty() const noexcept
True if no world-to-world connections are defined.
static label commWarn(const label communicator) noexcept
Alter communicator debugging setting. Warns for use of any communicator differing from specified...
Definition: UPstream.H:451
An edge is a list of two vertex labels. This can correspond to a directed graph edge or an edge on a ...
Definition: edge.H:59
const dimensionedScalar b
Wien displacement law constant: default SI units: [m.K].
Definition: createFields.H:27
A class for handling words, derived from Foam::string.
Definition: word.H:63
void sort(UList< T > &list)
Sort the list.
Definition: UList.C:348
virtual Ostream & endBlock()
Write end block group.
Definition: Ostream.C:98
multiWorldConnections(const Time &runTime)
Construct.
static void combineReduce(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine value from different processors...
const direction noexcept
Definition: Scalar.H:258
label getCommByName(const word &otherWorld) const
Get communicator for myWorld to other world connection by NAME.
int debug
Static debugging option.
Map from edge (expressed as its endpoints) to value. Hashing (and ==) on an edge is symmetric...
Definition: edgeHashes.H:56
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
static label commWorld() noexcept
Communicator for all ranks (respecting any local worlds)
Definition: UPstream.H:431
label size() const noexcept
Number of world-to-world connections defined.
bool addConnectionByName(const word &otherWorld)
Define a connection from myWorld to other world by NAME.
List< edge > sortedToc() const
The table of contents (the keys) in sorted order.
Definition: HashTable.C:130
messageStream Info
Information stream (stdout output on master, null elsewhere)
virtual Ostream & beginBlock(const keyType &kw)
Write begin block group with the given name.
Definition: Ostream.C:80
void createComms()
Create all communicators. Low-level, not normally called directly.
List< label > labelList
A List of labels.
Definition: List.H:62
static const wordList & allWorlds() noexcept
All worlds.
Definition: UPstream.H:1096
bool addConnectionById(const label otherWorld)
Define a connection from myWorld to other world by ID.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static label allocateCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Allocate new communicator with contiguous sub-ranks on the parent communicator.
Definition: UPstream.C:258
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static label myWorldID()
My worldID.
Definition: UPstream.H:1112
FlatOutput::OutputAdaptor< Container, Delimiters > flatOutput(const Container &obj, Delimiters delim)
Global flatOutput() function with specified output delimiters.
Definition: FlatOutput.H:225
static constexpr label commGlobal() noexcept
Communicator for all ranks, irrespective of any local worlds.
Definition: UPstream.H:421