52 static bool ourMpi =
false;
72 if (str.empty() || !
Foam::read(str, len) || len <= 0)
82 char* buf =
new char[len];
84 if (MPI_SUCCESS == MPI_Buffer_attach(buf, len))
91 Foam::Pout<<
"UPstream::init : buffer-size " << len <<
'\n';
97 Foam::Pout<<
"UPstream::init : could not attach buffer\n";
122 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
143 validParOptions.insert(
"np",
"");
144 validParOptions.insert(
"p4pg",
"PI file");
145 validParOptions.insert(
"p4wd",
"directory");
146 validParOptions.insert(
"p4amslave",
"");
147 validParOptions.insert(
"p4yourname",
"hostname");
148 validParOptions.insert(
"machinefile",
"machine file");
156 MPI_Finalized(&flag);
161 <<
"MPI was already finalized - cannot perform MPI_Init\n" 167 MPI_Initialized(&flag);
172 Pout<<
"UPstream::initNull : was already initialized\n";
198 int numprocs = 0, myRank = 0;
199 int provided_thread_support = 0;
202 MPI_Finalized(&flag);
207 <<
"MPI was already finalized - cannot perform MPI_Init" <<
endl 213 MPI_Initialized(&flag);
222 <<
"MPI was already initialized - cannot perform MPI_Init" <<
nl 223 <<
"This could indicate an application programming error!" 230 Pout<<
"UPstream::init : was already initialized\n";
241 ? MPI_THREAD_MULTIPLE
244 &provided_thread_support
251 label worldIndex = -1;
253 for (
int argi = 1; argi < argc; ++argi)
255 if (strcmp(argv[argi],
"-world") == 0)
261 <<
"Missing world name to argument \"world\"" 270 if (worldIndex != -1)
272 for (label i = worldIndex+2; i < argc; i++)
279 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
280 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
284 Pout<<
"UPstream::init :" 285 <<
" thread-support : requested:" << needsThread
288 (provided_thread_support == MPI_THREAD_SINGLE)
290 : (provided_thread_support == MPI_THREAD_SERIALIZED)
292 : (provided_thread_support == MPI_THREAD_MULTIPLE)
296 <<
" procs:" << numprocs
297 <<
" rank:" << myRank
298 <<
" world:" << world <<
endl;
301 if (worldIndex == -1 && numprocs <= 1)
304 <<
"attempt to run parallel on 1 processor" 309 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
311 if (worldIndex != -1)
327 DynamicList<word> worldNames(numprocs);
328 worldIDs_.resize_nocopy(numprocs);
332 const word& world = worlds[proci];
334 worldIDs_[proci] = worldNames.find(world);
336 if (worldIDs_[proci] == -1)
338 worldIDs_[proci] = worldNames.size();
339 worldNames.push_back(world);
343 allWorlds_.transfer(worldNames);
347 const label myWorldId =
350 DynamicList<label> subRanks;
353 if (worldIDs_[proci] == myWorldId)
355 subRanks.push_back(proci);
360 const label subComm =
379 int subNumProcs, subRank;
391 Pout<<
"UPstream::init : in world:" << world
392 <<
" using local communicator:" << subComm
393 <<
" rank " << subRank
394 <<
" of " << subNumProcs
399 Pout.
prefix() =
'[' + world +
'/' +
name(myProcNo(subComm)) +
"] ";
405 worldIDs_.resize_nocopy(numprocs);
419 MPI_Initialized(&flag);
426 MPI_Finalized(&flag);
433 <<
"MPI was already finalized (by a connected program?)\n";
437 Pout<<
"UPstream::shutdown : was already finalized\n";
446 <<
"Finalizing MPI, but was initialized elsewhere\n";
456 MPI_Abort(MPI_COMM_WORLD, errNo);
466 Pout<<
"UPstream::shutdown\n";
471 label nOutstanding = 0;
475 if (MPI_REQUEST_NULL != request)
485 <<
"Still have " << nOutstanding
486 <<
" outstanding MPI requests." 487 <<
" Should not happen for a normal code exit." 500 freeCommunicatorComponents(communicator);
518 MPI_Abort(MPI_COMM_WORLD, 1);
524 void Foam::UPstream::allocateCommunicatorComponents
526 const label parentIndex,
539 <<
"PstreamGlobals out of sync with UPstream data. Problem." 544 if (parentIndex == -1)
551 <<
"world communicator should always be index " 578 procIDs_[index].resize_nocopy(numProcs);
579 std::iota(procIDs_[index].
begin(), procIDs_[index].
end(), 0);
581 else if (parentIndex == -2)
588 MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
594 MPI_Comm_size(MPI_COMM_SELF, &numProcs);
600 <<
"MPI_COMM_SELF had " << numProcs <<
" != 1 ranks!\n" 609 procIDs_[index].resize_nocopy(1);
610 MPI_Comm_rank(MPI_COMM_WORLD, &procIDs_[index].front());
619 MPI_Group parent_group;
626 MPI_Group active_group;
630 procIDs_[index].size(),
631 procIDs_[index].cdata(),
635 #if defined(MSMPI_VER) 645 MPI_Comm_create_group
655 MPI_Group_free(&parent_group);
656 MPI_Group_free(&active_group);
661 myProcNo_[index] = -1;
677 <<
" when allocating communicator at " << index
678 <<
" from ranks " << procIDs_[index]
679 <<
" of parent " << parentIndex
680 <<
" cannot find my own rank" 688 void Foam::UPstream::freeCommunicatorComponents(
const label index)
693 Pout<<
"freeCommunicatorComponents: " << index
748 <<
"MPI_Ibarrier returned with error" 752 *req = UPstream::Request(request);
766 <<
"MPI_Barrier returned with error" 777 const int fromProcNo,
779 const label communicator
782 std::pair<int,int> result(-1, 0);
790 const int source = (fromProcNo < 0) ? MPI_ANY_SOURCE : fromProcNo;
813 <<
"MPI_Probe returned with error" 838 <<
"MPI_Iprobe returned with error" 847 result.first = status.MPI_SOURCE;
848 MPI_Get_count(&status, MPI_BYTE, &result.second);
static void addProbeTime()
Add time increment to probe time.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
"blocking" : (MPI_Bsend, MPI_Recv)
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Inter-processor communication reduction functions.
errorManipArg< error, int > exit(error &err, const int errNo=1)
commsTypes
Communications types.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
constexpr char nl
The newline '\n' character (0x0a)
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
static bool initNull()
Special purpose initialisation function.
static bool & parRun() noexcept
Test if this a parallel run.
static int & msgType() noexcept
Message tag of standard messages.
static constexpr label commSelf() noexcept
Communicator within the current rank only.
void reset_request(UPstream::Request *requestPtr, label *requestIdx=nullptr)
Reset UPstream::Request to null and/or the index of the outstanding request to -1.
string getEnv(const std::string &envName)
Get environment value for given envName.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
bool read(const char *buf, int32_t &val)
Same as readInt32.
#define forAll(list, i)
Loop across all elements in list.
DynamicList< bool > pendingMPIFree_
static void detachOurBuffers()
static void gatherList(const List< commsStruct > &comms, List< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for INVALID.
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
T & emplace_back(Args &&... args)
Construct an element at the end of the list, return reference to the new list element.
const string & prefix() const noexcept
Return the stream prefix.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
errorManip< error > abort(error &err)
static void beginTiming()
Update timer prior to measurement.
int debug
Static debugging option.
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
static void attachOurBuffers()
static void addRequestTime()
Add time increment to request time.
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all processes in communicator.
static void abort()
Call MPI_Abort with no other checks or cleanup.
List< word > wordList
List of word.
#define WarningInFunction
Report a warning using Foam::Warning.
static std::pair< int, int > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const label communicator=worldComm)
Probe for an incoming message.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static int attachedBufLen
#define forAllReverse(list, i)
Reverse loop across all elements in list.
static const int mpiBufferSize
MPI buffer-size (bytes)
constexpr auto begin(C &c) -> decltype(c.begin())
Return iterator to the beginning of the container c.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library adds/requires on the command line...
static label allocateCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Allocate new communicator with contiguous sub-ranks on the parent communicator.
static void barrier(const label communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking)
static constexpr label commGlobal() noexcept
Communicator for all ranks, irrespective of any local worlds.