Skip to content

[Ray Data] Async UDF actor leaks in map_batches #59033

@codingl2k1

Description

@codingl2k1

What happened + What you expected to happen

The async UDF actors are marked as DEAD in Ray GCS. However, the actor processes do not exit. The hang stack of the leaked actors is:

Thread 0x1EE46E0C0 (active): "MainThread"
    _shutdown (threading.py:1590)
Thread 0x32AC1B000 (idle): "PythonGCThread"
    wait (threading.py:327)
    wait (threading.py:629)
    run (ray/_private/gc_collect_manager.py:34)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
Thread 0x3E5FEF000 (active): "Thread-1 (run_loop)"
    run_loop (ray/data/_internal/planner/plan_udf_map_op.py:100)
    run (threading.py:982)
    _bootstrap_inner (threading.py:1045)
    _bootstrap (threading.py:1002)
2.51.1
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}]
Actor dead.
Actor pid: 88093
Process not running.
Actor process exit.
2.52.0
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}, {'id': 5}, {'id': 6}, {'id': 7}, {'id': 8}, {'id': 9}]
Actor dead.
Actor pid: 83686
True
True
True
True
True
True
True
True
...

Versions / Dependencies

2.52.0

Reproduction script

import time
import psutil

import gc
import ray
import ray.data
import ray.util.state

class TestMapActor:
    async def __call__(self, array):
        return array


def get_state():
    actors = ray.util.state.list_actors()
    states = [a.state for a in actors if "TestMapActor" in a.class_name]
    assert len(states) == 1
    return states[0]

def get_pid():
    actors = ray.util.state.list_actors()
    states = [a.pid for a in actors if "TestMapActor" in a.class_name]
    assert len(states) == 1
    return states[0]

if __name__ == "__main__":
    print(ray.__version__)
    ds = ray.data.range(10)
    ds = ds.map_batches(TestMapActor, compute=ray.data.ActorPoolStrategy(size=1))
    print(ds.take_all())
    del ds
    gc.collect()
    while get_state() == "ALIVE":
        time.sleep(1)
    print("Actor dead.")
    actor_pid = get_pid()
    print(f"Actor pid: {actor_pid}")
    while True:
        try:
            print(psutil.Process(actor_pid).is_running())
            time.sleep(1)
        except psutil.NoSuchProcess:
            print("Process not running.")
            break
    print("Actor process exit.")

Issue Severity

High: It blocks me from completing my task.

Metadata

Metadata

Assignees

Labels

bugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesstability

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions