multiWorldConnectionsObject.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2021-2022 OpenCFD Ltd.
9 -------------------------------------------------------------------------------
10 License
11  This file is part of OpenFOAM.
12 
13  OpenFOAM is free software: you can redistribute it and/or modify it
14  under the terms of the GNU General Public License as published by
15  the Free Software Foundation, either version 3 of the License, or
16  (at your option) any later version.
17 
18  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  for more details.
22 
23  You should have received a copy of the GNU General Public License
24  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25 
26 \*---------------------------------------------------------------------------*/
27 
29 #include "Pstream.H"
30 
31 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
32 
33 namespace Foam
34 {
35  defineTypeNameAndDebug(multiWorldConnections, 0);
36 }
37 
38 
39 // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
40 
41 namespace Foam
42 {
43 
44 // Combine world-to-world connections.
45 // Forward connection = 1, Backward connection = 2, Both = 3
47 {
48  void operator()(EdgeMap<unsigned>& a, const EdgeMap<unsigned>& b) const
49  {
50  forAllConstIters(b, iter)
51  {
52  a(iter.key()) |= iter.val();
53  }
54  }
55 };
56 
57 
58 static void printDOT(Ostream& os, const EdgeMap<unsigned>& connections)
59 {
60  os << nl << "// Multiworld communication graph:" << nl;
61  os.beginBlock("graph");
62 
63  // Graph Nodes == worlds
64  label worldi = 0;
65  for (const word& worldName : UPstream::allWorlds())
66  {
67  os.indent();
68  os << worldi << " [xlabel=" << worldi
69  << ",label=\"" << worldName << "\"]" << nl;
70 
71  ++worldi;
72  }
73  os << nl;
74 
75  // Graph Edges == connections
76  for (const edge& connect : connections.sortedToc())
77  {
78  os.indent();
79  os << connect.first() << " -- " << connect.second();
80 
81  // Mismatched forward/backward connections?
82  if (connections.lookup(connect, 0u) != 3u)
83  {
84  os << " [style=dashed] // mismatched?";
85  }
86  os << nl;
87  }
88 
89  os.endBlock();
90 
91  os << "// end graph" << nl;
92 }
93 
94 } // End namespace Foam
95 
96 
97 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
98 
99 Foam::edge Foam::multiWorldConnections::worldPair(const label otherWorld)
100 {
101  if (otherWorld < 0 || !Pstream::parRun())
102  {
103  Perr<< "ignore: no world or non-parallel" << endl;
104  return edge(-1, -1);
105  }
106  else if (UPstream::allWorlds().size() <= otherWorld)
107  {
108  Perr<< "ignore: invalid world: " << otherWorld << endl;
109  return edge(-1, -1);
110  }
111 
112  const label thisWorldID = UPstream::myWorldID();
113 
114  // The worlds (sorted)
115  return edge(thisWorldID, otherWorld, true);
116 }
117 
118 
119 Foam::edge Foam::multiWorldConnections::worldPair(const word& otherWorld)
120 {
121  if (otherWorld.empty() || !Pstream::parRun())
122  {
123  Perr<< "ignore: no world or non-parallel" << endl;
124  return edge(-1, -1);
125  }
126 
127  const label thisWorldID = UPstream::myWorldID();
128  const label otherWorldID = UPstream::allWorlds().find(otherWorld);
129 
130  if (otherWorldID < 0)
131  {
133  << "Cannot find world " << otherWorld
134  << " in set of worlds " << flatOutput(UPstream::allWorlds())
135  << exit(FatalError);
136  }
137 
138  // The worlds (sorted)
139  return edge(thisWorldID, otherWorldID, true);
140 }
141 
142 
143 Foam::label Foam::multiWorldConnections::createCommunicator(const edge& worlds)
144 {
145  // Fallback: do not create, just use local world
146  label comm = UPstream::worldComm;
147 
148  if (!worlds.valid())
149  {
150  return comm;
151  }
152 
153  const labelList& worldIDs = UPstream::worldIDs();
154 
155  DynamicList<label> subRanks(worldIDs.size());
156  forAll(worldIDs, proci)
157  {
158  if (worlds.found(worldIDs[proci]))
159  {
160  subRanks.append(proci);
161  }
162  }
163 
164  // Allocate new communicator with global world
166 
167  if (debug & 2)
168  {
169  Pout<< "multiWorld::communicator :"
170  << " between " << UPstream::allWorlds()[worlds.first()]
171  << " and " << UPstream::allWorlds()[worlds.second()]
172  << " sub-ranks: " << subRanks
173  << " comm:" << comm << endl;
174  }
176  return comm;
177 }
178 
179 
180 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
181 
183 :
185 {}
186 
187 
188 // * * * * * * * * * * * * * * * * Selectors * * * * * * * * * * * * * * * * //
189 
192 {
194 }
195 
196 
197 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
200 {}
201 
202 
203 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
206 {
207  return table_.empty();
208 }
209 
212 {
213  return table_.size();
214 }
215 
216 
218 {
219  // Need new communicator(s)
220 
221  const label thisWorldID = UPstream::myWorldID();
222 
223  EdgeMap<unsigned> allConnections;
224  forAllConstIters(table_, iter)
225  {
226  const edge& connect = iter.key();
227 
228  allConnections.insert
229  (
230  connect,
231  (connect.first() == thisWorldID ? 1u : 2u)
232  );
233  }
234 
235 
236  // Use MPI_COMM_WORLD
237  const label oldWorldComm(UPstream::worldComm);
238  const label oldWarnComm(UPstream::warnComm);
241 
242  if (Pstream::parRun())
243  {
244  Pstream::combineReduce(allConnections, worldConnectBitOrEq());
245  }
246 
247  // Check for mismatched connections
248  label brokenConnections = 0;
249 
250  forAllConstIters(allConnections, iter)
251  {
252  // Mismatched forward/backward connections?
253  if (iter.val() != 3u)
254  {
255  ++brokenConnections;
256  }
257  }
258 
259  if (brokenConnections)
260  {
261  Pstream::warnComm = oldWarnComm;
262  Pstream::worldComm = oldWorldComm;
263 
265  << "Has " << brokenConnections
266  << " broken world-world connections";
267 
268  printDOT(FatalError, allConnections);
269 
271  }
272  else
273  {
274  // NOTE: process in sorted order to ensure proper
275  // synchronization on all worlds and all processors
276 
277  for (const edge& connect : allConnections.sortedToc())
278  {
279  // Process known connections without communicators.
280  // - create a communicator and cache its value
281 
282  auto iter = table_.find(connect);
283  if (iter.found() && iter.val() == -1)
284  {
285  iter.val() = createCommunicator(connect);
286  }
287  }
288 
289  Pstream::warnComm = oldWarnComm;
290  Pstream::worldComm = oldWorldComm;
291  }
292 
293  if (debug)
294  {
295  printDOT(Info, allConnections);
296  }
297 }
298 
299 
300 bool Foam::multiWorldConnections::addConnectionById(const label otherWorld)
301 {
302  // The worlds (sorted)
303  edge worlds(worldPair(otherWorld));
304 
305  if (!worlds.valid())
306  {
307  return false;
308  }
309 
310  const bool added = table_.insert(worlds, -1);
311 
312  Pout<< (added ? "Add" : "Existing") << " connection from "
313  << UPstream::myWorld() << " to " << otherWorld << nl;
314 
315  return added;
316 }
317 
318 
320 {
321  // The worlds (sorted)
322  edge worlds(worldPair(otherWorld));
323 
324  if (!worlds.valid())
325  {
326  return false;
327  }
328 
329  const bool added = table_.insert(worlds, -1);
330 
331  Pout<< (added ? "Add" : "Existing") << " connection from "
332  << UPstream::myWorld() << " to " << otherWorld << nl;
333 
334  return added;
335 }
336 
337 
339 (
340  const label otherWorldID
341 ) const
342 {
343  // Default: use local world
344  label comm = UPstream::worldComm;
345 
346  // The communication worlds (sorted)
347  edge worlds(worldPair(otherWorldID));
348 
349  if (!worlds.valid())
350  {
351  return comm;
352  }
353 
354  const auto iter = table_.cfind(worlds);
355 
356  if (!iter.found())
357  {
359  << "No connection registered for worlds " << worlds
360  << exit(FatalError);
361  }
362 
363  // Get cached value, or allocate ALL known communicators
364  comm = iter.val();
365 
366  if (comm == -1)
367  {
368  // Need new communicator(s)
369  const_cast<multiWorldConnections&>(*this).createComms();
370 
371  // Retrieve from table cache
372  comm = table_.lookup(worlds, UPstream::worldComm);
373  }
374 
375  return comm;
376 }
377 
378 
380 (
381  const word& otherWorld
382 ) const
383 {
384  // Default: use local world
385  label comm = UPstream::worldComm;
386 
387  // The communication worlds (sorted)
388  edge worlds(worldPair(otherWorld));
389 
390  if (!worlds.valid())
391  {
392  return comm;
393  }
394 
395  const auto iter = table_.cfind(worlds);
396 
397  if (!iter.found())
398  {
400  << "No connection registered for worlds " << worlds
401  << exit(FatalError);
402  }
403 
404  // Get cached value, or allocate ALL known communicators
405  comm = iter.val();
406 
407  if (comm == -1)
408  {
409  // Need new communicator(s)
410  const_cast<multiWorldConnections&>(*this).createComms();
411 
412  // Retrieve from table cache
413  comm = table_.lookup(worlds, UPstream::worldComm);
414  }
415 
416  return comm;
417 }
418 
419 
421 {
422  labelList list(table_.size());
423 
424  if (list.empty())
425  {
426  // Default: use local world
427  list.resize(1, UPstream::worldComm);
428  }
429  else
430  {
431  forAllConstIters(table_, iter)
432  {
433  if (iter.val() == -1)
434  {
435  // Need new communicator(s)
436  const_cast<multiWorldConnections&>(*this).createComms();
437  break;
438  }
439  }
440 
441  // Retrieve values from table cache
442  label i = 0;
443 
444  forAllConstIters(table_, iter)
445  {
446  list[i] = iter.val();
447  ++i;
448  }
449 
450  Foam::sort(list); // Consistent order!
451  }
452 
453  return list;
454 }
455 
456 
457 // ************************************************************************* //
static const word & myWorld()
My world.
Definition: UPstream.H:739
label getCommById(const label otherWorld) const
Get communicator for myWorld to other world connection by ID.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
const T & first() const noexcept
Access the first element.
Definition: Pair.H:136
void operator()(EdgeMap< unsigned > &a, const EdgeMap< unsigned > &b) const
static const labelList & worldIDs() noexcept
The indices into allWorlds for all processes.
Definition: UPstream.H:723
static void printDOT(Ostream &os, const EdgeMap< unsigned > &connections)
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
bool valid() const noexcept
True if the vertices are unique and non-negative.
Definition: edge.H:194
void resize(const label len)
Adjust allocated size of list.
Definition: ListI.H:132
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
label find(const T &val, label pos=0) const
Find index of the first occurrence of the value.
Definition: UList.C:204
Centralized handling of multi-world MPI connections.
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:49
static const multiWorldConnections & New(const Time &runTime)
Access mesh object.
engineTime & runTime
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:639
labelList comms() const
Get communicators used for myWorld to other worlds in sorted order.
virtual void indent()
Add indentation characters.
Definition: OSstream.C:256
tmp< DimensionedField< TypeR, GeoMesh > > New(const tmp< DimensionedField< TypeR, GeoMesh >> &tdf1, const word &name, const dimensionSet &dimensions, const bool initCopy=false)
Global function forwards to reuseTmpDimensionedField::New.
static label worldComm
Default world communicator (all processors). May differ from globalComm if local worlds are in use...
Definition: UPstream.H:361
static label allocateCommunicator(const label parent, const labelUList &subRanks, const bool doPstream=true)
Allocate a new communicator with subRanks of parent communicator.
Definition: UPstream.C:139
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:69
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:413
bool insert(const edge &key, const T &obj)
Copy insert a new entry, not overwriting existing entries.
Definition: HashTableI.H:173
bool empty() const noexcept
True if no world-to-world connections are defined.
static constexpr label globalComm
Communicator for all processors, irrespective of any local worlds.
Definition: UPstream.H:371
An edge is a list of two point labels. The functionality it provides supports the discretisation on a...
Definition: edge.H:59
const dimensionedScalar b
Wien displacement law constant: default SI units: [m.K].
Definition: createFields.H:27
A class for handling words, derived from Foam::string.
Definition: word.H:63
void sort(UList< T > &list)
Sort the list.
Definition: UList.C:334
virtual Ostream & endBlock()
Write end block group.
Definition: Ostream.C:98
multiWorldConnections(const Time &runTime)
Construct.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:366
static void combineReduce(const List< commsStruct > &comms, T &value, const CombineOp &cop, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce) applying cop to inplace combine value from different processors...
const direction noexcept
Definition: Scalar.H:258
label getCommByName(const word &otherWorld) const
Get communicator for myWorld to other world connection by NAME.
int debug
Static debugging option.
Map from edge (expressed as its endpoints) to value. For easier forward declaration it is currently i...
Definition: EdgeMap.H:45
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
label size() const noexcept
Number of world-to-world connections defined.
bool addConnectionByName(const word &otherWorld)
Define a connection from myWorld to other world by NAME.
List< edge > sortedToc() const
The table of contents (the keys) in sorted order.
Definition: HashTable.C:130
messageStream Info
Information stream (stdout output on master, null elsewhere)
virtual Ostream & beginBlock(const keyType &kw)
Write begin block group with the given name.
Definition: Ostream.C:80
void createComms()
Create all communicators. Low-level, not normally called directly.
List< label > labelList
A List of labels.
Definition: List.H:62
static const wordList & allWorlds() noexcept
All worlds.
Definition: UPstream.H:715
bool addConnectionById(const label otherWorld)
Define a connection from myWorld to other world by ID.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
Namespace for OpenFOAM.
forAllConstIters(mixture.phases(), phase)
Definition: pEqn.H:28
static label myWorldID()
My worldID.
Definition: UPstream.H:731
FlatOutput::OutputAdaptor< Container, Delimiters > flatOutput(const Container &obj, Delimiters delim)
Global flatOutput() function with specified output delimiters.
Definition: FlatOutput.H:225