52 static bool ourMpi =
false;
72 if (str.empty() || !
Foam::read(str, len) || len <= 0)
84 Foam::Pout<<
"UPstream::init : buffer-size " << len <<
'\n';
87 char* buf =
new char[len];
89 if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
92 Foam::Pout<<
"UPstream::init : could not attach buffer\n";
114 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
130 validParOptions.insert(
"np",
"");
131 validParOptions.insert(
"p4pg",
"PI file");
132 validParOptions.insert(
"p4wd",
"directory");
133 validParOptions.insert(
"p4amslave",
"");
134 validParOptions.insert(
"p4yourname",
"hostname");
135 validParOptions.insert(
"machinefile",
"machine file");
143 MPI_Finalized(&flag);
148 <<
"MPI was already finalized - cannot perform MPI_Init\n" 154 MPI_Initialized(&flag);
159 Pout<<
"UPstream::initNull : was already initialized\n";
185 int numprocs = 0, myRank = 0;
186 int provided_thread_support = 0;
189 MPI_Finalized(&flag);
194 <<
"MPI was already finalized - cannot perform MPI_Init" <<
endl 200 MPI_Initialized(&flag);
209 <<
"MPI was already initialized - cannot perform MPI_Init" <<
nl 210 <<
"This could indicate an application programming error!" 217 Pout<<
"UPstream::init : was already initialized\n";
228 ? MPI_THREAD_MULTIPLE
231 &provided_thread_support
238 label worldIndex = -1;
240 for (
int argi = 1; argi < argc; ++argi)
242 if (strcmp(argv[argi],
"-world") == 0)
248 <<
"Missing world name to argument \"world\"" 257 if (worldIndex != -1)
259 for (label i = worldIndex+2; i < argc; i++)
266 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
267 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
271 Pout<<
"UPstream::init :" 272 <<
" thread-support : wanted:" << needsThread
275 provided_thread_support == MPI_THREAD_MULTIPLE
276 ?
"MPI_THREAD_MULTIPLE" 277 :
"MPI_THREAD_SINGLE" 279 <<
" procs:" << numprocs
280 <<
" rank:" << myRank
281 <<
" world:" << world <<
endl;
284 if (worldIndex == -1 && numprocs <= 1)
287 <<
"attempt to run parallel on 1 processor" 292 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
294 if (worldIndex != -1)
305 DynamicList<word> worldNames(numprocs);
306 worldIDs_.resize_nocopy(numprocs);
310 const word& world = worlds[proci];
312 worldIDs_[proci] = worldNames.find(world);
314 if (worldIDs_[proci] == -1)
316 worldIDs_[proci] = worldNames.size();
317 worldNames.push_back(world);
321 allWorlds_.transfer(worldNames);
325 const label myWorldId =
328 DynamicList<label> subRanks;
331 if (worldIDs_[proci] == myWorldId)
333 subRanks.push_back(proci);
338 const label subComm =
355 int subNumProcs, subRank;
367 Pout<<
"UPstream::init : in world:" << world
368 <<
" using local communicator:" << subComm
369 <<
" rank " << subRank
370 <<
" of " << subNumProcs
375 Pout.
prefix() =
'[' + world +
'/' +
name(myProcNo(subComm)) +
"] ";
381 worldIDs_.resize_nocopy(numprocs);
395 Pout<<
"UPstream::shutdown\n";
400 MPI_Initialized(&flag);
407 MPI_Finalized(&flag);
414 <<
"MPI was already finalized (by a connected program?)\n";
418 Pout<<
"UPstream::shutdown : was already finalized\n";
429 label nOutstanding = 0;
444 <<
"There were still " << nOutstanding
445 <<
" outstanding MPI_Requests." <<
nl 446 <<
"Which means your code exited before doing a " 447 <<
" UPstream::waitRequests()." <<
nl 448 <<
"This should not happen for a normal code exit." 454 forAll(myProcNo_, communicator)
456 if (myProcNo_[communicator] >= 0)
458 freePstreamCommunicator(communicator);
469 <<
"Finalizing MPI, but was initialized elsewhere\n";
479 MPI_Abort(MPI_COMM_WORLD, errNo);
494 MPI_Abort(MPI_COMM_WORLD, 1);
500 void Foam::UPstream::allocatePstreamCommunicator
502 const label parentIndex,
509 MPI_Comm newComm = MPI_COMM_NULL;
510 MPI_Group newGroup = MPI_GROUP_NULL;
517 <<
"PstreamGlobals out of sync with UPstream data. Problem." 522 if (parentIndex == -1)
529 <<
"world communicator should always be index " 536 MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_[index]);
540 MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
543 procIDs_[index].resize_nocopy(numProcs);
544 forAll(procIDs_[index], i)
546 procIDs_[index][i] = i;
549 else if (parentIndex == -2)
555 MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
561 MPI_Comm_size(MPI_COMM_SELF, &numProcs);
567 <<
"MPI_COMM_SELF had " << numProcs <<
" != 1 ranks!\n" 572 procIDs_[index].resize_nocopy(1);
581 procIDs_[index].size(),
582 procIDs_[index].cdata(),
586 #if defined(MSMPI_VER) 596 MPI_Comm_create_group
607 myProcNo_[index] = -1;
622 <<
" when allocating communicator at " << index
623 <<
" from ranks " << procIDs_[index]
624 <<
" of parent " << parentIndex
625 <<
" cannot find my own rank" 633 void Foam::UPstream::freePstreamCommunicator(
const label communicator)
639 Pout<<
"freePstreamCommunicator: " << communicator
644 if (communicator > 1)
685 Pout<<
"UPstream::waitRequests : starting wait for " 687 <<
" outstanding requests starting at " << start <<
endl;
695 SubList<MPI_Request> waitRequests
716 <<
"MPI_Waitall returned with error" <<
Foam::endl;
726 Pout<<
"UPstream::waitRequests : finished wait." <<
endl;
740 Pout<<
"UPstream::waitRequest : starting wait for request:" << i
747 <<
"You asked for request=" << i
749 <<
" outstanding requests!" <<
nl 750 <<
"Mixing use of blocking/non-blocking comms?" 767 <<
"MPI_Wait returned with error" <<
Foam::endl;
776 Pout<<
"UPstream::waitRequest : finished wait for request:" << i
791 Pout<<
"UPstream::finishedRequest : checking request:" << i
798 <<
"You asked for request=" << i
800 <<
" outstanding requests!" <<
nl 801 <<
"Mixing use of blocking/non-blocking comms?" 816 Pout<<
"UPstream::finishedRequest : finished request:" << i
839 Pout<<
"UPstream::allocateTag";
840 if (msg)
Pout<<
' ' << msg;
852 Pout<<
"UPstream::freeTag ";
853 if (msg)
Pout<<
' ' << msg;
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
DynamicList< label > freedRequests_
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Inter-processor communication reduction functions.
errorManipArg< error, int > exit(error &err, const int errNo=1)
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
DynamicList< int > freedTags_
Free'd message tags.
static label nRequests() noexcept
Number of outstanding requests.
constexpr char nl
The newline '\n' character (0x0a)
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
static bool initNull()
Special purpose initialisation function.
static bool & parRun() noexcept
Test if this a parallel run.
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
static int & msgType() noexcept
Message tag of standard messages.
string getEnv(const std::string &envName)
Get environment value for given envName.
static int myProcNo(const label communicator=worldComm)
Number of this process (starting from masterNo() = 0)
static label worldComm
Default world communicator (all processors). May differ from globalComm if local worlds are in use...
static label allocateCommunicator(const label parent, const labelUList &subRanks, const bool doPstream=true)
Allocate a new communicator with subRanks of parent communicator.
void pop_back(label n=1)
Reduce size by 1 or more elements. Can be called on an empty list.
static void addWaitTime()
Add time increment to wait time.
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
bool read(const char *buf, int32_t &val)
Same as readInt32.
#define forAll(list, i)
Loop across all elements in list.
static int allocateTag(const char *const msg=nullptr)
static constexpr label globalComm
Communicator for all processors, irrespective of any local worlds.
static void detachOurBuffers()
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
int nTags_
Max outstanding message tag operations.
const string & prefix() const noexcept
Return the stream prefix.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
errorManip< error > abort(error &err)
static void beginTiming()
Update timer prior to measurement.
static void resetRequests(const label n)
Truncate outstanding requests to given length.
int debug
Static debugging option.
DynamicList< MPI_Group > MPIGroups_
static void attachOurBuffers()
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
void push_back(const T &val)
Copy append an element to the end of this list.
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
static void abort()
Call MPI_Abort with no other checks or cleanup.
List< word > wordList
A List of words.
#define WarningInFunction
Report a warning using Foam::Warning.
static bool master(const label communicator=worldComm)
Am I the master rank.
T & back()
Access last element of the list, position [size()-1].
static void freeTag(const int tag, const char *const msg=nullptr)
static const int mpiBufferSize
MPI buffer-size (bytes)
static void waitRequest(const label i)
Wait until request i has finished.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library adds/requires on the command line...