mapDistributeBaseTemplates.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | www.openfoam.com
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8  Copyright (C) 2015-2017 OpenFOAM Foundation
9  Copyright (C) 2015-2024 OpenCFD Ltd.
10 -------------------------------------------------------------------------------
11 License
12  This file is part of OpenFOAM.
13 
14  OpenFOAM is free software: you can redistribute it and/or modify it
15  under the terms of the GNU General Public License as published by
16  the Free Software Foundation, either version 3 of the License, or
17  (at your option) any later version.
18 
19  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22  for more details.
23 
24  You should have received a copy of the GNU General Public License
25  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "Pstream.H"
30 #include "PstreamBuffers.H"
31 #include "flipOp.H"
32 
33 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
34 
35 template<class T, class CombineOp, class NegateOp>
37 (
38  List<T>& lhs,
39  const UList<T>& rhs,
40 
41  const labelUList& map,
42  const bool hasFlip,
43  const CombineOp& cop,
44  const NegateOp& negOp
45 )
46 {
47  const label len = map.size();
48 
49  if (hasFlip)
50  {
51  for (label i = 0; i < len; ++i)
52  {
53  const label index = map[i];
54 
55  if (index > 0)
56  {
57  cop(lhs[index-1], rhs[i]);
58  }
59  else if (index < 0)
60  {
61  cop(lhs[-index-1], negOp(rhs[i]));
62  }
63  else
64  {
66  << "Illegal flip index '0' at " << i << '/' << map.size()
67  << " for list:" << rhs.size() << nl
68  << exit(FatalError);
69  }
70  }
71  }
72  else
73  {
74  for (label i = 0; i < len; ++i)
75  {
76  cop(lhs[map[i]], rhs[i]);
77  }
78  }
79 }
80 
81 
82 template<class T, class NegateOp>
84 (
85  List<T>& output,
86  const UList<T>& values,
87  const labelUList& map,
88  const bool hasFlip,
89  const NegateOp& negOp
90 )
91 {
92  const label len = map.size();
93 
94  // FULLDEBUG: if (output.size() < len) FatalError ...;
95 
96  if (hasFlip)
97  {
98  for (label i = 0; i < len; ++i)
99  {
100  const label index = map[i];
101 
102  if (index > 0)
103  {
104  output[i] = values[index-1];
105  }
106  else if (index < 0)
107  {
108  output[i] = negOp(values[-index-1]);
109  }
110  else
111  {
113  << "Illegal flip index '0' at " << i << '/' << map.size()
114  << " for list:" << values.size() << nl
115  << exit(FatalError);
116  }
117  }
118  }
119  else
120  {
121  // Like indirect list
122  for (label i = 0; i < len; ++i)
123  {
124  output[i] = values[map[i]];
125  }
126  }
127 }
128 
129 
130 template<class T, class NegateOp>
132 (
133  const UList<T>& values,
134  const labelUList& map,
135  const bool hasFlip,
136  const NegateOp& negOp
137 )
138 {
139  List<T> output(map.size());
140  accessAndFlip(output, values, map, hasFlip, negOp);
141  return output;
142 }
143 
144 
145 template<class T, class negateOp>
147 (
148  const labelListList& subMap,
149  const bool subHasFlip,
150  const labelListList& constructMap,
151  const bool constructHasFlip,
152  const UList<T>& field,
153 
154  labelRange& sendRequests,
155  PtrList<List<T>>& sendFields,
156 
157  labelRange& recvRequests,
158  PtrList<List<T>>& recvFields,
159 
160  const negateOp& negOp,
161  const int tag,
162  const label comm
163 )
164 {
166  {
168  << "Only contiguous is currently supported"
169  << exit(FatalError);
170  }
171 
172  const auto myRank = UPstream::myProcNo(comm);
173  const auto nProcs = UPstream::nProcs(comm);
174 
175 
176  // Set up receives from neighbours
177  recvRequests.start() = UPstream::nRequests();
178  recvRequests.size() = 0;
179 
180  recvFields.resize(nProcs);
181 
182  for (const int proci : UPstream::allProcs(comm))
183  {
184  const labelList& map = constructMap[proci];
185 
186  if (proci == myRank)
187  {
188  // No communication for myself - but recvFields may be used
189  }
190  else if (map.empty())
191  {
192  // No receive necessary
193  (void) recvFields.release(proci);
194  }
195  else
196  {
197  List<T>& subField = recvFields.try_emplace(proci);
198  subField.resize_nocopy(map.size());
199 
201  (
203  proci,
204  subField.data_bytes(),
205  subField.size_bytes(),
206  tag,
207  comm
208  );
209  }
210  }
211 
212  // Finished setting up the receives
213  recvRequests.size() = (UPstream::nRequests() - recvRequests.start());
214 
215 
216  // Set up sends to neighbours
217  sendRequests.start() = UPstream::nRequests();
218  sendRequests.size() = 0;
219 
220  sendFields.resize(nProcs);
221 
222  for (const int proci : UPstream::allProcs(comm))
223  {
224  const labelList& map = subMap[proci];
225 
226  if (proci == myRank)
227  {
228  // No communication - sendFields not needed
229  (void) sendFields.release(proci);
230  }
231  else if (map.empty())
232  {
233  // No send necessary
234  (void) sendFields.release(proci);
235  }
236  else
237  {
238  List<T>& subField = sendFields.try_emplace(proci);
239  subField.resize_nocopy(map.size());
240 
241  accessAndFlip(subField, field, map, subHasFlip, negOp);
242 
244  (
246  proci,
247  subField.cdata_bytes(),
248  subField.size_bytes(),
249  tag,
250  comm
251  );
252  }
253  }
254 
255  // Finished setting up the sends
256  sendRequests.size() = (UPstream::nRequests() - sendRequests.start());
257 
258 
259  // Set up 'send' to myself - copy directly into recvFields
260  {
261  const labelList& map = subMap[myRank];
262 
263  if (map.empty())
264  {
265  // Nothing to send/recv
266  (void) recvFields.release(myRank);
267  }
268  else
269  {
270  List<T>& subField = recvFields.try_emplace(myRank);
271  subField.resize_nocopy(map.size());
272 
273  accessAndFlip(subField, field, map, subHasFlip, negOp);
274  }
275  }
276 }
277 
278 
279 template<class T>
281 (
282  const UList<T>& field,
283  labelRange& sendRequests,
284  PtrList<List<T>>& sendFields,
285  labelRange& recvRequests,
286  PtrList<List<T>>& recvFields,
287  const int tag
288 ) const
289 {
290  send
291  (
292  subMap_,
293  subHasFlip_,
294  constructMap_,
295  constructHasFlip_,
296  field,
297  sendRequests, sendFields,
298  recvRequests, recvFields,
299  flipOp(),
300  tag,
301  comm_
302  );
303 }
304 
305 
306 template<class T, class CombineOp, class negateOp>
308 (
309  const label constructSize,
310  const labelListList& constructMap,
311  const bool constructHasFlip,
312  const labelRange& requests,
313  const UPtrList<List<T>>& recvFields,
314  List<T>& field,
315  const CombineOp& cop,
316  const negateOp& negOp,
317  const int tag,
318  const label comm
319 )
320 {
322  {
324  << "Only contiguous is currently supported"
325  << exit(FatalError);
326  }
327 
328  const auto myRank = UPstream::myProcNo(comm);
329  const auto nProcs = UPstream::nProcs(comm);
330 
331 
332  // Receiving from which procs - according to map information
333 
334  DynamicList<int> recvProcs(nProcs);
335  for (const int proci : UPstream::allProcs(comm))
336  {
337  const labelList& map = constructMap[proci];
338 
339  if (proci != myRank && map.size())
340  {
341  recvProcs.push_back(proci);
342 
343  const auto* subFieldPtr = recvFields.get(proci);
344  if (subFieldPtr)
345  {
346  checkReceivedSize(proci, map.size(), subFieldPtr->size());
347  }
348  else
349  {
351  << "From processor " << proci
352  << " : unallocated receive field."
353  << " Expected size " << map.size()
354  << " on comm " << comm
355  << " with procs " << UPstream::nProcs(comm) << nl
356  << exit(FatalError);
357  }
358  }
359  }
360 
361 
362  // Combining bits - can reuse field storage
363  field.resize_nocopy(constructSize);
364 
365 
366  // Received sub field from myself : recvFields[myRank]
367  if (recvFields.test(myRank))
368  {
369  const labelList& map = constructMap[myRank];
370  const List<T>& subField = recvFields[myRank];
371 
372  // Unlikely to need a size check
373  // checkReceivedSize(myRank, map.size(), subField.size());
374 
375  flipAndCombine
376  (
377  field,
378  subField,
379  map,
380  constructHasFlip,
381  cop,
382  negOp
383  );
384  }
385 
386 
387  // NB: do NOT use polling and dispatch here.
388  // There is no certainty if the listed requests are still pending or
389  // have already been waited on before calling this method.
390 
391  // Wait for (receive) requests, but the range may also include
392  // other requests depending on what the caller provided
393 
394  UPstream::waitRequests(requests.start(), requests.size());
395 
396 
397  // Process received fields
398 
399  {
400  for (const int proci : recvProcs)
401  {
402  const labelList& map = constructMap[proci];
403  const List<T>& subField = recvFields[proci];
404 
405  // Already checked the sizes previously
406  // checkReceivedSize(proci, map.size(), subField.size());
407 
408  flipAndCombine
409  (
410  field,
411  subField,
412  map,
413  constructHasFlip,
414  cop,
415  negOp
416  );
417  }
418  }
419 }
420 
421 
422 template<class T>
424 (
425  const labelRange& requests,
426  const UPtrList<List<T>>& recvFields,
427  List<T>& field,
428  const int tag
429 ) const
430 {
431  receive
432  (
433  constructSize_,
434  constructMap_,
435  constructHasFlip_,
436  requests,
437  recvFields,
438  field,
439  eqOp<T>(),
440  flipOp(),
441  tag,
442  comm_
443  );
444 }
445 
446 
447 template<class T, class CombineOp, class NegateOp>
449 (
450  const UPstream::commsTypes commsType,
451  const List<labelPair>& schedule,
452  const label constructSize,
453  const labelListList& subMap,
454  const bool subHasFlip,
455  const labelListList& constructMap,
456  const bool constructHasFlip,
457  List<T>& field,
458  const T& nullValue,
459  const CombineOp& cop,
460  const NegateOp& negOp,
461  const int tag,
462  const label comm
463 )
464 {
465  const auto myRank = UPstream::myProcNo(comm);
466  const auto nProcs = UPstream::nProcs(comm);
467 
468  if (!UPstream::parRun())
469  {
470  // Do only me to me.
471 
472  List<T> subField
473  (
474  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
475  );
476 
477  // Receive sub field from myself (subField)
478  const labelList& map = constructMap[myRank];
479 
480  // Combining bits - can now reuse field storage
481  field.resize_nocopy(constructSize);
482  field = nullValue;
483 
484  flipAndCombine
485  (
486  field,
487  subField,
488  map,
489  constructHasFlip,
490  cop,
491  negOp
492  );
493 
494  return;
495  }
496 
497  if (commsType == UPstream::commsTypes::buffered)
498  {
499  // Since buffered sending can reuse the field to collect the
500  // received data.
501 
502  // Send sub field to neighbour
503  for (const int proci : UPstream::allProcs(comm))
504  {
505  const labelList& map = subMap[proci];
506 
507  if (proci != myRank && map.size())
508  {
509  List<T> subField
510  (
511  accessAndFlip(field, map, subHasFlip, negOp)
512  );
513 
514  // buffered send
515  OPstream os(commsType, proci, 0, tag, comm);
516  os << subField;
517  }
518  }
519 
520  {
521  // Subset myself
522  List<T> subField
523  (
524  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
525  );
526 
527  // Receive sub field from myself (subField)
528  const labelList& map = constructMap[myRank];
529 
530  // Combining bits - can now reuse field storage
531  field.resize_nocopy(constructSize);
532  field = nullValue;
533 
534  flipAndCombine
535  (
536  field,
537  subField,
538  map,
539  constructHasFlip,
540  cop,
541  negOp
542  );
543  }
544 
545  // Receive and process sub-field from neighbours
546  for (const int proci : UPstream::allProcs(comm))
547  {
548  const labelList& map = constructMap[proci];
549 
550  if (proci != myRank && map.size())
551  {
552  List<T> subField;
553  IPstream::recv(subField, proci, tag, comm);
554 
555  checkReceivedSize(proci, map.size(), subField.size());
556 
557  flipAndCombine
558  (
559  field,
560  subField,
561  map,
562  constructHasFlip,
563  cop,
564  negOp
565  );
566  }
567  }
568  }
569  else if (commsType == UPstream::commsTypes::scheduled)
570  {
571  // Need to make sure I don't overwrite field with received data
572  // since the data might need to be sent to another processor. So
573  // allocate a new field for the results.
574  List<T> newField;
575  newField.resize_nocopy(constructSize);
576  newField = nullValue;
577 
578  // First handle self
579  {
580  // Subset myself
581  List<T> subField
582  (
583  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
584  );
585 
586  // Receive sub field from myself (subField)
587  const labelList& map = constructMap[myRank];
588 
589  flipAndCombine
590  (
591  newField,
592  subField,
593  map,
594  constructHasFlip,
595  cop,
596  negOp
597  );
598  }
599 
600 
601  // Schedule will already have pruned 0-sized comms
602  for (const labelPair& twoProcs : schedule)
603  {
604  // twoProcs is a swap pair of processors. The first one is the
605  // one that needs to send first and then receive.
606 
607  if (twoProcs.first() == myRank)
608  {
609  // I am send first, receive next
610  const label nbrProc = twoProcs.second();
611 
612  {
613  const labelList& map = subMap[nbrProc];
614 
615  List<T> subField
616  (
617  accessAndFlip(field, map, subHasFlip, negOp)
618  );
619 
620  OPstream::send(subField, nbrProc, tag, comm);
621  }
622  {
623  List<T> subField;
624  IPstream::recv(subField, nbrProc, tag, comm);
625 
626  const labelList& map = constructMap[nbrProc];
627 
628  checkReceivedSize(nbrProc, map.size(), subField.size());
629 
630  flipAndCombine
631  (
632  newField,
633  subField,
634  map,
635  constructHasFlip,
636  cop,
637  negOp
638  );
639  }
640  }
641  else
642  {
643  // I am receive first, send next
644  const label nbrProc = twoProcs.first();
645 
646  {
647  List<T> subField;
648  IPstream::recv(subField, nbrProc, tag, comm);
649 
650  const labelList& map = constructMap[nbrProc];
651 
652  checkReceivedSize(nbrProc, map.size(), subField.size());
653 
654  flipAndCombine
655  (
656  newField,
657  subField,
658  map,
659  constructHasFlip,
660  cop,
661  negOp
662  );
663  }
664  {
665  const labelList& map = subMap[nbrProc];
666 
667  List<T> subField
668  (
669  accessAndFlip(field, map, subHasFlip, negOp)
670  );
671 
672  OPstream::send(subField, nbrProc, tag, comm);
673  }
674  }
675  }
676  field.transfer(newField);
677  }
678  else if (commsType == UPstream::commsTypes::nonBlocking)
679  {
680  const label startOfRequests = UPstream::nRequests();
681 
682  if (!is_contiguous<T>::value)
683  {
684  PstreamBuffers pBufs(comm, tag);
685 
686  // Stream data into buffer
687  for (const int proci : UPstream::allProcs(comm))
688  {
689  const labelList& map = subMap[proci];
690 
691  if (proci != myRank && map.size())
692  {
693  List<T> subField
694  (
695  accessAndFlip(field, map, subHasFlip, negOp)
696  );
697 
698  UOPstream os(proci, pBufs);
699  os << subField;
700  }
701  }
702 
703  // Initiate receiving - do yet not block
704  pBufs.finishedSends(false);
705 
706  {
707  // Set up 'send' to myself
708  List<T> subField
709  (
710  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
711  );
712 
713  // Combining bits - can now reuse field storage
714  field.resize_nocopy(constructSize);
715  field = nullValue;
716 
717  // Receive sub field from myself
718  const labelList& map = constructMap[myRank];
719 
720  flipAndCombine
721  (
722  field,
723  subField,
724  map,
725  constructHasFlip,
726  cop,
727  negOp
728  );
729  }
730 
731  // Wait for receive requests (and the send requests too)
732  UPstream::waitRequests(startOfRequests);
733 
734  // Receive and process neighbour fields
735  for (const int proci : UPstream::allProcs(comm))
736  {
737  const labelList& map = constructMap[proci];
738 
739  if (proci != myRank && map.size())
740  {
741  UIPstream is(proci, pBufs);
742  List<T> subField(is);
743 
744  checkReceivedSize(proci, map.size(), subField.size());
745 
746  flipAndCombine
747  (
748  field,
749  subField,
750  map,
751  constructHasFlip,
752  cop,
753  negOp
754  );
755  }
756  }
757  }
758  else
759  {
760  // Set up receives from neighbours
761 
762  List<List<T>> recvFields(nProcs);
763  DynamicList<int> recvProcs(nProcs);
764 
765  for (const int proci : UPstream::allProcs(comm))
766  {
767  const labelList& map = constructMap[proci];
768 
769  if (proci != myRank && map.size())
770  {
771  recvProcs.push_back(proci);
772  List<T>& subField = recvFields[proci];
773  subField.resize_nocopy(map.size());
774 
776  (
778  proci,
779  subField.data_bytes(),
780  subField.size_bytes(),
781  tag,
782  comm
783  );
784  }
785  }
786 
787 
788  // Set up sends to neighbours
789 
790  List<List<T>> sendFields(nProcs);
791 
792  for (const int proci : UPstream::allProcs(comm))
793  {
794  const labelList& map = subMap[proci];
795 
796  if (proci != myRank && map.size())
797  {
798  List<T>& subField = sendFields[proci];
799  subField.resize_nocopy(map.size());
800 
801  accessAndFlip(subField, field, map, subHasFlip, negOp);
802 
804  (
806  proci,
807  subField.cdata_bytes(),
808  subField.size_bytes(),
809  tag,
810  comm
811  );
812  }
813  }
814 
815  // Set up 'send' to myself - copy directly into recvFields
816  {
817  const labelList& map = subMap[myRank];
818  List<T>& subField = recvFields[myRank];
819  subField.resize_nocopy(map.size());
820 
821  accessAndFlip(subField, field, map, subHasFlip, negOp);
822  }
823 
824 
825  // Combining bits - can now reuse field storage
826  field.resize_nocopy(constructSize);
827  field = nullValue;
828 
829  // Receive sub field from myself : recvFields[myRank]
830  {
831  const labelList& map = constructMap[myRank];
832  const List<T>& subField = recvFields[myRank];
833 
834  // Probably don't need a size check
835  // checkReceivedSize(myRank, map.size(), subField.size());
836 
837  flipAndCombine
838  (
839  field,
840  subField,
841  map,
842  constructHasFlip,
843  cop,
844  negOp
845  );
846  }
847 
848 
849  // Poll for completed receive requests and dispatch
850  DynamicList<int> indices(recvProcs.size());
851  while
852  (
854  (
855  startOfRequests,
856  recvProcs.size(),
857  &indices
858  )
859  )
860  {
861  for (const int idx : indices)
862  {
863  const int proci = recvProcs[idx];
864  const labelList& map = constructMap[proci];
865  const List<T>& subField = recvFields[proci];
866 
867  // No size check - was dimensioned above
868  // checkReceivedSize(proci, map.size(), subField.size());
869 
870  flipAndCombine
871  (
872  field,
873  subField,
874  map,
875  constructHasFlip,
876  cop,
877  negOp
878  );
879  }
880  }
881 
882  // Wait for any remaining requests
883  UPstream::waitRequests(startOfRequests);
884  }
885  }
886  else
887  {
889  << "Unknown communication schedule " << int(commsType)
891  }
892 }
893 
894 
895 template<class T, class NegateOp>
897 (
898  const UPstream::commsTypes commsType,
899  const List<labelPair>& schedule,
900  const label constructSize,
901  const labelListList& subMap,
902  const bool subHasFlip,
903  const labelListList& constructMap,
904  const bool constructHasFlip,
905  List<T>& field,
906  const NegateOp& negOp,
907  const int tag,
908  const label comm
909 )
910 {
911  const auto myRank = UPstream::myProcNo(comm);
912  const auto nProcs = UPstream::nProcs(comm);
913 
914  if (!UPstream::parRun())
915  {
916  // Do only me to me.
917 
918  List<T> subField
919  (
920  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
921  );
922 
923  // Receive sub field from myself (subField)
924  const labelList& map = constructMap[myRank];
925 
926  // Combining bits - can now reuse field storage
927  field.resize_nocopy(constructSize);
928 
929  flipAndCombine
930  (
931  field,
932  subField,
933  map,
934  constructHasFlip,
935  eqOp<T>(),
936  negOp
937  );
938 
939  return;
940  }
941 
942  if (commsType == UPstream::commsTypes::buffered)
943  {
944  // Since buffered sending can reuse the field to collect the
945  // received data.
946 
947  // Send sub field to neighbour
948  for (const int proci : UPstream::allProcs(comm))
949  {
950  const labelList& map = subMap[proci];
951 
952  if (proci != myRank && map.size())
953  {
954  List<T> subField
955  (
956  accessAndFlip(field, map, subHasFlip, negOp)
957  );
958 
959  // buffered send
960  OPstream os(commsType, proci, 0, tag, comm);
961  os << subField;
962  }
963  }
964 
965  {
966  // Subset myself
967  List<T> subField
968  (
969  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
970  );
971 
972  // Receive sub field from myself (subField)
973  const labelList& map = constructMap[myRank];
974 
975  // Combining bits - can now reuse field storage
976  field.resize_nocopy(constructSize);
977 
978  flipAndCombine
979  (
980  field,
981  subField,
982  map,
983  constructHasFlip,
984  eqOp<T>(),
985  negOp
986  );
987  }
988 
989  // Receive and process sub-field from neighbours
990  for (const int proci : UPstream::allProcs(comm))
991  {
992  const labelList& map = constructMap[proci];
993 
994  if (proci != myRank && map.size())
995  {
996  List<T> subField;
997  IPstream::recv(subField, proci, tag, comm);
998 
999  checkReceivedSize(proci, map.size(), subField.size());
1000 
1001  flipAndCombine
1002  (
1003  field,
1004  subField,
1005  map,
1006  constructHasFlip,
1007  eqOp<T>(),
1008  negOp
1009  );
1010  }
1011  }
1012  }
1013  else if (commsType == UPstream::commsTypes::scheduled)
1014  {
1015  // Need to make sure I don't overwrite field with received data
1016  // since the data might need to be sent to another processor. So
1017  // allocate a new field for the results.
1018  List<T> newField;
1019  newField.resize_nocopy(constructSize);
1020 
1021  // First handle self
1022  {
1023  // Subset myself
1024  List<T> subField
1025  (
1026  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
1027  );
1028 
1029  // Receive sub field from myself (subField)
1030  const labelList& map = constructMap[myRank];
1031 
1032  flipAndCombine
1033  (
1034  newField,
1035  subField,
1036  map,
1037  constructHasFlip,
1038  eqOp<T>(),
1039  negOp
1040  );
1041  }
1042 
1043  // Schedule will already have pruned 0-sized comms
1044  for (const labelPair& twoProcs : schedule)
1045  {
1046  // twoProcs is a swap pair of processors. The first one is the
1047  // one that needs to send first and then receive.
1048 
1049  if (twoProcs.first() == myRank)
1050  {
1051  // I am send first, receive next
1052  const label nbrProc = twoProcs.second();
1053 
1054  {
1055  const labelList& map = subMap[nbrProc];
1056  List<T> subField
1057  (
1058  accessAndFlip(field, map, subHasFlip, negOp)
1059  );
1060 
1061  OPstream::send(subField, nbrProc, tag, comm);
1062  }
1063  {
1064  List<T> subField;
1065  IPstream::recv(subField, nbrProc, tag, comm);
1066 
1067  const labelList& map = constructMap[nbrProc];
1068 
1069  checkReceivedSize(nbrProc, map.size(), subField.size());
1070 
1071  flipAndCombine
1072  (
1073  newField,
1074  subField,
1075  map,
1076  constructHasFlip,
1077  eqOp<T>(),
1078  negOp
1079  );
1080  }
1081  }
1082  else
1083  {
1084  // I am receive first, send next
1085  const label nbrProc = twoProcs.first();
1086 
1087  {
1088  List<T> subField;
1089  IPstream::recv(subField, nbrProc, tag, comm);
1090 
1091  const labelList& map = constructMap[nbrProc];
1092 
1093  checkReceivedSize(nbrProc, map.size(), subField.size());
1094 
1095  flipAndCombine
1096  (
1097  newField,
1098  subField,
1099  map,
1100  constructHasFlip,
1101  eqOp<T>(),
1102  negOp
1103  );
1104  }
1105  {
1106  const labelList& map = subMap[nbrProc];
1107  List<T> subField
1108  (
1109  accessAndFlip(field, map, subHasFlip, negOp)
1110  );
1111 
1112  OPstream::send(subField, nbrProc, tag, comm);
1113  }
1114  }
1115  }
1116  field.transfer(newField);
1117  }
1118  else if (commsType == UPstream::commsTypes::nonBlocking)
1119  {
1120  const label startOfRequests = UPstream::nRequests();
1121 
1122  if (!is_contiguous<T>::value)
1123  {
1124  PstreamBuffers pBufs(comm, tag);
1125 
1126  // Stream data into buffer
1127  for (const int proci : UPstream::allProcs(comm))
1128  {
1129  const labelList& map = subMap[proci];
1130 
1131  if (proci != myRank && map.size())
1132  {
1133  List<T> subField
1134  (
1135  accessAndFlip(field, map, subHasFlip, negOp)
1136  );
1137 
1138  UOPstream os(proci, pBufs);
1139  os << subField;
1140  }
1141  }
1142 
1143  // Initiate receiving - do yet not block
1144  pBufs.finishedSends(false);
1145 
1146  {
1147  // Set up 'send' to myself
1148  List<T> subField
1149  (
1150  accessAndFlip(field, subMap[myRank], subHasFlip, negOp)
1151  );
1152 
1153  // Combining bits - can now reuse field storage
1154  field.resize_nocopy(constructSize);
1155 
1156  // Receive sub field from myself
1157  const labelList& map = constructMap[myRank];
1158 
1159  flipAndCombine
1160  (
1161  field,
1162  subField,
1163  map,
1164  constructHasFlip,
1165  eqOp<T>(),
1166  negOp
1167  );
1168  }
1169 
1170  // Wait for receive requests (and the send requests too)
1171  UPstream::waitRequests(startOfRequests);
1172 
1173  // Receive and process neighbour fields
1174  for (const int proci : UPstream::allProcs(comm))
1175  {
1176  const labelList& map = constructMap[proci];
1177 
1178  if (proci != myRank && map.size())
1179  {
1180  UIPstream is(proci, pBufs);
1181  List<T> subField(is);
1182 
1183  checkReceivedSize(proci, map.size(), subField.size());
1184 
1185  flipAndCombine
1186  (
1187  field,
1188  subField,
1189  map,
1190  constructHasFlip,
1191  eqOp<T>(),
1192  negOp
1193  );
1194  }
1195  }
1196  }
1197  else
1198  {
1199  // Set up receives from neighbours
1200 
1201  List<List<T>> recvFields(nProcs);
1202  DynamicList<int> recvProcs(nProcs);
1203 
1204  for (const int proci : UPstream::allProcs(comm))
1205  {
1206  const labelList& map = constructMap[proci];
1207 
1208  if (proci != myRank && map.size())
1209  {
1210  recvProcs.push_back(proci);
1211  List<T>& subField = recvFields[proci];
1212  subField.resize_nocopy(map.size());
1213 
1215  (
1217  proci,
1218  subField.data_bytes(),
1219  subField.size_bytes(),
1220  tag,
1221  comm
1222  );
1223  }
1224  }
1225 
1226 
1227  // Set up sends to neighbours
1228 
1229  List<List<T>> sendFields(nProcs);
1230 
1231  for (const int proci : UPstream::allProcs(comm))
1232  {
1233  const labelList& map = subMap[proci];
1234 
1235  if (proci != myRank && map.size())
1236  {
1237  List<T>& subField = sendFields[proci];
1238  subField.resize_nocopy(map.size());
1239 
1240  accessAndFlip(subField, field, map, subHasFlip, negOp);
1241 
1243  (
1245  proci,
1246  subField.cdata_bytes(),
1247  subField.size_bytes(),
1248  tag,
1249  comm
1250  );
1251  }
1252  }
1253 
1254  // Set up 'send' to myself - copy directly into recvFields
1255  {
1256  const labelList& map = subMap[myRank];
1257  List<T>& subField = recvFields[myRank];
1258  subField.resize_nocopy(map.size());
1259 
1260  accessAndFlip(subField, field, map, subHasFlip, negOp);
1261  }
1262 
1263 
1264  // Combining bits - can now reuse field storage
1265  field.resize_nocopy(constructSize);
1266 
1267 
1268  // Receive sub field from myself : recvFields[myRank]
1269  {
1270  const labelList& map = constructMap[myRank];
1271  const List<T>& subField = recvFields[myRank];
1272 
1273  // Probably don't need a size check
1274  // checkReceivedSize(myRank, map.size(), subField.size());
1275 
1276  flipAndCombine
1277  (
1278  field,
1279  subField,
1280  map,
1281  constructHasFlip,
1282  eqOp<T>(),
1283  negOp
1284  );
1285  }
1286 
1287 
1288  // Poll for completed receive requests and dispatch
1289  DynamicList<int> indices(recvProcs.size());
1290  while
1291  (
1293  (
1294  startOfRequests,
1295  recvProcs.size(),
1296  &indices
1297  )
1298  )
1299  {
1300  for (const int idx : indices)
1301  {
1302  const int proci = recvProcs[idx];
1303  const labelList& map = constructMap[proci];
1304  const List<T>& subField = recvFields[proci];
1305 
1306  // No size check - was dimensioned above
1307  // checkReceivedSize(proci, map.size(), subField.size());
1308 
1309  flipAndCombine
1310  (
1311  field,
1312  subField,
1313  map,
1314  constructHasFlip,
1315  eqOp<T>(),
1316  negOp
1317  );
1318  }
1319  }
1320 
1321  // Wait for any remaining requests
1322  UPstream::waitRequests(startOfRequests);
1323  }
1324  }
1325  else
1326  {
1328  << "Unknown communication schedule " << int(commsType)
1330  }
1331 }
1332 
1333 
1334 template<class T>
1336 (
1337  PstreamBuffers& pBufs,
1338  const List<T>& field
1339 ) const
1340 {
1341  // Stream data into buffer
1342  for (const int proci : UPstream::allProcs(comm_))
1343  {
1344  const labelList& map = subMap_[proci];
1345 
1346  if (map.size())
1347  {
1348  List<T> subField
1349  (
1350  accessAndFlip(field, map, subHasFlip_, flipOp())
1351  );
1352 
1353  UOPstream os(proci, pBufs);
1354  os << subField;
1355  }
1356  }
1357 
1358  // Start sending and receiving but do not block.
1359  pBufs.finishedSends(false);
1360 }
1361 
1362 
1363 template<class T>
1365 (
1366  PstreamBuffers& pBufs,
1367  List<T>& field
1368 ) const
1369 {
1370  // Consume
1371  field.resize_nocopy(constructSize_);
1372 
1373  for (const int proci : UPstream::allProcs(comm_))
1374  {
1375  const labelList& map = constructMap_[proci];
1376 
1377  if (map.size())
1378  {
1379  UIPstream is(proci, pBufs);
1380  List<T> subField(is);
1381 
1382  checkReceivedSize(proci, map.size(), subField.size());
1383 
1384  flipAndCombine
1385  (
1386  field,
1387  subField,
1388  map,
1389  constructHasFlip_,
1390  eqOp<T>(),
1391  flipOp()
1392  );
1393  }
1394  }
1395 }
1396 
1397 
1398 template<class T, class NegateOp>
1400 (
1401  const UPstream::commsTypes commsType,
1402  List<T>& values,
1403  const NegateOp& negOp,
1404  const int tag
1405 ) const
1406 {
1407  distribute
1408  (
1409  commsType,
1410  whichSchedule(commsType),
1411  constructSize_,
1412  subMap_,
1413  subHasFlip_,
1414  constructMap_,
1415  constructHasFlip_,
1416  values,
1417  negOp,
1418  tag,
1419  comm_
1420  );
1421 }
1422 
1423 
1424 template<class T, class NegateOp>
1426 (
1427  const UPstream::commsTypes commsType,
1428  const T& nullValue,
1429  List<T>& values,
1430  const NegateOp& negOp,
1431  const int tag
1432 ) const
1433 {
1434  distribute
1435  (
1436  commsType,
1437  whichSchedule(commsType),
1438  constructSize_,
1439  subMap_,
1440  subHasFlip_,
1441  constructMap_,
1442  constructHasFlip_,
1443  values,
1444  nullValue,
1445  eqOp<T>(),
1446  negOp,
1447  tag,
1448  comm_
1449  );
1450 }
1451 
1452 
1453 template<class T, class NegateOp>
1455 (
1456  List<T>& values,
1457  const NegateOp& negOp,
1458  const int tag
1459 ) const
1460 {
1461  distribute
1462  (
1464  );
1465 }
1466 
1467 
1468 template<class T>
1470 (
1471  const UPstream::commsTypes commsType,
1472  List<T>& values,
1473  const int tag
1474 ) const
1476  distribute(commsType, values, flipOp(), tag);
1477 }
1478 
1479 
1480 template<class T>
1482 (
1483  const UPstream::commsTypes commsType,
1485  const int tag
1486 ) const
1487 {
1488  values.shrink();
1489 
1490  List<T>& list = static_cast<List<T>&>(values);
1491 
1492  distribute(commsType, list, tag);
1494  values.setCapacity(list.size());
1495 }
1496 
1497 
1498 template<class T>
1500 (
1501  List<T>& values,
1502  const int tag
1503 ) const
1505  distribute(UPstream::defaultCommsType, values, tag);
1506 }
1507 
1508 
1509 template<class T>
1511 (
1513  const int tag
1514 ) const
1516  distribute(UPstream::defaultCommsType, values, tag);
1517 }
1518 
1519 
1520 template<class T>
1522 (
1523  const UPstream::commsTypes commsType,
1524  const label constructSize,
1525  List<T>& values,
1526  const int tag
1527 ) const
1528 {
1529  reverseDistribute<T, flipOp>
1530  (
1531  commsType,
1532  constructSize,
1533  values,
1534  flipOp(),
1535  tag
1536  );
1537 }
1538 
1539 
1540 template<class T, class NegateOp>
1542 (
1543  const UPstream::commsTypes commsType,
1544  const label constructSize,
1545  List<T>& values,
1546  const NegateOp& negOp,
1547  const int tag
1548 ) const
1549 {
1550  distribute
1551  (
1552  commsType,
1553  whichSchedule(commsType),
1554  constructSize,
1555  constructMap_,
1556  constructHasFlip_,
1557  subMap_,
1558  subHasFlip_,
1559  values,
1560  negOp,
1561  tag,
1562  comm_
1563  );
1564 }
1565 
1566 
1567 template<class T>
1569 (
1570  const UPstream::commsTypes commsType,
1571  const label constructSize,
1572  const T& nullValue,
1573  List<T>& values,
1574  const int tag
1575 ) const
1576 {
1577  distribute
1578  (
1579  commsType,
1580  whichSchedule(commsType),
1581  constructSize,
1582  constructMap_,
1583  constructHasFlip_,
1584  subMap_,
1585  subHasFlip_,
1586  values,
1587 
1588  nullValue,
1589  eqOp<T>(),
1590  flipOp(),
1591 
1592  tag,
1593  comm_
1594  );
1595 }
1596 
1597 
1598 template<class T>
1600 (
1601  const label constructSize,
1602  List<T>& values,
1603  const int tag
1604 ) const
1605 {
1606  reverseDistribute
1607  (
1609  constructSize,
1610  values,
1611  tag
1612  );
1613 }
1614 
1615 
1616 template<class T>
1618 (
1619  const label constructSize,
1620  const T& nullValue,
1621  List<T>& values,
1622  const int tag
1623 ) const
1624 {
1625  reverseDistribute
1626  (
1628  constructSize,
1629  nullValue,
1630  values,
1631  tag
1632  );
1633 }
1634 
1635 
1636 // ************************************************************************* //
void size(const label n)
Older name for setAddressableSize.
Definition: UList.H:116
Definition: ops.H:67
rDeltaTY field()
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:125
commsTypes
Communications types.
Definition: UPstream.H:77
error FatalError
Error stream (stdout output on all processes), with additional &#39;FOAM FATAL ERROR&#39; header text and sta...
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:608
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: BitOps.H:56
static void accessAndFlip(List< T > &output, const UList< T > &values, const labelUList &map, const bool hasFlip, const NegateOp &negOp)
Lookup field values at specified map indices and save after any flip negation operations.
void send(PstreamBuffers &pBufs, const List< T > &field) const
Do all sends using PstreamBuffers.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests)
static rangeType allProcs(const label communicator=worldComm)
Range of process indices for all processes.
Definition: UPstream.H:1188
A range or interval of labels defined by a start and a size.
Definition: labelRange.H:52
constexpr char nl
The newline &#39;\n&#39; character (0x0a)
Definition: Ostream.H:50
static bool & parRun() noexcept
Test if this a parallel run.
Definition: UPstream.H:1061
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition: ListI.H:168
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Can be negative if the process i...
Definition: UPstream.H:1086
void push_back(const T &val)
Append an element at the end of the list.
Definition: ListI.H:220
List< labelList > labelListList
List of labelList.
Definition: labelList.H:38
static void waitRequests()
Wait for all requests to finish.
Definition: UPstream.H:1561
UList< label > labelUList
A UList of labels.
Definition: UList.H:78
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition: HashOps.H:164
Input inter-processor communications stream using MPI send/recv etc. - operating on external buffer...
Definition: UIPstream.H:289
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run. ...
Definition: UPstream.H:1077
static void recv(Type &value, const int fromProcNo, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Receive and deserialize a value. Uses operator>> for de-serialization.
Definition: IPstream.H:81
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects...
Definition: DynamicList.H:51
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void reverseDistribute(const label constructSize, List< T > &values, const int tag=UPstream::msgType()) const
Reverse distribute data using default commsType and the default flip/negate operator.
IntType start() const noexcept
The (inclusive) lower value of the range.
Definition: IntRange.H:204
A list of pointers to objects of type <T>, without allocation/deallocation management of the pointers...
Definition: HashTable.H:106
errorManip< error > abort(error &err)
Definition: errorManip.H:139
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:105
Pair< label > labelPair
A pair of labels.
Definition: Pair.H:51
OBJstream os(runTime.globalPath()/outputName)
Buffers for inter-processor communications streams (UOPstream, UIPstream).
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
void receive(PstreamBuffers &pBufs, List< T > &field) const
Do all receives using PstreamBuffers.
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:397
static void flipAndCombine(List< T > &lhs, const UList< T > &rhs, const labelUList &map, const bool hasFlip, const CombineOp &cop, const NegateOp &negOp)
Combine field values (after any flip negation operation) into the specified mapped target locations...
A list of pointers to objects of type <T>, with allocation/deallocation management of the pointers...
Definition: List.H:55
A template class to specify that a data type can be considered as being contiguous in memory...
Definition: contiguous.H:70
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents to given processor.
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static Ostream & output(Ostream &os, const IntRange< T > &range)
Definition: IntRanges.C:44
List< label > labelList
A List of labels.
Definition: List.H:62
IntType size() const noexcept
The size of the range.
Definition: IntRange.H:194
"buffered" : (MPI_Bsend, MPI_Recv)
bool send()
Send buffer contents now and not in destructor [advanced usage]. Returns true on success.
Definition: OPstreams.C:84
Functor to negate primitives. Dummy for most other types.
Definition: flipOp.H:66
static void distribute(const UPstream::commsTypes commsType, const List< labelPair > &schedule, const label constructSize, const labelListList &subMap, const bool subHasFlip, const labelListList &constructMap, const bool constructHasFlip, List< T > &field, const T &nullValue, const CombineOp &cop, const NegateOp &negOp, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm)
Distribute combine data with specified combine operation and negate operator (for flips)...
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, UPstream::Request *req=nullptr)
Read buffer contents from given processor.
Definition: UIPstreamRead.C:35
static bool waitSomeRequests(const label pos, label len=-1, DynamicList< int > *indices=nullptr)
Wait until some requests (from position onwards) have finished. Corresponds to MPI_Waitsome() ...