[xray] Lineage cache requests notifications from the GCS about remote tasks#1834
Conversation
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
src/ray/raylet/lineage_cache.cc
Outdated
| // Stop listening for notifications about this task. | ||
| auto it = subscribed_tasks_.find(task_id); | ||
| if (it != subscribed_tasks_.end()) { | ||
| task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_); |
There was a problem hiding this comment.
It seems like for this use case, it would be fine for the Cancel to happen automatically once the GCS sends the notification, right?
The issue with that is that it involves special-casing the behavior in the redis module?
There was a problem hiding this comment.
Hmm yeah, for the Table interface, we could just call Cancel as soon as an entry is found since table entries are supposed to be immutable. Should I change that in this PR?
There was a problem hiding this comment.
It can be a subsequent PR if that makes sense.
src/ray/raylet/lineage_cache.cc
Outdated
| // Stop listening for notifications about this task. | ||
| auto it = subscribed_tasks_.find(task_id); | ||
| if (it != subscribed_tasks_.end()) { | ||
| task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_); |
There was a problem hiding this comment.
we should probably do something with the returned STATUS
| /// will remain unchanged. | ||
| /// | ||
| /// \param task_id The ID of the waiting task to remove. | ||
| void RemoveWaitingTask(const TaskID &task_id); |
There was a problem hiding this comment.
Should all of these methods return a status? Or should they handle failures internally?
|
I added some status checks, but I wasn't really sure if they should be |
| } | ||
|
|
||
| void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { | ||
| RAY_LOG(DEBUG) << "task committed: " << task_id; |
There was a problem hiding this comment.
Under the current design, we can't actually guarantee that this method won't fire twice (or more) for the same task ID, right? Couldn't that cause issues here?
There was a problem hiding this comment.
Hmm, it should be okay if this method fires twice. I think this function is idempotent.
There was a problem hiding this comment.
Also, the only case I can think of right now where this would fire twice is if a task gets reconstructed and added to the table again. That should be pretty rare.
There was a problem hiding this comment.
You were right :) I changed this to check that the status is already COMMITTED if the call to set the status fails.
|
Test FAILed. |
|
Thanks, @robertnishihara, I think the code you added looks good. We're basically calling |
|
Test FAILed. |
|
Test PASSed. |
|
Test PASSed. |
|
Currently seeing |
|
Test FAILed. |
|
Test PASSed. |
* master: Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903) [tune] Polishing docs (ray-project#1846) [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848) Preemptively push local arguments for actor tasks (ray-project#1901) [tune] Allow fetching pinned objects from trainable functions (ray-project#1895) Multithreading refactor for ObjectManager. (ray-project#1911) Add slice functionality (ray-project#1832) [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894) Addresses missed comments from multichunk object transfer PR. (ray-project#1908) Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816) [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834) Fix UI issue for non-json-serializable task arguments. (ray-project#1892) Remove unnecessary calls to .hex() for object IDs. (ray-project#1910) Allow multiple raylets to be started on a single machine. (ray-project#1904) # Conflicts: # python/ray/rllib/__init__.py # python/ray/rllib/dqn/dqn.py
What do these changes do?
This extends the lineage cache to request notifications about tasks that are scheduled to execute at a remote node. This allows each node to evict tasks from its local lineage cache once a notification about the remote task's commit is received.