Conversation
|
I've removed the WIP label. This seems decent enough for me, at least as a first pass. I suspect that we'll change the implementation of ActorFuture eventually, but I think that the user API seems stabilized. |
|
I've drafted a parameter server that uses these I have a question: shouldn't In [1]: from distributed import Client
In [2]: class Dummy:
...: def __init__(self, value):
...: self.value = value
...: def get_value(self):
...: return self.value
...:
In [3]: client = Client()
In [4]: futures = client.map(Dummy, [1, 2], actor=True)
In [5]: dummies = client.gather(futures)
In [6]: values = [dummy.get_value() for dummy in dummies]
In [7]: values
Out[7]: [<ActorFuture>, <ActorFuture>]
In [8]: client.gather(values)
Out[8]: [<ActorFuture>, <ActorFuture>]I expected In [6]: v = [client.submit(lambda x: x, i) for i in range(2)]
In [7]: v
Out[7]:
[<Future: status: finished, type: int, key: <lambda>-e0ea2bfe577f8de576a4698ab710d5ae>,
<Future: status: finished, type: int, key: <lambda>-982d509e7b5d6f50f0a2ed3fb2838fcf>]
In [8]: client.gather(v)
Out[8]: [0, 1] |
I agree that it would be nice to unify everything and maybe that will happen some day. But that isn't how things work now, and I don't think it's likely to be that way soon. I recommend just using the |
|
@jcrist can I ask for your review on this? |
jcrist
left a comment
There was a problem hiding this comment.
I only gave this a cursory review - the actual implementation is a bit opaque to me.
As far as api/docs, I'm a bit confused by the barrier between Future and ActorFuture - when is it ok to mix them and when do they need to be separated?
Additionally, overloading the existing api seems odd to me, specifically in graph-level operations. compute(graph, actors=True) seems to indicate that only the end results are actors, not the intermediate values, but I could also interpret things the other way.
Do you see use cases for returning Actor objects from compute/persist? If not, I'd be inclined to not overload the existing methods, and only support something like client.new_actor(cls, *args, **kwargs), which may help make things clearer (at least for me).
distributed/actor.py
Outdated
|
|
||
| def __dir__(self): | ||
| o = set(dir(type(self))) | ||
| o.update({attr for attr in dir(self._cls) if not attr.startswith('_')}) |
distributed/actor.py
Outdated
| def __getattr__(self, key): | ||
| if not hasattr(self._cls, key): | ||
| raise AttributeError("%s does not have attribute %s" % | ||
| (type(self).__name__, key)) |
There was a problem hiding this comment.
Should be able to just rely on getattr(self._cls, key) raising this error below.
distributed/client.py
Outdated
| allow_other_workers = kwargs.pop('allow_other_workers', False) | ||
| actor = kwargs.pop('actor', False) | ||
| actors = kwargs.pop('actors', False) | ||
| actor = actor or actors |
There was a problem hiding this comment.
actor = kwargs.pop('actor', kwargs.pop('actors', False))
There was a problem hiding this comment.
Why support both actor and actors here? I'd prefer only a single boolean flag between all functions to keep things consistent.
There was a problem hiding this comment.
So actors would be general purpose, but it seemed a bit odd for submit
client.submit(Foo, actors=True)
It also seemed error prone to have one keyword for submit and one for map, so I just used both in both places. I agree that this is wonky though and am not surprised that it would not survive review. Do you have any suggestions? Use actors= everywhere, including submit?
distributed/client.py
Outdated
| fifo_timeout = kwargs.pop('fifo_timeout', '100ms') | ||
| actor = kwargs.pop('actor', False) | ||
| actors = kwargs.pop('actors', False) | ||
| actor = actor or actors |
There was a problem hiding this comment.
actor = kwargs.pop('actor', kwargs.pop('actors', False))
| try: | ||
| exception = protocol.pickle.loads(exception) | ||
| except Exception: | ||
| exception = Exception(exception) |
There was a problem hiding this comment.
Sometimes exceptions don't come in as serialized bytes, sometimes they come in as just a string of an error message. This is the case when the worker produces a non-serializable Exception (happens sometimes) or when the scheduler needs to return an exception. This came up in testin in this PR. I think that long-term we probably need to have a better structure where we return a message that includes the exception and how the exception is represented. I would prefer not to handle that in this PR though.
distributed/scheduler.py
Outdated
|
|
||
| for ts in ws.actors: | ||
| if ts.state not in {'memory', 'processing'}: | ||
| import pdb; pdb.set_trace() |
I've just added a test that uses actors with compute. Yes, I think that this is a valid use case. There is also some maintenance cost to adding new future-creating methods like |
I'll try to add some documentation around this point. Thank you for raising it. |
|
When I ran through a benchmark with a pseudo-parameter-server workload I found that I was getting latencies in the 5-10ms range, which seems pretty high. This lead to some work in profiling the scheduler and worker administrative threads (where I suspect most of the blame lies). Hopefully I can get this down in the future. |
|
My expectation is that Dask will run somewhere around 1-2ms in the moderate future. We would probably have to look to some more serious changes to get below that (but that's certainly possible as well). |
140cfc2 to
1ff5c21
Compare
This allows Dask to manage remote stateful classes. This has a few advantages:
And a few drawbacks
Fixes #2109
Example
Performance
Current roundtrip latency is around a millisecond