Skip to content

Commit e5ee95e

Browse files
xush6528facebook-github-bot
authored andcommitted
[RPC] Add to confirmed users immediately if the fork is shared from owner, instead of adding nothing to pending users (#34988)
Summary: Pull Request resolved: #34988 In #31893, we introduced a confirmedUsers_ map in RRefContext. For the case that the fork is shared from the owner, there is no `pendingUsers_` intermediate phase for this fork, we should put this fork into `confirmedUsers_` immediately. Test Plan: ``` buck test mode/dev-nosan //caffe2/test/distributed/rpc:rpc_fork ``` ``` buck test mode/dev-nosan //caffe2/test/distributed/rpc/jit:rpc_fork ``` Differential Revision: D7735909 fbshipit-source-id: 14c36a16486f0cc9618dcfb111fe5223781b647d
1 parent b8e043a commit e5ee95e

2 files changed

Lines changed: 19 additions & 4 deletions

File tree

torch/csrc/distributed/rpc/rref_context.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ void RRefContext::notifyOwnerAndParentOfFork(
344344
const ForkId& forkId,
345345
worker_id_t parent,
346346
const c10::intrusive_ptr<RRef>& rref) {
347+
// Fork is shared from owner.
347348
if (parent == rref->owner()) {
348349
if (parent == agent_->getWorkerInfo().id_) {
349350
// Owner sending RRef to self, remove the forkId as it was added during
@@ -365,10 +366,13 @@ void RRefContext::notifyOwnerAndParentOfFork(
365366
// Hence, it is not necessary to send another RREF_CHILD_ACCEPT or
366367
// RREF_FORK_REQUEST back to the owner. See Note [Early Fork
367368
// Registration].
369+
std::lock_guard<std::mutex> lock(mutex_);
370+
addConfirmedUser(forkId, rref);
368371
}
369372
return;
370373
}
371374

375+
// Fork is shared from user.
372376
if (rref->isOwner()) {
373377
// See Note [Useful Phantom Fork ID for User to Owner Call]
374378
// In this case, the owner is the caller, and it does not add the fork id
@@ -494,17 +498,25 @@ void RRefContext::delPendingUser(const ForkId& forkId) {
494498
// hiding the subtle logic using a reentrant lock.
495499
deletedState = iter->second; // Increase refcount
496500

497-
confirmedUsers_.emplace(
498-
std::piecewise_construct,
499-
std::forward_as_tuple(forkId),
500-
std::forward_as_tuple(iter->second->rref_));
501+
addConfirmedUser(forkId, iter->second->rref_);
501502
pendingUsers_.erase(iter); // Decrease refcount.
502503
}
503504
deletedState->confirm();
504505
deleteAllUsersCV_.notify_all();
505506
deletedState.reset(); // Decrease refcount.
506507
}
507508

509+
void RRefContext::addConfirmedUser(
510+
const ForkId& forkId,
511+
const c10::intrusive_ptr<RRef>& rref) {
512+
// Notice, caller need to hold the mutex for confirmedUsers_.
513+
// std::lock_guard<std::mutex> lock(mutex_);
514+
confirmedUsers_.emplace(
515+
std::piecewise_construct,
516+
std::forward_as_tuple(forkId),
517+
std::forward_as_tuple(rref));
518+
}
519+
508520
void RRefContext::recordThreadLocalPendingRRefs() {
509521
TORCH_INTERNAL_ASSERT(
510522
userTable_.empty(),

torch/csrc/distributed/rpc/rref_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ class TORCH_API RRefContext {
147147
const ForkId& forkId,
148148
const c10::intrusive_ptr<RRef>& rref);
149149
void delPendingUser(const ForkId& forkId);
150+
void addConfirmedUser(
151+
const ForkId& forkId,
152+
const c10::intrusive_ptr<RRef>& rref);
150153

151154
// Start recroding new pending UserRRefs. All pending UserRRefs introduced
152155
// after this point will be put into the thread_local userTable_, which will

0 commit comments

Comments
 (0)