Skip to content

[RFC] Define and document per-key ordering semantics for runners#15378

Merged
pabloem merged 9 commits intoapache:masterfrom
pabloem:ordering-semantics
Dec 3, 2021
Merged

[RFC] Define and document per-key ordering semantics for runners#15378
pabloem merged 9 commits intoapache:masterfrom
pabloem:ordering-semantics

Conversation

@pabloem
Copy link
Copy Markdown
Member

@pabloem pabloem commented Aug 24, 2021

There are various use cases that can be implemented on top of Beam and its runners, but that need certain guarantees regarding the ordered transport of data in between execution stages/steps of a Beam Pipeline.

This PR proposes the following:

  • Defining the concept of per-key ordered delivery.
  • ValidatesRunner tests that allow us to verify runner behavior for per-key ordered delivery.
  • Adding a row to the Beam Capability Matrix where runner support for this is documented.

With this, Beam users will be able to enable various order-dependent workloads on Beam/Dataflow with confidence that they need.


Results form ValidatesRunner tests:


r: @apilloud
r: @kennknowles


Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@pabloem pabloem force-pushed the ordering-semantics branch 2 times, most recently from 77198a9 to 23d0955 Compare August 24, 2021 18:37
@pabloem pabloem force-pushed the ordering-semantics branch from 23d0955 to 68229bc Compare August 25, 2021 03:52
@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Aug 25, 2021

Run Dataflow ValidatesRunner

@pabloem pabloem force-pushed the ordering-semantics branch from 4a368c3 to cb053bf Compare August 26, 2021 04:59
@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Aug 26, 2021

Run Java Flink PortableValidatesRunner Streaming

@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Aug 26, 2021

Run Java Samza PortableValidatesRunner

@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Aug 26, 2021

@pabloem pabloem changed the title [WIP] Starting to define per-key ordering semantics for runners [WIP] Define and document per-key ordering semantics for runners Aug 26, 2021
@pabloem pabloem marked this pull request as ready for review August 26, 2021 22:02
@pabloem pabloem changed the title [WIP] Define and document per-key ordering semantics for runners [RFC] Define and document per-key ordering semantics for runners Aug 26, 2021
@apilloud
Copy link
Copy Markdown
Member

Copy link
Copy Markdown
Member

@apilloud apilloud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documenting the existing state of the world so LGTM.

I would lean towards making this a bit more explicit: have the direct runner break ordering by default and have some way for the user to indicate a pipeline requires it. We can then fail the pipeline on runners that don't support ordering if it is required.

matched = matched == null ? 0 : matched;
if (matched == -1) {
// When matched is set to -1, it means that we have met an error, and elements on this
// key are not matched anymore - thus we ignore all inputs.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Possibly make this explicit with a return as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (matched == -1) {
// When matched is set to -1, it means that we have met an error, and elements on this
// key are not matched anymore - thus we ignore all inputs.
} else if (matched < this.perKeyElements.size()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears matched >= this.perKeyElements.size() should be its own "Got more elements than expected" error case and throw an exception.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

matchedElements.write(-1);
receiver.output(KV.of(elm.getKey(), false));
} else {
assert this.perKeyElements.get(matched).equals(elm.getValue())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think this assert is going to work, it is the case above?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the case above is denied (!...?)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if my comment wasn't clear, here is what I'm thinking: the case above is mached < this.perKeyElements.size() && !... the assert triggers on !..., so the assert actually triggers on matched >= this.perKeyElements.size().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the latest update you should never be able to hit this assert, you can leave it as is.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java assert is, I think, less preferable than a library that generates more elaborate error messages. Also, failures within a user DoFn may be swallowed by a runner. Can the tests be described in terms of a dead-letter output and PAssert?

Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).stream()
.map(elm -> String.format("k%s", elm))
.collect(Collectors.toList());
PCollection<KV<String, Integer>> kvSeeds =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test types other than Integer?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm ... I could try to parameterize the test with different types -though IDK if the Parameterized JUnit runner integrates well with the ValidatesRunner framework. Do you have a suggestion? Should I try adding a more complex type instead of just ints?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you'll be able to parameterize this, you'll need different test methods for each type.

