Skip to content

Async await not working when result is fetched from another node's object store #11649

@PidgeyBE

Description

@PidgeyBE

What is the problem?

  • ray 1.0.0

Reproduction (REQUIRED)

Ray remote actor tasks can be awaited. However, this only works reliably when the driver and the actor run on the same ray node. If this is not the case, await statements can take infinitely long.

In the script below, the OTHER_NODE custom resource is to indicate that this actor should be scheduled a node that's different from the driver of the script.

import os
import numpy as np
import ray
import asyncio

head_service = os.environ.get('RAY_HEAD_SERVICE_HOST')
head_port = os.environ.get('RAY_HEAD_SERVICE_PORT_REDIS_PRIMARY')
address = f"{head_service}:{head_port}" if head_service is not None else None
ray.init(address=address)

TOTAL_IMAGES = 10

@ray.remote(num_cpus=0,  resources={"OTHER_NODE": 1})
class ResultActor:
    def __init__(self):
        self.results = {}

    def predict(self, i, image):
        self.results[i] = image
    
    async def get_async_result(self, i):
        result = self.results.pop(i, None)
        while result is None:
            print(f"Awaiting {i}")
            result = self.results.pop(i, None)
            print(f"Done awaiting {i}")
            await asyncio.sleep(1)
        print(f"GOT RESULT FOR {i}")
        return result


result_actor = ResultActor.remote()

# SEND DATA TO ResultActor
for i in range(TOTAL_IMAGES):
    image = (np.random.random((192, 1080, 3)) * 255).astype(np.uint8)  # ~ 0.5MB
    result_actor.predict.remote(i, image)

# Async get all data
async def get_all_results():
    futs = []
    fails = 0
    for i in range(TOTAL_IMAGES):
        fut = result_actor.get_async_result.remote(i)
        futs.append(fut)
        try:
            await asyncio.wait_for(fut, timeout=5)
            print(f"SUCCESS {i}")
        except:
            print(f"FAIL {i}")
            fails += 1

    # ALTERNATIVE: Await all at the same time
    # done, pending = await asyncio.wait(futs, timeout=5)
    # if len(pending):
    #     print(f"FAILED! {int(100.*len(pending)/TOTAL_IMAGES)}% tasks not succeeded!")

    print(f"FAIL RATIO: {int(100.*fails/TOTAL_IMAGES)}%")


loop = asyncio.new_event_loop()
loop.run_until_complete(get_all_results())

The output of this script is:

2020-10-27 16:31:23,203	INFO worker.py:634 -- Connecting to existing Ray cluster at address: 10.43.87.248:6379
(pid=883, ip=10.42.0.8) GOT RESULT FOR 0
FAIL 0
(pid=883, ip=10.42.0.8) GOT RESULT FOR 1
FAIL 1
SUCCESS 2
(pid=883, ip=10.42.0.8) GOT RESULT FOR 2
(pid=883, ip=10.42.0.8) GOT RESULT FOR 3
FAIL 3
(pid=883, ip=10.42.0.8) GOT RESULT FOR 4
FAIL 4
(pid=883, ip=10.42.0.8) GOT RESULT FOR 5
FAIL 5
(pid=883, ip=10.42.0.8) GOT RESULT FOR 6
FAIL 6
SUCCESS 7
(pid=883, ip=10.42.0.8) GOT RESULT FOR 7
(pid=883, ip=10.42.0.8) GOT RESULT FOR 8
FAIL 8
(pid=883, ip=10.42.0.8) GOT RESULT FOR 9
FAIL 9
FAIL RATIO: 80%

As shown, we get print statements from the actor returning data, but the main process does not receive the data it is awaiting.

When the Actor is scheduled on the driver node, there is no issue.
When the data that is transfered is small (I assume <100kB) there also is no issue.

  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

Metadata

Metadata

Assignees

Labels

P0Issues that should be fixed in short orderbugSomething that is supposed to be working; but isn'trelease-blockerP0 Issue that blocks the release

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions