Skip to content

Introduce OutputBuilder in Java SDK#34902

Merged
kennknowles merged 3 commits intoapache:masterfrom
kennknowles:OutputBuilder
Sep 23, 2025
Merged

Introduce OutputBuilder in Java SDK#34902
kennknowles merged 3 commits intoapache:masterfrom
kennknowles:OutputBuilder

Conversation

@kennknowles
Copy link
Copy Markdown
Member

@kennknowles kennknowles commented May 9, 2025

With the addition of extended metadata for elements, we need to make code - especially user code - more robust to added metadata fields. This pull request adds OutputBuilder which should be used to builds new values, and builders should always be obtained by pulling metadata from some context (for example a currently-in-process element) when possible.

Concretely, this pull request

  • Adds OutputBuilder as a public interface extending WindowedValue.
  • Adds WindowedValues.builder(WindowedValueReceiver) to produce an OutputBuilder that will output to the provider receiver.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@kennknowles kennknowles changed the title DO NOT MERGE: demonstrating impact of OutputBuilder DO NOT MERGE: demonstrating OutputBuilder in Java SDK May 9, 2025
@kennknowles kennknowles changed the title DO NOT MERGE: demonstrating OutputBuilder in Java SDK Feedback requested: introduce OutputBuilder in Java SDK Jun 2, 2025
@kennknowles kennknowles force-pushed the OutputBuilder branch 3 times, most recently from b4f82b1 to 52afeda Compare June 20, 2025 20:12
@kennknowles kennknowles force-pushed the OutputBuilder branch 2 times, most recently from 4289ea0 to fe0763a Compare July 21, 2025 22:43
@derrickaw
Copy link
Copy Markdown
Collaborator

waiting on author

@kennknowles
Copy link
Copy Markdown
Member Author

R: @stankiewicz

@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@kennknowles
Copy link
Copy Markdown
Member Author

I think Radek you have the most context, and can critique the details where I have missed something. Especially in FnApiDoFnRunner I see a couple spots where I think I must have gotten impatient - wherever I decompose a WindowedValue and then it gets put back together, we will lose metadata (once it is introduced).

I realize I also need some end-user tests of the OutputBuilder. It is very very thoroughly tested by how it is used internal to the SDK but there are no examples of use in a DoFn in an actual pipeline.

@kennknowles
Copy link
Copy Markdown
Member Author

I also need to check that this does not create noticeable allocation/GC overhead by the new style.

PaneInfo paneInfo) {
checkTimestamp(timestamp);
outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo));
builder(output).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just like it so much!


/** Create a Builder that takes element metadata from the provideed delegate. */
public static <T> Builder<T> builder(WindowedValue<T> template) {
return new Builder<T>()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thi is great, so if we start to expand with new fields, this is the place to propagate them.

Copy link
Copy Markdown
Contributor

@stankiewicz stankiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read all of it, this will massively help extend WindowedValue with new fields while avoiding the journey of extending each output method. thanks!

@kennknowles
Copy link
Copy Markdown
Member Author

Thank you!

OK, now it looks like somehow my attempts to make the distroless integration tests use the HEAD container caused actually maybe other tests to also not use the HEAD container. I know that internal tests are all green but I will see which of these tests are actually broken versus not configured to use the right container.

@kennknowles
Copy link
Copy Markdown
Member Author

OK! All green and I am going to merge.

.withValue(value)
.setReceiver(
windowedValue -> {
checkTimestamp(windowedValue.getTimestamp());
Copy link
Copy Markdown
Contributor

@kellen kellen Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kennknowles Can you explain the reasoning about checking the timestamp here? In Scio we have some DoFns that propagate the element timestamp and this additional check breaks their current behavior. Not sure if we've been doing the wrong thing or not

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was only to maintain functionality. I am sure that I debugged a failing test around "invalid timestamps are correctly rejected" but I cannot recall exactly which one it is. I'm re-running some suites to find it and I'll report back here. I definitely don't want any currently-correct code to start failing!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! For context, one of the failing tests is for our AsyncDoFn, where we process elements, spinning off a set of futures while waiting for their respective responses, awaiting on those responses, and outputting those elements back to their original windows with their original timestamps.

The corresponding failing test is here, giving the following exception:

[info] - should propagate element metadata *** FAILED *** (45 milliseconds)
[info]   org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
[info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
[info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
[info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[info]   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
[info]   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
[info]   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
[info]   at com.spotify.scio.ScioContext.execute(ScioContext.scala:671)
[info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:658)
[info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:646)
[info]   ...
[info]   Cause: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:462)
[info]   at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
[info]   at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
[info]   at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info]   at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
[info]   at com.spotify.scio.transforms.BaseAsyncLookupDoFn.lambda$processElement$0(BaseAsyncLookupDoFn.java:190)
[info]   at com.spotify.scio.transforms.BaseAsyncLookupDoFn.flush(BaseAsyncLookupDoFn.java:310)
[info]   ...

Copy link
Copy Markdown
Member Author

@kennknowles kennknowles Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. This is actually a red herring. The error is not coming from this checkTimestamp. This is the portable / Dataflow v2 path.

The SimpleDoFnRunner is the v1 codepath and I see it now:

I've updated #36822 to remove that check. On the other hand, it looks to be correct. I wonder if the problem is that elem is not captured by the lambda and is being accessed as a field later when the local element has changed when the AsyncDoFn is outputting and it is validating the timestamp against a later element. SimpleDoFnRunner is not designed for concurrent or async use, so that could be the problem. A simple fix that would preserve safety would be to capture it in a local variable that ends up in the closure of the lambda.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/beam/pull/36838/files might fix it - if you have a test environment where it is easy to pull this in and try it (or test against a snapshot once the nightlies run)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants