WIP - Optionally use offload executor in worker#4307
Conversation
Normally we have one separate thread pool executor for deserializing data
and another thread pool executor in a worker for execution.
Sometimes this presents a problem because some data types want to be
used in the thread in which they were created. One example of this is
TensorFlow graphs, but there are others.
One way to resolve this is to reuse the same executor in both
situations, and ensure that it has only one thread. This means that
execution and deserialization will block each other (not great) but that
user data will always be operated on in one thread only.
This commit implements that, and drafts up a small test. However, it is
still broken because Dask can be clever in some situations and
deserialize directly on the event loop. This happens for a few reasons
today:
1. For small messages we deserialize on the event loop for performance
reasons. The user can control this with the
distributed.comm.offload configuration value.
I recommend the value of 1, meaning a single byte
2. The scheduler-client comms intentionally do not offload today
(grep for the `allow_offload=False`)
|
Nice, this is really interesting. We also don't offload deserialization of tasks on the worker distributed/distributed/worker.py Lines 3280 to 3294 in 3407aa3 so for this to work, we'll need to add some additional offloading logic there |
distributed/worker.py
Outdated
| function, args, kwargs = _deserialize(*ts.runspec) | ||
| # Offload deserializing large tasks | ||
| offload_threshold = get_offload_threshold() | ||
| if sizeof(ts.runspec) > offload_threshold: |
There was a problem hiding this comment.
I want to think about if there's a better way to do this. This may lead to a slowdown if we start submitting task deserialization to a busy offloading thread pool.
…ker-offload-executor
distributed/comm/utils.py
Outdated
| if FRAME_OFFLOAD_THRESHOLD and allow_offload: | ||
| # Offload serializing large frames to improve event loop responsiveness. | ||
| offload_threshold = get_offload_threshold() | ||
| if offload_threshold and allow_offload: |
There was a problem hiding this comment.
Hrm, this may be expensive to do on every message. Thoughts?
There was a problem hiding this comment.
It looks like this adds ~3µs:
In [1]: import dask
In [2]: from distributed.comm.utils import get_offload_threshold
In [3]: %timeit get_offload_threshold()
2.96 µs ± 44.7 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)This change was so we could set distributed.comm.offload at runtime (i.e. with dask.config.set(distributed__comm__offload=1):) instead of just at startup time. Though I agree since is run for every message, the increased overhead may not be worth the extra flexibility.
There was a problem hiding this comment.
I reverted the addition of get_offload_threshold and we now just pull in distributed.comm.offload at startup time
Otherwise I think that we open ourselves to some slowdown on the event loop and the possibility of deadlocks (maybe?)
| function, args, kwargs = _deserialize(*ts.runspec) | ||
| # Offload deserializing large tasks | ||
| if sizeof(ts.runspec) > OFFLOAD_THRESHOLD: | ||
| function, args, kwargs = await offload(_deserialize, *ts.runspec) |
|
OK. This seems ok to me. Merging in. |
|
cc @Carreau (who IIRC had a similar use case in the past) |
Normally we have one separate thread pool executor for deserializing data
and another thread pool executor in a worker for execution.
Sometimes this presents a problem because some data types want to be
used in the thread in which they were created. One example of this is
TensorFlow graphs, but there are others.
One way to resolve this is to reuse the same executor in both
situations, and ensure that it has only one thread. This means that
execution and deserialization will block each other (not great) but that
user data will always be operated on in one thread only.
This commit implements that, and drafts up a small test. However, it is
still broken because Dask can be clever in some situations and
deserialize directly on the event loop. This happens for a few reasons
today:
For small messages we deserialize on the event loop for performance
reasons. The user can control this with the
distributed.comm.offload configuration value.
I recommend the value of 1, meaning a single byte
The scheduler-client comms intentionally do not offload today
(grep for the
allow_offload=False)