I believe runners see these as byte[] today, so the behavior here actually depends on coders, so changes in the coder will change the ordering. (Future relational work will likely make some runners aware of the coder contents as well.) The byte coder is the trivial passthrough type, so Byte might actually be the best for a basic "does the runner produce deterministic ordering" test.

public void testSingleCallOrderingWithShuffle() {
// Here we test that the output of a single process call in a DoFn will be output in order
List<Integer> perKeyElements =
Lists.newArrayList(-8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late addition: You might want to test some more extreme values. VarIntCoder is variable sized, so around those size boundaries might be good to test.

* A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that. All good.

Copy link
Copy Markdown
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that per-key-path ordered delivery is subtle enough it requires a pseudo-mathy treatment in a design doc. I might be behind on dev@ threads but want to check that this has happened. Tests are a useful executable "spec" but when we find a bug that wasn't covered by tests, we go back to the math docs to build the new correct test.


/**
* Category tag for validation tests which rely on a runner providing per-key ordering in between
* transforms in the same stage. Tests tagged with {@link UsesPerKeyOrderInStage} should be run for
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Stage" is not a concept in the Beam model, unless you also define it elsewhere in this PR. (commenting as I go)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed this to bundle. WDYT?

/**
* Category tag for validation tests which rely on a runner providing per-key ordering. Tests tagged
* with {@link UsesPerKeyOrderedDelivery} should be run for runners which support key-to-key
* ordering of elements across shuffle / stage boundaries.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"shuffle" is also not a model concept. For example runners can choose to move shuffles around so then the ordered delivery might change. Unless you can demonstrate/require that allowable optimizations preserve the ordered delivery you are looking for.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to bundle and across bundle boundaries. WDYT?


@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unrelated.

matchedElements.write(-1);
receiver.output(KV.of(elm.getKey(), false));
} else {
assert this.perKeyElements.get(matched).equals(elm.getValue())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java assert is, I think, less preferable than a library that generates more elaborate error messages. Also, failures within a user DoFn may be swallowed by a runner. Can the tests be described in terms of a dead-letter output and PAssert?

@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Sep 16, 2021

Run Website_Stage_GCS PreCommit


We say that the Beam runner supports **key-ordered delivery** if it guarantees
that these two events will be observed downstream in the same order,
independently of the kind of transmission.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it seems to be mentioned I think it would be good to restate here explicitly, that this holds if and only if the two PTransforms are directly connected - or at least there is no intermediate grouping transform in between. "A downstream PCollection" in the above paragraph might be interpreted as any downstream PCollection, which does not hold.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this comment and it occurs to me that you have to have key-limited parallelism in the producer of course. I don't think this is required anywhere in the model (state/timers require key+window limited parallelism) and also not mentioned here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was that if we we have a chain of transforms as follows:

A -> B -> C

if all these transforms are stateful transforms, then there is no guarantee for ordering of elements emitted from A arriving at C. The only exception would be when there is no change in key, because then we can prove that the ordering will be preserved due to transitivity. If key between A and B changes, then there is no guarantee for ordering at C (even if the key changes back to the same as emitted from A).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with both the above points. Should it explicitly say the following?

  • Key-ordered deliver is only guaranteed between immediately connected elements.
  • To enforce key ordering both producer and consumer require key limited parallelism.

Or are these statements not accurate?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, should it refer to stages or elements? When building a pipeline you don't necessarily know what stages will result from any fusion that occurs, so would you specify a requirement for key-ordered delivery between elements rather than stages?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all! Sorry about the delay, but I've finally tried to address these comments. Can you please take a look and LMK what you think>

@aaltay
Copy link
Copy Markdown
Member

aaltay commented Oct 7, 2021

@pabloem - Could you respond to the open comments?

- class: dataflow
l1: "Partially"
l2:
l3: Dataflow performs different shuffling algorithms for batch and streaming. Dataflow guarantees key-ordered delivery in streaming, though not in batch.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reference that this is true? It is something i am trying to figure out at the moment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm we have not stated this in Dataflow documentation (I'm working on that), but it's true for streaming.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i guess it makes sense to define the concept here first.

This may allow the runner to avoid serializing elements; instead, the runner
can just pass the elements in memory.

Passing elements between transforms that are running on the same worker is
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These definitions are slightly different to the glossary ones for fusion and stage. Maybe just have a link in the last bullet point above saying runners might use a strategy called fusion which combines elements into stages?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Member Author

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've finally gotten around to addressing your comments. Can you please take another look?

@kennknowles @je-ik @hughack

I also wrote this doc to try to formalize a little more: https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#

@hughack
Copy link
Copy Markdown

hughack commented Nov 13, 2021

@pabloem - Yeah looks good, the only comment would be around @kennknowles point. I don't think "key-limited parallelism" is mentioned anywhere else in the Beam model. My understanding is it does happen, despite not explicitly being documented anywhere i can see. I'd be interested how Dataflow handles scaling the number of workers and if the guarantees still hold for any key that starts getting handled by a different worker.

Copy link
Copy Markdown
Contributor

@je-ik je-ik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but from higher level perspective, do we have any plans to support ordering regardless of the direct connection between the two PTransforms?

@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Nov 16, 2021

@kennknowles PTAL : )

@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Nov 29, 2021

@kennknowles PTAL - if no other comments by the end of the week, I'll merge form lazy consensus.

@pabloem pabloem merged commit 862ece1 into apache:master Dec 3, 2021
@pabloem pabloem deleted the ordering-semantics branch December 3, 2021 17:52
@pabloem
Copy link
Copy Markdown
Member Author

pabloem commented Dec 3, 2021

moving forward as-is, but this is an area of active discussion, so please feel free to engage on this, and we cna work to keep this up to date.

@lukecwik
Copy link
Copy Markdown
Member

lukecwik commented Jan 1, 2022

#15378 broke spark PVR as well: https://ci-beam.apache.org/job/beam_PreCommit_Java_PVR_Spark_Batch_Cron/5140/ (build 5139 passes)

This Jenkins Job was renamed and is still failing.

PVR tests sickbayed in #16411

@aaltay
Copy link
Copy Markdown
Member

aaltay commented Jan 5, 2022

@pabloem @ibzib - Should this be reverted? (For reference https://issues.apache.org/jira/browse/BEAM-13522 is the tracker.)

@lukecwik
Copy link
Copy Markdown
Member

lukecwik commented Jan 6, 2022

I don't think it should be reverted but better care around adding new test categories without verifying them should be done in the future.

@ibzib
Copy link
Copy Markdown

ibzib commented Jan 6, 2022

I've proposed making Flink/Spark validates runner tests precommits for runners/flink and runners/spark file changes respectively, as well as sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ since often new VR tests are added there without checking if they pass on these runners. https://issues.apache.org/jira/browse/BEAM-13521

@aaltay
Copy link
Copy Markdown
Member

aaltay commented Jan 6, 2022

I agree with @lukecwik, and @ibzib's proposal sounds like a good idea to enforce that.

@ibzib - How do we proceed with your proposal? Are you planning to do that? Are you waiting for input on the proposal?

@ibzib
Copy link
Copy Markdown

ibzib commented Jan 6, 2022

We've converted portable Flink VR to a precommit already, I'll follow up with other jobs later.

@aaltay
Copy link
Copy Markdown
Member

aaltay commented Jan 14, 2022

We've converted portable Flink VR to a precommit already, I'll follow up with other jobs later.

@ibzib, did the Spark VR also become a precommit?

@ibzib
Copy link
Copy Markdown

ibzib commented Jan 14, 2022

We've converted portable Flink VR to a precommit already, I'll follow up with other jobs later.

@ibzib, did the Spark VR also become a precommit?

See my comments on #16345

@aaltay
Copy link
Copy Markdown
Member

aaltay commented Jan 15, 2022

We've converted portable Flink VR to a precommit already, I'll follow up with other jobs later.

@ibzib, did the Spark VR also become a precommit?

See my comments on #16345

Ack. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants