mapDistributeBaseTemplates.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2015-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2023 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "Pstream.H"
30 #include "PstreamBuffers.H"
31 #include "flipOp.H"
32 
33 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
34 
35 template<class T, class CombineOp, class NegateOp>
37 (
38  List<T>& lhs,
39  const UList<T>& rhs,
40 
41  const labelUList& map,
42  const bool hasFlip,
43  const CombineOp& cop,
44  const NegateOp& negOp
45 )
46 {
47  const label len = map.size();
48 
49  if (hasFlip)
50  {
51  for (label i = 0; i < len; ++i)
52  {
53  const label index = map[i];
54 
55  if (index > 0)
56  {
57  cop(lhs[index-1], rhs[i]);
58  }
59  else if (index < 0)
60  {
61  cop(lhs[-index-1], negOp(rhs[i]));
62  }
63  else
64  {
66  << "Illegal flip index '0' at " << i << '/' << map.size()
67  << " for list:" << rhs.size() << nl
68  << exit(FatalError);
69  }
70  }
71  }
72  else
73  {
74  for (label i = 0; i < len; ++i)
75  {
76  cop(lhs[map[i]], rhs[i]);
77  }
78  }
79 }
80 
81 
82 template<class T, class NegateOp>
84 (
85  List<T>& output,
86  const UList<T>& values,
87  const labelUList& map,
88  const bool hasFlip,
89  const NegateOp& negOp
90 )
91 {
92  const label len = map.size();
93 
94  // FULLDEBUG: if (output.size() < len) FatalError ...;
95 
96  if (hasFlip)
97  {
98  for (label i = 0; i < len; ++i)
99  {
100  const label index = map[i];
101 
102  if (index > 0)
103  {
104  output[i] = values[index-1];
105  }
106  else if (index < 0)
107  {
108  output[i] = negOp(values[-index-1]);
109  }
110  else
111  {
113  << "Illegal flip index '0' at " << i << '/' << map.size()
114  << " for list:" << values.size() << nl
115  << exit(FatalError);
116  }
117  }
118  }
119  else
120  {
121  // Like indirect list
122  for (label i = 0; i < len; ++i)
123  {
124  output[i] = values[map[i]];
125  }
126  }
127 }
128 
129 
130 template<class T, class NegateOp>
132 (
133  const UList<T>& values,
134  const labelUList& map,
135  const bool hasFlip,
136  const NegateOp& negOp
137 )
138 {
139  List<T> output(map.size());
140  accessAndFlip(output, values, map, hasFlip, negOp);
141  return output;
142 }
143 
144 
145 template<class T, class negateOp>
147 (
148  const labelListList& subMap,
149  const bool subHasFlip,
150  const labelListList& constructMap,
151  const bool constructHasFlip,
152  const UList<T>& field,
153 
154  labelRange& sendRequests,
155  PtrList<List<T>>& sendFields,
156 
157  labelRange& recvRequests,
158  PtrList<List<T>>& recvFields,
159 
160  const negateOp& negOp,
161  const int tag,
162  const label comm
163 )
164 {
166  {
168  << "Only contiguous is currently supported"
169  << exit(FatalError);
170  }
171 
172  const auto myRank = UPstream::myProcNo(comm);
173  const auto nProcs = UPstream::nProcs(comm);
174 
175 
176  // Set up receives from neighbours
177  recvRequests.start() = UPstream::nRequests();
178  recvRequests.size() = 0;
179 
180  recvFields.resize(nProcs);
181 
182  for (const int proci : UPstream::allProcs(comm))
183  {
184  const labelList& map = constructMap[proci];
185 
186  if (proci == myRank)
187  {
188  // No communication for myself - but recvFields may be used
189  }
190  else if (map.empty())
191  {
192  // No receive necessary
193  (void) recvFields.release(proci);
194  }
195  else
196  {
197  List<T>& subField = recvFields.try_emplace(proci);
198  subField.resize_nocopy(map.size());
199 
201  (
203  proci,
204  subField.data_bytes(),
205  subField.size_bytes(),
206  tag,
207  comm
208  );
209  }
210  }
211 
212  // Finished setting up the receives
213  recvRequests.size() = (UPstream::nRequests() - recvRequests.start());
214 
215 
216  // Set up sends to neighbours
217  sendRequests.start() = UPstream::nRequests();
218  sendRequests.size() = 0;
219 
220  sendFields.resize(nProcs);
221 
222  for (const int proci : UPstream::allProcs(comm))
223  {
224  const labelList& map = subMap[proci];
225 
226  if (proci == myRank)
227  {
228  // No communication - sendFields not needed
229  (void) sendFields.release(proci);
230  }
231  else if (map.empty())
232  {
233  // No send necessary
234  (void) sendFields.release(proci);
235  }
236  else
237  {
238  List<T>& subField = sendFields.try_emplace(proci);
239  subField.resize_nocopy(map.size());
240 
241  accessAndFlip(subField, field, map, subHasFlip, negOp);
242 
244  (
246  proci,
247  subField.cdata_bytes(),
248  subField.size_bytes(),
249  tag,
250  comm
251  );
252  }
253  }
254 
255  // Finished setting up the sends
256  sendRequests.size() = (UPstream::nRequests() - sendRequests.start());
257 
258 
259  // Set up 'send' to myself - copy directly into recvFields
260  {
261  const labelList& map = subMap[myRank];
262 
263  if (map.empty())
264  {
265  // Nothing to send/recv
266  (void) recvFields.release(myRank);
267  }
268  else
269  {
270  List<T>& subField = recvFields.try_emplace(myRank);
271  subField.resize_nocopy(map.size());
272 
273  accessAndFlip(subField, field, map, subHasFlip, negOp);
274  }
275  }
276 }
277 
278 
279 template<class T>
281 (
282  const UList<T>& field,
283  labelRange& sendRequests,
284  PtrList<List<T>>& sendFields,
285  labelRange& recvRequests,
286  PtrList<List<T>>& recvFields,
287  const int tag
288 ) const
289 {
290  send
291  (
292  subMap_,
293  subHasFlip_,
294  constructMap_,
295  constructHasFlip_,
296  field,
297  sendRequests, sendFields,
298  recvRequests, recvFields,
299  flipOp(),
300  tag,
301  comm_
302  );
303 }
304 
305 
306 template<class T, class CombineOp, class negateOp>
308 (
309  const label constructSize,
310  const labelListList& constructMap,
311  const bool constructHasFlip,
312  const labelRange& requests,
313  const UPtrList<List<T>>& recvFields,
314  List<T>& field,
315  const CombineOp& cop,
316  const negateOp& negOp,
317  const int tag,
318  const label comm
319 )
320 {
322  {
324  << "Only contiguous is currently supported"
325  << exit(FatalError);
326  }
327 
328  const auto myRank = UPstream::myProcNo(comm);
329  const auto nProcs = UPstream::nProcs(comm);
330 
331 
332  // Receiving from which procs - according to map information
333 
334  DynamicList<int> recvProcs(nProcs);
335  for (const int proci : UPstream::allProcs(comm))
336  {
337  const labelList& map = constructMap[proci];
338 
339  if (proci != myRank && map.size())
340  {
341  recvProcs.push_back(proci);
342 
343  const auto* subFieldPtr = recvFields.get(proci);
344  if (subFieldPtr)
345  {
346  checkReceivedSize(proci, map.size(), subFieldPtr->size());
347  }
348  else
349  {
351  << "From processor " << proci
352  << " : unallocated receive field" << nl
353  << exit(FatalError);
354  }
355  }
356  }
357 
358 
359  // Combining bits - can reuse field storage
360  field.resize_nocopy(constructSize);
361 
362 
363  // Received sub field from myself : recvFields[myRank]
364  if (recvFields.test(myRank))
365  {
366  const labelList& map = constructMap[myRank];
367  const List<T>& subField = recvFields[myRank];
368 
369  // Unlikely to need a size check
370  // checkReceivedSize(myRank, map.size(), subField.size());
371 
372  flipAndCombine
373  (
374  field,
375  subField,
376  map,
377  constructHasFlip,
378  cop,
379  negOp
380  );
381  }
382 
383 
384  // NB: do NOT use polling and dispatch here.
385  // There is no certainty if the listed requests are still pending or
386  // have already been waited on before calling this method.
387 
388  // Wait for (receive) requests, but the range may also include
389  // other requests depending on what the caller provided
390 
391  UPstream::waitRequests(requests.start(), requests.size());
392 
393 
394  // Process received fields
395 
396  {
397  for (const int proci : recvProcs)
398  {
399  const labelList& map = constructMap[proci];
400  const List<T>& subField = recvFields[proci];
401 
402  // Already checked the sizes previously
403  // checkReceivedSize(proci, map.size(), subField.size());
404 
405  flipAndCombine
406  (
407  field,
408  subField,
409  map,
410  constructHasFlip,
411  cop,
412  negOp
413  );
414  }
415  }
416 }
417 
418 
419 template<class T>
421 (
422  const labelRange& requests,
423  const UPtrList<List<T>>& recvFields,
424  List<T>& field,
425  const int tag
426 ) const
427 {
428  receive
429  (
430  constructSize_,
431  constructMap_,
432  constructHasFlip_,
433  requests,
434  recvFields,
435  field,
436  eqOp<T>(),
437  flipOp(),
438  tag,
439  comm_
440  );
441 }
442 
443 
444 template<class T, class CombineOp, class NegateOp>
446 (
447  const UPstream::commsTypes commsType,
448  const List<labelPair>& schedule,
449  const label constructSize,
450  const labelListList& subMap,
451  const bool subHasFlip,
452  const labelListList& constructMap,
453  const bool constructHasFlip,
454  List<T>& field,
455  const T& nullValue,
456  const CombineOp& cop,
457  const NegateOp& negOp,
458  const int tag,
459  const label comm
460 )
461 {
462  const auto myRank = UPstream::myProcNo(comm);
463  const auto nProcs = UPstream::nProcs(comm);
464 
465  if (!UPstream::parRun())
466  {
467  // Do only me to me.
468 
469  List<T> subField
470  (
471  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
472  );
473 
474  // Receive sub field from myself (subField)
475  const labelList& map = constructMap[myRank];
476 
477  // Combining bits - can now reuse field storage
478  field.resize_nocopy(constructSize);
479  field = nullValue;
480 
481  flipAndCombine
482  (
483  field,
484  subField,
485  map,
486  constructHasFlip,
487  cop,
488  negOp
489  );
490 
491  return;
492  }
493 
494  if (commsType == UPstream::commsTypes::blocking)
495  {
496  // Since buffered sending can reuse the field to collect the
497  // received data.
498 
499  // Send sub field to neighbour
500  for (const int proci : UPstream::allProcs(comm))
501  {
502  const labelList& map = subMap[proci];
503 
504  if (proci != myRank && map.size())
505  {
506  OPstream os
507  (
509  proci,
510  0,
511  tag,
512  comm
513  );
514  List<T> subField
515  (
516  accessAndFlip(field, map, subHasFlip, negOp)
517  );
518 
519  os << subField;
520  }
521  }
522 
523  {
524  // Subset myself
525  List<T> subField
526  (
527  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
528  );
529 
530  // Receive sub field from myself (subField)
531  const labelList& map = constructMap[myRank];
532 
533  // Combining bits - can now reuse field storage
534  field.resize_nocopy(constructSize);
535  field = nullValue;
536 
537  flipAndCombine
538  (
539  field,
540  subField,
541  map,
542  constructHasFlip,
543  cop,
544  negOp
545  );
546  }
547 
548  // Receive and process sub-field from neighbours
549  for (const int proci : UPstream::allProcs(comm))
550  {
551  const labelList& map = constructMap[proci];
552 
553  if (proci != myRank && map.size())
554  {
555  IPstream is
556  (
558  proci,
559  0,
560  tag,
561  comm
562  );
563  List<T> subField(is);
564 
565  checkReceivedSize(proci, map.size(), subField.size());
566 
567  flipAndCombine
568  (
569  field,
570  subField,
571  map,
572  constructHasFlip,
573  cop,
574  negOp
575  );
576  }
577  }
578  }
579  else if (commsType == UPstream::commsTypes::scheduled)
580  {
581  // Need to make sure I don't overwrite field with received data
582  // since the data might need to be sent to another processor. So
583  // allocate a new field for the results.
584  List<T> newField;
585  newField.resize_nocopy(constructSize);
586  newField = nullValue;
587 
588  // First handle self
589  {
590  // Subset myself
591  List<T> subField
592  (
593  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
594  );
595 
596  // Receive sub field from myself (subField)
597  const labelList& map = constructMap[myRank];
598 
599  flipAndCombine
600  (
601  newField,
602  subField,
603  map,
604  constructHasFlip,
605  cop,
606  negOp
607  );
608  }
609 
610 
611  // Schedule will already have pruned 0-sized comms
612  for (const labelPair& twoProcs : schedule)
613  {
614  // twoProcs is a swap pair of processors. The first one is the
615  // one that needs to send first and then receive.
616 
617  if (twoProcs.first() == myRank)
618  {
619  // I am send first, receive next
620  const label nbrProc = twoProcs.second();
621 
622  {
623  OPstream os
624  (
626  nbrProc,
627  0,
628  tag,
629  comm
630  );
631 
632  const labelList& map = subMap[nbrProc];
633 
634  List<T> subField
635  (
636  accessAndFlip(field, map, subHasFlip, negOp)
637  );
638 
639  os << subField;
640  }
641  {
642  IPstream is
643  (
645  nbrProc,
646  0,
647  tag,
648  comm
649  );
650  List<T> subField(is);
651  const labelList& map = constructMap[nbrProc];
652 
653  checkReceivedSize(nbrProc, map.size(), subField.size());
654 
655  flipAndCombine
656  (
657  newField,
658  subField,
659  map,
660  constructHasFlip,
661  cop,
662  negOp
663  );
664  }
665  }
666  else
667  {
668  // I am receive first, send next
669  const label nbrProc = twoProcs.first();
670 
671  {
672  IPstream is
673  (
675  nbrProc,
676  0,
677  tag,
678  comm
679  );
680  List<T> subField(is);
681  const labelList& map = constructMap[nbrProc];
682 
683  checkReceivedSize(nbrProc, map.size(), subField.size());
684 
685  flipAndCombine
686  (
687  newField,
688  subField,
689  map,
690  constructHasFlip,
691  cop,
692  negOp
693  );
694  }
695  {
696  OPstream os
697  (
699  nbrProc,
700  0,
701  tag,
702  comm
703  );
704 
705  const labelList& map = subMap[nbrProc];
706 
707  List<T> subField
708  (
709  accessAndFlip(field, map, subHasFlip, negOp)
710  );
711 
712  os << subField;
713  }
714  }
715  }
716  field.transfer(newField);
717  }
718  else if (commsType == UPstream::commsTypes::nonBlocking)
719  {
720  const label startOfRequests = UPstream::nRequests();
721 
722  if (!is_contiguous<T>::value)
723  {
724  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking, tag, comm);
725 
726  // Stream data into buffer
727  for (const int proci : UPstream::allProcs(comm))
728  {
729  const labelList& map = subMap[proci];
730 
731  if (proci != myRank && map.size())
732  {
733  UOPstream os(proci, pBufs);
734 
735  List<T> subField
736  (
737  accessAndFlip(field, map, subHasFlip, negOp)
738  );
739 
740  os << subField;
741  }
742  }
743 
744  // Initiate receiving - do yet not block
745  pBufs.finishedSends(false);
746 
747  {
748  // Set up 'send' to myself
749  List<T> subField
750  (
751  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
752  );
753 
754  // Combining bits - can now reuse field storage
755  field.resize_nocopy(constructSize);
756  field = nullValue;
757 
758  // Receive sub field from myself
759  const labelList& map = constructMap[myRank];
760 
761  flipAndCombine
762  (
763  field,
764  subField,
765  map,
766  constructHasFlip,
767  cop,
768  negOp
769  );
770  }
771 
772  // Wait for receive requests (and the send requests too)
773  UPstream::waitRequests(startOfRequests);
774 
775  // Receive and process neighbour fields
776  for (const int proci : UPstream::allProcs(comm))
777  {
778  const labelList& map = constructMap[proci];
779 
780  if (proci != myRank && map.size())
781  {
782  UIPstream is(proci, pBufs);
783  List<T> subField(is);
784 
785  checkReceivedSize(proci, map.size(), subField.size());
786 
787  flipAndCombine
788  (
789  field,
790  subField,
791  map,
792  constructHasFlip,
793  cop,
794  negOp
795  );
796  }
797  }
798  }
799  else
800  {
801  // Set up receives from neighbours
802 
803  List<List<T>> recvFields(nProcs);
804  DynamicList<int> recvProcs(nProcs);
805 
806  for (const int proci : UPstream::allProcs(comm))
807  {
808  const labelList& map = constructMap[proci];
809 
810  if (proci != myRank && map.size())
811  {
812  recvProcs.push_back(proci);
813  List<T>& subField = recvFields[proci];
814  subField.resize_nocopy(map.size());
815 
817  (
819  proci,
820  subField.data_bytes(),
821  subField.size_bytes(),
822  tag,
823  comm
824  );
825  }
826  }
827 
828 
829  // Set up sends to neighbours
830 
831  List<List<T>> sendFields(nProcs);
832 
833  for (const int proci : UPstream::allProcs(comm))
834  {
835  const labelList& map = subMap[proci];
836 
837  if (proci != myRank && map.size())
838  {
839  List<T>& subField = sendFields[proci];
840  subField.resize_nocopy(map.size());
841 
842  accessAndFlip(subField, field, map, subHasFlip, negOp);
843 
845  (
847  proci,
848  subField.cdata_bytes(),
849  subField.size_bytes(),
850  tag,
851  comm
852  );
853  }
854  }
855 
856  // Set up 'send' to myself - copy directly into recvFields
857  {
858  const labelList& map = subMap[myRank];
859  List<T>& subField = recvFields[myRank];
860  subField.resize_nocopy(map.size());
861 
862  accessAndFlip(subField, field, map, subHasFlip, negOp);
863  }
864 
865 
866  // Combining bits - can now reuse field storage
867  field.resize_nocopy(constructSize);
868  field = nullValue;
869 
870  // Receive sub field from myself : recvFields[myRank]
871  {
872  const labelList& map = constructMap[myRank];
873  const List<T>& subField = recvFields[myRank];
874 
875  // Probably don't need a size check
876  // checkReceivedSize(myRank, map.size(), subField.size());
877 
878  flipAndCombine
879  (
880  field,
881  subField,
882  map,
883  constructHasFlip,
884  cop,
885  negOp
886  );
887  }
888 
889 
890  // Poll for completed receive requests and dispatch
891  DynamicList<int> indices(recvProcs.size());
892  while
893  (
895  (
896  startOfRequests,
897  recvProcs.size(),
898  &indices
899  )
900  )
901  {
902  for (const int idx : indices)
903  {
904  const int proci = recvProcs[idx];
905  const labelList& map = constructMap[proci];
906  const List<T>& subField = recvFields[proci];
907 
908  // No size check - was dimensioned above
909  // checkReceivedSize(proci, map.size(), subField.size());
910 
911  flipAndCombine
912  (
913  field,
914  subField,
915  map,
916  constructHasFlip,
917  cop,
918  negOp
919  );
920  }
921  }
922 
923  // Wait for any remaining requests
924  UPstream::waitRequests(startOfRequests);
925  }
926  }
927  else
928  {
930  << "Unknown communication schedule " << int(commsType)
932  }
933 }
934 
935 
936 template<class T, class NegateOp>
938 (
939  const UPstream::commsTypes commsType,
940  const List<labelPair>& schedule,
941  const label constructSize,
942  const labelListList& subMap,
943  const bool subHasFlip,
944  const labelListList& constructMap,
945  const bool constructHasFlip,
946  List<T>& field,
947  const NegateOp& negOp,
948  const int tag,
949  const label comm
950 )
951 {
952  const auto myRank = UPstream::myProcNo(comm);
953  const auto nProcs = UPstream::nProcs(comm);
954 
955  if (!UPstream::parRun())
956  {
957  // Do only me to me.
958 
959  List<T> subField
960  (
961  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
962  );
963 
964  // Receive sub field from myself (subField)
965  const labelList& map = constructMap[myRank];
966 
967  // Combining bits - can now reuse field storage
968  field.resize_nocopy(constructSize);
969 
970  flipAndCombine
971  (
972  field,
973  subField,
974  map,
975  constructHasFlip,
976  eqOp<T>(),
977  negOp
978  );
979 
980  return;
981  }
982 
983  if (commsType == UPstream::commsTypes::blocking)
984  {
985  // Since buffered sending can reuse the field to collect the
986  // received data.
987 
988  // Send sub field to neighbour
989  for (const int proci : UPstream::allProcs(comm))
990  {
991  const labelList& map = subMap[proci];
992 
993  if (proci != myRank && map.size())
994  {
995  OPstream os
996  (
998  proci,
999  0,
1000  tag,
1001  comm
1002  );
1003 
1004  List<T> subField
1005  (
1006  accessAndFlip(field, map, subHasFlip, negOp)
1007  );
1008 
1009  os << subField;
1010  }
1011  }
1012 
1013  {
1014  // Subset myself
1015  List<T> subField
1016  (
1017  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
1018  );
1019 
1020  // Receive sub field from myself (subField)
1021  const labelList& map = constructMap[myRank];
1022 
1023  // Combining bits - can now reuse field storage
1024  field.resize_nocopy(constructSize);
1025 
1026  flipAndCombine
1027  (
1028  field,
1029  subField,
1030  map,
1031  constructHasFlip,
1032  eqOp<T>(),
1033  negOp
1034  );
1035  }
1036 
1037  // Receive and process sub-field from neighbours
1038  for (const int proci : UPstream::allProcs(comm))
1039  {
1040  const labelList& map = constructMap[proci];
1041 
1042  if (proci != myRank && map.size())
1043  {
1044  IPstream is
1045  (
1047  proci,
1048  0,
1049  tag,
1050  comm
1051  );
1052  List<T> subField(is);
1053 
1054  checkReceivedSize(proci, map.size(), subField.size());
1055 
1056  flipAndCombine
1057  (
1058  field,
1059  subField,
1060  map,
1061  constructHasFlip,
1062  eqOp<T>(),
1063  negOp
1064  );
1065  }
1066  }
1067  }
1068  else if (commsType == UPstream::commsTypes::scheduled)
1069  {
1070  // Need to make sure I don't overwrite field with received data
1071  // since the data might need to be sent to another processor. So
1072  // allocate a new field for the results.
1073  List<T> newField;
1074  newField.resize_nocopy(constructSize);
1075 
1076  // First handle self
1077  {
1078  // Subset myself
1079  List<T> subField
1080  (
1081  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
1082  );
1083 
1084  // Receive sub field from myself (subField)
1085  const labelList& map = constructMap[myRank];
1086 
1087  flipAndCombine
1088  (
1089  newField,
1090  subField,
1091  map,
1092  constructHasFlip,
1093  eqOp<T>(),
1094  negOp
1095  );
1096  }
1097 
1098  // Schedule will already have pruned 0-sized comms
1099  for (const labelPair& twoProcs : schedule)
1100  {
1101  // twoProcs is a swap pair of processors. The first one is the
1102  // one that needs to send first and then receive.
1103 
1104  if (twoProcs.first() == myRank)
1105  {
1106  // I am send first, receive next
1107  const label nbrProc = twoProcs.second();
1108 
1109  {
1110  OPstream os
1111  (
1113  nbrProc,
1114  0,
1115  tag,
1116  comm
1117  );
1118 
1119  const labelList& map = subMap[nbrProc];
1120  List<T> subField
1121  (
1122  accessAndFlip(field, map, subHasFlip, negOp)
1123  );
1124 
1125  os << subField;
1126  }
1127  {
1128  IPstream is
1129  (
1131  nbrProc,
1132  0,
1133  tag,
1134  comm
1135  );
1136  List<T> subField(is);
1137 
1138  const labelList& map = constructMap[nbrProc];
1139 
1140  checkReceivedSize(nbrProc, map.size(), subField.size());
1141 
1142  flipAndCombine
1143  (
1144  newField,
1145  subField,
1146  map,
1147  constructHasFlip,
1148  eqOp<T>(),
1149  negOp
1150  );
1151  }
1152  }
1153  else
1154  {
1155  // I am receive first, send next
1156  const label nbrProc = twoProcs.first();
1157 
1158  {
1159  IPstream is
1160  (
1162  nbrProc,
1163  0,
1164  tag,
1165  comm
1166  );
1167  List<T> subField(is);
1168 
1169  const labelList& map = constructMap[nbrProc];
1170 
1171  checkReceivedSize(nbrProc, map.size(), subField.size());
1172 
1173  flipAndCombine
1174  (
1175  newField,
1176  subField,
1177  map,
1178  constructHasFlip,
1179  eqOp<T>(),
1180  negOp
1181  );
1182  }
1183  {
1184  OPstream os
1185  (
1187  nbrProc,
1188  0,
1189  tag,
1190  comm
1191  );
1192 
1193  const labelList& map = subMap[nbrProc];
1194  List<T> subField
1195  (
1196  accessAndFlip(field, map, subHasFlip, negOp)
1197  );
1198 
1199  os << subField;
1200  }
1201  }
1202  }
1203  field.transfer(newField);
1204  }
1205  else if (commsType == UPstream::commsTypes::nonBlocking)
1206  {
1207  const label startOfRequests = UPstream::nRequests();
1208 
1209  if (!is_contiguous<T>::value)
1210  {
1211  PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking, tag, comm);
1212 
1213  // Stream data into buffer
1214  for (const int proci : UPstream::allProcs(comm))
1215  {
1216  const labelList& map = subMap[proci];
1217 
1218  if (proci != myRank && map.size())
1219  {
1220  UOPstream os(proci, pBufs);
1221 
1222  List<T> subField
1223  (
1224  accessAndFlip(field, map, subHasFlip, negOp)
1225  );
1226 
1227  os << subField;
1228  }
1229  }
1230 
1231  // Initiate receiving - do yet not block
1232  pBufs.finishedSends(false);
1233 
1234  {
1235  // Set up 'send' to myself
1236  List<T> subField
1237  (
1238  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
1239  );
1240 
1241  // Combining bits - can now reuse field storage
1242  field.resize_nocopy(constructSize);
1243 
1244  // Receive sub field from myself
1245  const labelList& map = constructMap[myRank];
1246 
1247  flipAndCombine
1248  (
1249  field,
1250  subField,
1251  map,
1252  constructHasFlip,
1253  eqOp<T>(),
1254  negOp
1255  );
1256  }
1257 
1258  // Wait for receive requests (and the send requests too)
1259  UPstream::waitRequests(startOfRequests);
1260 
1261  // Receive and process neighbour fields
1262  for (const int proci : UPstream::allProcs(comm))
1263  {
1264  const labelList& map = constructMap[proci];
1265 
1266  if (proci != myRank && map.size())
1267  {
1268  UIPstream is(proci, pBufs);
1269  List<T> subField(is);
1270 
1271  checkReceivedSize(proci, map.size(), subField.size());
1272 
1273  flipAndCombine
1274  (
1275  field,
1276  subField,
1277  map,
1278  constructHasFlip,
1279  eqOp<T>(),
1280  negOp
1281  );
1282  }
1283  }
1284  }
1285  else
1286  {
1287  // Set up receives from neighbours
1288 
1289  List<List<T>> recvFields(nProcs);
1290  DynamicList<int> recvProcs(nProcs);
1291 
1292  for (const int proci : UPstream::allProcs(comm))
1293  {
1294  const labelList& map = constructMap[proci];
1295 
1296  if (proci != myRank && map.size())
1297  {
1298  recvProcs.push_back(proci);
1299  List<T>& subField = recvFields[proci];
1300  subField.resize_nocopy(map.size());
1301 
1303  (
1305  proci,
1306  subField.data_bytes(),
1307  subField.size_bytes(),
1308  tag,
1309  comm
1310  );
1311  }
1312  }
1313 
1314 
1315  // Set up sends to neighbours
1316 
1317  List<List<T>> sendFields(nProcs);
1318 
1319  for (const int proci : UPstream::allProcs(comm))
1320  {
1321  const labelList& map = subMap[proci];
1322 
1323  if (proci != myRank && map.size())
1324  {
1325  List<T>& subField = sendFields[proci];
1326  subField.resize_nocopy(map.size());
1327 
1328  accessAndFlip(subField, field, map, subHasFlip, negOp);
1329 
1331  (
1333  proci,
1334  subField.cdata_bytes(),
1335  subField.size_bytes(),
1336  tag,
1337  comm
1338  );
1339  }
1340  }
1341 
1342  // Set up 'send' to myself - copy directly into recvFields
1343  {
1344  const labelList& map = subMap[myRank];
1345  List<T>& subField = recvFields[myRank];
1346  subField.resize_nocopy(map.size());
1347 
1348  accessAndFlip(subField, field, map, subHasFlip, negOp);
1349  }
1350 
1351 
1352  // Combining bits - can now reuse field storage
1353  field.resize_nocopy(constructSize);
1354 
1355 
1356  // Receive sub field from myself : recvFields[myRank]
1357  {
1358  const labelList& map = constructMap[myRank];
1359  const List<T>& subField = recvFields[myRank];
1360 
1361  // Probably don't need a size check
1362  // checkReceivedSize(myRank, map.size(), subField.size());
1363 
1364  flipAndCombine
1365  (
1366  field,
1367  subField,
1368  map,
1369  constructHasFlip,
1370  eqOp<T>(),
1371  negOp
1372  );
1373  }
1374 
1375 
1376  // Poll for completed receive requests and dispatch
1377  DynamicList<int> indices(recvProcs.size());
1378  while
1379  (
1381  (
1382  startOfRequests,
1383  recvProcs.size(),
1384  &indices
1385  )
1386  )
1387  {
1388  for (const int idx : indices)
1389  {
1390  const int proci = recvProcs[idx];
1391  const labelList& map = constructMap[proci];
1392  const List<T>& subField = recvFields[proci];
1393 
1394  // No size check - was dimensioned above
1395  // checkReceivedSize(proci, map.size(), subField.size());
1396 
1397  flipAndCombine
1398  (
1399  field,
1400  subField,
1401  map,
1402  constructHasFlip,
1403  eqOp<T>(),
1404  negOp
1405  );
1406  }
1407  }
1408 
1409  // Wait for any remaining requests
1410  UPstream::waitRequests(startOfRequests);
1411  }
1412  }
1413  else
1414  {
1416  << "Unknown communication schedule " << int(commsType)
1418  }
1419 }
1420 
1421 
1422 template<class T>
1424 (
1425  PstreamBuffers& pBufs,
1426  const List<T>& field
1427 ) const
1428 {
1429  // Stream data into buffer
1430  for (const int proci : UPstream::allProcs(comm_))
1431  {
1432  const labelList& map = subMap_[proci];
1433 
1434  if (map.size())
1435  {
1436  UOPstream os(proci, pBufs);
1437 
1438  List<T> subField
1439  (
1440  accessAndFlip(field, map, subHasFlip_, flipOp())
1441  );
1442 
1443  os << subField;
1444  }
1445  }
1446 
1447  // Start sending and receiving but do not block.
1448  pBufs.finishedSends(false);
1449 }
1450 
1451 
1452 template<class T>
1454 (
1455  PstreamBuffers& pBufs,
1456  List<T>& field
1457 ) const
1458 {
1459  // Consume
1460  field.resize_nocopy(constructSize_);
1461 
1462  for (const int proci : UPstream::allProcs(comm_))
1463  {
1464  const labelList& map = constructMap_[proci];
1465 
1466  if (map.size())
1467  {
1468  UIPstream is(proci, pBufs);
1469  List<T> subField(is);
1470 
1471  checkReceivedSize(proci, map.size(), subField.size());
1472 
1473  flipAndCombine
1474  (
1475  field,
1476  subField,
1477  map,
1478  constructHasFlip_,
1479  eqOp<T>(),
1480  flipOp()
1481  );
1482  }
1483  }
1484 }
1485 
1486 
1487 template<class T, class NegateOp>
1489 (
1490  const UPstream::commsTypes commsType,
1491  List<T>& values,
1492  const NegateOp& negOp,
1493  const int tag
1494 ) const
1495 {
1496  distribute
1497  (
1498  commsType,
1499  whichSchedule(commsType),
1500  constructSize_,
1501  subMap_,
1502  subHasFlip_,
1503  constructMap_,
1504  constructHasFlip_,
1505  values,
1506  negOp,
1507  tag,
1508  comm_
1509  );
1510 }
1511 
1512 
1513 template<class T, class NegateOp>
1515 (
1516  const UPstream::commsTypes commsType,
1517  const T& nullValue,
1518  List<T>& values,
1519  const NegateOp& negOp,
1520  const int tag
1521 ) const
1522 {
1523  distribute
1524  (
1525  commsType,
1526  whichSchedule(commsType),
1527  constructSize_,
1528  subMap_,
1529  subHasFlip_,
1530  constructMap_,
1531  constructHasFlip_,
1532  values,
1533  nullValue,
1534  eqOp<T>(),
1535  negOp,
1536  tag,
1537  comm_
1538  );
1539 }
1540 
1541 
1542 template<class T, class NegateOp>
1544 (
1545  List<T>& values,
1546  const NegateOp& negOp,
1547  const int tag
1548 ) const
1549 {
1550  distribute
1551  (
1553  );
1554 }
1555 
1556 
1557 template<class T>
1559 (
1560  const UPstream::commsTypes commsType,
1561  List<T>& values,
1562  const int tag
1563 ) const
1565  distribute(commsType, values, flipOp(), tag);
1566 }
1567 
1568 
1569 template<class T>
1571 (
1572  const UPstream::commsTypes commsType,
1574  const int tag
1575 ) const
1576 {
1577  values.shrink();
1578 
1579  List<T>& list = static_cast<List<T>&>(values);
1580 
1581  distribute(commsType, list, tag);
1583  values.setCapacity(list.size());
1584 }
1585 
1586 
1587 template<class T>
1589 (
1590  List<T>& values,
1591  const int tag
1592 ) const
1594  distribute(UPstream::defaultCommsType, values, tag);
1595 }
1596 
1597 
1598 template<class T>
1600 (
1602  const int tag
1603 ) const
1605  distribute(UPstream::defaultCommsType, values, tag);
1606 }
1607 
1608 
1609 template<class T>
1611 (
1612  const UPstream::commsTypes commsType,
1613  const label constructSize,
1614  List<T>& values,
1615  const int tag
1616 ) const
1617 {
1618  reverseDistribute<T, flipOp>
1619  (
1620  commsType,
1621  constructSize,
1622  values,
1623  flipOp(),
1624  tag
1625  );
1626 }
1627 
1628 
1629 template<class T, class NegateOp>
1631 (
1632  const UPstream::commsTypes commsType,
1633  const label constructSize,
1634  List<T>& values,
1635  const NegateOp& negOp,
1636  const int tag
1637 ) const
1638 {
1639  distribute
1640  (
1641  commsType,
1642  whichSchedule(commsType),
1643  constructSize,
1644  constructMap_,
1645  constructHasFlip_,
1646  subMap_,
1647  subHasFlip_,
1648  values,
1649  negOp,
1650  tag,
1651  comm_
1652  );
1653 }
1654 
1655 
1656 template<class T>
1658 (
1659  const UPstream::commsTypes commsType,
1660  const label constructSize,
1661  const T& nullValue,
1662  List<T>& values,
1663  const int tag
1664 ) const
1665 {
1666  distribute
1667  (
1668  commsType,
1669  whichSchedule(commsType),
1670  constructSize,
1671  constructMap_,
1672  constructHasFlip_,
1673  subMap_,
1674  subHasFlip_,
1675  values,
1676 
1677  nullValue,
1678  eqOp<T>(),
1679  flipOp(),
1680 
1681  tag,
1682  comm_
1683  );
1684 }
1685 
1686 
1687 template<class T>
1689 (
1690  const label constructSize,
1691  List<T>& values,
1692  const int tag
1693 ) const
1694 {
1695  reverseDistribute
1696  (
1698  constructSize,
1699  values,
1700  tag
1701  );
1702 }
1703 
1704 
1705 template<class T>
1707 (
1708  const label constructSize,
1709  const T& nullValue,
1710  List<T>& values,
1711  const int tag
1712 ) const
1713 {
1714  reverseDistribute
1715  (
1717  constructSize,
1718  nullValue,
1719  values,
1720  tag
1721  );
1722 }
1723 
1724 
1725 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
"blocking" : (MPI_Bsend, MPI_Recv)
Definition: ops.H:67
rDeltaTY field()
static label read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Communications types.
Definition: UPstream.H:72
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:598
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
static void accessAndFlip(List< T > &output, const UList< T > &values, const labelUList &map, const bool hasFlip, const NegateOp &negOp)
Lookup field values at specified map indices and save after any flip negation operations.
void send(PstreamBuffers &pBufs, const List< T > &field) const
Do all sends using PstreamBuffers.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
static rangeType allProcs(const label communicator=worldComm)
Range of process indices for all processes.
Definition: UPstream.H:1176
A range or interval of labels defined by a start and a size.
Definition: labelRange.H:52
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1049
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:175
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1074
void push_back(const T &val)
Append an element at the end of the list.
Definition: ListI.H:227
List< labelList > labelListList
List of labelList.
Definition: labelList.H:38
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1538
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
Input inter-processor communications stream using MPI send/recv etc. - operating on external buffer...
Definition: UIPstream.H:287
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1065
"scheduled" : (MPI_Send, MPI_Recv)
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
Definition: DynamicList.H:51
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void reverseDistribute(const label constructSize, List< T > &values, const int tag=UPstream::msgType()) const
Reverse distribute data using default commsType and the default flip/negate operator.
IntType start() const noexcept
The (inclusive) lower value of the range.
Definition: IntRange.H:204
A list of pointers to objects of type <T>, without allocation/deallocation management of the pointers...
Definition: HashTable.H:106
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
Pair< label > labelPair
A pair of labels.
Definition: Pair.H:51
OBJstream os(runTime.globalPath()/outputName)
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
void receive(PstreamBuffers &pBufs, List< T > &field) const
Do all receives using PstreamBuffers.
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:385
static void flipAndCombine(List< T > &lhs, const UList< T > &rhs, const labelUList &map, const bool hasFlip, const CombineOp &cop, const NegateOp &negOp)
Combine field values (after any flip negation operation) into the specified mapped target locations...
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers...
Definition: List.H:55
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" : (MPI_Isend, MPI_Irecv)
static Ostream & output(Ostream &os, const IntRange< T > &range)
Definition: IntRanges.C:44
List< label > labelList
A List of labels.
Definition: List.H:62
IntType size() const noexcept
The size of the range.
Definition: IntRange.H:194
Functor to negate primitives. Dummy for most other types.
Definition: flipOp.H:66
static void distribute(const UPstream::commsTypes commsType, const List< labelPair > &schedule, const label constructSize, const labelListList &subMap, const bool subHasFlip, const labelListList &constructMap, const bool constructHasFlip, List< T > &field, const T &nullValue, const CombineOp &cop, const NegateOp &negOp, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Distribute combine data with specified combine operation and negate operator (for flips)...
static bool waitSomeRequests(const label pos, label len=-1, DynamicList< int > *indices=nullptr)
Wait until some requests (from position onwards) have finished. Corresponds to MPI_Waitsome() ...