Introduce OutputBuilder in Java SDK#34902
Conversation
sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
Outdated
Show resolved
Hide resolved
0ae2e78 to
efe4648
Compare
efe4648 to
a888fb9
Compare
...va/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
Show resolved
Hide resolved
b4f82b1 to
52afeda
Compare
52afeda to
38d34d3
Compare
4289ea0 to
fe0763a
Compare
|
waiting on author |
fe0763a to
0a72b50
Compare
|
R: @stankiewicz |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
I think Radek you have the most context, and can critique the details where I have missed something. Especially in FnApiDoFnRunner I see a couple spots where I think I must have gotten impatient - wherever I decompose a WindowedValue and then it gets put back together, we will lose metadata (once it is introduced). I realize I also need some end-user tests of the OutputBuilder. It is very very thoroughly tested by how it is used internal to the SDK but there are no examples of use in a DoFn in an actual pipeline. |
|
I also need to check that this does not create noticeable allocation/GC overhead by the new style. |
| PaneInfo paneInfo) { | ||
| checkTimestamp(timestamp); | ||
| outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); | ||
| builder(output).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); |
There was a problem hiding this comment.
I just like it so much!
|
|
||
| /** Create a Builder that takes element metadata from the provideed delegate. */ | ||
| public static <T> Builder<T> builder(WindowedValue<T> template) { | ||
| return new Builder<T>() |
There was a problem hiding this comment.
thi is great, so if we start to expand with new fields, this is the place to propagate them.
stankiewicz
left a comment
There was a problem hiding this comment.
read all of it, this will massively help extend WindowedValue with new fields while avoiding the journey of extending each output method. thanks!
|
Thank you! OK, now it looks like somehow my attempts to make the distroless integration tests use the HEAD container caused actually maybe other tests to also not use the HEAD container. I know that internal tests are all green but I will see which of these tests are actually broken versus not configured to use the right container. |
|
OK! All green and I am going to merge. |
| .withValue(value) | ||
| .setReceiver( | ||
| windowedValue -> { | ||
| checkTimestamp(windowedValue.getTimestamp()); |
There was a problem hiding this comment.
Hey @kennknowles Can you explain the reasoning about checking the timestamp here? In Scio we have some DoFns that propagate the element timestamp and this additional check breaks their current behavior. Not sure if we've been doing the wrong thing or not
There was a problem hiding this comment.
My intention was only to maintain functionality. I am sure that I debugged a failing test around "invalid timestamps are correctly rejected" but I cannot recall exactly which one it is. I'm re-running some suites to find it and I'll report back here. I definitely don't want any currently-correct code to start failing!
There was a problem hiding this comment.
Thanks! For context, one of the failing tests is for our AsyncDoFn, where we process elements, spinning off a set of futures while waiting for their respective responses, awaiting on those responses, and outputting those elements back to their original windows with their original timestamps.
The corresponding failing test is here, giving the following exception:
[info] - should propagate element metadata *** FAILED *** (45 milliseconds)
[info] org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info] at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
[info] at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
[info] at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
[info] at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[info] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
[info] at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
[info] at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
[info] at com.spotify.scio.ScioContext.execute(ScioContext.scala:671)
[info] at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:658)
[info] at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:646)
[info] ...
[info] Cause: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info] at org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
[info] at org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
[info] at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:462)
[info] at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info] at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
[info] at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
[info] at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info] at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
[info] at com.spotify.scio.transforms.BaseAsyncLookupDoFn.lambda$processElement$0(BaseAsyncLookupDoFn.java:190)
[info] at com.spotify.scio.transforms.BaseAsyncLookupDoFn.flush(BaseAsyncLookupDoFn.java:310)
[info] ...
There was a problem hiding this comment.
Interesting. This is actually a red herring. The error is not coming from this checkTimestamp. This is the portable / Dataflow v2 path.
The SimpleDoFnRunner is the v1 codepath and I see it now:
- The failure is thrown from https://github.com/apache/beam/pull/34902/files/2cff4cc48abb7be6e7f91d3ab76dd6235c87938e#diff-560313cd06556c758d162abea624cab2646038cdabf483cdad69d392bf720a0bL252
- The added check is in this block https://github.com/apache/beam/pull/34902/files/2cff4cc48abb7be6e7f91d3ab76dd6235c87938e#diff-560313cd06556c758d162abea624cab2646038cdabf483cdad69d392bf720a0bR502
I've updated #36822 to remove that check. On the other hand, it looks to be correct. I wonder if the problem is that elem is not captured by the lambda and is being accessed as a field later when the local element has changed when the AsyncDoFn is outputting and it is validating the timestamp against a later element. SimpleDoFnRunner is not designed for concurrent or async use, so that could be the problem. A simple fix that would preserve safety would be to capture it in a local variable that ends up in the closure of the lambda.
There was a problem hiding this comment.
Confirmed the test that fails: https://develocity.apache.org/s/unfvg6wymazji/tests/overview?outcome=FAILED
There was a problem hiding this comment.
https://github.com/apache/beam/pull/36838/files might fix it - if you have a test environment where it is easy to pull this in and try it (or test against a snapshot once the nightlies run)
With the addition of extended metadata for elements, we need to make code - especially user code - more robust to added metadata fields. This pull request adds
OutputBuilderwhich should be used to builds new values, and builders should always be obtained by pulling metadata from some context (for example a currently-in-process element) when possible.Concretely, this pull request
OutputBuilderas a public interface extendingWindowedValue.WindowedValues.builder(WindowedValueReceiver)to produce anOutputBuilderthat will output to the provider receiver.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.