43 #undef Pstream_use_MPI_Get_count 54 static bool ourMpi =
false;
74 if (str.empty() || !
Foam::read(str, len) || len <= 0)
84 char* buf =
new char[len];
86 if (MPI_SUCCESS == MPI_Buffer_attach(buf, len))
93 Foam::Perr<<
"UPstream::init : buffer-size " << len <<
'\n';
99 Foam::Perr<<
"UPstream::init : could not attach buffer\n";
124 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
145 validParOptions.insert(
"np",
"");
146 validParOptions.insert(
"p4pg",
"PI file");
147 validParOptions.insert(
"p4wd",
"directory");
148 validParOptions.insert(
"p4amslave",
"");
149 validParOptions.insert(
"p4yourname",
"hostname");
150 validParOptions.insert(
"machinefile",
"machine file");
158 MPI_Finalized(&flag);
163 <<
"MPI was already finalized - cannot perform MPI_Init\n" 169 MPI_Initialized(&flag);
174 Perr<<
"UPstream::initNull : was already initialized\n";
200 int numprocs = 0, myRank = 0;
201 int provided_thread_support = 0;
204 MPI_Finalized(&flag);
209 <<
"MPI was already finalized - cannot perform MPI_Init" <<
endl 215 MPI_Initialized(&flag);
224 <<
"MPI was already initialized - cannot perform MPI_Init" <<
nl 225 <<
"This could indicate an application programming error!" 232 Perr<<
"UPstream::init : was already initialized\n";
243 ? MPI_THREAD_MULTIPLE
246 &provided_thread_support
253 label worldIndex = -1;
255 for (
int argi = 1; argi < argc; ++argi)
257 if (strcmp(argv[argi],
"-world") == 0)
263 <<
"Missing world name to argument \"world\"" 272 if (worldIndex != -1)
274 for (label i = worldIndex+2; i < argc; i++)
281 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
282 MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
286 Perr<<
"UPstream::init :" 287 <<
" thread-support : requested:" << needsThread
290 (provided_thread_support == MPI_THREAD_SINGLE)
292 : (provided_thread_support == MPI_THREAD_SERIALIZED)
294 : (provided_thread_support == MPI_THREAD_MULTIPLE)
298 <<
" procs:" << numprocs
299 <<
" rank:" << myRank
300 <<
" world:" << world <<
endl;
303 if (worldIndex == -1 && numprocs <= 1)
306 <<
"attempt to run parallel on 1 processor" 311 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
313 if (worldIndex != -1)
329 DynamicList<word> worldNames(numprocs);
330 worldIDs_.resize_nocopy(numprocs);
334 const word& world = worlds[proci];
336 worldIDs_[proci] = worldNames.find(world);
338 if (worldIDs_[proci] == -1)
340 worldIDs_[proci] = worldNames.size();
341 worldNames.push_back(world);
345 allWorlds_.transfer(worldNames);
349 const label myWorldId =
352 DynamicList<label> subRanks;
355 if (worldIDs_[proci] == myWorldId)
357 subRanks.push_back(proci);
362 const label subComm =
381 int subNumProcs, subRank;
393 Perr<<
"UPstream::init : in world:" << world
394 <<
" using local communicator:" << subComm
395 <<
" rank " << subRank
396 <<
" of " << subNumProcs
401 Pout.
prefix() =
'[' + world +
'/' +
name(myProcNo(subComm)) +
"] ";
407 worldIDs_.resize_nocopy(numprocs);
421 MPI_Initialized(&flag);
428 MPI_Finalized(&flag);
435 <<
"MPI was already finalized (by a connected program?)\n";
439 Perr<<
"UPstream::shutdown : was already finalized\n";
448 <<
"Finalizing MPI, but was initialized elsewhere\n";
458 MPI_Abort(MPI_COMM_WORLD, errNo);
468 Perr<<
"UPstream::shutdown\n";
473 label nOutstanding = 0;
477 if (MPI_REQUEST_NULL != request)
487 <<
"Still have " << nOutstanding
488 <<
" outstanding MPI requests." 489 <<
" Should not happen for a normal code exit." 502 freeCommunicatorComponents(communicator);
520 MPI_Abort(MPI_COMM_WORLD, 1);
526 void Foam::UPstream::allocateCommunicatorComponents
528 const label parentIndex,
541 <<
"PstreamGlobals out of sync with UPstream data. Problem." 546 if (parentIndex == -1)
553 <<
"base world communicator should always be index " 580 procIDs_[index].resize_nocopy(numProcs);
581 std::iota(procIDs_[index].
begin(), procIDs_[index].
end(), 0);
583 else if (parentIndex == -2)
590 MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
596 MPI_Comm_size(MPI_COMM_SELF, &numProcs);
602 <<
"MPI_COMM_SELF had " << numProcs <<
" != 1 ranks!\n" 611 procIDs_[index].resize_nocopy(1);
612 MPI_Comm_rank(MPI_COMM_WORLD, &procIDs_[index].front());
621 MPI_Group parent_group;
628 MPI_Group active_group;
632 procIDs_[index].size(),
633 procIDs_[index].cdata(),
637 #if defined(MSMPI_VER) 647 MPI_Comm_create_group
657 MPI_Group_free(&parent_group);
658 MPI_Group_free(&active_group);
663 myProcNo_[index] = -1;
679 <<
" when allocating communicator at " << index
680 <<
" from ranks " << procIDs_[index]
681 <<
" of parent " << parentIndex
682 <<
" cannot find my own rank" 690 void Foam::UPstream::freeCommunicatorComponents(
const label index)
694 Perr<<
"freeCommunicatorComponents: " << index
744 <<
"MPI_Ibarrier returned with error" 748 *req = UPstream::Request(request);
762 <<
"MPI_Barrier returned with error" 769 std::pair<int,int64_t>
773 const int fromProcNo,
775 const label communicator
778 std::pair<int,int64_t> result(-1, 0);
786 const int source = (fromProcNo < 0) ? MPI_ANY_SOURCE : fromProcNo;
809 <<
"MPI_Probe returned with error" 834 <<
"MPI_Iprobe returned with error" 846 #ifdef Pstream_use_MPI_Get_count 848 MPI_Get_count(&status, MPI_BYTE, &
count);
851 MPI_Get_elements_x(&status, MPI_BYTE, &
count);
855 if (
count == MPI_UNDEFINED || int64_t(
count) < 0)
858 <<
"MPI_Get_count() or MPI_Get_elements_x() : " 859 "returned undefined or negative value" 862 else if (int64_t(
count) > int64_t(INT_MAX))
865 <<
"MPI_Get_count() or MPI_Get_elements_x() : " 866 "count is larger than INI_MAX bytes" 871 result.first = status.MPI_SOURCE;
872 result.second = int64_t(
count);
static void addProbeTime()
Add time increment to probe time.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
Inter-processor communication reduction functions.
errorManipArg< error, int > exit(error &err, const int errNo=1)
commsTypes
Communications types.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
constexpr char nl
The newline '\n' character (0x0a)
DynamicList< MPI_Comm > MPICommunicators_
Ostream & endl(Ostream &os)
Add newline and flush stream.
static void gatherList(const UList< commsStruct > &comms, UList< T > &values, const int tag, const label comm)
Gather data, but keep individual values separate. Uses the specified communication schedule...
static bool initNull()
Special purpose initialisation function.
static bool & parRun() noexcept
Test if this a parallel run.
static int & msgType() noexcept
Message tag of standard messages.
static constexpr label commSelf() noexcept
Communicator within the current rank only.
static std::pair< int, int64_t > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const label communicator=worldComm)
Probe for an incoming message.
void reset_request(UPstream::Request *requestPtr, label *requestIdx=nullptr)
Reset UPstream::Request to null and/or the index of the outstanding request to -1.
string getEnv(const std::string &envName)
Get environment value for given envName.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
bool read(const char *buf, int32_t &val)
Same as readInt32.
#define forAll(list, i)
Loop across all elements in list.
DynamicList< bool > pendingMPIFree_
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
static void detachOurBuffers()
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
static bool is_rank(const label communicator=worldComm)
True if process corresponds to any rank (master or sub-rank) in the given communicator.
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
T & emplace_back(Args &&... args)
Construct an element at the end of the list, return reference to the new list element.
const string & prefix() const noexcept
Return the stream prefix.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
errorManip< error > abort(error &err)
static void beginTiming()
Update timer prior to measurement.
int debug
Static debugging option.
constexpr auto end(C &c) -> decltype(c.end())
Return iterator to the end of the container c.
static void attachOurBuffers()
static void addRequestTime()
Add time increment to request time.
static void broadcasts(const label comm, Type &arg1, Args &&... args)
Broadcast multiple items to all communicator ranks. Does nothing in non-parallel. ...
static void abort()
Call MPI_Abort with no other checks or cleanup.
List< word > wordList
List of word.
#define WarningInFunction
Report a warning using Foam::Warning.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static int attachedBufLen
#define forAllReverse(list, i)
Reverse loop across all elements in list.
"buffered" : (MPI_Bsend, MPI_Recv)
static const int mpiBufferSize
MPI buffer-size (bytes)
constexpr auto begin(C &c) -> decltype(c.begin())
Return iterator to the beginning of the container c.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library adds/requires on the command line...
static label allocateCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Allocate new communicator with contiguous sub-ranks on the parent communicator.
static void barrier(const label communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking)
static constexpr label commGlobal() noexcept
Communicator for all ranks, irrespective of any local worlds.