UPstreamRequest.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2011 OpenFOAM Foundation
9  Copyright (C) 2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UPstreamWrapping.H"
30 #include "PstreamGlobals.H"
31 #include "profilingPstream.H"
32 
33 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
34 
36 :
37  UPstream::Request(MPI_REQUEST_NULL)
38 {}
39 
40 
41 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
42 
44 {
45  return MPI_REQUEST_NULL != PstreamDetail::Request::get(*this);
46 }
47 
48 
50 {
51  *this = UPstream::Request(MPI_REQUEST_NULL);
52 }
53 
54 
55 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
56 
58 {
60 }
61 
62 
63 void Foam::UPstream::resetRequests(const label n)
64 {
65  if (n >= 0 && n < PstreamGlobals::outstandingRequests_.size())
66  {
68  }
69 }
70 
71 
72 void Foam::UPstream::addRequest(UPstream::Request& req)
73 {
74  // No-op for non-parallel
75  if (!UPstream::parRun())
76  {
77  return;
78  }
79 
80  // Transcribe as a MPI_Request
82  (
84  );
85 
86  // Invalidate parameter
87  req = UPstream::Request(MPI_REQUEST_NULL);
88 }
89 
90 
91 void Foam::UPstream::cancelRequest(const label i)
92 {
93  // No-op for non-parallel, or out-of-range (eg, placeholder indices)
94  if
95  (
97  || i < 0
99  )
100  {
101  return;
102  }
103 
104  {
105  auto& request = PstreamGlobals::outstandingRequests_[i];
106  if (MPI_REQUEST_NULL != request) // Active handle is mandatory
107  {
108  MPI_Cancel(&request);
109  MPI_Request_free(&request); //<- Sets to MPI_REQUEST_NULL
110  }
111  }
112 }
113 
114 
115 void Foam::UPstream::cancelRequest(UPstream::Request& req)
116 {
117  // No-op for non-parallel
118  if (!UPstream::parRun())
119  {
120  return;
121  }
122 
123  {
124  MPI_Request request = PstreamDetail::Request::get(req);
125  if (MPI_REQUEST_NULL != request) // Active handle is mandatory
126  {
127  MPI_Cancel(&request);
128  MPI_Request_free(&request);
129  }
130  req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
131  }
132 }
133 
134 
135 void Foam::UPstream::cancelRequests(UList<UPstream::Request>& requests)
136 {
137  // No-op for non-parallel
138  if (!UPstream::parRun())
139  {
140  return;
141  }
142 
143  for (auto& req : requests)
144  {
145  MPI_Request request = PstreamDetail::Request::get(req);
146  if (MPI_REQUEST_NULL != request) // Active handle is mandatory
147  {
148  MPI_Cancel(&request);
149  MPI_Request_free(&request);
150  }
151  req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
152  }
153 }
154 
155 
156 void Foam::UPstream::removeRequests(const label pos, label len)
157 {
158  // No-op for non-parallel, no pending requests or out-of-range
159  if
160  (
162  || (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
163  || !len
164  )
165  {
166  return;
167  }
168 
170 
171  // Apply range-checking on slice with (len < 0) behaving like npos
172  // (ie, the rest of the list)
173  if (len >= 0 && len < count)
174  {
175  // A non-trailing slice
176  count = len;
177  }
178  // Have count >= 1
179 
180  const labelRange range(pos, count);
181 
182  for (const label i : range)
183  {
184  auto& request = PstreamGlobals::outstandingRequests_[i];
185  if (MPI_REQUEST_NULL != request) // Active handle is mandatory
186  {
187  MPI_Cancel(&request);
188  MPI_Request_free(&request); //<- Sets to MPI_REQUEST_NULL
189  }
190  }
191 
192  // Remove from list of outstanding requests and move down
194 }
195 
196 
197 void Foam::UPstream::freeRequest(UPstream::Request& req)
198 {
199  // No-op for non-parallel
200  if (!UPstream::parRun())
201  {
202  return;
203  }
204 
205  {
206  MPI_Request request = PstreamDetail::Request::get(req);
207  if (MPI_REQUEST_NULL != request) // Active handle is mandatory
208  {
209  // if (cancel)
210  // {
211  // MPI_Cancel(&request);
212  // }
213  MPI_Request_free(&request);
214  }
215  req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
216  }
217 }
218 
219 
220 void Foam::UPstream::freeRequests(UList<UPstream::Request>& requests)
221 {
222  // No-op for non-parallel
223  if (!UPstream::parRun())
224  {
225  return;
226  }
227 
228  for (auto& req : requests)
229  {
230  MPI_Request request = PstreamDetail::Request::get(req);
231  if (MPI_REQUEST_NULL != request) // Active handle is mandatory
232  {
233  // if (cancel)
234  // {
235  // MPI_Cancel(&request);
236  // }
237  MPI_Request_free(&request);
238  }
239  req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
240  }
241 }
242 
243 
244 void Foam::UPstream::waitRequests(const label pos, label len)
245 {
246  // No-op for non-parallel, no pending requests or out-of-range
247  if
248  (
250  || (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
251  || !len
252  )
253  {
254  return;
255  }
256 
258  bool trim = true; // Can trim the trailing part of the list
259 
260  // Apply range-checking on slice with (len < 0) behaving like npos
261  // (ie, the rest of the list)
262  if (len >= 0 && len < count)
263  {
264  // A non-trailing slice
265  count = len;
266  trim = false;
267  }
268  // Have count >= 1
269 
271 
272  if (UPstream::debug)
273  {
274  Pout<< "UPstream::waitRequests : starting wait for "
275  << count << " requests starting at " << pos << endl;
276  }
277 
279 
280  if (count == 1)
281  {
282  // On success: sets request to MPI_REQUEST_NULL
283  if (MPI_Wait(waitRequests, MPI_STATUS_IGNORE))
284  {
286  << "MPI_Wait returned with error"
288  }
289  }
290  else if (count > 1)
291  {
292  // On success: sets each request to MPI_REQUEST_NULL
293  if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
294  {
296  << "MPI_Waitall returned with error"
298  }
299  }
300 
302 
303  if (trim)
304  {
305  // Trim the length of outstanding requests
307  }
308 
309  if (UPstream::debug)
310  {
311  Pout<< "UPstream::waitRequests : finished wait." << endl;
312  }
313 }
314 
315 
316 void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
317 {
318  // No-op for non-parallel or no pending requests
319  if (!UPstream::parRun() || requests.empty())
320  {
321  return;
322  }
323 
324  // Looks ugly but is legitimate since UPstream::Request is an intptr_t,
325  // which is always large enough to hold an MPI_Request (int or pointer)
326 
327  label count = 0;
328  auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
329 
330  for (auto& req : requests)
331  {
332  MPI_Request request = PstreamDetail::Request::get(req);
333 
334  if (MPI_REQUEST_NULL != request) // Apply some prefiltering
335  {
336  waitRequests[count] = request;
337  ++count;
338  }
339  }
340 
341  if (!count)
342  {
343  // No active request handles
344  return;
345  }
346 
348 
349  // On success: sets each request to MPI_REQUEST_NULL
350  if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
351  {
353  << "MPI_Waitall returned with error"
355  }
356 
358 
359  // Everything handled, reset all to MPI_REQUEST_NULL
360  requests = UPstream::Request(MPI_REQUEST_NULL);
361 }
362 
363 
364 bool Foam::UPstream::waitAnyRequest(const label pos, label len)
365 {
366  // No-op for non-parallel, no pending requests or out-of-range
367  if
368  (
370  || (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
371  || !len
372  )
373  {
374  return false;
375  }
376 
378 
379  // Apply range-checking on slice with (len < 0) behaving like npos
380  // (ie, the rest of the list)
381  if (len >= 0 && len < count)
382  {
383  // A non-trailing slice
384  count = len;
385  }
386  // Have count >= 1
387 
389 
390  if (UPstream::debug)
391  {
392  Pout<< "UPstream::waitAnyRequest : starting wait for any of "
393  << count << " requests starting at " << pos << endl;
394  }
395 
397 
398  // On success: sets request to MPI_REQUEST_NULL
399  int index = MPI_UNDEFINED;
400  if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE))
401  {
403  << "MPI_Waitany returned with error"
405  }
406 
408 
409  if (index == MPI_UNDEFINED)
410  {
411  // No active request handles
412  return false;
413  }
414 
415  return true;
416 }
417 
418 
420 (
421  const label pos,
422  label len,
423  DynamicList<int>* indices
424 )
425 {
426  // No-op for non-parallel, no pending requests or out-of-range
427  if
428  (
430  || (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
431  || !len
432  )
433  {
434  if (indices) indices->clear();
435  return false;
436  }
437 
439 
440  // Apply range-checking on slice with (len < 0) behaving like npos
441  // (ie, the rest of the list)
442  if (len >= 0 && len < count)
443  {
444  // A non-trailing slice
445  count = len;
446  }
447  // Have count >= 1
448 
450 
451  if (UPstream::debug)
452  {
453  Pout<< "UPstream:waitSomeRequest : starting wait for some of "
454  << count << " requests starting at " << pos << endl;
455  }
456 
457 
458  // Local temporary storage, or return via calling parameter
459  List<int> tmpIndices;
460  if (indices)
461  {
462  indices->resize_nocopy(count);
463  }
464  else
465  {
466  tmpIndices.resize(count);
467  }
468 
470 
471  // On success: sets non-blocking requests to MPI_REQUEST_NULL
472  int outcount = 0;
473  if
474  (
475  MPI_Waitsome
476  (
477  count,
478  waitRequests,
479  &outcount,
480  (indices ? indices->data() : tmpIndices.data()),
481  MPI_STATUSES_IGNORE
482  )
483  )
484  {
486  << "MPI_Waitsome returned with error"
488  }
489 
491 
492  if (outcount == MPI_UNDEFINED || outcount < 1)
493  {
494  // No active request handles
495  if (indices) indices->clear();
496  return false;
497  }
498 
499  if (indices)
500  {
501  indices->resize(outcount);
502  }
503 
504  return true;
505 }
506 
507 
509 (
510  UList<UPstream::Request>& requests,
511  DynamicList<int>* indices
512 )
513 {
514  // No-op for non-parallel or no pending requests
515  if (!UPstream::parRun() || requests.empty())
516  {
517  if (indices) indices->clear();
518  return false;
519  }
520 
521  // Looks ugly but is legitimate since UPstream::Request is an intptr_t,
522  // which is always large enough to hold an MPI_Request (int or pointer)
523 
524  label count = 0;
525  auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
526 
527  for (auto& req : requests)
528  {
530  ++count;
531  }
532 
533  // Local temporary storage, or return via calling parameter
534  List<int> tmpIndices;
535  if (indices)
536  {
537  indices->resize_nocopy(count);
538  }
539  else
540  {
541  tmpIndices.resize(count);
542  }
543 
544  if (UPstream::debug)
545  {
546  Pout<< "UPstream:waitSomeRequest : starting wait for some of "
547  << requests.size() << " requests" << endl;
548  }
549 
551 
552  // On success: sets non-blocking requests to MPI_REQUEST_NULL
553  int outcount = 0;
554  if
555  (
556  MPI_Waitsome
557  (
558  count,
559  waitRequests,
560  &outcount,
561  (indices ? indices->data() : tmpIndices.data()),
562  MPI_STATUSES_IGNORE
563  )
564  )
565  {
567  << "MPI_Waitsome returned with error"
569  }
570 
572 
573  if (outcount == MPI_UNDEFINED || outcount < 1)
574  {
575  // No active request handles
576  if (indices) indices->clear();
577 
578  // Everything handled or inactive, reset all to MPI_REQUEST_NULL
579  requests = UPstream::Request(MPI_REQUEST_NULL);
580  return false;
581  }
582 
583  if (indices)
584  {
585  indices->resize(outcount);
586  }
587 
588  // Transcribe MPI_Request back into UPstream::Request
589  // - do in reverse order - see note in finishedRequests()
590  {
591  for (label i = requests.size()-1; i >= 0; --i)
592  {
593  requests[i] = UPstream::Request(waitRequests[i]);
594  }
595  }
596 
597  return true;
598 }
599 
600 
601 Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
602 {
603  // No-op for non-parallel or no pending requests
604  if (!UPstream::parRun() || requests.empty())
605  {
606  return -1;
607  }
608 
609  // Looks ugly but is legitimate since UPstream::Request is an intptr_t,
610  // which is always large enough to hold an MPI_Request (int or pointer)
611 
612  label count = 0;
613  auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
614 
615  // Transcribe UPstream::Request into MPI_Request
616  // - do not change locations within the list since these are relevant
617  // for the return index.
618  for (auto& req : requests)
619  {
621  ++count;
622  }
623 
625 
626  // On success: sets request to MPI_REQUEST_NULL
627  int index = MPI_UNDEFINED;
628  if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE))
629  {
631  << "MPI_Waitany returned with error"
633  }
634 
636 
637  if (index == MPI_UNDEFINED)
638  {
639  index = -1; // No outstanding requests
640  }
641 
642  // Transcribe MPI_Request back into UPstream::Request
643  // - do in reverse order - see note in finishedRequests()
644  {
645  for (label i = count-1; i >= 0; --i)
646  {
647  requests[i] = UPstream::Request(waitRequests[i]);
648  }
649 
650  // Trailing portion
651  for (label i = count; i < requests.size(); ++i)
652  {
653  requests[i] = UPstream::Request(MPI_REQUEST_NULL);
654  }
655  }
656 
657  return index;
658 }
659 
660 
661 // FUTURE?
662 //
711 
712 
713 void Foam::UPstream::waitRequest(const label i)
714 {
715  // No-op for non-parallel, or out-of-range (eg, placeholder indices)
716  if
717  (
719  || i < 0
721  )
722  {
723  return;
724  }
725 
726  auto& request = PstreamGlobals::outstandingRequests_[i];
727 
728  // No-op for null request
729  if (MPI_REQUEST_NULL == request)
730  {
731  return;
732  }
733 
734  if (UPstream::debug)
735  {
736  Pout<< "UPstream::waitRequest : starting wait for request:"
737  << i << endl;
738  }
739 
741 
742  // On success: sets request to MPI_REQUEST_NULL
743  if (MPI_Wait(&request, MPI_STATUS_IGNORE))
744  {
746  << "MPI_Wait returned with error"
748  }
749 
751 
752  if (UPstream::debug)
753  {
754  Pout<< "UPstream::waitRequest : finished wait for request:"
755  << i << endl;
756  }
757 }
758 
759 
760 void Foam::UPstream::waitRequest(UPstream::Request& req)
761 {
762  // No-op for non-parallel
763  if (!UPstream::parRun())
764  {
765  return;
766  }
767 
768  MPI_Request request = PstreamDetail::Request::get(req);
769 
770  // No-op for null request
771  if (MPI_REQUEST_NULL == request)
772  {
773  return;
774  }
775 
777 
778  if (MPI_Wait(&request, MPI_STATUS_IGNORE))
779  {
781  << "MPI_Wait returned with error"
783  }
784 
786 
787  req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
788 }
789 
790 
791 bool Foam::UPstream::finishedRequest(const label i)
792 {
793  // No-op for non-parallel, or out-of-range (eg, placeholder indices)
794  if
795  (
797  || i < 0
799  )
800  {
801  return true;
802  }
803 
804  if (UPstream::debug)
805  {
806  Pout<< "UPstream::finishedRequest : check request:"
807  << i << endl;
808  }
809 
810  auto& request = PstreamGlobals::outstandingRequests_[i];
811 
812  // Fast-path (no-op) for null request
813  if (MPI_REQUEST_NULL == request)
814  {
815  return true;
816  }
817 
818  // On success: sets request to MPI_REQUEST_NULL
819  int flag = 0;
820  MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
821 
822  return flag != 0;
823 }
824 
825 
826 bool Foam::UPstream::finishedRequest(UPstream::Request& req)
827 {
828  // No-op for non-parallel
829  if (!UPstream::parRun())
830  {
831  return true;
832  }
833 
834  MPI_Request request = PstreamDetail::Request::get(req);
835 
836  // Fast-path (no-op) for null request
837  if (MPI_REQUEST_NULL == request)
838  {
839  return true;
840  }
841 
842  int flag = 0;
843  MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
844 
845  if (flag)
846  {
847  // Success: now inactive
848  req = UPstream::Request(MPI_REQUEST_NULL);
849  }
850 
851  return flag != 0;
852 }
853 
854 
855 bool Foam::UPstream::finishedRequests(const label pos, label len)
856 {
857  // No-op for non-parallel, or out-of-range (eg, placeholder indices)
858  if
859  (
861  || (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
862  || !len
863  )
864  {
865  return true;
866  }
867 
869 
870  // Apply range-checking on slice with (len < 0) behaving like npos
871  // (ie, the rest of the list)
872  if (len >= 0 && len < count)
873  {
874  // A non-trailing slice
875  count = len;
876  }
877  // Have count >= 1
878 
879  if (UPstream::debug)
880  {
881  Pout<< "UPstream::finishedRequests : check " << count
882  << " requests starting at " << pos << endl;
883  }
884 
886 
887  int flag = 1;
888 
889  if (count == 1)
890  {
891  // Fast-path (no-op) for single null request
892  if (MPI_REQUEST_NULL == *waitRequests)
893  {
894  return true;
895  }
896 
897  // On success: sets request to MPI_REQUEST_NULL
898  MPI_Test(waitRequests, &flag, MPI_STATUS_IGNORE);
899  }
900  else if (count > 1)
901  {
902  // On success: sets each request to MPI_REQUEST_NULL
903  // On failure: no request is modified
904  MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE);
905  }
906 
907  return flag != 0;
908 }
909 
910 
911 bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
912 {
913  // No-op for non-parallel or no pending requests
914  if (!UPstream::parRun() || requests.empty())
915  {
916  return true;
917  }
918 
919  // Looks ugly but is legitimate since UPstream::Request is an intptr_t,
920  // which is always large enough to hold an MPI_Request (int or pointer)
921 
922  label count = 0;
923  auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
924 
925  for (auto& req : requests)
926  {
927  MPI_Request request = PstreamDetail::Request::get(req);
928 
929  if (MPI_REQUEST_NULL != request) // Apply some prefiltering
930  {
931  waitRequests[count] = request;
932  ++count;
933  }
934  }
935 
936  if (!count)
937  {
938  // No active handles
939  return true;
940  }
941 
942  // On success: sets each request to MPI_REQUEST_NULL
943  // On failure: no request is modified
944  int flag = 0;
945  MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE);
946 
947  if (flag)
948  {
949  // Success: reset all requests to MPI_REQUEST_NULL
950  requests = UPstream::Request(MPI_REQUEST_NULL);
951  }
952  else
953  {
954  // Not all done. Recover wrapped representation but in reverse order
955  // since sizeof(MPI_Request) can be smaller than
956  // sizeof(UPstream::Request::value_type)
957  // eg, mpich has MPI_Request as 'int'
958  //
959  // This is uglier that we'd like, but much better than allocating
960  // and freeing a scratch buffer each time we query things.
961 
962  for (label i = count-1; i >= 0; --i)
963  {
964  requests[i] = UPstream::Request(waitRequests[i]);
965  }
966 
967  // Trailing portion
968  for (label i = count; i < requests.size(); ++i)
969  {
970  requests[i] = UPstream::Request(MPI_REQUEST_NULL);
971  }
972  }
973 
974  return flag != 0;
975 }
976 
977 
978 bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
979 {
980  // No-op for non-parallel
981  if (!UPstream::parRun())
982  {
983  req0 = -1;
984  req1 = -1;
985  return true;
986  }
987 
988  bool anyActive = false;
989  MPI_Request waitRequests[2];
990 
991  // No-op for out-of-range (eg, placeholder indices)
992 
993  if (req0 >= 0 && req0 < PstreamGlobals::outstandingRequests_.size())
994  {
996  }
997  else
998  {
999  waitRequests[0] = MPI_REQUEST_NULL;
1000  }
1001 
1002  if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size())
1003  {
1005  }
1006  else
1007  {
1008  waitRequests[1] = MPI_REQUEST_NULL;
1009  }
1010 
1011  if (MPI_REQUEST_NULL != waitRequests[0]) // An active handle
1012  {
1013  anyActive = true;
1014  }
1015  else
1016  {
1017  req0 = -1;
1018  }
1019 
1020  if (MPI_REQUEST_NULL != waitRequests[1]) // An active handle
1021  {
1022  anyActive = true;
1023  }
1024  else
1025  {
1026  req1 = -1;
1027  }
1028 
1029  if (!anyActive)
1030  {
1031  // No active handles
1032  return true;
1033  }
1034 
1036 
1037  // On success: sets each request to MPI_REQUEST_NULL
1038  int indices[2];
1039  int outcount = 0;
1040  if
1041  (
1042  MPI_Testsome
1043  (
1044  2,
1045  waitRequests,
1046  &outcount,
1047  indices,
1048  MPI_STATUSES_IGNORE
1049  )
1050  )
1051  {
1053  << "MPI_Testsome returned with error"
1054  << Foam::abort(FatalError);
1055  }
1056 
1058 
1059  if (outcount == MPI_UNDEFINED)
1060  {
1061  // No active request handles.
1062  // Slight pedantic, but copy back requests in case they were altered
1063 
1064  if (req0 >= 0)
1065  {
1067  }
1068 
1069  if (req1 >= 0)
1070  {
1072  }
1073 
1074  // Flag indices as 'done'
1075  req0 = -1;
1076  req1 = -1;
1077  return true;
1078  }
1079 
1080  // Copy back requests to their 'stack' locations
1081  for (int i = 0; i < outcount; ++i)
1082  {
1083  const int idx = indices[i];
1084 
1085  if (idx == 0)
1086  {
1087  if (req0 >= 0)
1088  {
1090  req0 = -1;
1091  }
1092  }
1093  if (idx == 1)
1094  {
1095  if (req1 >= 0)
1096  {
1098  req1 = -1;
1099  }
1100  }
1101  }
1102 
1103  return (outcount > 0);
1104 }
1105 
1106 
1107 void Foam::UPstream::waitRequestPair(label& req0, label& req1)
1108 {
1109  // No-op for non-parallel. Flag indices as 'done'
1110  if (!UPstream::parRun())
1111  {
1112  req0 = -1;
1113  req1 = -1;
1114  return;
1115  }
1116 
1117  int count = 0;
1118  MPI_Request waitRequests[2];
1119 
1120  // No-op for out-of-range (eg, placeholder indices)
1121  // Prefilter inactive handles
1122 
1123  if (req0 >= 0 && req0 < PstreamGlobals::outstandingRequests_.size())
1124  {
1126  PstreamGlobals::outstandingRequests_[req0] = MPI_REQUEST_NULL;
1127 
1128  if (MPI_REQUEST_NULL != waitRequests[count]) // An active handle
1129  {
1130  ++count;
1131  }
1132  }
1133 
1134  if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size())
1135  {
1137  PstreamGlobals::outstandingRequests_[req1] = MPI_REQUEST_NULL;
1138 
1139  if (MPI_REQUEST_NULL != waitRequests[count]) // An active handle
1140  {
1141  ++count;
1142  }
1143  }
1144 
1145  // Flag in advance as being handled
1146  req0 = -1;
1147  req1 = -1;
1148 
1149  if (!count)
1150  {
1151  // No active handles
1152  return;
1153  }
1154 
1156 
1157  // On success: sets each request to MPI_REQUEST_NULL
1158  if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
1159  {
1161  << "MPI_Waitall returned with error"
1162  << Foam::abort(FatalError);
1163  }
1164 
1166 }
1167 
1168 
1169 // ************************************************************************* //
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
static void removeRequests(const label pos, label len=-1)
Non-blocking comms: cancel/free outstanding requests (from position onwards) and remove from internal...
void reset() noexcept
Reset to default constructed value (MPI_REQUEST_NULL)
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:531
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1049
static void cancelRequest(const label i)
Non-blocking comms: cancel and free outstanding request. Corresponds to MPI_Cancel() + MPI_Request_fr...
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished? Corresponds to MPI_Test()
static void freeRequest(UPstream::Request &req)
Non-blocking comms: free outstanding request. Corresponds to MPI_Request_free()
static void freeRequests(UList< UPstream::Request > &requests)
Non-blocking comms: free outstanding requests. Corresponds to MPI_Request_free()
static void addWaitTime()
Add time increment to wait time.
bool good() const noexcept
True if not equal to MPI_REQUEST_NULL.
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1538
scalar range
static bool finishedRequestPair(label &req0, label &req1)
Non-blocking comms: have both requests finished? Corresponds to pair of MPI_Test() ...
dimensionedScalar pos(const dimensionedScalar &ds)
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of &#39;true&#39; entries.
Definition: BitOps.H:73
string trim(const std::string &s)
Return string trimmed of leading and trailing whitespace.
Definition: stringOps.C:1033
errorManip< error > abort(error &err)
Definition: errorManip.H:139
static bool waitAnyRequest(const label pos, label len=-1)
Wait until any request (from position onwards) has finished. Corresponds to MPI_Waitany() ...
const direction noexcept
Definition: Scalar.H:258
static void beginTiming()
Update timer prior to measurement.
static void resetRequests(const label n)
Truncate outstanding requests to given length, which is expected to be in the range [0 to nRequests()...
int debug
Static debugging option.
static void waitRequestPair(label &req0, label &req1)
Non-blocking comms: wait for both requests to finish. Corresponds to pair of MPI_Wait() ...
static void addRequest(UPstream::Request &req)
Transfer the (wrapped) MPI request to the internal global list.
Request() noexcept
Default construct as MPI_REQUEST_NULL.
static void cancelRequests(UList< UPstream::Request > &requests)
Non-blocking comms: cancel and free outstanding requests. Corresponds to MPI_Cancel() + MPI_Request_f...
label n
static std::enable_if< std::is_pointer< Type >::value, Type >::type get(const UPstream::Request &req) noexcept
static void waitRequest(const label i)
Wait until request i has finished. Corresponds to MPI_Wait()
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static bool waitSomeRequests(const label pos, label len=-1, DynamicList< int > *indices=nullptr)
Wait until some requests (from position onwards) have finished. Corresponds to MPI_Waitsome() ...
static bool finishedRequests(const label pos, label len=-1)
Non-blocking comms: have all requests (from position onwards) finished? Corresponds to MPI_Testall() ...