-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Data] map_batches on a threaded actor shows incorrect concurrency #55354
Copy link
Copy link
Closed
Labels
P1Issue that should be fixed within a few weeksIssue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesperformancestability
Description
What happened + What you expected to happen
The map_batches function should respect the max_concurrency setting as specified in the actor options. However, when I set max_concurrency > 1, the map_batches on a sync actor still has no concurrency, all operations on the actor are executed serially. The map_batches on an async actor gives expected concurrency.
The following reproduction code prints:
Multithreaded actor with max_concurrency=2: ['enter', 'enter', 'exit', 'exit']
Async actor with max_concurrency=2: ['enter', 'enter', 'exit', 'exit']
map_batches on a sync Actor with max_concurrency=2: ['enter', 'exit', 'enter', 'exit']
map_batches on an async Actor with max_concurrency=2: ['enter', 'enter', 'exit', 'exit']You can see map_batches on a sync Actor with max_concurrency=2 is executed serially.
Versions / Dependencies
Ray 2.48.0
Reproduction script
import asyncio
import time
import ray
import pyarrow as pa
@ray.remote
class Recorder:
def __init__(self):
self._seq = []
def clear(self):
self._seq.clear()
def append(self, seq):
self._seq.append(seq)
def get_seq(self):
return self._seq
class Actor:
def __init__(self, recorder):
recorder.clear.remote()
self._recorder = recorder
def __call__(self, batch: pa.Table) -> pa.Table:
self._recorder.append.remote("enter")
time.sleep(1)
self._recorder.append.remote("exit")
return batch
class AsyncActor:
def __init__(self, recorder):
recorder.clear.remote()
self._recorder = recorder
async def __call__(self, batch: pa.Table) -> pa.Table:
self._recorder.append.remote("enter")
await asyncio.sleep(1)
self._recorder.append.remote("exit")
return batch
if __name__ == "__main__":
ray.init(address="local")
recorder = Recorder.remote()
# Test multithreaded actor
threaded_actor = ray.remote(Actor).options(max_concurrency=2).remote(recorder)
threaded_actor.__call__.remote(None)
ray.get(threaded_actor.__call__.remote(None))
print(f"Multithreaded actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")
# Test async actor
async_actor = ray.remote(AsyncActor).options(max_concurrency=2).remote(recorder)
async_actor.__call__.remote(None)
ray.get(async_actor.__call__.remote(None))
print(f"Async actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")
texts = list(range(10))
dataset = ray.data.from_items(texts)
# Test map_batches on a sync actor with max_concurrency=2
dataset.map_batches(Actor, fn_constructor_args=(recorder,),
concurrency=1,
batch_format="pyarrow", batch_size=5,
max_concurrency=2).materialize()
print(f"map_batches on a sync Actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")
# Test map_batches on an async actor with max_concurrency=2
dataset.map_batches(AsyncActor, fn_constructor_args=(recorder,),
concurrency=1,
batch_format="pyarrow", batch_size=5,
max_concurrency=2).materialize()
print(f"map_batches on an async Actor with max_concurrency=2: {ray.get(recorder.get_seq.remote())}")Issue Severity
None
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
P1Issue that should be fixed within a few weeksIssue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesperformancestability
Type
Projects
Status
Done