Track newly created actor's parent actor#5098
Track newly created actor's parent actor#5098stephanie-wang merged 17 commits intoray-project:masterfrom
Conversation
src/ray/raylet/node_manager.cc
Outdated
| auto parent_task_id = task.GetTaskSpecification().ParentTaskId(); | ||
| RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( | ||
| JobID::Nil(), parent_task_id, | ||
| //success_callback |
There was a problem hiding this comment.
Our convention is to use /success_callback=/ for comments that describe a kwarg.
src/ray/raylet/node_manager.cc
Outdated
| // get the parent actor id. | ||
| auto message = flatbuffers::GetRoot<protocol::Task>(parent_task_data.task().data()); | ||
| Task parent_task(*message); | ||
| new_actor_data.set_parent_actor_id(parent_task.GetTaskSpecification().ActorId().Binary()); |
There was a problem hiding this comment.
It's a bit ugly, but there are actually two cases where an actor might start another actor, either in a task for the parent actor (which you have here), or during the parent actor's creation task. You can check for the latter with parent_task.GetTaskSpecification().IsActorCreationTask(), and the actor ID will be parent_task.GetTaskSpecification().ActorCreationId().
src/ray/raylet/node_manager.cc
Outdated
| if(!resumed_from_checkpoint){ | ||
| // The actor was not resumed from a checkpoint. We extend the actor's | ||
| // frontier as usual since there is no frontier to restore. | ||
| ExtendActorFrontier(task, actor_id, actor_handle_id); |
There was a problem hiding this comment.
Can we move this if statement into FinishAssignedActorTaskHelper?
src/ray/raylet/node_manager.cc
Outdated
| } | ||
|
|
||
| if (!resumed_from_checkpoint) { | ||
| else if (!resumed_from_checkpoint) { |
There was a problem hiding this comment.
This looks like it could just be an else (you would never have resumed_from_checkpoint for regular actor tasks). Also, please move to the previous line.
There was a problem hiding this comment.
The if condition existed before, hence I stayed with it. Can remove if it's unnecessary.
There was a problem hiding this comment.
Yeah it used to be necessary because we also wanted to call ExtendActorFrontier for actor creation tasks that didn't resume from a checkpoint, but now we're calling that in the callback for the actor table lookup. So we can just make this an else now.
src/ray/raylet/node_manager.cc
Outdated
| } | ||
| } | ||
|
|
||
| void NodeManager::ExtendActorFrontier(const Task &task, ActorID &actor_id, ActorHandleID &actor_handle_id){ |
There was a problem hiding this comment.
This looks like it only needs to take in task.GetTaskSpecification().ActorDummyObject(), not the whole task. This would allow us to avoid capturing task in FinishAssignedActorTask.
There was a problem hiding this comment.
That makes sense. However, we would still need to capture one of TaskSpecification and ActorTableData as CreateActorTableDataFromCreationTask would require TaskSpecification if we move the CreateActorTableDataFromCreationTask call to the helper function
There was a problem hiding this comment.
Takes dummy_object now instead of the entire task.
src/ray/raylet/node_manager.cc
Outdated
| RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( | ||
| JobID::Nil(), parent_task_id, | ||
| //success_callback | ||
| [this, task, actor_id, actor_handle_id, new_actor_data, resumed_from_checkpoint] |
There was a problem hiding this comment.
To avoid capturing new_actor_data, we can just call CreateActorTableDataFromCreationTask inside the callback, and maybe pass in the parent actor ID to that helper function.
There was a problem hiding this comment.
Skipped this for now as it would require capturing TaskSpecification for CreateActorTableDataFromCreationTask.
There was a problem hiding this comment.
Hmm I would prefer capturing the TaskSpecification over new_actor_data so we can avoid marking the lambda as mutable.
src/ray/raylet/node_manager.cc
Outdated
| HandleObjectLocal(dummy_object); | ||
| } | ||
|
|
||
| void NodeManager::FinishAssignedActorTaskHelper(const ActorID& actor_id, const ActorTableData new_actor_data, bool resumed_from_checkpoint) { |
There was a problem hiding this comment.
I would name this something more descriptive, like FinishAssignedActorCreationTask.
src/ray/raylet/node_manager.h
Outdated
| /// \return Void. | ||
| void FinishAssignedActorTaskHelper(const ActorID& actor_id, | ||
| const ActorTableData new_actor_data, bool resumed_from_checkpoint); | ||
| /// Extend actor frontier when an actor task or actor creation task arrives. |
There was a problem hiding this comment.
| /// Extend actor frontier when an actor task or actor creation task arrives. | |
| /// Extend actor frontier after an actor task or actor creation task executes. |
Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>
|
Test FAILed. |
Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
src/ray/raylet/node_manager.cc
Outdated
| void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { | ||
| ActorID actor_id; | ||
| ActorHandleID actor_handle_id; | ||
| TaskSpecification task_spec = task.GetTaskSpecification(); |
There was a problem hiding this comment.
const TaskSpecification &task_spec
src/ray/raylet/node_manager.cc
Outdated
| } | ||
|
|
||
| if (!resumed_from_checkpoint) { | ||
| else if (!resumed_from_checkpoint) { |
There was a problem hiding this comment.
Yeah it used to be necessary because we also wanted to call ExtendActorFrontier for actor creation tasks that didn't resume from a checkpoint, but now we're calling that in the callback for the actor table lookup. So we can just make this an else now.
stephanie-wang
left a comment
There was a problem hiding this comment.
Thanks, a couple small comments, but otherwise looks good!
src/ray/raylet/node_manager.cc
Outdated
| (ray::gcs::AsyncGcsClient *client, const TaskID &parent_task_id) mutable { | ||
| // The parent task was not in the GCS task table. It must therefore be in the | ||
| // lineage cache. | ||
| RAY_CHECK(lineage_cache_.ContainsTask(parent_task_id)) |
There was a problem hiding this comment.
Unfortunately, one of the travis tests for the new C++ worker is failing on this check, I think because it does not add the driver task to the GCS. Let's make this a non-fatal check for now. If the task is not in the GCS or the lineage cache, we can just set the parent actor ID to nil (and leave a note/todo explaining this case).
|
Test FAILed. |
|
Test FAILed. |
|
Test PASSed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test PASSed. |
|
Test FAILed. |
Change-Id: I9c1a65134dc23a2d175047e96b86ab9d9cf61971
|
Test FAILed. |
|
Test PASSed. |
Change-Id: I1def06218130b399d2527b999258aecf9abb98dd
|
Test FAILed. |
|
Test PASSed. |
|
@vipulharsh @stephanie-wang CI seems broken after merging this PR. Can you take a look? |
What do these changes do?
Related issue number
Linter
scripts/format.shto lint the changes in this PR.