Inject SDK-side flattens while handling input/output coder mismatch in flattens.#34641
Merged
lostluck merged 5 commits intoapache:masterfrom Apr 16, 2025
Merged
Conversation
Contributor
|
Assigning reviewers. If you would like to opt out of this review, comment R: @jrmccluskey for label go. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
17 tasks
…the fix. The test is also included in the test suite of flink, samza and spark, but without transcoding until their corresponding FRs are resolved.
9d6ec0a to
369859b
Compare
lostluck
approved these changes
Apr 16, 2025
Contributor
lostluck
left a comment
There was a problem hiding this comment.
I like how clean this ended up. Great work.
| func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult { | ||
| if !h.config.SDKFlatten { | ||
| t.EnvironmentId = "" // force the flatten to be a runner transform due to configuration. | ||
| if !h.config.SDKFlatten && !strings.HasPrefix(tid, "ft_") { |
Contributor
There was a problem hiding this comment.
I'll note that there's no user serviceable way to do these configurations at the moment, and it really was a hard binary. It would be acceptable to remove the SDKFlatten option in favour of just a single approach that biases to runner flattens, but does the SDK flatten to get around these issues.
| stg.internalCols = internal | ||
| // Sort the keys of internal producers (from stageFacts.PcolProducers) | ||
| // to ensure deterministic order for stable tests. | ||
| sort.Strings(stg.internalCols) |
Contributor
There was a problem hiding this comment.
Good find! I thought I had everything deterministic already.
This was referenced Apr 18, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Following the idea of identity transform (#32930 (comment)) and valuable discussions with @lostluck, I have developed a "simpler and more general fix" for Prism runner issues involving
Flatten. This approach works not only forFlatten->Flattenscenarios, but also for others likeGroupBy->Flatten.The key observations are:
Flattenonly passes data through from its input PCollections.Flattenoutput.Previous approaches (including #34602) attempted to fix this by by overwriting the upstream coders with the
Flattenoutput coder. This works in many scenarios, especially when the upstream transform is a SDK-side transform or anotherFlatten(where #34602 ensured coder propagation).However, modifying upstream PCollection coders can cause side effects.
groupby->flattenexample (org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2), changingGroupBy's coder has no effect on the actual encoded data, becauseGroupByalways generateK,Iterable<V>. Downstream transforms expecting data encoded with theFlatten's coder then fail during decoding.C = [A, B] | Flatten(), D = A | GroupByKey(). IfFlattenoverwrites the coder associated with PCollectionA, theGroupByKeyonAmay fail due to coder incompatibility.The proposed solution avoids the pitfalls of modifying upstream coders. Instead, it inserts an identity transform (specifically, an SDK-side Flatten) before the Runner
Flatten. This identity transform implicitly converts the input PCollection's elements to use the same output coder as the target Runner Flatten, ensuring the correctly encoded data emitted from it.fixes #32930
fixes #34643
The PR also fixes the flaky test of
Test_preprocessor_preProcessGraph/ignoreEmptyAndIdentityTransformunder https://github.com/apache/beam/actions/workflows/beam_PreCommit_Go.yml.