Propagate contextvars to worker threads; catch warnings in 3.14t#12224
Propagate contextvars to worker threads; catch warnings in 3.14t#12224crusaderky merged 1 commit intodask:mainfrom
Conversation
8f5aaf5 to
86cd653
Compare
|
|
||
|
|
||
| @pytest.mark.parametrize("num_workers", [None, 1]) | ||
| def test_context_aware_warnings(num_workers): |
There was a problem hiding this comment.
3.14t CI run, where this test becomes meaninful, in #12223
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 0 tests ±0 0 ✅ ±0 0s ⏱️ ±0s Results for commit f5db9d5. ± Comparison against base commit 825904a. ♻️ This comment has been updated with latest results. |
|
CI failures are unrelated |
|
@jacobtomlinson @jsignell does either of you have bandwidth to offer review? |
jsignell
left a comment
There was a problem hiding this comment.
I can't say I 100% understand all the pieces of this, but I read it through and tried it out. I works well and seems like a good improvement. Does it have any negative repercussions for Python<3.14t?
2920f3a to
f5db9d5
Compare
On 3.13 I'm observing an extra 5~10 μs / task worth of overhead: >>> import dask.array as da
>>> a = da.ones(5000, chunks=1).sum(split_every=2)
>>> len(a.dask)
15005
>>> _ = a.compute() # thread pool warm-up
>>> %time a.compute()
# BEFORE
CPU times: user 1.87 s, sys: 244 ms, total: 2.11 s
Wall time: 1.85 s
# AFTER
CPU times: user 1.97 s, sys: 215 ms, total: 2.19 s
Wall time: 1.96 sIMHO it's a reasonable price to pay to have consistent behaviour across all Python versions. @jsignell are you happy to merge? |
|
I'm also observing no performance downgrade if I define 1000 contextvars globally: >>> import contextvars
>>> vars = [contextvars.ContextVar(f"c{i}") for i in range(1000)]
>>> for var in vars:
... var.set(123) |
|
Yeah I am fine with merging. |
Context
Python 3.14t introduces two substantial changes that impact Dask:
contextvarsare propagated from the parent thread to new threads that are being spawned. When it comes to thread pools, however, it means that a worker thread may be already running when the user callssubmit, and thus won't get the change in context.warnings.catch_warningsnow is backed bycontextvars, whereas previously was process-global. This also impacts@pytest.mark.filterwarnings.This change in CPython was necessary to allow catching warnings in threads without interfering with other threads.
It's worth noting that, for Dask, this behaviour is relevant also in GIL-enabled builds (think about divisions by zero in numpy). On Python 3.14t, this behaviour is on by default. On 3.14, it is opt-in with
or with equivalent environment variables.
See upstream documentation:
Impact
The direct consequence for most Dask users is that catching warnings on the threaded scheduler becomes erratic on 3.14t, because
dask.threadedkeeps thread pools warm across calls toget/compute/persist.Consider this:
the above fails to catch the warning on 3.14t. If you comment out
a.compute(), however, it will work again.Same will happen here:
pytestwill fail withRuntimeWarning.pytest -k test_2will pass.In-depth discussion here: pytest-dev/pytest#14077
This PR
scheduler="threads"(which is the default fordask.arrayanddask.dataframewhen there is nodistributed.Client) will now capture the contextvars on the thread callingget/compute/persistand replicate them in the worker threads. This is on all Python versions and regardless of thread pool warmup.warnings.catch_warningsand@pytest.mark.filterwarningswould not catch anything if the thread pool was already warm (e.g. if another test already calledget/compute/persist).