-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Ray Data] Async UDF actor leaks in map_batches #59033
Copy link
Copy link
Closed
Labels
bugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesstability
Description
What happened + What you expected to happen
The async UDF actors are marked as DEAD in Ray GCS. However, the actor processes do not exit. The hang stack of the leaked actors is:
Thread 0x1EE46E0C0 (active): "MainThread"
_shutdown (threading.py:1590)
Thread 0x32AC1B000 (idle): "PythonGCThread"
wait (threading.py:327)
wait (threading.py:629)
run (ray/_private/gc_collect_manager.py:34)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)
Thread 0x3E5FEF000 (active): "Thread-1 (run_loop)"
run_loop (ray/data/_internal/planner/plan_udf_map_op.py:100)
run (threading.py:982)
_bootstrap_inner (threading.py:1045)
_bootstrap (threading.py:1002)2.51.1
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}]
Actor dead.
Actor pid: 88093
Process not running.
Actor process exit.
2.52.0
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}]
Actor dead.
Actor pid: 83686
True
True
True
True
True
True
True
True
...
Versions / Dependencies
2.52.0
Reproduction script
import time
import psutil
import gc
import ray
import ray.data
import ray.util.state
class TestMapActor:
async def __call__(self, array):
return array
def get_state():
actors = ray.util.state.list_actors()
states = [a.state for a in actors if "TestMapActor" in a.class_name]
assert len(states) == 1
return states[0]
def get_pid():
actors = ray.util.state.list_actors()
states = [a.pid for a in actors if "TestMapActor" in a.class_name]
assert len(states) == 1
return states[0]
if __name__ == "__main__":
print(ray.__version__)
ds = ray.data.range(10)
ds = ds.map_batches(TestMapActor, compute=ray.data.ActorPoolStrategy(size=1))
print(ds.take_all())
del ds
gc.collect()
while get_state() == "ALIVE":
time.sleep(1)
print("Actor dead.")
actor_pid = get_pid()
print(f"Actor pid: {actor_pid}")
while True:
try:
print(psutil.Process(actor_pid).is_running())
time.sleep(1)
except psutil.NoSuchProcess:
print("Process not running.")
break
print("Actor process exit.")Issue Severity
High: It blocks me from completing my task.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesstability