UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2022 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 Note
28  Included by global/globals.C
29 
30 \*---------------------------------------------------------------------------*/
31 
32 #include "UPstream.H"
33 #include "debug.H"
34 #include "registerSwitch.H"
35 #include "dictionary.H"
36 #include "IOstreams.H"
37 
38 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
39 
40 namespace Foam
41 {
42  defineTypeNameAndDebug(UPstream, 0);
43 }
44 
45 const Foam::Enum
46 <
48 >
50 ({
51  { commsTypes::blocking, "blocking" },
52  { commsTypes::scheduled, "scheduled" },
53  { commsTypes::nonBlocking, "nonBlocking" },
54 });
55 
56 
57 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
58 
59 void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
60 {
61  parRun_ = (nProcs > 0);
62  haveThreads_ = haveThreads;
63 
64  label comm = -1;
65 
66  if (!parRun_)
67  {
68  // These are already correct from the static initialisation,
69  // but just in case of future changes
70 
71  // Using (world, self) ordering
74 
75  // 0: worldComm
76  comm = allocateCommunicator(-1, Foam::labelList(Foam::one{}, 0), false);
77  if (comm != UPstream::globalComm)
78  {
79  // Failed sanity check
81  << "problem : comm:" << comm
82  << " UPstream::globalComm:" << UPstream::globalComm
84  }
85 
86  // 1: selfComm
87  comm = allocateCommunicator(-2, Foam::labelList(Foam::one{}, 0), false);
88  if (comm != UPstream::selfComm)
89  {
90  // Failed sanity check
92  << "problem : comm:" << comm
93  << " UPstream::selfComm:" << UPstream::selfComm
95  }
96 
97  Pout.prefix().clear();
98  Perr.prefix().clear();
99  }
100  else
101  {
102  // Redo communicators that were created during static initialisation
103  // but this time with Pstream components
104 
105  // Using (world, self) ordering
108 
109  // 0: worldComm
110  comm = allocateCommunicator(-1, identity(nProcs), true);
111  if (comm != UPstream::globalComm)
112  {
113  // Failed sanity check
115  << "problem : comm:" << comm
116  << " UPstream::globalComm:" << UPstream::globalComm
118  }
119 
120  Pout.prefix() = '[' + Foam::name(myProcNo(comm)) + "] ";
121  Perr.prefix() = Pout.prefix();
122 
123  // 1: selfComm
124  comm = allocateCommunicator(-2, Foam::labelList(Foam::one{}, 0), true);
125  if (comm != UPstream::selfComm)
126  {
127  // Failed sanity check
129  << "problem : comm:" << comm
130  << " UPstream::selfComm:" << UPstream::selfComm
132  }
133  }
134 
135  if (debug)
136  {
137  Pout<< "UPstream::setParRun :"
138  << " nProcs:" << nProcs
139  << " haveThreads:" << haveThreads
140  << endl;
141  }
142 }
143 
144 
146 (
147  const label parentIndex,
148  const labelUList& subRanks,
149  const bool doPstream
150 )
151 {
152  label index;
153  if (!freeComms_.empty())
154  {
155  // LIFO pop
156  index = freeComms_.back();
157  freeComms_.pop_back();
158  }
159  else
160  {
161  // Extend storage
162  index = parentComm_.size();
163 
164  myProcNo_.append(-1);
165  procIDs_.append(List<int>());
166  parentComm_.append(-1);
167  linearCommunication_.append(List<commsStruct>());
168  treeCommunication_.append(List<commsStruct>());
169  }
170 
171  if (debug)
172  {
173  Pout<< "Communicators : Allocating communicator " << index << endl
174  << " parent : " << parentIndex << endl
175  << " procs : " << subRanks << endl
176  << endl;
177  }
178 
179  // Initialise; overwritten by allocatePstreamCommunicator
180  myProcNo_[index] = 0;
181 
182  // The selected sub-ranks.
183  // - transcribe from label to int. Treat negative values as 'ignore'
184  // - enforce incremental order (so index is rank in next communicator)
185 
186  auto& procIds = procIDs_[index];
187  procIds.resize_nocopy(subRanks.size());
188 
189  label numSubRanks = 0;
190  bool monotonicOrder = true;
191  for (const label subRanki : subRanks)
192  {
193  if (subRanki < 0)
194  {
195  continue;
196  }
197  if (monotonicOrder && numSubRanks)
198  {
199  monotonicOrder = (procIds[numSubRanks-1] < subRanki);
200  }
201 
202  procIds[numSubRanks] = subRanki;
203  ++numSubRanks;
204  }
205 
206  if (!monotonicOrder)
207  {
208  auto last = procIds.begin() + numSubRanks;
209  std::sort(procIds.begin(), last);
210  last = std::unique(procIds.begin(), last);
211  numSubRanks = label(last - procIds.begin());
212  }
213 
214  procIds.resize(numSubRanks);
215 
216  parentComm_[index] = parentIndex;
217 
218  // Size but do not fill structure - this is done on-the-fly
219  linearCommunication_[index] = List<commsStruct>(numSubRanks);
220  treeCommunication_[index] = List<commsStruct>(numSubRanks);
221 
222  if (doPstream && parRun())
223  {
224  allocatePstreamCommunicator(parentIndex, index);
225 
226  // Could 'remember' locations of uninvolved ranks
239  }
240 
241  return index;
242 }
243 
244 
246 (
247  const label communicator,
248  const bool doPstream
249 )
250 {
251  // Filter out any placeholders
252  if (communicator < 0)
253  {
254  return;
255  }
256 
257  if (debug)
258  {
259  Pout<< "Communicators : Freeing communicator " << communicator
260  << " parent: " << parentComm_[communicator]
261  << " myProcNo: " << myProcNo_[communicator]
262  << endl;
263  }
264 
265  if (doPstream && parRun())
266  {
267  freePstreamCommunicator(communicator);
268  }
269 
270  myProcNo_[communicator] = -1;
271  //procIDs_[communicator].clear();
272  parentComm_[communicator] = -1;
273  linearCommunication_[communicator].clear();
274  treeCommunication_[communicator].clear();
275 
276  // LIFO push
277  freeComms_.push_back(communicator);
278 }
279 
280 
281 void Foam::UPstream::freeCommunicators(const bool doPstream)
282 {
283  forAll(myProcNo_, communicator)
284  {
285  if (myProcNo_[communicator] >= 0)
286  {
287  freeCommunicator(communicator, doPstream);
288  }
289  }
290 }
291 
292 
293 int Foam::UPstream::baseProcNo(label comm, int procID)
294 {
295  while (parent(comm) >= 0 && procID >= 0)
296  {
297  const auto& parentRanks = UPstream::procID(comm);
298  procID = parentRanks[procID];
299  comm = parent(comm);
300  }
301 
302  return procID;
303 }
304 
305 
306 Foam::label Foam::UPstream::procNo(const label comm, const int baseProcID)
307 {
308  const auto& parentRanks = procID(comm);
309  label parentComm = parent(comm);
310 
311  int procID = baseProcID;
312 
313  if (parentComm >= 0)
314  {
315  procID = procNo(parentComm, baseProcID);
316  }
317 
318  return parentRanks.find(procID);
319 }
320 
321 
322 Foam::label Foam::UPstream::procNo
323 (
324  const label comm,
325  const label currentComm,
326  const int currentProcID
327 )
328 {
329  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
330  return procNo(comm, physProcID);
331 }
332 
333 
334 template<>
337 {
338  UPstream::commsStruct& t = v_[procID];
339 
340  if (t.allBelow().size() + t.allNotBelow().size() + 1 != size())
341  {
342  // Not yet allocated
343 
344  label above(-1);
345  labelList below;
346  labelList allBelow;
347 
348  if (size() < UPstream::nProcsSimpleSum)
349  {
350  // Linear schedule
351 
352  if (procID == 0)
353  {
354  below.setSize(size()-1);
355  for (label procI = 1; procI < size(); procI++)
356  {
357  below[procI-1] = procI;
358  }
359  }
360  else
361  {
362  above = 0;
363  }
364  }
365  else
366  {
367  // Use tree like schedule. For 8 procs:
368  // (level 0)
369  // 0 receives from 1
370  // 2 receives from 3
371  // 4 receives from 5
372  // 6 receives from 7
373  // (level 1)
374  // 0 receives from 2
375  // 4 receives from 6
376  // (level 2)
377  // 0 receives from 4
378  //
379  // The sends/receives for all levels are collected per processor
380  // (one send per processor; multiple receives possible) creating
381  // a table:
382  //
383  // So per processor:
384  // proc receives from sends to
385  // ---- ------------- --------
386  // 0 1,2,4 -
387  // 1 - 0
388  // 2 3 0
389  // 3 - 2
390  // 4 5 0
391  // 5 - 4
392  // 6 7 4
393  // 7 - 6
394 
395  label mod = 0;
396 
397  for (label step = 1; step < size(); step = mod)
398  {
399  mod = step * 2;
400 
401  if (procID % mod)
402  {
403  above = procID - (procID % mod);
404  break;
405  }
406  else
407  {
408  for
409  (
410  label j = procID + step;
411  j < size() && j < procID + mod;
412  j += step
413  )
414  {
415  below.append(j);
416  }
417  for
418  (
419  label j = procID + step;
420  j < size() && j < procID + mod;
421  j++
422  )
423  {
424  allBelow.append(j);
425  }
426  }
427  }
428  }
429  t = UPstream::commsStruct(size(), procID, above, below, allBelow);
430  }
431  return t;
432 }
433 
434 
435 template<>
438 {
439  return const_cast<UList<UPstream::commsStruct>&>(*this).operator[](procID);
440 }
441 
442 
443 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
444 
445 bool Foam::UPstream::parRun_(false);
446 
447 bool Foam::UPstream::haveThreads_(false);
448 
449 int Foam::UPstream::msgType_(1);
450 
451 
452 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
453 
454 Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
455 
456 Foam::DynamicList<Foam::label> Foam::UPstream::parentComm_(10);
457 
458 Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
459 
460 Foam::wordList Foam::UPstream::allWorlds_(Foam::one{}, "");
461 Foam::labelList Foam::UPstream::worldIDs_(Foam::one{}, 0);
462 
464 Foam::UPstream::linearCommunication_(10);
465 
467 Foam::UPstream::treeCommunication_(10);
468 
469 
471 
472 Foam::label Foam::UPstream::warnComm(-1);
473 
474 
475 // Predefine worldComm, selfComm slots.
476 // These are overwritten in parallel mode (by UPstream::setParRun())
477 const Foam::label nPredefinedComm = []()
478 {
479  const Foam::labelList singleProc(Foam::one{}, 0);
480 
481  // 0: worldComm
482  (void) Foam::UPstream::allocateCommunicator(-1, singleProc, false);
483 
484  // 1: selfComm
485  (void) Foam::UPstream::allocateCommunicator(-2, singleProc, false);
486 
487  return Foam::UPstream::nComms();
488 }();
489 
490 
492 (
493  Foam::debug::optimisationSwitch("floatTransfer", 0)
494 );
496 (
497  "floatTransfer",
498  bool,
500 );
501 
503 (
504  Foam::debug::optimisationSwitch("nProcsSimpleSum", 0)
505 );
507 (
508  "nProcsSimpleSum",
509  int,
511 );
512 
514 (
515  commsTypeNames.get
516  (
517  "commsType",
519  )
520 );
521 
522 namespace Foam
523 {
524  //- Registered reader for UPstream::defaultCommsType
525  class addcommsTypeToOpt
526  :
528  {
529  public:
530 
531  addcommsTypeToOpt(const char* name)
532  :
534  {}
535 
536  virtual ~addcommsTypeToOpt() = default;
537 
538  virtual void readData(Foam::Istream& is)
539  {
542  }
543 
544  virtual void writeData(Foam::Ostream& os) const
545  {
547  }
548  };
549 
550  addcommsTypeToOpt addcommsTypeToOpt_("commsType");
551 }
552 
554 (
555  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
556 );
558 (
559  "nPollProcInterfaces",
560  int,
562 );
563 
564 
566 (
567  Foam::debug::optimisationSwitch("maxCommsSize", 0)
568 );
570 (
571  "maxCommsSize",
572  int,
574 );
575 
576 
578 (
579  Foam::debug::optimisationSwitch("mpiBufferSize", 0)
580 );
581 
582 
583 // ************************************************************************* //
registerOptSwitch("fa:geometryOrder", int, faMesh::geometryOrder_)
static bool floatTransfer
Should compact transfer be used in which floats replace doubles reducing the bandwidth requirement at...
Definition: UPstream.H:327
Abstract base class for registered object with I/O. Used in debug symbol registration.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
EnumType read(Istream &is) const
Read a word from Istream and return the corresponding enumeration.
Definition: Enum.C:102
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:118
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition: UPstream.H:76
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Types of communications.
Definition: UPstream.H:66
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:578
void append(const T &val)
Append an element at the end of the list.
Definition: List.H:491
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
static int maxCommsSize
Optional maximum message size (bytes)
Definition: UPstream.H:347
static int nProcsSimpleSum
Number of processors to change from linear to tree communication.
Definition: UPstream.H:332
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:487
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor number) (= reverse of baseProcNo) ...
Definition: UPstream.C:299
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:688
static label worldComm
Default world communicator (all processors). May differ from globalComm if local worlds are in use...
Definition: UPstream.H:361
static label allocateCommunicator(const label parent, const labelUList &subRanks, const bool doPstream=true)
Allocate a new communicator with subRanks of parent communicator.
Definition: UPstream.C:139
UList< label > labelUList
A UList of labels.
Definition: UList.H:80
#define forAll(list, i)
Loop across all elements in list.
Definition: stdFoam.H:413
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
T & operator[](const label i)
Return element of UList.
Definition: UListI.H:292
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:274
static constexpr label globalComm
Communicator for all processors, irrespective of any local worlds.
Definition: UPstream.H:371
virtual ~addcommsTypeToOpt()=default
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator) is 1 for serial run.
Definition: UPstream.H:656
void setSize(const label n)
Alias for resize()
Definition: List.H:289
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:342
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
Definition: exprTraits.C:52
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:539
labelList identity(const label len, label start=0)
Return an identity map of the given length with (map[i] == i)
Definition: labelList.C:31
void sort(UList< T > &list)
Sort the list.
Definition: UList.C:334
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:533
const string & prefix() const noexcept
Return the stream prefix.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:366
Structure for communicating between processors.
Definition: UPstream.H:84
const labelList & allBelow() const noexcept
The procIDs of all processors below (so not just directly below)
Definition: UPstream.H:171
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:55
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition: debug.C:234
int debug
Static debugging option.
OBJstream os(runTime.globalPath()/outputName)
defineTypeNameAndDebug(combustionModel, 0)
Wrapper class for allocating/freeing communicators.
Definition: UPstream.H:440
Registered reader for UPstream::defaultCommsType.
Definition: UPstream.C:520
const Foam::label nPredefinedComm
Definition: UPstream.C:470
const labelList & allNotBelow() const noexcept
The procIDs of all processors that are above. The inverse set of allBelow without myProcNo...
Definition: UPstream.H:180
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:337
Enum is a wrapper around a list of names/values that represent particular enumeration (or int) values...
static List< int > & procID(const label communicator)
Process IDs within a given communicator.
Definition: UPstream.H:704
static label nComms() noexcept
Number of currently defined communicators.
Definition: UPstream.H:381
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict(s).
Definition: debug.C:216
static bool haveThreads() noexcept
Have support for threads.
Definition: UPstream.H:647
static const int mpiBufferSize
MPI buffer-size (bytes)
Definition: UPstream.H:352
void addOptimisationObject(const char *name, simpleRegIOobject *obj)
Register optimisation switch read/write object.
Definition: debug.C:259
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:526
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static constexpr label selfComm
A communicator within the current rank only.
Definition: UPstream.H:376
static int baseProcNo(label comm, int procID)
Return physical processor number (i.e. processor number in worldComm) given communicator and processo...
Definition: UPstream.C:286
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:239
Namespace for OpenFOAM.
A class representing the concept of 1 (one) that can be used to avoid manipulating objects known to b...
Definition: one.H:57