-
Notifications
You must be signed in to change notification settings - Fork 7.4k
Closed
Labels
P0Issues that should be fixed in short orderIssues that should be fixed in short orderbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'trelease-blockerP0 Issue that blocks the releaseP0 Issue that blocks the release
Description
What is the problem?
- ray 1.0.0
Reproduction (REQUIRED)
Ray remote actor tasks can be awaited. However, this only works reliably when the driver and the actor run on the same ray node. If this is not the case, await statements can take infinitely long.
In the script below, the OTHER_NODE custom resource is to indicate that this actor should be scheduled a node that's different from the driver of the script.
import os
import numpy as np
import ray
import asyncio
head_service = os.environ.get('RAY_HEAD_SERVICE_HOST')
head_port = os.environ.get('RAY_HEAD_SERVICE_PORT_REDIS_PRIMARY')
address = f"{head_service}:{head_port}" if head_service is not None else None
ray.init(address=address)
TOTAL_IMAGES = 10
@ray.remote(num_cpus=0, resources={"OTHER_NODE": 1})
class ResultActor:
def __init__(self):
self.results = {}
def predict(self, i, image):
self.results[i] = image
async def get_async_result(self, i):
result = self.results.pop(i, None)
while result is None:
print(f"Awaiting {i}")
result = self.results.pop(i, None)
print(f"Done awaiting {i}")
await asyncio.sleep(1)
print(f"GOT RESULT FOR {i}")
return result
result_actor = ResultActor.remote()
# SEND DATA TO ResultActor
for i in range(TOTAL_IMAGES):
image = (np.random.random((192, 1080, 3)) * 255).astype(np.uint8) # ~ 0.5MB
result_actor.predict.remote(i, image)
# Async get all data
async def get_all_results():
futs = []
fails = 0
for i in range(TOTAL_IMAGES):
fut = result_actor.get_async_result.remote(i)
futs.append(fut)
try:
await asyncio.wait_for(fut, timeout=5)
print(f"SUCCESS {i}")
except:
print(f"FAIL {i}")
fails += 1
# ALTERNATIVE: Await all at the same time
# done, pending = await asyncio.wait(futs, timeout=5)
# if len(pending):
# print(f"FAILED! {int(100.*len(pending)/TOTAL_IMAGES)}% tasks not succeeded!")
print(f"FAIL RATIO: {int(100.*fails/TOTAL_IMAGES)}%")
loop = asyncio.new_event_loop()
loop.run_until_complete(get_all_results())
The output of this script is:
2020-10-27 16:31:23,203 INFO worker.py:634 -- Connecting to existing Ray cluster at address: 10.43.87.248:6379
(pid=883, ip=10.42.0.8) GOT RESULT FOR 0
FAIL 0
(pid=883, ip=10.42.0.8) GOT RESULT FOR 1
FAIL 1
SUCCESS 2
(pid=883, ip=10.42.0.8) GOT RESULT FOR 2
(pid=883, ip=10.42.0.8) GOT RESULT FOR 3
FAIL 3
(pid=883, ip=10.42.0.8) GOT RESULT FOR 4
FAIL 4
(pid=883, ip=10.42.0.8) GOT RESULT FOR 5
FAIL 5
(pid=883, ip=10.42.0.8) GOT RESULT FOR 6
FAIL 6
SUCCESS 7
(pid=883, ip=10.42.0.8) GOT RESULT FOR 7
(pid=883, ip=10.42.0.8) GOT RESULT FOR 8
FAIL 8
(pid=883, ip=10.42.0.8) GOT RESULT FOR 9
FAIL 9
FAIL RATIO: 80%
As shown, we get print statements from the actor returning data, but the main process does not receive the data it is awaiting.
When the Actor is scheduled on the driver node, there is no issue.
When the data that is transfered is small (I assume <100kB) there also is no issue.
- I have verified my script runs in a clean environment and reproduces the issue.
- I have verified the issue also occurs with the latest wheels.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
P0Issues that should be fixed in short orderIssues that should be fixed in short orderbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'trelease-blockerP0 Issue that blocks the releaseP0 Issue that blocks the release