41 #include "Tpetra_Distributor.hpp" 42 #include "Tpetra_Details_gathervPrint.hpp" 43 #include "Teuchos_StandardParameterEntryValidators.hpp" 44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp" 51 if (sendType == DISTRIBUTOR_ISEND) {
54 else if (sendType == DISTRIBUTOR_RSEND) {
57 else if (sendType == DISTRIBUTOR_SEND) {
60 else if (sendType == DISTRIBUTOR_SSEND) {
64 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid " 65 "EDistributorSendType enum value " << sendType <<
".");
73 case Details::DISTRIBUTOR_NOT_INITIALIZED:
74 return "Not initialized yet";
75 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76 return "By createFromSends";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78 return "By createFromRecvs";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80 return "By createReverseDistributor";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82 return "By copy constructor";
89 Teuchos::Array<std::string>
92 Teuchos::Array<std::string> sendTypes;
93 sendTypes.push_back (
"Isend");
94 sendTypes.push_back (
"Rsend");
95 sendTypes.push_back (
"Send");
96 sendTypes.push_back (
"Ssend");
106 const bool tpetraDistributorDebugDefault =
false;
108 const bool barrierBetween_default =
false;
110 const bool useDistinctTags_default =
true;
113 int Distributor::getTag (
const int pathTag)
const {
114 return useDistinctTags_ ? pathTag : comm_->getTag ();
118 #ifdef TPETRA_DISTRIBUTOR_TIMERS 119 void Distributor::makeTimers () {
120 const std::string name_doPosts3 =
"Tpetra::Distributor: doPosts(3)";
121 const std::string name_doPosts4 =
"Tpetra::Distributor: doPosts(4)";
122 const std::string name_doWaits =
"Tpetra::Distributor: doWaits";
123 const std::string name_doPosts3_recvs =
"Tpetra::Distributor: doPosts(3): recvs";
124 const std::string name_doPosts4_recvs =
"Tpetra::Distributor: doPosts(4): recvs";
125 const std::string name_doPosts3_barrier =
"Tpetra::Distributor: doPosts(3): barrier";
126 const std::string name_doPosts4_barrier =
"Tpetra::Distributor: doPosts(4): barrier";
127 const std::string name_doPosts3_sends =
"Tpetra::Distributor: doPosts(3): sends";
128 const std::string name_doPosts4_sends =
"Tpetra::Distributor: doPosts(4): sends";
130 timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
131 timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
132 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
133 timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
134 timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
135 timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
136 timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
137 timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
138 timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
140 #endif // TPETRA_DISTRIBUTOR_TIMERS 143 Distributor::init (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
144 const Teuchos::RCP<Teuchos::FancyOStream>& out,
145 const Teuchos::RCP<Teuchos::ParameterList>& plist)
147 this->out_ = out.is_null () ?
148 Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
149 if (! plist.is_null ()) {
153 #ifdef TPETRA_DISTRIBUTOR_TIMERS 155 #endif // TPETRA_DISTRIBUTOR_TIMERS 158 TEUCHOS_TEST_FOR_EXCEPTION
159 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: debug_ " 160 "is true but out_ (pointer to the output stream) is NULL. Please " 161 "report this bug to the Tpetra developers.");
162 Teuchos::OSTab tab (out_);
163 std::ostringstream os;
164 os << comm_->getRank ()
165 <<
": Distributor ctor done" << std::endl;
172 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
173 , sendType_ (
Details::DISTRIBUTOR_SEND)
174 , barrierBetween_ (barrierBetween_default)
175 , debug_ (tpetraDistributorDebugDefault)
176 , selfMessage_ (false)
180 , totalReceiveLength_ (0)
181 , lastRoundBytesSend_ (0)
182 , lastRoundBytesRecv_ (0)
183 , useDistinctTags_ (useDistinctTags_default)
185 init (comm, Teuchos::null, Teuchos::null);
189 const Teuchos::RCP<Teuchos::FancyOStream>& out)
191 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
192 , sendType_ (
Details::DISTRIBUTOR_SEND)
193 , barrierBetween_ (barrierBetween_default)
194 , debug_ (tpetraDistributorDebugDefault)
195 , selfMessage_ (false)
199 , totalReceiveLength_ (0)
200 , lastRoundBytesSend_ (0)
201 , lastRoundBytesRecv_ (0)
202 , useDistinctTags_ (useDistinctTags_default)
204 init (comm, out, Teuchos::null);
208 const Teuchos::RCP<Teuchos::ParameterList>& plist)
210 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
211 , sendType_ (
Details::DISTRIBUTOR_SEND)
212 , barrierBetween_ (barrierBetween_default)
213 , debug_ (tpetraDistributorDebugDefault)
214 , selfMessage_ (false)
218 , totalReceiveLength_ (0)
219 , lastRoundBytesSend_ (0)
220 , lastRoundBytesRecv_ (0)
221 , useDistinctTags_ (useDistinctTags_default)
223 init (comm, Teuchos::null, plist);
227 const Teuchos::RCP<Teuchos::FancyOStream>& out,
228 const Teuchos::RCP<Teuchos::ParameterList>& plist)
230 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
231 , sendType_ (
Details::DISTRIBUTOR_SEND)
232 , barrierBetween_ (barrierBetween_default)
233 , debug_ (tpetraDistributorDebugDefault)
234 , selfMessage_ (false)
238 , totalReceiveLength_ (0)
239 , lastRoundBytesSend_ (0)
240 , lastRoundBytesRecv_ (0)
241 , useDistinctTags_ (useDistinctTags_default)
243 init (comm, out, plist);
247 : comm_ (distributor.comm_)
248 , out_ (distributor.out_)
249 , howInitialized_ (
Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
250 , sendType_ (distributor.sendType_)
251 , barrierBetween_ (distributor.barrierBetween_)
252 , debug_ (distributor.debug_)
253 , selfMessage_ (distributor.selfMessage_)
254 , numSends_ (distributor.numSends_)
255 , procsTo_ (distributor.procsTo_)
256 , startsTo_ (distributor.startsTo_)
257 , lengthsTo_ (distributor.lengthsTo_)
258 , maxSendLength_ (distributor.maxSendLength_)
259 , indicesTo_ (distributor.indicesTo_)
260 , numReceives_ (distributor.numReceives_)
261 , totalReceiveLength_ (distributor.totalReceiveLength_)
262 , lengthsFrom_ (distributor.lengthsFrom_)
263 , procsFrom_ (distributor.procsFrom_)
264 , startsFrom_ (distributor.startsFrom_)
265 , indicesFrom_ (distributor.indicesFrom_)
266 , reverseDistributor_ (distributor.reverseDistributor_)
267 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
268 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
269 , useDistinctTags_ (distributor.useDistinctTags_)
271 using Teuchos::ParameterList;
272 using Teuchos::parameterList;
282 RCP<const ParameterList> rhsList = distributor.getParameterList ();
283 if (! rhsList.is_null ()) {
284 this->setMyParamList (parameterList (* rhsList));
287 #ifdef TPETRA_DISTRIBUTOR_TIMERS 289 #endif // TPETRA_DISTRIBUTOR_TIMERS 292 TEUCHOS_TEST_FOR_EXCEPTION
293 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: debug_ " 294 "is true but out_ (pointer to the output stream) is NULL. Please " 295 "report this bug to the Tpetra developers.");
296 Teuchos::OSTab tab (out_);
297 std::ostringstream os;
298 os << comm_->getRank ()
299 <<
": Distributor copy ctor done" << std::endl;
305 using Teuchos::ParameterList;
306 using Teuchos::parameterList;
309 std::swap (comm_, rhs.comm_);
310 std::swap (out_, rhs.out_);
311 std::swap (howInitialized_, rhs.howInitialized_);
312 std::swap (sendType_, rhs.sendType_);
313 std::swap (barrierBetween_, rhs.barrierBetween_);
314 std::swap (debug_, rhs.debug_);
315 std::swap (selfMessage_, rhs.selfMessage_);
316 std::swap (numSends_, rhs.numSends_);
317 std::swap (procsTo_, rhs.procsTo_);
318 std::swap (startsTo_, rhs.startsTo_);
319 std::swap (lengthsTo_, rhs.lengthsTo_);
320 std::swap (maxSendLength_, rhs.maxSendLength_);
321 std::swap (indicesTo_, rhs.indicesTo_);
322 std::swap (numReceives_, rhs.numReceives_);
323 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
324 std::swap (lengthsFrom_, rhs.lengthsFrom_);
325 std::swap (procsFrom_, rhs.procsFrom_);
326 std::swap (startsFrom_, rhs.startsFrom_);
327 std::swap (indicesFrom_, rhs.indicesFrom_);
328 std::swap (reverseDistributor_, rhs.reverseDistributor_);
329 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
330 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
331 std::swap (useDistinctTags_, rhs.useDistinctTags_);
335 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
336 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
337 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
338 rhsList = parameterList (*rhsList);
340 if (! rhsList.is_null ()) {
341 this->setMyParamList (rhsList);
343 if (! lhsList.is_null ()) {
344 rhs.setMyParamList (lhsList);
355 TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
356 "Tpetra::Distributor: Destructor called with " << requests_.size()
357 <<
" outstanding posts (unfulfilled communication requests). There " 358 "should be none at this point. Please report this bug to the Tpetra " 365 using Teuchos::FancyOStream;
366 using Teuchos::getIntegralValue;
367 using Teuchos::includesVerbLevel;
368 using Teuchos::OSTab;
369 using Teuchos::ParameterList;
370 using Teuchos::parameterList;
375 plist->validateParametersAndSetDefaults (*validParams);
377 const bool barrierBetween =
378 plist->get<
bool> (
"Barrier between receives and sends");
380 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
381 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
382 const bool debug = plist->get<
bool> (
"Debug");
387 const bool enable_cuda_rdma =
388 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
389 TEUCHOS_TEST_FOR_EXCEPTION
390 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::" 391 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA " 392 "support\" = false. This is no longer valid. You don't need to " 393 "specify this option any more; Tpetra assumes it is always true. " 394 "This is a very light assumption on the MPI implementation, and in " 395 "fact does not actually involve hardware or system RDMA support. " 396 "Tpetra just assumes that the MPI implementation can tell whether a " 397 "pointer points to host memory or CUDA device memory.");
404 TEUCHOS_TEST_FOR_EXCEPTION(
405 ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
406 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
407 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier " 408 "between receives and sends." << endl <<
"This is invalid; you must " 409 "include the barrier if you use ready sends." << endl <<
"Ready sends " 410 "require that their corresponding receives have already been posted, " 411 "and the only way to guarantee that in general is with a barrier.");
414 sendType_ = sendType;
415 barrierBetween_ = barrierBetween;
416 useDistinctTags_ = useDistinctTags;
421 this->setMyParamList (plist);
424 Teuchos::RCP<const Teuchos::ParameterList>
427 using Teuchos::Array;
428 using Teuchos::ParameterList;
429 using Teuchos::parameterList;
431 using Teuchos::setStringToIntegralParameter;
433 const bool barrierBetween = barrierBetween_default;
434 const bool useDistinctTags = useDistinctTags_default;
435 const bool debug = tpetraDistributorDebugDefault;
438 const std::string defaultSendType (
"Send");
439 Array<Details::EDistributorSendType> sendTypeEnums;
440 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
441 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
442 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
443 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
445 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
446 plist->set (
"Barrier between receives and sends", barrierBetween,
447 "Whether to execute a barrier between receives and sends in do" 448 "[Reverse]Posts(). Required for correctness when \"Send type\"" 449 "=\"Rsend\", otherwise correct but not recommended.");
450 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
451 defaultSendType,
"When using MPI, the variant of send to use in " 452 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
453 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct " 454 "MPI message tags for different code paths. Highly recommended" 455 " to avoid message collisions.");
456 plist->set (
"Debug", debug,
"Whether to print copious debugging output on " 458 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can " 459 "tell whether a pointer points to host memory or CUDA device " 460 "memory. You don't need to specify this option any more; " 461 "Tpetra assumes it is always true. This is a very light " 462 "assumption on the MPI implementation, and in fact does not " 463 "actually involve hardware or system RDMA support.");
471 Teuchos::setupVerboseObjectSublist (&*plist);
472 return Teuchos::rcp_const_cast<
const ParameterList> (plist);
477 {
return totalReceiveLength_; }
480 {
return numReceives_; }
483 {
return selfMessage_; }
486 {
return numSends_; }
489 {
return maxSendLength_; }
492 {
return procsFrom_; }
495 {
return lengthsFrom_; }
501 {
return lengthsTo_; }
503 Teuchos::RCP<Distributor>
505 if (reverseDistributor_.is_null ()) {
506 createReverseDistributor ();
508 TEUCHOS_TEST_FOR_EXCEPTION
509 (reverseDistributor_.is_null (), std::logic_error,
"The reverse " 510 "Distributor is null after createReverseDistributor returned. " 511 "Please report this bug to the Tpetra developers.");
512 return reverseDistributor_;
517 Distributor::createReverseDistributor()
const 519 reverseDistributor_ = Teuchos::rcp (
new Distributor (comm_, out_));
520 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
521 reverseDistributor_->sendType_ = sendType_;
522 reverseDistributor_->barrierBetween_ = barrierBetween_;
523 reverseDistributor_->debug_ = debug_;
528 size_t totalSendLength =
529 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
534 size_t maxReceiveLength = 0;
535 const int myProcID = comm_->getRank();
536 for (
size_t i=0; i < numReceives_; ++i) {
537 if (procsFrom_[i] != myProcID) {
539 if (lengthsFrom_[i] > maxReceiveLength) {
540 maxReceiveLength = lengthsFrom_[i];
549 reverseDistributor_->selfMessage_ = selfMessage_;
550 reverseDistributor_->numSends_ = numReceives_;
551 reverseDistributor_->procsTo_ = procsFrom_;
552 reverseDistributor_->startsTo_ = startsFrom_;
553 reverseDistributor_->lengthsTo_ = lengthsFrom_;
554 reverseDistributor_->maxSendLength_ = maxReceiveLength;
555 reverseDistributor_->indicesTo_ = indicesFrom_;
556 reverseDistributor_->numReceives_ = numSends_;
557 reverseDistributor_->totalReceiveLength_ = totalSendLength;
558 reverseDistributor_->lengthsFrom_ = lengthsTo_;
559 reverseDistributor_->procsFrom_ = procsTo_;
560 reverseDistributor_->startsFrom_ = startsTo_;
561 reverseDistributor_->indicesFrom_ = indicesTo_;
570 reverseDistributor_->lastRoundBytesSend_ = 0;
571 reverseDistributor_->lastRoundBytesRecv_ = 0;
573 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
586 reverseDistributor_->reverseDistributor_ = Teuchos::null;
591 using Teuchos::Array;
592 using Teuchos::CommRequest;
593 using Teuchos::FancyOStream;
594 using Teuchos::includesVerbLevel;
595 using Teuchos::is_null;
596 using Teuchos::OSTab;
598 using Teuchos::waitAll;
601 Teuchos::OSTab tab (out_);
603 #ifdef TPETRA_DISTRIBUTOR_TIMERS 604 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
605 #endif // TPETRA_DISTRIBUTOR_TIMERS 607 const int myRank = comm_->getRank ();
610 std::ostringstream os;
611 os << myRank <<
": doWaits: # reqs = " 612 << requests_.size () << endl;
616 if (requests_.size() > 0) {
617 waitAll (*comm_, requests_());
619 #ifdef HAVE_TEUCHOS_DEBUG 621 for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
622 it != requests_.end(); ++it)
624 TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
625 Teuchos::typeName(*
this) <<
"::doWaits(): Communication requests " 626 "should all be null aftr calling Teuchos::waitAll() on them, but " 627 "at least one request is not null.");
629 #endif // HAVE_TEUCHOS_DEBUG 632 requests_.resize (0);
635 #ifdef HAVE_TEUCHOS_DEBUG 637 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
638 int globalSizeNonzero = 0;
639 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
641 Teuchos::outArg (globalSizeNonzero));
642 TEUCHOS_TEST_FOR_EXCEPTION(
643 globalSizeNonzero != 0, std::runtime_error,
644 "Tpetra::Distributor::doWaits: After waitAll, at least one process has " 645 "a nonzero number of outstanding posts. There should be none at this " 646 "point. Please report this bug to the Tpetra developers.");
648 #endif // HAVE_TEUCHOS_DEBUG 651 std::ostringstream os;
652 os << myRank <<
": doWaits done" << endl;
659 if (! reverseDistributor_.is_null()) {
660 reverseDistributor_->doWaits();
665 std::ostringstream out;
667 out <<
"\"Tpetra::Distributor\": {";
668 const std::string label = this->getObjectLabel ();
670 out <<
"Label: " << label <<
", ";
672 out <<
"How initialized: " 676 << DistributorSendTypeEnumToString (sendType_)
677 <<
", Barrier between receives and sends: " 678 << (barrierBetween_ ?
"true" :
"false")
679 <<
", Use distinct tags: " 680 << (useDistinctTags_ ?
"true" :
"false")
681 <<
", Debug: " << (debug_ ?
"true" :
"false")
688 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const 690 using Teuchos::toString;
691 using Teuchos::VERB_HIGH;
692 using Teuchos::VERB_EXTREME;
696 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
697 return std::string ();
700 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
701 auto outp = Teuchos::getFancyOStream (outStringP);
702 Teuchos::FancyOStream& out = *outp;
704 const int myRank = comm_->getRank ();
705 const int numProcs = comm_->getSize ();
706 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
707 Teuchos::OSTab tab1 (out);
711 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
712 out <<
"procsTo: " << toString (procsTo_) << endl;
713 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
716 if (vl == VERB_EXTREME) {
717 out <<
"startsTo: " << toString (startsTo_) << endl;
718 out <<
"indicesTo: " << toString (indicesTo_) << endl;
720 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
723 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
724 out <<
"startsFrom: " << toString (startsFrom_) << endl;
725 out <<
"procsFrom: " << toString (procsFrom_) << endl;
729 return outStringP->str ();
735 const Teuchos::EVerbosityLevel verbLevel)
const 738 using Teuchos::VERB_DEFAULT;
739 using Teuchos::VERB_NONE;
740 using Teuchos::VERB_LOW;
741 using Teuchos::VERB_MEDIUM;
742 using Teuchos::VERB_HIGH;
743 using Teuchos::VERB_EXTREME;
744 const Teuchos::EVerbosityLevel vl =
745 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
747 if (vl == VERB_NONE) {
755 if (comm_.is_null ()) {
758 const int myRank = comm_->getRank ();
759 const int numProcs = comm_->getSize ();
768 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
774 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
777 out <<
"\"Tpetra::Distributor\":" << endl;
778 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
780 const std::string label = this->getObjectLabel ();
782 out <<
"Label: " << label << endl;
784 out <<
"Number of processes: " << numProcs << endl
785 <<
"How initialized: " 789 out <<
"Parameters: " << endl;
790 Teuchos::OSTab tab2 (out);
791 out <<
"\"Send type\": " 792 << DistributorSendTypeEnumToString (sendType_) << endl
793 <<
"\"Barrier between receives and sends\": " 794 << (barrierBetween_ ?
"true" :
"false") << endl
795 <<
"\"Use distinct tags\": " 796 << (useDistinctTags_ ?
"true" :
"false") << endl
797 <<
"\"Debug\": " << (debug_ ?
"true" :
"false") << endl;
803 const std::string lclStr = this->localDescribeToString (vl);
807 out <<
"Reverse Distributor:";
808 if (reverseDistributor_.is_null ()) {
809 out <<
" null" << endl;
813 reverseDistributor_->describe (out, vl);
818 Distributor::computeReceives ()
820 using Teuchos::Array;
821 using Teuchos::ArrayRCP;
823 using Teuchos::CommStatus;
824 using Teuchos::CommRequest;
825 using Teuchos::ireceive;
828 using Teuchos::REDUCE_SUM;
829 using Teuchos::receive;
830 using Teuchos::reduce;
831 using Teuchos::scatter;
833 using Teuchos::waitAll;
836 Teuchos::OSTab tab (out_);
837 const int myRank = comm_->getRank();
838 const int numProcs = comm_->getSize();
841 const int pathTag = 2;
842 const int tag = this->getTag (pathTag);
845 std::ostringstream os;
846 os << myRank <<
": computeReceives: " 847 "{selfMessage_: " << (selfMessage_ ?
"true" :
"false")
848 <<
", tag: " << tag <<
"}" << endl;
858 Array<int> toProcsFromMe (numProcs, 0);
859 #ifdef HAVE_TEUCHOS_DEBUG 860 bool counting_error =
false;
861 #endif // HAVE_TEUCHOS_DEBUG 862 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
863 #ifdef HAVE_TEUCHOS_DEBUG 864 if (toProcsFromMe[procsTo_[i]] != 0) {
865 counting_error =
true;
867 #endif // HAVE_TEUCHOS_DEBUG 868 toProcsFromMe[procsTo_[i]] = 1;
870 #ifdef HAVE_TEUCHOS_DEBUG 872 "Tpetra::Distributor::computeReceives: There was an error on at least " 873 "one process in counting the number of messages send by that process to " 874 "the other processs. Please report this bug to the Tpetra developers.",
876 #endif // HAVE_TEUCHOS_DEBUG 879 std::ostringstream os;
880 os << myRank <<
": computeReceives: Calling reduce and scatter" << endl;
937 Array<int> numRecvsOnEachProc;
938 if (myRank == root) {
939 numRecvsOnEachProc.resize (numProcs);
941 int numReceivesAsInt = 0;
942 reduce<int, int> (toProcsFromMe.getRawPtr (),
943 numRecvsOnEachProc.getRawPtr (),
944 numProcs, REDUCE_SUM, root, *comm_);
945 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
946 &numReceivesAsInt, 1, root, *comm_);
947 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
953 lengthsFrom_.assign (numReceives_, 0);
954 procsFrom_.assign (numReceives_, 0);
970 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
976 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
977 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
978 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
983 const int anySourceProc = MPI_ANY_SOURCE;
985 const int anySourceProc = -1;
989 std::ostringstream os;
990 os << myRank <<
": computeReceives: Posting " 991 << actualNumReceives <<
" irecvs" << endl;
996 for (
size_t i = 0; i < actualNumReceives; ++i) {
1001 lengthsFromBuffers[i].resize (1);
1002 lengthsFromBuffers[i][0] = as<size_t> (0);
1003 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1005 std::ostringstream os;
1006 os << myRank <<
": computeReceives: " 1007 "Posted any-proc irecv w/ specified tag " << tag << endl;
1013 std::ostringstream os;
1014 os << myRank <<
": computeReceives: " 1015 "posting " << numSends_ <<
" sends" << endl;
1026 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1027 if (procsTo_[i] != myRank) {
1031 const size_t*
const lengthsTo_i = &lengthsTo_[i];
1032 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1034 std::ostringstream os;
1035 os << myRank <<
": computeReceives: " 1036 "Posted send to Proc " << procsTo_[i] <<
" w/ specified tag " 1048 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1049 procsFrom_[numReceives_-1] = myRank;
1054 std::ostringstream os;
1055 os << myRank <<
": computeReceives: waitAll on " 1056 << requests.size () <<
" requests" << endl;
1065 waitAll (*comm_, requests (), statuses ());
1066 for (
size_t i = 0; i < actualNumReceives; ++i) {
1067 lengthsFrom_[i] = *lengthsFromBuffers[i];
1068 procsFrom_[i] = statuses[i]->getSourceRank ();
1074 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1077 totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1078 indicesFrom_.clear ();
1079 indicesFrom_.reserve (totalReceiveLength_);
1080 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1081 indicesFrom_.push_back(i);
1084 startsFrom_.clear ();
1085 startsFrom_.reserve (numReceives_);
1086 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1087 startsFrom_.push_back(j);
1088 j += lengthsFrom_[i];
1096 std::ostringstream os;
1097 os << myRank <<
": computeReceives: done" << endl;
1105 using Teuchos::outArg;
1106 using Teuchos::REDUCE_MAX;
1107 using Teuchos::reduceAll;
1110 Teuchos::OSTab tab (out_);
1111 const size_t numExports = exportProcIDs.size();
1112 const int myProcID = comm_->getRank();
1113 const int numProcs = comm_->getSize();
1115 std::ostringstream os;
1116 os << myProcID <<
": createFromSends" << endl;
1168 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1171 size_t numActive = 0;
1172 int needSendBuff = 0;
1174 #ifdef HAVE_TPETRA_DEBUG 1176 #endif // HAVE_TPETRA_DEBUG 1177 for (
size_t i = 0; i < numExports; ++i) {
1178 const int exportID = exportProcIDs[i];
1179 if (exportID >= numProcs) {
1180 #ifdef HAVE_TPETRA_DEBUG 1182 #endif // HAVE_TPETRA_DEBUG 1185 else if (exportID >= 0) {
1199 if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1206 #ifdef HAVE_TPETRA_DEBUG 1213 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1214 TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1215 Teuchos::typeName(*
this) <<
"::createFromSends(): Process " << gbl_badID
1216 <<
", perhaps among other processes, got a bad send process ID.");
1231 #endif // HAVE_TPETRA_DEBUG 1233 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS) 1235 int global_needSendBuff;
1236 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1237 outArg (global_needSendBuff));
1239 global_needSendBuff != 0, std::runtime_error,
1240 "::createFromSends: Grouping export IDs together by process rank often " 1241 "improves performance.");
1247 if (starts[myProcID] != 0) {
1248 selfMessage_ =
true;
1251 selfMessage_ =
false;
1254 #ifdef HAVE_TEUCHOS_DEBUG 1255 bool index_neq_numActive =
false;
1256 bool send_neq_numSends =
false;
1258 if (! needSendBuff) {
1263 for (
int i = 0; i < numProcs; ++i) {
1271 indicesTo_.resize(0);
1274 procsTo_.assign(numSends_,0);
1275 startsTo_.assign(numSends_,0);
1276 lengthsTo_.assign(numSends_,0);
1283 size_t index = 0, procIndex = 0;
1284 for (
size_t i = 0; i < numSends_; ++i) {
1285 while (exportProcIDs[procIndex] < 0) {
1288 startsTo_[i] = procIndex;
1289 int procID = exportProcIDs[procIndex];
1290 procsTo_[i] = procID;
1291 index += starts[procID];
1292 procIndex += starts[procID];
1294 #ifdef HAVE_TEUCHOS_DEBUG 1295 if (index != numActive) {
1296 index_neq_numActive =
true;
1302 if (numSends_ > 0) {
1303 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1307 for (
size_t i = 0; i < numSends_; ++i) {
1308 int procID = procsTo_[i];
1309 lengthsTo_[i] = starts[procID];
1310 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1311 maxSendLength_ = lengthsTo_[i];
1322 if (starts[0] == 0 ) {
1328 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1330 i != starts.end(); ++i)
1332 if (*i != 0) ++numSends_;
1338 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1339 i=starts.rbegin()+1;
1340 i != starts.rend(); ++i)
1349 indicesTo_.resize(numActive);
1351 for (
size_t i = 0; i < numExports; ++i) {
1352 if (exportProcIDs[i] >= 0) {
1354 indicesTo_[starts[exportProcIDs[i]]] = i;
1356 ++starts[exportProcIDs[i]];
1368 for (
int proc = numProcs-1; proc != 0; --proc) {
1369 starts[proc] = starts[proc-1];
1372 starts[numProcs] = numActive;
1379 procsTo_.resize(numSends_);
1380 startsTo_.resize(numSends_);
1381 lengthsTo_.resize(numSends_);
1388 for (
int proc = 0; proc < numProcs; ++proc ) {
1389 if (starts[proc+1] != starts[proc]) {
1390 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1391 startsTo_[snd] = starts[proc];
1393 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1394 maxSendLength_ = lengthsTo_[snd];
1396 procsTo_[snd] = proc;
1400 #ifdef HAVE_TEUCHOS_DEBUG 1401 if (snd != numSends_) {
1402 send_neq_numSends =
true;
1406 #ifdef HAVE_TEUCHOS_DEBUG 1408 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1410 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1413 if (selfMessage_) --numSends_;
1419 std::ostringstream os;
1420 os << myProcID <<
": createFromSends: done" << endl;
1426 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1428 return totalReceiveLength_;
1434 const Teuchos::ArrayView<const int>& remoteProcIDs)
1443 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1446 int myProcID = comm_->getRank ();
1447 int numProcs = comm_->getSize();
1449 const size_t numExportIDs = exportProcIDs.size();
1450 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1452 size_t numActive = 0;
1453 int needSendBuff = 0;
1455 for(
size_t i = 0; i < numExportIDs; i++ )
1457 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1459 if( exportProcIDs[i] >= 0 )
1461 ++starts[ exportProcIDs[i] ];
1466 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1472 if (starts[0] == 0 ) {
1478 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1480 i != starts.end(); ++i)
1482 if (*i != 0) ++numSends_;
1488 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1489 i=starts.rbegin()+1;
1490 i != starts.rend(); ++i)
1499 indicesTo_.resize(numActive);
1501 for (
size_t i = 0; i < numExportIDs; ++i) {
1502 if (exportProcIDs[i] >= 0) {
1504 indicesTo_[starts[exportProcIDs[i]]] = i;
1506 ++starts[exportProcIDs[i]];
1509 for (
int proc = numProcs-1; proc != 0; --proc) {
1510 starts[proc] = starts[proc-1];
1513 starts[numProcs] = numActive;
1514 procsTo_.resize(numSends_);
1515 startsTo_.resize(numSends_);
1516 lengthsTo_.resize(numSends_);
1519 for (
int proc = 0; proc < numProcs; ++proc ) {
1520 if (starts[proc+1] != starts[proc]) {
1521 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1522 startsTo_[snd] = starts[proc];
1524 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1525 maxSendLength_ = lengthsTo_[snd];
1527 procsTo_[snd] = proc;
1537 for (
int i = 0; i < numProcs; ++i) {
1545 indicesTo_.resize(0);
1548 procsTo_.assign(numSends_,0);
1549 startsTo_.assign(numSends_,0);
1550 lengthsTo_.assign(numSends_,0);
1557 size_t index = 0, procIndex = 0;
1558 for (
size_t i = 0; i < numSends_; ++i) {
1559 while (exportProcIDs[procIndex] < 0) {
1562 startsTo_[i] = procIndex;
1563 int procID = exportProcIDs[procIndex];
1564 procsTo_[i] = procID;
1565 index += starts[procID];
1566 procIndex += starts[procID];
1571 if (numSends_ > 0) {
1572 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1576 for (
size_t i = 0; i < numSends_; ++i) {
1577 int procID = procsTo_[i];
1578 lengthsTo_[i] = starts[procID];
1579 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1580 maxSendLength_ = lengthsTo_[i];
1586 numSends_ -= selfMessage_;
1587 std::vector<int> recv_list;
1588 recv_list.reserve(numSends_);
1591 for(
int i=0; i<remoteProcIDs.size(); i++) {
1592 if(remoteProcIDs[i]>last_pid) {
1593 recv_list.push_back(remoteProcIDs[i]);
1594 last_pid = remoteProcIDs[i];
1596 else if (remoteProcIDs[i]<last_pid)
1597 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1599 numReceives_ = recv_list.size();
1601 procsFrom_.assign(numReceives_,0);
1602 lengthsFrom_.assign(numReceives_,0);
1603 indicesFrom_.assign(numReceives_,0);
1604 startsFrom_.assign(numReceives_,0);
1606 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1608 procsFrom_[i] = recv_list[i];
1610 for( ; j<(size_t)remoteProcIDs.size() &&
1611 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1612 lengthsFrom_[i] = j-jlast;
1614 totalReceiveLength_ = remoteProcIDs.size();
1615 indicesFrom_.clear ();
1616 indicesFrom_.reserve (totalReceiveLength_);
1617 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1618 indicesFrom_.push_back(i);
1621 numReceives_-=selfMessage_;
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumReceives() const
The number of processes from which we will receive data.
size_t getNumSends() const
The number of processes to which we will send data.
Implementation details of Tpetra.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Sets up and executes a communication plan for a Tpetra DistObject.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
virtual ~Distributor()
Destructor (virtual for memory safety).
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
std::string description() const
Return a one-line description of this object.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.