Revert three commits related to supporting custom coder in reshuffle#33414
Revert three commits related to supporting custom coder in reshuffle#33414chamikaramj merged 1 commit intoapache:masterfrom
Conversation
- Fix custom coder not being used in Reshuffle (global window) (apache#33339) - Fix custom coders not being used in Reshuffle (non global window) apache#33363 - Add missing to_type_hint to WindowedValueCoder apache#33403
|
LGTM. Thanks. |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Failed tests are unrelated to the changes. |
|
Just a thought, as this changes coders in some cases, should this be guarded by the update compatibility flag? https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L592 |
|
As the flag is defined in "StreamingOptions", is it previously designed for using in streaming case? |
Yes, it is designed for "streaming update" where you may have an in-progress aggregation in a shuffle when you do a pipeline update. Then you need the state to be compatible. |
|
And by "state" here this includes the in-flight encoded elements that were written by the pre-udpate version of the pipeline and will be read by the post-update code. Irrelevant for batch pipelines, but may become so if a runner supports some kind of a resume (from pause or failure) where the code might be updated. |
|
I see. Thank you both for the clarification! Regarding the possibly breaking changes that could be introduced by reverting this reverted PR, shall we add a new pipeline option rather than overloading this existing flag? Something like "use_legacy_reshuffle" can allow users to switch back to the previous reshuffle code path, where basically FastPrimitivesCoder are used inside regardless of coders/typehints specified by cx. |
|
I don't think we want to introduce a new flag. The point of the |
I am fine with using a flag like that to avoid adding more options, as I don't like too many options to remember too. However, I cannot deny that both the naming and where it is defined are a little bit confusing to me. We are somehow overloading "update" to both streaming and batch in this context. For batch, cx may only want to "create" a pipeline with existed code that works as before. There is no "update" on the pipeline from their perspective, only an update of Beam version. :) |
|
For a batch pipeline, setting this flag is a workaround, and they should fix their type hints. (We should make that clear in the docs.) |
It is causing some internal test failure so we revert it for now.