Skip to content

Fix optimize_blockwise bug for duplicate dependency names#8542

Merged
rjzamora merged 9 commits intodask:mainfrom
rjzamora:optimize-blockwise-fix
Jan 14, 2022
Merged

Fix optimize_blockwise bug for duplicate dependency names#8542
rjzamora merged 9 commits intodask:mainfrom
rjzamora:optimize-blockwise-fix

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Jan 7, 2022

This PR addsd a possible solution to #8535

It seems that the rewrite_blockwise phase of the current optimize_blockwise implementation can loose important information when a Blockwise layer depends on two other Blockwise layers with the same name (i.e. the same layer). For example, if matmul is used to multiply a matrix with itself, the same matrix projection may be used for both inputs to the binomial operation (rather than the proper ij,jk projections). This will happen if the duplicate input is also a Blockwise layer (like X*2), because the original indices information in the matmul layer will be lost when the X*2 result is fused into the root subgraph.

This PR proposes that we avoid this problem by appending a "counting" label to duplicated dependency names when they are fused into the root subgraph. This ensures that operations like matmul(a, b) will always store a distinct key-value pair for a and b the subgraph.

cc @ian-r-rose

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand blockwise optimization at all, so these comments are very from the hip

new_axes = inputs[root].new_axes
concatenate = inputs[root].concatenate
dsk = dict(inputs[root].dsk)
seen = defaultdict(int)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to track this globally? Or could we start fresh at each layer we process (is this the for i, (dep, ind) in enumerate(indices) loop?)?

I had thought the problem was specifically when one layer used the same input name multiple times, but with different iteration orders (indices), not just that the same name occurred in the whole graph multiple times. So maybe this is too wide of a net? Or maybe it doesn't matter?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the case, then maybe rather than a counter, we could actually just have use_dep = f"{dep}_{ind}", i.e. identify dependencies internally not just by unique name, but by unique combo of name and indexing pattern?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same thing. Currently sitting in a REPL trying to see if I can make a name/index pair that should be combined with this, but is not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can probably move all the logic into the same dep-processing step. This exact suggestion is one of the reasons the PR was still marked as draft. We may also want to avoid "duplicating" the k-v pair if the name+index combiunation matches exactly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, scratch my last comment... A single for i, (dep, ind) in enumerate(indices): loop does not actually cover the repeated dep case that we need. This outer while changed: loop actually constructs the fused subgraph in a depth-first way (not in a breadth-first layer-by-layer order). This is why dep may be the same for two distinct loops, and we want to make sure we do not use the first index used for that name for all subsequent occurrences.

@ian-r-rose
Copy link
Collaborator

Thanks for this @rjzamora. I'm still digesting this, but I can confirm that it fixes the issue that prompted #8535 in addition to the minimal example I posted there

Comment on lines +1368 to +1370
if _ind:
# We only append a "counting" label for
# `ind` cases after the first ooccurrence
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you ale to find a case where both the dep and the ind were matching and the previous implementation separated them? I was not, unfortunately

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - I was thinking da.add would capture this case, but apparently it does not. In that case, the simpler defaultdict(int) solution may be preferable for now.

@rjzamora rjzamora marked this pull request as ready for review January 7, 2022 23:19
@rjzamora
Copy link
Member Author

@crusaderky - Are all these recent "windows-latest, 3.7" failures related to #8549, or are there other issues? I see that all recently-opened PRs are failing this CI step.

@crusaderky
Copy link
Collaborator

crusaderky commented Jan 10, 2022

This is #8549 :

FAILED dask/dataframe/tests/test_shuffle.py::test_set_index_consistent_divisions

This looks like a regression in this PR:

FAILED dask/tests/test_multiprocessing.py::test_unpicklable_args_generate_errors

@rjzamora
Copy link
Member Author

This looks like a regression in this PR

This may be true, but I probably won't bother "debugging" the minor changes here until I see Windows/Python-3.7 passing in other PRs :)


# Change new_dsk key to match use_dep
if dep != use_dep and dep in new_dsk:
new_dsk[use_dep] = new_dsk.pop(dep)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay that we're changing the name of the key in the dsk? If dep == root, I could see this being a problem, because the resulting Blockwise's output key wouldn't be in its dsk.

It may be that it's impossible for this case to happen for root, but I think we should at least prove and assert that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only changing the name of an internal key that we are adding to the subgraph during fusion (note that dsk is the internal subgraph for the root blockwise layer). The name of the root layer we are fusing into does not change here. However, I see your concern and can add an explicit assertion that this is not happening.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that if dep == root and dep != use_dep, this will break the SubgraphCallable that ultimately gets produced, because the outkey (which is root) won't exist in the dsk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another observation: if dep == root, then by definition no other keys should depend on dep, so just leaving it in the dict under dep instead of use_dep should be fine. Basically this would be if dep != root and dep != use_dep and dep in new_dsk.

