Warn if tasks are submitted with identical keys but different run_spec#8185
Warn if tasks are submitted with identical keys but different run_spec#8185hendrikmakait merged 4 commits intodask:mainfrom
run_spec#8185Conversation
run_specs
run_specsrun_spec
run_specrun_spec
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 27 files ± 0 27 suites ±0 10h 1m 29s ⏱️ - 9m 16s For more details on these failures, see this check. Results for commit 07974fd. ± Comparison against base commit 045dc64. ♻️ This comment has been updated with latest results. |
crusaderky
left a comment
There was a problem hiding this comment.
I'm quite conflicted about this PR. I see a lot of pitfalls, and the only benefit would be to push the failure sooner in the process and make it more explicit.
I wonder if it wouldn't be simpler to just blindly retain the previous run_spec in case of resubmission?
Should we have a high-bandwidth brainstorming session about this?
distributed/scheduler.py
Outdated
| elif ( | ||
| # run_spec in the submitted graph may be None. This happens | ||
| # when an already persisted future is part of the graph | ||
| dsk.get(k) is not None |
There was a problem hiding this comment.
is it even possible that k is not in dsk? Short of a malformed graph
There was a problem hiding this comment.
As it turns out, there are plenty of possibilities. Most notably, this is the case when there are persisted futures as part of the graph
I'm growing more convinced about this approach by the minute. Our assumptions that the key is uniquely identifying the entire task (not just the data) is so strongly baked into the system that we cannot afford to slip. I will not be able to make time today for a high bandwidth session but I will also not be working on this today. If you are interested, I recommend checking out some of the remaining test failures. For instance, in P2P it appears that we have a genuine, potentially data-lossy bug in some situations because of this. |
I agree that it's extremely unhealthy to let a task change its contents after it's been created. |
Not too excited about this. We'd basically replace one inconsistency with another. |
Why? Assuming the user doesn't manually submit different tasks with the same hand-crafted key (if they want to shoot themselves in their feet, it's their problem), it would nicely work around the issue of the optimizer submitting different DAGs that produce the same output? |
I do not want to assume that there is only one possible way to achieve this corrupt state and guard against this. When using the futures API, it is not that difficult to produce a state like this. Providing "hand crafted" keys is part of our API and users are using this (e.g. #3965 and #7551; unrelated but also problems because users are providing keys that break our assumptions). This is even the most natural way to scatter data. I also have no idea how future optimizers will behave. I don't believe that blindly keeping the first known run_spec is the right way. I don't like guessing intent. Besides... the code right now actually does this (albeit buggy)! |
cc4f46b to
2e302a1
Compare
Status update
This PR is blocked by |
a70bbdd to
cf5c294
Compare
| marks=pytest.mark.xfail(reason="https://github.com/dask/dask/issues/9888"), | ||
| ), | ||
| ], | ||
| ) |
There was a problem hiding this comment.
b6df5d4 to
e9c59d5
Compare
|
Houston we have a problem. The new check highlights behaviour in p2p shuffle where a piece of graph is eagerly sent up to the scheduler during graph definition time, then again with the same keys but different run_spec. To my understanding this is a separate issue from dask/dask#9888 as it is doesn't go away with You can easily see it in ddf = dd.from_pandas(df, npartitions=4)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
ddf = ddf.set_index("a").sort_values("b")
result = ddf.compute()however, it trips the run_spec check, because
Some keys are the same, but run_spec changes; this triggers a race condition where the scheduler didn't have the time to forget those keys yet. Example exception: The use case won't fail in main because the key is always close to its released->forgotten transition when this happens (although in other cases it may be in memory; I'm not sure). To me, the hard failure is very healthy behaviour, but at the same time this means we can't merge this PR in until after we fix shuffle to prevent the above. |
|
Looking at the tokens you are posting, this still looks like dask/dask#9888. The config option you are referring to does not control blockwise fusion but rather low-level task fusing. Both are problematic but interestingly, the low level fusion is a little more robust. This would also mean that this goes away with dask-expr (although I'm sure there are cases we can trigger a similar problem for arrays; one step at a time) |
2099447 to
50bed33
Compare
50bed33 to
7591996
Compare
|
Closes dask/dask#10905 |
distributed/scheduler.py
Outdated
| tok_lhs != tok_rhs or deps_lhs != deps_rhs | ||
| ) and ts.group not in tgs_with_bad_run_spec: | ||
| tgs_with_bad_run_spec.add(ts.group) | ||
| logger.warning( |
There was a problem hiding this comment.
nit: Should we add a debug log regardless of ts.group not in tgs_with_bad_run_spec?
distributed/scheduler.py
Outdated
| tok_lhs != tok_rhs or deps_lhs != deps_rhs | ||
| ) and ts.group not in tgs_with_bad_run_spec: | ||
| tgs_with_bad_run_spec.add(ts.group) | ||
| logger.warning( |
There was a problem hiding this comment.
I'm wondering if it would be useful to propagate this warning to the user who submitted the graph instead of potentially burying it in the scheduler logs. Do you have thoughts on this?
There was a problem hiding this comment.
AFAIK we have no means to propagate warnings to the client? We'd need to build the infrastructure for it first
There was a problem hiding this comment.
I wasn't entirely sure whether we had something that does this. Since that's not the case, forget what I said.
distributed/scheduler.py
Outdated
| tok_lhs: Any = tokenize(ts.run_spec, ensure_deterministic=True) | ||
| tok_rhs: Any = tokenize(dsk[k], ensure_deterministic=True) |
There was a problem hiding this comment.
Should we deal with the case where one of the two is deterministic but not the other?
7591996 to
728b89e
Compare
728b89e to
cad1467
Compare
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
|
@hendrikmakait all comments have been addressed |
f13dd0b to
07974fd
Compare
hendrikmakait
left a comment
There was a problem hiding this comment.
Thanks, @crusaderky! LGTM, assuming CI is happy.
|
Since version 2024.2.1, I am getting the "Detected different Does this point to an issue in my code or with the dask.array.linalg code? Log |
Not an error, a warning. As a user, you could work around it by making sure you don't resubmit the same parts of your graph multiple times. a = ... # dask collection
b = f(a)
a = a.persist()
b = b.persist()the error will disappear if you change it to a = ... # dask collection
a = a.persist()
b = f(a)
b = b.persist() |
This mitigates the impact of dask/dask#9888 by raising an exception when we detect that the submitted run_spec differs from the already known run_spec.
I believe to handle such a key collision gracefully we'd need to not only store the key itself but also a hash/token of the run_spec itself and pass this signature around everywhere. Alternatively, there was a proposal to use the TaskState object ID for this but the changes are the same.
A surprising amount of tests are failing with this change and I have to investigate more