Allow actors to call actors on the same worker#4225
Conversation
|
I tested manually with the following code, to show that blocking, async methods and properties work: import dask.distributed
client = dask.distributed.Client(n_workers=1)
class Minimal:
def __init__(self):
self.value = 0
def inc(self):
self.value += 1
return self.value
async def ainc(self):
self.value += 1
return self.value
@property
def val(self):
return self.value
class UsesMinimal:
def do_inc(self, ac, N):
return ac.inc().result()
async def ado_inc(self, ac, N):
return await ac.ainc()
def do_val(self, ac):
return ac.val
ac = client.submit(Minimal, actor=True).result()
ac2 = client.submit(UsesMinimal, actor=True, workers=[ac._address]).result() |
|
Cool. I'm glad to see this come together so fast. Do you think it would be doable to include your manual test as a pytest test? |
|
Yes of course - I wasn't certain whether what i wrote made sense or not.
…On November 6, 2020 8:38:03 PM EST, Matthew Rocklin ***@***.***> wrote:
Cool. I'm glad to see this come together so fast. Do you think it
would be doable to include your manual test as a pytest test?
--
You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub:
#4225 (comment)
--
Sent from my Android device with K-9 Mail. Please excuse my brevity.
|
distributed/actor.py
Outdated
| attr = getattr(actor, key) | ||
|
|
||
| if iscoroutinefunction(attr): | ||
| return lambda *args, **kwargs: attr(*args, **kwargs) |
There was a problem hiding this comment.
I'm confused by this. It seems like a no-op? Would return attr be equivalent?
There was a problem hiding this comment.
That seems ... likely :). It must have been a little more complex as I tried&errored here.
I don't see anything that rings any warning bells, but all of my experience here is somewhat dated. And hey, "if it passes tests ... " :) |
|
This took a little more work, because the new code path I made only applies to workers calling workers, not normal submitted tasks calling workers (because they go via the scheduler and get executed in the normal pool). |
|
This passed on Travis at https://travis-ci.com/github/martindurant/distributed/builds/199375854 but not updating here. |
|
The following also fails - in fact, I think any exception in an actor might kill the worker. I'll put the fix in a new PR as soon as I have it def test_exception():
class MyException(Exception):
pass
class Broken:
def method(self):
raise MyException
with cluster(nworkers=2) as (cl, w):
client = Client(cl["address"])
ac = client.submit(Broken, actor=True).result()
acfut = ac.method()
with pytest.raises(MyException):
acfut.result() |
distributed/actor.py
Outdated
| if ( | ||
| self._worker | ||
| and self._worker.address == self._address | ||
| and threading.current_thread().name.startswith("Dask-Actor-Threads") |
There was a problem hiding this comment.
Typically we use threadlocals for this. You might want to look into the use of the thread_state variable in worker.py
|
|
||
| thread_state.execution_state = execution_state | ||
| thread_state.key = key | ||
| thread_state.actor = True |
There was a problem hiding this comment.
Maybe we should set this to False just after calling the function?
There was a problem hiding this comment.
The intent, I think, is to state "yes, this is an actor-specific thread" rather than "we happen to be running an actor function right now". This use is more similar to checking the thread name.
Fixes #4224