Comment on lines +1365 to +1366
use_dep = dep + f"_{seen[dep]}" if seen[dep] else dep
seen[dep] += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use_dep = dep + f"_{seen[dep]}" if seen[dep] else dep
seen[dep] += 1
use_dep = f"{dep}_{''.join(ind)}"

Still wondering whether we actually need the seen set, or whether we can just include the indices in the name in every case. As far as I understand, the key names in this dsk are an implementation detail of Blockwise and could be changed to whatever as long as the dask is still valid (is this correct??).

Making this change on your branch makes your test_optimize_blockwise_duplicate_dependency pass, along will all other Array tests besides dask/array/tests/test_atop.py::test_rewrite. The reason test_rewrite fails is that its cases include handwritten dasks of the expected output, so obviously the key names there no longer match. However, I don't know if those key names matter from a correctness standpoint, or if we could just adjust those tests. (The fact that they're not failing with this branch just indicates to me that none of those cases involve this situation of a repeated dep, so they're incomplete anyway.)

I'd guess the only case where the key names matter is with output, which (as I mentioned in the other comment) probably needs to be addressed here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - We could certainly add an enumeration in every key that is added to the root subgraph during fusion. This should result in "correct" behavior, and changing the tests should be fine in my opinion. With that said, this approach wont really simplify the code much.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this very complicated code, I'll take any simplification I can get :)

It also is slightly different behavior, and I'm not sure which one we want. In my version, two repeated dep names with the same (user-given?) indices will still trample each other. Maybe this is good, because they should be considered identical? Or maybe we want to consider every layer unique, even if the names and indices match. I'm really not sure.

@rjzamora
Copy link
Member Author

@gjoseph92 - Thanks for the helpful review here! I revised the PR to align with the "append the index info every time" approach. It makes the PR larger, but the logic is a bit simpler. Let me know if you had something else in mind, or if you have further thoughts/suggestions :)

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I feel like this is slightly easier to intuit. I'm still not sure my suggestion was the right approach though compared to what you originally did! I really don't know if matching dep-ind combos should be considered distinct or equivalent.

indices.append(index)
new_dsk = subs(inputs[dep].dsk, sub)

# Change new_dsk key to match local_dep
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just still wondering about this. Can we be guaranteed that the dsk contains no references to the key dep? I just don't know. In practice I think the graphs are totally formulaic, but in principle couldn't the dsk of a Blockwise be any arbitrary valid graph?

Would we want to do this renaming via subs or something just to be sure we're catching everything? In the same way we use subs because we're renaming the blockwise tokens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just still wondering about this. Can we be guaranteed that the dsk contains no references to the key dep? I just don't know.

I'm definitely not sure I understand what you are asking here. Maybe you could share a hypothetical graph with the problem you are thinking of?

In practice I think the graphs are totally formulaic, but in principle couldn't the dsk of a Blockwise be any arbitrary valid graph?

I am pretty sure the answer is no. At this point dsk is not a SubgraphCallable yet, and so it's guaranteed to be pretty simple (i.e. {<layer-name>: (<function>, <one-or-more-blockwise-tokens>)}).

Would we want to do this renaming via subs or something just to be sure we're catching everything? In the same way we use subs because we're renaming the blockwise tokens?

When we rename blockwise tokens, we are modifying the values (and not the keys) in the dictionary. In this case, we are only changing a single key. I am not really understanding what would be missed here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry was basically that the dsk could in theory include references to the name of a dependency inside its tasks:

dask/dask/blockwise.py

Lines 327 to 329 in 1a1abfd

dsk: dict
A small graph to apply per-output-block. May include keys from the
input indices.

Notice the second sentence. We would have just changed the name of that input, without changing its name within the tasks.

However, what I missed was that the dsk gets evaluated inside a SubgraphCallable, so its scope, essentially, is restricted. It's impossible for it to reference keys outside of itself.
And the other thing I missed is that that docstring is basically wrong: the dsk cannot reference the inputs as keys. It has to use special (undocumented) keys generated by blockwise_token.

Put together, those facts mean that the only keys it's possible for dsk to reference are _0, _1, etc. And those keys are already rewritten during the optimization process. If the dsk uses the names of one of its inputs literally, instead of the _N argument for it, it'll fail whether or it it gets optimized.

At this point dsk is not a SubgraphCallable yet, and so it's guaranteed to be pretty simple (i.e. {<layer-name>: (<function>, <one-or-more-blockwise-tokens>)})

