Skip to content

[Data] map_batches on a threaded actor shows incorrect concurrency #55354

@codingl2k1

Description

@codingl2k1

What happened + What you expected to happen

The map_batches function should respect the max_concurrency setting as specified in the actor options. However, when I set max_concurrency > 1, the map_batches on a sync actor still has no concurrency, all operations on the actor are executed serially. The map_batches on an async actor gives expected concurrency.

The following reproduction code prints:

Multithreaded actor with max_concurrency=2: ['enter', 'enter', 'exit', 'exit']
Async actor with max_concurrency=2: ['enter', 'enter', 'exit', 'exit']
map_batches on a sync Actor with max_concurrency=2: ['enter', 'exit', 'enter', 'exit']
map_batches on an async Actor with max_concurrency=2: ['enter', 'enter', 'exit', 'exit']

You can see map_batches on a sync Actor with max_concurrency=2 is executed serially.

Versions / Dependencies

Ray 2.48.0

Reproduction script

import asyncio
import time
import ray
import pyarrow as pa


@ray.remote
class Recorder:
    def __init__(self):
        self._seq = []

    def clear(self):
        self._seq.clear()

    def append(self, seq):
        self._seq.append(seq)

    def get_seq(self):
        return self._seq


class Actor:
    def __init__(self, recorder):
        recorder.clear.remote()
        self._recorder = recorder

    def __call__(self, batch: pa.Table) -> pa.Table:
        self._recorder.append.remote("enter")
        time.sleep(1)
        self._recorder.append.remote("exit")
        return batch


class AsyncActor:
    def __init__(self, recorder):
        recorder.clear.remote()
        self._recorder = recorder

    async def __call__(self, batch: pa.Table) -> pa.Table:
        self._recorder.append.remote("enter")
        await asyncio.sleep(1)
        self._recorder.append.remote("exit")
        return batch


if __name__ == "__main__":
    ray.init(address="local")
    recorder = Recorder.remote()

    # Test multithreaded actor
    threaded_actor = ray.remote(Actor).options(max_concurrency=2).remote(recorder)
    threaded_actor.__call__.remote(None)
    ray.get(threaded_actor.__call__.remote(None))
    print(f"Multithreaded actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")

    # Test async actor
    async_actor = ray.remote(AsyncActor).options(max_concurrency=2).remote(recorder)
    async_actor.__call__.remote(None)
    ray.get(async_actor.__call__.remote(None))
    print(f"Async actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")

    texts = list(range(10))
    dataset = ray.data.from_items(texts)

    # Test map_batches on a sync actor with max_concurrency=2
    dataset.map_batches(Actor, fn_constructor_args=(recorder,),
                        concurrency=1,
                        batch_format="pyarrow", batch_size=5,
                        max_concurrency=2).materialize()
    print(f"map_batches on a sync Actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")

    # Test map_batches on an async actor with max_concurrency=2
    dataset.map_batches(AsyncActor, fn_constructor_args=(recorder,),
                        concurrency=1,
                        batch_format="pyarrow", batch_size=5,
                        max_concurrency=2).materialize()
    print(f"map_batches on an async Actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")

Issue Severity

None

Metadata

Metadata

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesperformancestability

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions