Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
37 //
38 // ************************************************************************
39 // @HEADER
40 
41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Teuchos_StandardParameterEntryValidators.hpp"
44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
45 
46 namespace Tpetra {
47  namespace Details {
48  std::string
50  {
51  if (sendType == DISTRIBUTOR_ISEND) {
52  return "Isend";
53  }
54  else if (sendType == DISTRIBUTOR_RSEND) {
55  return "Rsend";
56  }
57  else if (sendType == DISTRIBUTOR_SEND) {
58  return "Send";
59  }
60  else if (sendType == DISTRIBUTOR_SSEND) {
61  return "Ssend";
62  }
63  else {
64  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
65  "EDistributorSendType enum value " << sendType << ".");
66  }
67  }
68 
69  std::string
71  {
72  switch (how) {
73  case Details::DISTRIBUTOR_NOT_INITIALIZED:
74  return "Not initialized yet";
75  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76  return "By createFromSends";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78  return "By createFromRecvs";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80  return "By createReverseDistributor";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82  return "By copy constructor";
83  default:
84  return "INVALID";
85  }
86  }
87  } // namespace Details
88 
89  Teuchos::Array<std::string>
91  {
92  Teuchos::Array<std::string> sendTypes;
93  sendTypes.push_back ("Isend");
94  sendTypes.push_back ("Rsend");
95  sendTypes.push_back ("Send");
96  sendTypes.push_back ("Ssend");
97  return sendTypes;
98  }
99 
100  // We set default values of Distributor's Boolean parameters here,
101  // in this one place. That way, if we want to change the default
102  // value of a parameter, we don't have to search the whole file to
103  // ensure a consistent setting.
104  namespace {
105  // Default value of the "Debug" parameter.
106  const bool tpetraDistributorDebugDefault = false;
107  // Default value of the "Barrier between receives and sends" parameter.
108  const bool barrierBetween_default = false;
109  // Default value of the "Use distinct tags" parameter.
110  const bool useDistinctTags_default = true;
111  } // namespace (anonymous)
112 
113  int Distributor::getTag (const int pathTag) const {
114  return useDistinctTags_ ? pathTag : comm_->getTag ();
115  }
116 
117 
118 #ifdef TPETRA_DISTRIBUTOR_TIMERS
119  void Distributor::makeTimers () {
120  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
121  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
122  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
123  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
124  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
125  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
126  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
127  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
128  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
129 
130  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
131  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
132  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
133  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
134  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
135  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
136  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
137  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
138  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
139  }
140 #endif // TPETRA_DISTRIBUTOR_TIMERS
141 
142  void
143  Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
144  const Teuchos::RCP<Teuchos::FancyOStream>& out,
145  const Teuchos::RCP<Teuchos::ParameterList>& plist)
146  {
147  this->out_ = out.is_null () ?
148  Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
149  if (! plist.is_null ()) {
150  this->setParameterList (plist);
151  }
152 
153 #ifdef TPETRA_DISTRIBUTOR_TIMERS
154  makeTimers ();
155 #endif // TPETRA_DISTRIBUTOR_TIMERS
156 
157  if (debug_) {
158  TEUCHOS_TEST_FOR_EXCEPTION
159  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: debug_ "
160  "is true but out_ (pointer to the output stream) is NULL. Please "
161  "report this bug to the Tpetra developers.");
162  Teuchos::OSTab tab (out_);
163  std::ostringstream os;
164  os << comm_->getRank ()
165  << ": Distributor ctor done" << std::endl;
166  *out_ << os.str ();
167  }
168  }
169 
170  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
171  : comm_ (comm)
172  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
173  , sendType_ (Details::DISTRIBUTOR_SEND)
174  , barrierBetween_ (barrierBetween_default)
175  , debug_ (tpetraDistributorDebugDefault)
176  , selfMessage_ (false)
177  , numSends_ (0)
178  , maxSendLength_ (0)
179  , numReceives_ (0)
180  , totalReceiveLength_ (0)
181  , lastRoundBytesSend_ (0)
182  , lastRoundBytesRecv_ (0)
183  , useDistinctTags_ (useDistinctTags_default)
184  {
185  init (comm, Teuchos::null, Teuchos::null);
186  }
187 
188  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
189  const Teuchos::RCP<Teuchos::FancyOStream>& out)
190  : comm_ (comm)
191  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
192  , sendType_ (Details::DISTRIBUTOR_SEND)
193  , barrierBetween_ (barrierBetween_default)
194  , debug_ (tpetraDistributorDebugDefault)
195  , selfMessage_ (false)
196  , numSends_ (0)
197  , maxSendLength_ (0)
198  , numReceives_ (0)
199  , totalReceiveLength_ (0)
200  , lastRoundBytesSend_ (0)
201  , lastRoundBytesRecv_ (0)
202  , useDistinctTags_ (useDistinctTags_default)
203  {
204  init (comm, out, Teuchos::null);
205  }
206 
207  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
208  const Teuchos::RCP<Teuchos::ParameterList>& plist)
209  : comm_ (comm)
210  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
211  , sendType_ (Details::DISTRIBUTOR_SEND)
212  , barrierBetween_ (barrierBetween_default)
213  , debug_ (tpetraDistributorDebugDefault)
214  , selfMessage_ (false)
215  , numSends_ (0)
216  , maxSendLength_ (0)
217  , numReceives_ (0)
218  , totalReceiveLength_ (0)
219  , lastRoundBytesSend_ (0)
220  , lastRoundBytesRecv_ (0)
221  , useDistinctTags_ (useDistinctTags_default)
222  {
223  init (comm, Teuchos::null, plist);
224  }
225 
226  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
227  const Teuchos::RCP<Teuchos::FancyOStream>& out,
228  const Teuchos::RCP<Teuchos::ParameterList>& plist)
229  : comm_ (comm)
230  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
231  , sendType_ (Details::DISTRIBUTOR_SEND)
232  , barrierBetween_ (barrierBetween_default)
233  , debug_ (tpetraDistributorDebugDefault)
234  , selfMessage_ (false)
235  , numSends_ (0)
236  , maxSendLength_ (0)
237  , numReceives_ (0)
238  , totalReceiveLength_ (0)
239  , lastRoundBytesSend_ (0)
240  , lastRoundBytesRecv_ (0)
241  , useDistinctTags_ (useDistinctTags_default)
242  {
243  init (comm, out, plist);
244  }
245 
246  Distributor::Distributor (const Distributor & distributor)
247  : comm_ (distributor.comm_)
248  , out_ (distributor.out_)
249  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
250  , sendType_ (distributor.sendType_)
251  , barrierBetween_ (distributor.barrierBetween_)
252  , debug_ (distributor.debug_)
253  , selfMessage_ (distributor.selfMessage_)
254  , numSends_ (distributor.numSends_)
255  , procsTo_ (distributor.procsTo_)
256  , startsTo_ (distributor.startsTo_)
257  , lengthsTo_ (distributor.lengthsTo_)
258  , maxSendLength_ (distributor.maxSendLength_)
259  , indicesTo_ (distributor.indicesTo_)
260  , numReceives_ (distributor.numReceives_)
261  , totalReceiveLength_ (distributor.totalReceiveLength_)
262  , lengthsFrom_ (distributor.lengthsFrom_)
263  , procsFrom_ (distributor.procsFrom_)
264  , startsFrom_ (distributor.startsFrom_)
265  , indicesFrom_ (distributor.indicesFrom_)
266  , reverseDistributor_ (distributor.reverseDistributor_)
267  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
268  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
269  , useDistinctTags_ (distributor.useDistinctTags_)
270  {
271  using Teuchos::ParameterList;
272  using Teuchos::parameterList;
273  using Teuchos::RCP;
274  using Teuchos::rcp;
275 
276  // Clone the right-hand side's ParameterList, so that this' list
277  // is decoupled from the right-hand side's list. We don't need to
278  // do validation, since the right-hand side already has validated
279  // its parameters, so just call setMyParamList(). Note that this
280  // won't work if the right-hand side doesn't have a list set yet,
281  // so we first check for null.
282  RCP<const ParameterList> rhsList = distributor.getParameterList ();
283  if (! rhsList.is_null ()) {
284  this->setMyParamList (parameterList (* rhsList));
285  }
286 
287 #ifdef TPETRA_DISTRIBUTOR_TIMERS
288  makeTimers ();
289 #endif // TPETRA_DISTRIBUTOR_TIMERS
290 
291  if (debug_) {
292  TEUCHOS_TEST_FOR_EXCEPTION
293  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: debug_ "
294  "is true but out_ (pointer to the output stream) is NULL. Please "
295  "report this bug to the Tpetra developers.");
296  Teuchos::OSTab tab (out_);
297  std::ostringstream os;
298  os << comm_->getRank ()
299  << ": Distributor copy ctor done" << std::endl;
300  *out_ << os.str ();
301  }
302  }
303 
305  using Teuchos::ParameterList;
306  using Teuchos::parameterList;
307  using Teuchos::RCP;
308 
309  std::swap (comm_, rhs.comm_);
310  std::swap (out_, rhs.out_);
311  std::swap (howInitialized_, rhs.howInitialized_);
312  std::swap (sendType_, rhs.sendType_);
313  std::swap (barrierBetween_, rhs.barrierBetween_);
314  std::swap (debug_, rhs.debug_);
315  std::swap (selfMessage_, rhs.selfMessage_);
316  std::swap (numSends_, rhs.numSends_);
317  std::swap (procsTo_, rhs.procsTo_);
318  std::swap (startsTo_, rhs.startsTo_);
319  std::swap (lengthsTo_, rhs.lengthsTo_);
320  std::swap (maxSendLength_, rhs.maxSendLength_);
321  std::swap (indicesTo_, rhs.indicesTo_);
322  std::swap (numReceives_, rhs.numReceives_);
323  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
324  std::swap (lengthsFrom_, rhs.lengthsFrom_);
325  std::swap (procsFrom_, rhs.procsFrom_);
326  std::swap (startsFrom_, rhs.startsFrom_);
327  std::swap (indicesFrom_, rhs.indicesFrom_);
328  std::swap (reverseDistributor_, rhs.reverseDistributor_);
329  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
330  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
331  std::swap (useDistinctTags_, rhs.useDistinctTags_);
332 
333  // Swap parameter lists. If they are the same object, make a deep
334  // copy first, so that modifying one won't modify the other one.
335  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
336  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
337  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
338  rhsList = parameterList (*rhsList);
339  }
340  if (! rhsList.is_null ()) {
341  this->setMyParamList (rhsList);
342  }
343  if (! lhsList.is_null ()) {
344  rhs.setMyParamList (lhsList);
345  }
346 
347  // We don't need to swap timers, because all instances of
348  // Distributor use the same timers.
349  }
350 
352  {
353  // We shouldn't have any outstanding communication requests at
354  // this point.
355  TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
356  "Tpetra::Distributor: Destructor called with " << requests_.size()
357  << " outstanding posts (unfulfilled communication requests). There "
358  "should be none at this point. Please report this bug to the Tpetra "
359  "developers.");
360  }
361 
362  void
363  Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
364  {
365  using Teuchos::FancyOStream;
366  using Teuchos::getIntegralValue;
367  using Teuchos::includesVerbLevel;
368  using Teuchos::OSTab;
369  using Teuchos::ParameterList;
370  using Teuchos::parameterList;
371  using Teuchos::RCP;
372  using std::endl;
373 
374  RCP<const ParameterList> validParams = getValidParameters ();
375  plist->validateParametersAndSetDefaults (*validParams);
376 
377  const bool barrierBetween =
378  plist->get<bool> ("Barrier between receives and sends");
379  const Details::EDistributorSendType sendType =
380  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
381  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
382  const bool debug = plist->get<bool> ("Debug");
383  {
384  // mfh 03 May 2016: We keep this option only for backwards
385  // compatibility, but it must always be true. See discussion of
386  // Github Issue #227.
387  const bool enable_cuda_rdma =
388  plist->get<bool> ("Enable MPI CUDA RDMA support");
389  TEUCHOS_TEST_FOR_EXCEPTION
390  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
391  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
392  "support\" = false. This is no longer valid. You don't need to "
393  "specify this option any more; Tpetra assumes it is always true. "
394  "This is a very light assumption on the MPI implementation, and in "
395  "fact does not actually involve hardware or system RDMA support. "
396  "Tpetra just assumes that the MPI implementation can tell whether a "
397  "pointer points to host memory or CUDA device memory.");
398  }
399 
400  // We check this property explicitly, since we haven't yet learned
401  // how to make a validator that can cross-check properties.
402  // Later, turn this into a validator so that it can be embedded in
403  // the valid ParameterList and used in Optika.
404  TEUCHOS_TEST_FOR_EXCEPTION(
405  ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
406  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
407  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
408  "between receives and sends." << endl << "This is invalid; you must "
409  "include the barrier if you use ready sends." << endl << "Ready sends "
410  "require that their corresponding receives have already been posted, "
411  "and the only way to guarantee that in general is with a barrier.");
412 
413  // Now that we've validated the input list, save the results.
414  sendType_ = sendType;
415  barrierBetween_ = barrierBetween;
416  useDistinctTags_ = useDistinctTags;
417  debug_ = debug;
418 
419  // ParameterListAcceptor semantics require pointer identity of the
420  // sublist passed to setParameterList(), so we save the pointer.
421  this->setMyParamList (plist);
422  }
423 
424  Teuchos::RCP<const Teuchos::ParameterList>
426  {
427  using Teuchos::Array;
428  using Teuchos::ParameterList;
429  using Teuchos::parameterList;
430  using Teuchos::RCP;
431  using Teuchos::setStringToIntegralParameter;
432 
433  const bool barrierBetween = barrierBetween_default;
434  const bool useDistinctTags = useDistinctTags_default;
435  const bool debug = tpetraDistributorDebugDefault;
436 
437  Array<std::string> sendTypes = distributorSendTypes ();
438  const std::string defaultSendType ("Send");
439  Array<Details::EDistributorSendType> sendTypeEnums;
440  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
441  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
442  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
443  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
444 
445  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
446  plist->set ("Barrier between receives and sends", barrierBetween,
447  "Whether to execute a barrier between receives and sends in do"
448  "[Reverse]Posts(). Required for correctness when \"Send type\""
449  "=\"Rsend\", otherwise correct but not recommended.");
450  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
451  defaultSendType, "When using MPI, the variant of send to use in "
452  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
453  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
454  "MPI message tags for different code paths. Highly recommended"
455  " to avoid message collisions.");
456  plist->set ("Debug", debug, "Whether to print copious debugging output on "
457  "all processes.");
458  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
459  "tell whether a pointer points to host memory or CUDA device "
460  "memory. You don't need to specify this option any more; "
461  "Tpetra assumes it is always true. This is a very light "
462  "assumption on the MPI implementation, and in fact does not "
463  "actually involve hardware or system RDMA support.");
464 
465  // mfh 24 Dec 2015: Tpetra no longer inherits from
466  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
467  // sublist. However, we retain the "VerboseObject" sublist
468  // anyway, for backwards compatibility (otherwise the above
469  // validation would fail with an invalid parameter name, should
470  // the user still want to provide this list).
471  Teuchos::setupVerboseObjectSublist (&*plist);
472  return Teuchos::rcp_const_cast<const ParameterList> (plist);
473  }
474 
475 
477  { return totalReceiveLength_; }
478 
480  { return numReceives_; }
481 
483  { return selfMessage_; }
484 
486  { return numSends_; }
487 
489  { return maxSendLength_; }
490 
491  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
492  { return procsFrom_; }
493 
494  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
495  { return lengthsFrom_; }
496 
497  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
498  { return procsTo_; }
499 
500  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
501  { return lengthsTo_; }
502 
503  Teuchos::RCP<Distributor>
505  if (reverseDistributor_.is_null ()) {
506  createReverseDistributor ();
507  }
508  TEUCHOS_TEST_FOR_EXCEPTION
509  (reverseDistributor_.is_null (), std::logic_error, "The reverse "
510  "Distributor is null after createReverseDistributor returned. "
511  "Please report this bug to the Tpetra developers.");
512  return reverseDistributor_;
513  }
514 
515 
516  void
517  Distributor::createReverseDistributor() const
518  {
519  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_, out_));
520  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
521  reverseDistributor_->sendType_ = sendType_;
522  reverseDistributor_->barrierBetween_ = barrierBetween_;
523  reverseDistributor_->debug_ = debug_;
524 
525  // The total length of all the sends of this Distributor. We
526  // calculate it because it's the total length of all the receives
527  // of the reverse Distributor.
528  size_t totalSendLength =
529  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
530 
531  // The maximum length of any of the receives of this Distributor.
532  // We calculate it because it's the maximum length of any of the
533  // sends of the reverse Distributor.
534  size_t maxReceiveLength = 0;
535  const int myProcID = comm_->getRank();
536  for (size_t i=0; i < numReceives_; ++i) {
537  if (procsFrom_[i] != myProcID) {
538  // Don't count receives for messages sent by myself to myself.
539  if (lengthsFrom_[i] > maxReceiveLength) {
540  maxReceiveLength = lengthsFrom_[i];
541  }
542  }
543  }
544 
545  // Initialize all of reverseDistributor's data members. This
546  // mainly just involves flipping "send" and "receive," or the
547  // equivalent "to" and "from."
548 
549  reverseDistributor_->selfMessage_ = selfMessage_;
550  reverseDistributor_->numSends_ = numReceives_;
551  reverseDistributor_->procsTo_ = procsFrom_;
552  reverseDistributor_->startsTo_ = startsFrom_;
553  reverseDistributor_->lengthsTo_ = lengthsFrom_;
554  reverseDistributor_->maxSendLength_ = maxReceiveLength;
555  reverseDistributor_->indicesTo_ = indicesFrom_;
556  reverseDistributor_->numReceives_ = numSends_;
557  reverseDistributor_->totalReceiveLength_ = totalSendLength;
558  reverseDistributor_->lengthsFrom_ = lengthsTo_;
559  reverseDistributor_->procsFrom_ = procsTo_;
560  reverseDistributor_->startsFrom_ = startsTo_;
561  reverseDistributor_->indicesFrom_ = indicesTo_;
562 
563  // requests_: Allocated on demand.
564  // reverseDistributor_: See note below
565 
566  // mfh 31 Mar 2016: These are statistics, kept on calls to
567  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
568  // when I started, and I didn't add them, so I don't know if they
569  // are accurate.
570  reverseDistributor_->lastRoundBytesSend_ = 0;
571  reverseDistributor_->lastRoundBytesRecv_ = 0;
572 
573  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
574 
575  // I am my reverse Distributor's reverse Distributor.
576  // Thus, it would be legit to do the following:
577  //
578  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
579  //
580  // (Note use of a "weak reference" to avoid a circular RCP
581  // dependency.) The only issue is that if users hold on to the
582  // reverse Distributor but let go of the forward one, this
583  // reference won't be valid anymore. However, the reverse
584  // Distributor is really an implementation detail of Distributor
585  // and not meant to be used directly, so we don't need to do this.
586  reverseDistributor_->reverseDistributor_ = Teuchos::null;
587  }
588 
589 
591  using Teuchos::Array;
592  using Teuchos::CommRequest;
593  using Teuchos::FancyOStream;
594  using Teuchos::includesVerbLevel;
595  using Teuchos::is_null;
596  using Teuchos::OSTab;
597  using Teuchos::RCP;
598  using Teuchos::waitAll;
599  using std::endl;
600 
601  Teuchos::OSTab tab (out_);
602 
603 #ifdef TPETRA_DISTRIBUTOR_TIMERS
604  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
605 #endif // TPETRA_DISTRIBUTOR_TIMERS
606 
607  const int myRank = comm_->getRank ();
608 
609  if (debug_) {
610  std::ostringstream os;
611  os << myRank << ": doWaits: # reqs = "
612  << requests_.size () << endl;
613  *out_ << os.str ();
614  }
615 
616  if (requests_.size() > 0) {
617  waitAll (*comm_, requests_());
618 
619 #ifdef HAVE_TEUCHOS_DEBUG
620  // Make sure that waitAll() nulled out all the requests.
621  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
622  it != requests_.end(); ++it)
623  {
624  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
625  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
626  "should all be null aftr calling Teuchos::waitAll() on them, but "
627  "at least one request is not null.");
628  }
629 #endif // HAVE_TEUCHOS_DEBUG
630  // Restore the invariant that requests_.size() is the number of
631  // outstanding nonblocking communication requests.
632  requests_.resize (0);
633  }
634 
635 #ifdef HAVE_TEUCHOS_DEBUG
636  {
637  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
638  int globalSizeNonzero = 0;
639  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
640  localSizeNonzero,
641  Teuchos::outArg (globalSizeNonzero));
642  TEUCHOS_TEST_FOR_EXCEPTION(
643  globalSizeNonzero != 0, std::runtime_error,
644  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
645  "a nonzero number of outstanding posts. There should be none at this "
646  "point. Please report this bug to the Tpetra developers.");
647  }
648 #endif // HAVE_TEUCHOS_DEBUG
649 
650  if (debug_) {
651  std::ostringstream os;
652  os << myRank << ": doWaits done" << endl;
653  *out_ << os.str ();
654  }
655  }
656 
658  // call doWaits() on the reverse Distributor, if it exists
659  if (! reverseDistributor_.is_null()) {
660  reverseDistributor_->doWaits();
661  }
662  }
663 
664  std::string Distributor::description () const {
665  std::ostringstream out;
666 
667  out << "\"Tpetra::Distributor\": {";
668  const std::string label = this->getObjectLabel ();
669  if (label != "") {
670  out << "Label: " << label << ", ";
671  }
672  out << "How initialized: "
674  << ", Parameters: {"
675  << "Send type: "
676  << DistributorSendTypeEnumToString (sendType_)
677  << ", Barrier between receives and sends: "
678  << (barrierBetween_ ? "true" : "false")
679  << ", Use distinct tags: "
680  << (useDistinctTags_ ? "true" : "false")
681  << ", Debug: " << (debug_ ? "true" : "false")
682  << "}}";
683  return out.str ();
684  }
685 
686  std::string
687  Distributor::
688  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
689  {
690  using Teuchos::toString;
691  using Teuchos::VERB_HIGH;
692  using Teuchos::VERB_EXTREME;
693  using std::endl;
694 
695  // This preserves current behavior of Distributor.
696  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
697  return std::string ();
698  }
699 
700  auto outStringP = Teuchos::rcp (new std::ostringstream ());
701  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
702  Teuchos::FancyOStream& out = *outp;
703 
704  const int myRank = comm_->getRank ();
705  const int numProcs = comm_->getSize ();
706  out << "Process " << myRank << " of " << numProcs << ":" << endl;
707  Teuchos::OSTab tab1 (out);
708 
709  out << "selfMessage: " << hasSelfMessage () << endl;
710  out << "numSends: " << getNumSends () << endl;
711  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
712  out << "procsTo: " << toString (procsTo_) << endl;
713  out << "lengthsTo: " << toString (lengthsTo_) << endl;
714  out << "maxSendLength: " << getMaxSendLength () << endl;
715  }
716  if (vl == VERB_EXTREME) {
717  out << "startsTo: " << toString (startsTo_) << endl;
718  out << "indicesTo: " << toString (indicesTo_) << endl;
719  }
720  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
721  out << "numReceives: " << getNumReceives () << endl;
722  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
723  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
724  out << "startsFrom: " << toString (startsFrom_) << endl;
725  out << "procsFrom: " << toString (procsFrom_) << endl;
726  }
727 
728  out.flush (); // make sure the ostringstream got everything
729  return outStringP->str ();
730  }
731 
732  void
734  describe (Teuchos::FancyOStream &out,
735  const Teuchos::EVerbosityLevel verbLevel) const
736  {
737  using std::endl;
738  using Teuchos::VERB_DEFAULT;
739  using Teuchos::VERB_NONE;
740  using Teuchos::VERB_LOW;
741  using Teuchos::VERB_MEDIUM;
742  using Teuchos::VERB_HIGH;
743  using Teuchos::VERB_EXTREME;
744  const Teuchos::EVerbosityLevel vl =
745  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
746 
747  if (vl == VERB_NONE) {
748  return; // don't print anything
749  }
750  // If this Distributor's Comm is null, then the the calling
751  // process does not participate in Distributor-related collective
752  // operations with the other processes. In that case, it is not
753  // even legal to call this method. The reasonable thing to do in
754  // that case is nothing.
755  if (comm_.is_null ()) {
756  return;
757  }
758  const int myRank = comm_->getRank ();
759  const int numProcs = comm_->getSize ();
760 
761  // Only Process 0 should touch the output stream, but this method
762  // in general may need to do communication. Thus, we may need to
763  // preserve the current tab level across multiple "if (myRank ==
764  // 0) { ... }" inner scopes. This is why we sometimes create
765  // OSTab instances by pointer, instead of by value. We only need
766  // to create them by pointer if the tab level must persist through
767  // multiple inner scopes.
768  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
769 
770  if (myRank == 0) {
771  // At every verbosity level but VERB_NONE, Process 0 prints.
772  // By convention, describe() always begins with a tab before
773  // printing.
774  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
775  // We quote the class name because it contains colons.
776  // This makes the output valid YAML.
777  out << "\"Tpetra::Distributor\":" << endl;
778  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
779 
780  const std::string label = this->getObjectLabel ();
781  if (label != "") {
782  out << "Label: " << label << endl;
783  }
784  out << "Number of processes: " << numProcs << endl
785  << "How initialized: "
787  << endl;
788  {
789  out << "Parameters: " << endl;
790  Teuchos::OSTab tab2 (out);
791  out << "\"Send type\": "
792  << DistributorSendTypeEnumToString (sendType_) << endl
793  << "\"Barrier between receives and sends\": "
794  << (barrierBetween_ ? "true" : "false") << endl
795  << "\"Use distinct tags\": "
796  << (useDistinctTags_ ? "true" : "false") << endl
797  << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
798  }
799  } // if myRank == 0
800 
801  // This is collective over the Map's communicator.
802  if (vl > VERB_LOW) {
803  const std::string lclStr = this->localDescribeToString (vl);
804  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
805  }
806 
807  out << "Reverse Distributor:";
808  if (reverseDistributor_.is_null ()) {
809  out << " null" << endl;
810  }
811  else {
812  out << endl;
813  reverseDistributor_->describe (out, vl);
814  }
815  }
816 
817  void
818  Distributor::computeReceives ()
819  {
820  using Teuchos::Array;
821  using Teuchos::ArrayRCP;
822  using Teuchos::as;
823  using Teuchos::CommStatus;
824  using Teuchos::CommRequest;
825  using Teuchos::ireceive;
826  using Teuchos::RCP;
827  using Teuchos::rcp;
828  using Teuchos::REDUCE_SUM;
829  using Teuchos::receive;
830  using Teuchos::reduce;
831  using Teuchos::scatter;
832  using Teuchos::send;
833  using Teuchos::waitAll;
834  using std::endl;
835 
836  Teuchos::OSTab tab (out_);
837  const int myRank = comm_->getRank();
838  const int numProcs = comm_->getSize();
839 
840  // MPI tag for nonblocking receives and blocking sends in this method.
841  const int pathTag = 2;
842  const int tag = this->getTag (pathTag);
843 
844  if (debug_) {
845  std::ostringstream os;
846  os << myRank << ": computeReceives: "
847  "{selfMessage_: " << (selfMessage_ ? "true" : "false")
848  << ", tag: " << tag << "}" << endl;
849  *out_ << os.str ();
850  }
851 
852  // toProcsFromMe[i] == the number of messages sent by this process
853  // to process i. The data in numSends_, procsTo_, and lengthsTo_
854  // concern the contiguous sends. Therefore, each process will be
855  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
856  // either be 0 or 1.
857  {
858  Array<int> toProcsFromMe (numProcs, 0);
859 #ifdef HAVE_TEUCHOS_DEBUG
860  bool counting_error = false;
861 #endif // HAVE_TEUCHOS_DEBUG
862  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
863 #ifdef HAVE_TEUCHOS_DEBUG
864  if (toProcsFromMe[procsTo_[i]] != 0) {
865  counting_error = true;
866  }
867 #endif // HAVE_TEUCHOS_DEBUG
868  toProcsFromMe[procsTo_[i]] = 1;
869  }
870 #ifdef HAVE_TEUCHOS_DEBUG
871  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
872  "Tpetra::Distributor::computeReceives: There was an error on at least "
873  "one process in counting the number of messages send by that process to "
874  "the other processs. Please report this bug to the Tpetra developers.",
875  *comm_);
876 #endif // HAVE_TEUCHOS_DEBUG
877 
878  if (debug_) {
879  std::ostringstream os;
880  os << myRank << ": computeReceives: Calling reduce and scatter" << endl;
881  *out_ << os.str ();
882  }
883 
884  // Compute the number of receives that this process needs to
885  // post. The number of receives includes any self sends (i.e.,
886  // messages sent by this process to itself).
887  //
888  // (We will use numReceives_ this below to post exactly that
889  // number of receives, with MPI_ANY_SOURCE as the sending rank.
890  // This will tell us from which processes this process expects
891  // to receive, and how many packets of data we expect to receive
892  // from each process.)
893  //
894  // toProcsFromMe[i] is the number of messages sent by this
895  // process to process i. Compute the sum (elementwise) of all
896  // the toProcsFromMe arrays on all processes in the
897  // communicator. If the array x is that sum, then if this
898  // process has rank j, x[j] is the number of messages sent
899  // to process j, that is, the number of receives on process j
900  // (including any messages sent by process j to itself).
901  //
902  // Yes, this requires storing and operating on an array of
903  // length P, where P is the number of processes in the
904  // communicator. Epetra does this too. Avoiding this O(P)
905  // memory bottleneck would require some research.
906  //
907  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
908  // implement this O(P) memory algorithm.
909  //
910  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
911  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
912  // Then, scatter the latter, so that each process p gets
913  // numRecvsOnEachProc[p].
914  //
915  // 2. Like #1, but use MPI_Reduce_scatter instead of
916  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
917  // optimized to reduce the number of messages, but
918  // MPI_Reduce_scatter is more general than we need (it
919  // allows the equivalent of MPI_Scatterv). See Bug 6336.
920  //
921  // 3. Do an all-reduce on toProcsFromMe, and let my process
922  // (with rank myRank) get numReceives_ from
923  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
924  // all-reduce method.
925  //
926  // Approaches 1 and 3 have the same critical path length.
927  // However, #3 moves more data. This is because the final
928  // result is just one integer, but #3 moves a whole array of
929  // results to all the processes. This is why we use Approach 1
930  // here.
931  //
932  // mfh 12 Apr 2013: See discussion in createFromSends() about
933  // how we could use this communication to propagate an error
934  // flag for "free" in a release build.
935 
936  const int root = 0; // rank of root process of the reduction
937  Array<int> numRecvsOnEachProc; // temp; only needed on root
938  if (myRank == root) {
939  numRecvsOnEachProc.resize (numProcs);
940  }
941  int numReceivesAsInt = 0; // output
942  reduce<int, int> (toProcsFromMe.getRawPtr (),
943  numRecvsOnEachProc.getRawPtr (),
944  numProcs, REDUCE_SUM, root, *comm_);
945  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
946  &numReceivesAsInt, 1, root, *comm_);
947  numReceives_ = static_cast<size_t> (numReceivesAsInt);
948  }
949 
950  // Now we know numReceives_, which is this process' number of
951  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
952  // with this number of entries.
953  lengthsFrom_.assign (numReceives_, 0);
954  procsFrom_.assign (numReceives_, 0);
955 
956  //
957  // Ask (via nonblocking receive) each process from which we are
958  // receiving how many packets we should expect from it in the
959  // communication pattern.
960  //
961 
962  // At this point, numReceives_ includes any self message that
963  // there may be. At the end of this routine, we'll subtract off
964  // the self message (if there is one) from numReceives_. In this
965  // routine, we don't need to receive a message from ourselves in
966  // order to figure out our lengthsFrom_ and source process ID; we
967  // can just ask ourselves directly. Thus, the actual number of
968  // nonblocking receives we post here does not include the self
969  // message.
970  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
971 
972  // Teuchos' wrapper for nonblocking receives requires receive
973  // buffers that it knows won't go away. This is why we use RCPs,
974  // one RCP per nonblocking receive request. They get allocated in
975  // the loop below.
976  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
977  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
978  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
979 
980  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
981  // (receive data from any process).
982 #ifdef HAVE_MPI
983  const int anySourceProc = MPI_ANY_SOURCE;
984 #else
985  const int anySourceProc = -1;
986 #endif
987 
988  if (debug_) {
989  std::ostringstream os;
990  os << myRank << ": computeReceives: Posting "
991  << actualNumReceives << " irecvs" << endl;
992  *out_ << os.str ();
993  }
994 
995  // Post the (nonblocking) receives.
996  for (size_t i = 0; i < actualNumReceives; ++i) {
997  // Once the receive completes, we can ask the corresponding
998  // CommStatus object (output by wait()) for the sending process'
999  // ID (which we'll assign to procsFrom_[i] -- don't forget to
1000  // do that!).
1001  lengthsFromBuffers[i].resize (1);
1002  lengthsFromBuffers[i][0] = as<size_t> (0);
1003  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1004  if (debug_) {
1005  std::ostringstream os;
1006  os << myRank << ": computeReceives: "
1007  "Posted any-proc irecv w/ specified tag " << tag << endl;
1008  *out_ << os.str ();
1009  }
1010  }
1011 
1012  if (debug_) {
1013  std::ostringstream os;
1014  os << myRank << ": computeReceives: "
1015  "posting " << numSends_ << " sends" << endl;
1016  *out_ << os.str ();
1017  }
1018  // Post the sends: Tell each process to which we are sending how
1019  // many packets it should expect from us in the communication
1020  // pattern. We could use nonblocking sends here, as long as we do
1021  // a waitAll() on all the sends and receives at once.
1022  //
1023  // We assume that numSends_ and selfMessage_ have already been
1024  // set. The value of numSends_ (my process' number of sends) does
1025  // not include any message that it might send to itself.
1026  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1027  if (procsTo_[i] != myRank) {
1028  // Send a message to procsTo_[i], telling that process that
1029  // this communication pattern will send that process
1030  // lengthsTo_[i] blocks of packets.
1031  const size_t* const lengthsTo_i = &lengthsTo_[i];
1032  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1033  if (debug_) {
1034  std::ostringstream os;
1035  os << myRank << ": computeReceives: "
1036  "Posted send to Proc " << procsTo_[i] << " w/ specified tag "
1037  << tag << endl;
1038  *out_ << os.str ();
1039  }
1040  }
1041  else {
1042  // We don't need a send in the self-message case. If this
1043  // process will send a message to itself in the communication
1044  // pattern, then the last element of lengthsFrom_ and
1045  // procsFrom_ corresponds to the self-message. Of course
1046  // this process knows how long the message is, and the process
1047  // ID is its own process ID.
1048  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1049  procsFrom_[numReceives_-1] = myRank;
1050  }
1051  }
1052 
1053  if (debug_) {
1054  std::ostringstream os;
1055  os << myRank << ": computeReceives: waitAll on "
1056  << requests.size () << " requests" << endl;
1057  *out_ << os.str ();
1058  }
1059  //
1060  // Wait on all the receives. When they arrive, check the status
1061  // output of wait() for the receiving process ID, unpack the
1062  // request buffers into lengthsFrom_, and set procsFrom_ from the
1063  // status.
1064  //
1065  waitAll (*comm_, requests (), statuses ());
1066  for (size_t i = 0; i < actualNumReceives; ++i) {
1067  lengthsFrom_[i] = *lengthsFromBuffers[i];
1068  procsFrom_[i] = statuses[i]->getSourceRank ();
1069  }
1070 
1071  // Sort the procsFrom_ array, and apply the same permutation to
1072  // lengthsFrom_. This ensures that procsFrom_[i] and
1073  // lengthsFrom_[i] refers to the same thing.
1074  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1075 
1076  // Compute indicesFrom_
1077  totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1078  indicesFrom_.clear ();
1079  indicesFrom_.reserve (totalReceiveLength_);
1080  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1081  indicesFrom_.push_back(i);
1082  }
1083 
1084  startsFrom_.clear ();
1085  startsFrom_.reserve (numReceives_);
1086  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1087  startsFrom_.push_back(j);
1088  j += lengthsFrom_[i];
1089  }
1090 
1091  if (selfMessage_) {
1092  --numReceives_;
1093  }
1094 
1095  if (debug_) {
1096  std::ostringstream os;
1097  os << myRank << ": computeReceives: done" << endl;
1098  *out_ << os.str ();
1099  }
1100  }
1101 
1102  size_t
1103  Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportProcIDs)
1104  {
1105  using Teuchos::outArg;
1106  using Teuchos::REDUCE_MAX;
1107  using Teuchos::reduceAll;
1108  using std::endl;
1109 
1110  Teuchos::OSTab tab (out_);
1111  const size_t numExports = exportProcIDs.size();
1112  const int myProcID = comm_->getRank();
1113  const int numProcs = comm_->getSize();
1114  if (debug_) {
1115  std::ostringstream os;
1116  os << myProcID << ": createFromSends" << endl;
1117  *out_ << os.str ();
1118  }
1119 
1120  // exportProcIDs tells us the communication pattern for this
1121  // distributor. It dictates the way that the export data will be
1122  // interpreted in doPosts(). We want to perform at most one
1123  // send per process in doPosts; this is for two reasons:
1124  // * minimize latency / overhead in the comm routines (nice)
1125  // * match the number of receives and sends between processes
1126  // (necessary)
1127  //
1128  // Teuchos::Comm requires that the data for a send are contiguous
1129  // in a send buffer. Therefore, if the data in the send buffer
1130  // for doPosts() are not contiguous, they will need to be copied
1131  // into a contiguous buffer. The user has specified this
1132  // noncontiguous pattern and we can't do anything about it.
1133  // However, if they do not provide an efficient pattern, we will
1134  // warn them if one of the following compile-time options has been
1135  // set:
1136  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1137  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1138  //
1139  // If the data are contiguous, then we can post the sends in situ
1140  // (i.e., without needing to copy them into a send buffer).
1141  //
1142  // Determine contiguity. There are a number of ways to do this:
1143  // * If the export IDs are sorted, then all exports to a
1144  // particular proc must be contiguous. This is what Epetra does.
1145  // * If the export ID of the current export already has been
1146  // listed, then the previous listing should correspond to the
1147  // same export. This tests contiguity, but not sortedness.
1148  //
1149  // Both of these tests require O(n), where n is the number of
1150  // exports. However, the latter will positively identify a greater
1151  // portion of contiguous patterns. We use the latter method.
1152  //
1153  // Check to see if values are grouped by procs without gaps
1154  // If so, indices_to -> 0.
1155 
1156  // Set up data structures for quick traversal of arrays.
1157  // This contains the number of sends for each process ID.
1158  //
1159  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1160  // that create an array of length the number of processes in the
1161  // communicator (plus one). Given how this code uses this array,
1162  // it should be straightforward to replace it with a hash table or
1163  // some other more space-efficient data structure. In practice,
1164  // most of the entries of starts should be zero for a sufficiently
1165  // large process count, unless the communication pattern is dense.
1166  // Note that it's important to be able to iterate through keys (i
1167  // for which starts[i] is nonzero) in increasing order.
1168  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1169 
1170  // numActive is the number of sends that are not Null
1171  size_t numActive = 0;
1172  int needSendBuff = 0; // Boolean
1173 
1174 #ifdef HAVE_TPETRA_DEBUG
1175  int badID = -1; // only used in a debug build
1176 #endif // HAVE_TPETRA_DEBUG
1177  for (size_t i = 0; i < numExports; ++i) {
1178  const int exportID = exportProcIDs[i];
1179  if (exportID >= numProcs) {
1180 #ifdef HAVE_TPETRA_DEBUG
1181  badID = myProcID;
1182 #endif // HAVE_TPETRA_DEBUG
1183  break;
1184  }
1185  else if (exportID >= 0) {
1186  // exportID is a valid process ID. Increment the number of
1187  // messages this process will send to that process.
1188  ++starts[exportID];
1189 
1190  // If we're sending more than one message to process exportID,
1191  // then it is possible that the data are not contiguous.
1192  // Check by seeing if the previous process ID in the list
1193  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1194  // because if starts[exportID] > 1, then i must be > 1 (since
1195  // the starts array was filled with zeros initially).
1196 
1197  // null entries break continuity.
1198  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1199  if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1200  needSendBuff = 1;
1201  }
1202  ++numActive;
1203  }
1204  }
1205 
1206 #ifdef HAVE_TPETRA_DEBUG
1207  // Test whether any process in the communicator got an invalid
1208  // process ID. If badID != -1 on this process, then it equals
1209  // this process' rank. The max of all badID over all processes is
1210  // the max rank which has an invalid process ID.
1211  {
1212  int gbl_badID;
1213  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1214  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1215  Teuchos::typeName(*this) << "::createFromSends(): Process " << gbl_badID
1216  << ", perhaps among other processes, got a bad send process ID.");
1217  }
1218 #else
1219  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1220  // ignoring this information, we should think about how to pass it
1221  // along so that all the processes find out about it. In a
1222  // release build with efficiency warnings turned off, the next
1223  // collective communication happens in computeReceives(). We
1224  // could figure out how to encode the error flag in that
1225  // operation, for example by adding an extra entry to the
1226  // collective's output array that encodes the error condition (0
1227  // on all processes if no error, else 1 on any process with the
1228  // error, so that the sum will produce a nonzero value if any
1229  // process had an error). I'll defer this change for now and
1230  // recommend instead that people with troubles try a debug build.
1231 #endif // HAVE_TPETRA_DEBUG
1232 
1233 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1234  {
1235  int global_needSendBuff;
1236  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1237  outArg (global_needSendBuff));
1239  global_needSendBuff != 0, std::runtime_error,
1240  "::createFromSends: Grouping export IDs together by process rank often "
1241  "improves performance.");
1242  }
1243 #endif
1244 
1245  // Determine from the caller's data whether or not the current
1246  // process should send (a) message(s) to itself.
1247  if (starts[myProcID] != 0) {
1248  selfMessage_ = true;
1249  }
1250  else {
1251  selfMessage_ = false;
1252  }
1253 
1254 #ifdef HAVE_TEUCHOS_DEBUG
1255  bool index_neq_numActive = false;
1256  bool send_neq_numSends = false;
1257 #endif
1258  if (! needSendBuff) {
1259  // grouped by proc, no send buffer or indicesTo_ needed
1260  numSends_ = 0;
1261  // Count total number of sends, i.e., total number of procs to
1262  // which we are sending. This includes myself, if applicable.
1263  for (int i = 0; i < numProcs; ++i) {
1264  if (starts[i]) {
1265  ++numSends_;
1266  }
1267  }
1268 
1269  // Not only do we not need these, but we must clear them, as
1270  // empty status of indicesTo is a flag used later.
1271  indicesTo_.resize(0);
1272  // Size these to numSends_; note, at the moment, numSends_
1273  // includes self sends. Set their values to zeros.
1274  procsTo_.assign(numSends_,0);
1275  startsTo_.assign(numSends_,0);
1276  lengthsTo_.assign(numSends_,0);
1277 
1278  // set startsTo to the offset for each send (i.e., each proc ID)
1279  // set procsTo to the proc ID for each send
1280  // in interpreting this code, remember that we are assuming contiguity
1281  // that is why index skips through the ranks
1282  {
1283  size_t index = 0, procIndex = 0;
1284  for (size_t i = 0; i < numSends_; ++i) {
1285  while (exportProcIDs[procIndex] < 0) {
1286  ++procIndex; // skip all negative proc IDs
1287  }
1288  startsTo_[i] = procIndex;
1289  int procID = exportProcIDs[procIndex];
1290  procsTo_[i] = procID;
1291  index += starts[procID];
1292  procIndex += starts[procID];
1293  }
1294 #ifdef HAVE_TEUCHOS_DEBUG
1295  if (index != numActive) {
1296  index_neq_numActive = true;
1297  }
1298 #endif
1299  }
1300  // sort the startsTo and proc IDs together, in ascending order, according
1301  // to proc IDs
1302  if (numSends_ > 0) {
1303  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1304  }
1305  // compute the maximum send length
1306  maxSendLength_ = 0;
1307  for (size_t i = 0; i < numSends_; ++i) {
1308  int procID = procsTo_[i];
1309  lengthsTo_[i] = starts[procID];
1310  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1311  maxSendLength_ = lengthsTo_[i];
1312  }
1313  }
1314  }
1315  else {
1316  // not grouped by proc, need send buffer and indicesTo_
1317 
1318  // starts[i] is the number of sends to proc i
1319  // numActive equals number of sends total, \sum_i starts[i]
1320 
1321  // this loop starts at starts[1], so explicitly check starts[0]
1322  if (starts[0] == 0 ) {
1323  numSends_ = 0;
1324  }
1325  else {
1326  numSends_ = 1;
1327  }
1328  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1329  im1=starts.begin();
1330  i != starts.end(); ++i)
1331  {
1332  if (*i != 0) ++numSends_;
1333  *i += *im1;
1334  im1 = i;
1335  }
1336  // starts[i] now contains the number of exports to procs 0 through i
1337 
1338  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1339  i=starts.rbegin()+1;
1340  i != starts.rend(); ++i)
1341  {
1342  *ip1 = *i;
1343  ip1 = i;
1344  }
1345  starts[0] = 0;
1346  // starts[i] now contains the number of exports to procs 0 through
1347  // i-1, i.e., all procs before proc i
1348 
1349  indicesTo_.resize(numActive);
1350 
1351  for (size_t i = 0; i < numExports; ++i) {
1352  if (exportProcIDs[i] >= 0) {
1353  // record the offset to the sendBuffer for this export
1354  indicesTo_[starts[exportProcIDs[i]]] = i;
1355  // now increment the offset for this proc
1356  ++starts[exportProcIDs[i]];
1357  }
1358  }
1359  // our send buffer will contain the export data for each of the procs
1360  // we communicate with, in order by proc id
1361  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1362  // indicesTo now maps each export to the location in our send buffer
1363  // associated with the export
1364  // data for export i located at sendBuffer[indicesTo[i]]
1365  //
1366  // starts[i] once again contains the number of exports to
1367  // procs 0 through i
1368  for (int proc = numProcs-1; proc != 0; --proc) {
1369  starts[proc] = starts[proc-1];
1370  }
1371  starts.front() = 0;
1372  starts[numProcs] = numActive;
1373  //
1374  // starts[proc] once again contains the number of exports to
1375  // procs 0 through proc-1
1376  // i.e., the start of my data in the sendBuffer
1377 
1378  // this contains invalid data at procs we don't care about, that is okay
1379  procsTo_.resize(numSends_);
1380  startsTo_.resize(numSends_);
1381  lengthsTo_.resize(numSends_);
1382 
1383  // for each group of sends/exports, record the destination proc,
1384  // the length, and the offset for this send into the
1385  // send buffer (startsTo_)
1386  maxSendLength_ = 0;
1387  size_t snd = 0;
1388  for (int proc = 0; proc < numProcs; ++proc ) {
1389  if (starts[proc+1] != starts[proc]) {
1390  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1391  startsTo_[snd] = starts[proc];
1392  // record max length for all off-proc sends
1393  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1394  maxSendLength_ = lengthsTo_[snd];
1395  }
1396  procsTo_[snd] = proc;
1397  ++snd;
1398  }
1399  }
1400 #ifdef HAVE_TEUCHOS_DEBUG
1401  if (snd != numSends_) {
1402  send_neq_numSends = true;
1403  }
1404 #endif
1405  }
1406 #ifdef HAVE_TEUCHOS_DEBUG
1407  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1408  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1409  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1410  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1411 #endif
1412 
1413  if (selfMessage_) --numSends_;
1414 
1415  // Invert map to see what msgs are received and what length
1416  computeReceives();
1417 
1418  if (debug_) {
1419  std::ostringstream os;
1420  os << myProcID << ": createFromSends: done" << endl;
1421  *out_ << os.str ();
1422  }
1423 
1424  // createFromRecvs() calls createFromSends(), but will set
1425  // howInitialized_ again after calling createFromSends().
1426  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1427 
1428  return totalReceiveLength_;
1429  }
1430 
1431  void
1433  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1434  const Teuchos::ArrayView<const int>& remoteProcIDs)
1435  {
1436  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1437  // an entry for each GID. If the export/remoteProcIDs is taken from
1438  // the getProcs{From|To} lists that are extracted from a previous distributor,
1439  // it will generate a wrong answer, because those lists have a unique entry
1440  // for each processor id. A version of this with lengthsTo and lengthsFrom
1441  // should be made.
1442 
1443  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1444 
1445 
1446  int myProcID = comm_->getRank ();
1447  int numProcs = comm_->getSize();
1448 
1449  const size_t numExportIDs = exportProcIDs.size();
1450  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1451 
1452  size_t numActive = 0;
1453  int needSendBuff = 0; // Boolean
1454 
1455  for(size_t i = 0; i < numExportIDs; i++ )
1456  {
1457  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1458  needSendBuff = 1;
1459  if( exportProcIDs[i] >= 0 )
1460  {
1461  ++starts[ exportProcIDs[i] ];
1462  ++numActive;
1463  }
1464  }
1465 
1466  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1467 
1468  numSends_ = 0;
1469 
1470  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1471  {
1472  if (starts[0] == 0 ) {
1473  numSends_ = 0;
1474  }
1475  else {
1476  numSends_ = 1;
1477  }
1478  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1479  im1=starts.begin();
1480  i != starts.end(); ++i)
1481  {
1482  if (*i != 0) ++numSends_;
1483  *i += *im1;
1484  im1 = i;
1485  }
1486  // starts[i] now contains the number of exports to procs 0 through i
1487 
1488  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1489  i=starts.rbegin()+1;
1490  i != starts.rend(); ++i)
1491  {
1492  *ip1 = *i;
1493  ip1 = i;
1494  }
1495  starts[0] = 0;
1496  // starts[i] now contains the number of exports to procs 0 through
1497  // i-1, i.e., all procs before proc i
1498 
1499  indicesTo_.resize(numActive);
1500 
1501  for (size_t i = 0; i < numExportIDs; ++i) {
1502  if (exportProcIDs[i] >= 0) {
1503  // record the offset to the sendBuffer for this export
1504  indicesTo_[starts[exportProcIDs[i]]] = i;
1505  // now increment the offset for this proc
1506  ++starts[exportProcIDs[i]];
1507  }
1508  }
1509  for (int proc = numProcs-1; proc != 0; --proc) {
1510  starts[proc] = starts[proc-1];
1511  }
1512  starts.front() = 0;
1513  starts[numProcs] = numActive;
1514  procsTo_.resize(numSends_);
1515  startsTo_.resize(numSends_);
1516  lengthsTo_.resize(numSends_);
1517  maxSendLength_ = 0;
1518  size_t snd = 0;
1519  for (int proc = 0; proc < numProcs; ++proc ) {
1520  if (starts[proc+1] != starts[proc]) {
1521  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1522  startsTo_[snd] = starts[proc];
1523  // record max length for all off-proc sends
1524  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1525  maxSendLength_ = lengthsTo_[snd];
1526  }
1527  procsTo_[snd] = proc;
1528  ++snd;
1529  }
1530  }
1531  }
1532  else {
1533  // grouped by proc, no send buffer or indicesTo_ needed
1534  numSends_ = 0;
1535  // Count total number of sends, i.e., total number of procs to
1536  // which we are sending. This includes myself, if applicable.
1537  for (int i = 0; i < numProcs; ++i) {
1538  if (starts[i]) {
1539  ++numSends_;
1540  }
1541  }
1542 
1543  // Not only do we not need these, but we must clear them, as
1544  // empty status of indicesTo is a flag used later.
1545  indicesTo_.resize(0);
1546  // Size these to numSends_; note, at the moment, numSends_
1547  // includes self sends. Set their values to zeros.
1548  procsTo_.assign(numSends_,0);
1549  startsTo_.assign(numSends_,0);
1550  lengthsTo_.assign(numSends_,0);
1551 
1552  // set startsTo to the offset for each send (i.e., each proc ID)
1553  // set procsTo to the proc ID for each send
1554  // in interpreting this code, remember that we are assuming contiguity
1555  // that is why index skips through the ranks
1556  {
1557  size_t index = 0, procIndex = 0;
1558  for (size_t i = 0; i < numSends_; ++i) {
1559  while (exportProcIDs[procIndex] < 0) {
1560  ++procIndex; // skip all negative proc IDs
1561  }
1562  startsTo_[i] = procIndex;
1563  int procID = exportProcIDs[procIndex];
1564  procsTo_[i] = procID;
1565  index += starts[procID];
1566  procIndex += starts[procID];
1567  }
1568  }
1569  // sort the startsTo and proc IDs together, in ascending order, according
1570  // to proc IDs
1571  if (numSends_ > 0) {
1572  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1573  }
1574  // compute the maximum send length
1575  maxSendLength_ = 0;
1576  for (size_t i = 0; i < numSends_; ++i) {
1577  int procID = procsTo_[i];
1578  lengthsTo_[i] = starts[procID];
1579  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1580  maxSendLength_ = lengthsTo_[i];
1581  }
1582  }
1583  }
1584 
1585 
1586  numSends_ -= selfMessage_;
1587  std::vector<int> recv_list;
1588  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1589 
1590  int last_pid=-2;
1591  for(int i=0; i<remoteProcIDs.size(); i++) {
1592  if(remoteProcIDs[i]>last_pid) {
1593  recv_list.push_back(remoteProcIDs[i]);
1594  last_pid = remoteProcIDs[i];
1595  }
1596  else if (remoteProcIDs[i]<last_pid)
1597  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1598  }
1599  numReceives_ = recv_list.size();
1600  if(numReceives_) {
1601  procsFrom_.assign(numReceives_,0);
1602  lengthsFrom_.assign(numReceives_,0);
1603  indicesFrom_.assign(numReceives_,0);
1604  startsFrom_.assign(numReceives_,0);
1605  }
1606  for(size_t i=0,j=0; i<numReceives_; ++i) {
1607  int jlast=j;
1608  procsFrom_[i] = recv_list[i];
1609  startsFrom_[i] = j;
1610  for( ; j<(size_t)remoteProcIDs.size() &&
1611  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1612  lengthsFrom_[i] = j-jlast;
1613  }
1614  totalReceiveLength_ = remoteProcIDs.size();
1615  indicesFrom_.clear ();
1616  indicesFrom_.reserve (totalReceiveLength_);
1617  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1618  indicesFrom_.push_back(i);
1619  }
1620 
1621  numReceives_-=selfMessage_;
1622  }
1623 
1624 } // namespace Tpetra
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumReceives() const
The number of processes from which we will receive data.
size_t getNumSends() const
The number of processes to which we will send data.
Implementation details of Tpetra.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Sets up and executes a communication plan for a Tpetra DistObject.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
virtual ~Distributor()
Destructor (virtual for memory safety).
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
std::string description() const
Return a one-line description of this object.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.