I would not say dsk is guaranteed to be simple. (Counterexample: the Blockwise layer returned by rewrite_blockwise! Its graph is rather complicated.) If you happen to use the blockwise.blockwise function to make your Blockwise layer instance, then yes the graph will look like what you said, but the Blockwise class itself can accept any dsk of arbitrary complexity. There's no documentation or validation saying otherwise, so I'd say there's nothing in the contract about the dsk. This should be tightened a up a lot: for example, the docs should say that the dsk must use blockwise tokens as its input keys, and that it must use the key corresponding to output as its result.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - Okay. Thanks for clarifying your thoughts here. I think I agree with everything you said :)

[
[
(b, "jk", {b: (add, _0, _1)}, [(a, "jk"), (2, None)]),
(c, "ijk", {c: (add, _0, _1)}, [(b, "ij"), (b, "jk")]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you also test repeating the same name-indices combination as an input? Like

(x, "ijk", {c: (add, _0, _1, _2)}, [(b, "ij"), (b, "jk"), (b, "ij")]),

or also

(y, "ijk", {c: (sub, _0, _1)}, [(b, "ij"), (b, "jk")]),

(so b_i_j and b_j_k show up as deps multiple times in the list of inputs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added another test case, but I'm not sure if it covers the case you are thinking of here.

@gjoseph92 gjoseph92 mentioned this pull request Jan 12, 2022
2 tasks
@jrbourbeau jrbourbeau mentioned this pull request Jan 12, 2022
2 tasks
@rjzamora
Copy link
Member Author

Thanks again for the reviews here @gjoseph92 - Sorry that I didn't see youre review from yesterday until this evening!

I'm still not sure my suggestion was the right approach though compared to what you originally did! I really don't know if matching dep-ind combos should be considered distinct or equivalent.

I feel pretty confident that it makes sense to always encode the index along with the layer name when it is fused into the subgraph of another layer. Also, I think it is pretty clear that two instances of the same dep-ind combination should be considered equivalent within the context of the same subgraph (which is what we are considereing here). The problem that motivated this PR was when two distinct dep-ind combinations were erroneously being treated as equivalent.

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've played with this a lot and I'm confident this change doesn't introduce any new problems.

Also, I think it is pretty clear that two instances of the same dep-ind combination should be considered equivalent within the context of the same subgraph

Having thought about this more, I agree. The dep names should already be distinct identifiers, and the indices are normalized by the time they reach this point.

indices.append(index)
new_dsk = subs(inputs[dep].dsk, sub)

# Change new_dsk key to match local_dep
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry was basically that the dsk could in theory include references to the name of a dependency inside its tasks:

dask/dask/blockwise.py

Lines 327 to 329 in 1a1abfd

dsk: dict
A small graph to apply per-output-block. May include keys from the
input indices.

Notice the second sentence. We would have just changed the name of that input, without changing its name within the tasks.

However, what I missed was that the dsk gets evaluated inside a SubgraphCallable, so its scope, essentially, is restricted. It's impossible for it to reference keys outside of itself.
And the other thing I missed is that that docstring is basically wrong: the dsk cannot reference the inputs as keys. It has to use special (undocumented) keys generated by blockwise_token.

Put together, those facts mean that the only keys it's possible for dsk to reference are _0, _1, etc. And those keys are already rewritten during the optimization process. If the dsk uses the names of one of its inputs literally, instead of the _N argument for it, it'll fail whether or it it gets optimized.

At this point dsk is not a SubgraphCallable yet, and so it's guaranteed to be pretty simple (i.e. {<layer-name>: (<function>, <one-or-more-blockwise-tokens>)})

I would not say dsk is guaranteed to be simple. (Counterexample: the Blockwise layer returned by rewrite_blockwise! Its graph is rather complicated.) If you happen to use the blockwise.blockwise function to make your Blockwise layer instance, then yes the graph will look like what you said, but the Blockwise class itself can accept any dsk of arbitrary complexity. There's no documentation or validation saying otherwise, so I'd say there's nothing in the contract about the dsk. This should be tightened a up a lot: for example, the docs should say that the dsk must use blockwise tokens as its input keys, and that it must use the key corresponding to output as its result.

@gjoseph92
Copy link
Collaborator

@jrbourbeau we think this is ready, might be nice to get in the release?

@rjzamora
Copy link
Member Author

rerun tests

@jrbourbeau jrbourbeau changed the title Fix optimize_blockwise bug for duplicate dependency names Fix optimize_blockwise bug for duplicate dependency names Jan 14, 2022
@jrbourbeau
Copy link
Member

As this is a relatively important bug fix, and @gjoseph92 @rjzamora are confident about the changes in this PR, let's go ahead and include it. @rjzamora, would like the honor of pressing the green button?

@rjzamora rjzamora merged commit 27cf9bb into dask:main Jan 14, 2022
@rjzamora rjzamora deleted the optimize-blockwise-fix branch January 14, 2022 17:22
@ian-r-rose
Copy link
Collaborator

Thanks @rjzamora and @gjoseph92!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Blockwise optimization bug when collections appear multiple times

5 participants