[RFC] Define and document per-key ordering semantics for runners#15378
[RFC] Define and document per-key ordering semantics for runners#15378pabloem merged 9 commits intoapache:masterfrom
Conversation
77198a9 to
23d0955
Compare
23d0955 to
68229bc
Compare
|
Run Dataflow ValidatesRunner |
4a368c3 to
cb053bf
Compare
|
Run Java Flink PortableValidatesRunner Streaming |
|
Run Java Samza PortableValidatesRunner |
|
|
Generated website changes are here: https://apache-beam-website-pull-requests.storage.googleapis.com/15378/documentation/runners/capability-matrix/index.html |
apilloud
left a comment
There was a problem hiding this comment.
This documenting the existing state of the world so LGTM.
I would lean towards making this a bit more explicit: have the direct runner break ordering by default and have some way for the user to indicate a pipeline requires it. We can then fail the pipeline on runners that don't support ordering if it is required.
| matched = matched == null ? 0 : matched; | ||
| if (matched == -1) { | ||
| // When matched is set to -1, it means that we have met an error, and elements on this | ||
| // key are not matched anymore - thus we ignore all inputs. |
There was a problem hiding this comment.
nit: Possibly make this explicit with a return as well?
| if (matched == -1) { | ||
| // When matched is set to -1, it means that we have met an error, and elements on this | ||
| // key are not matched anymore - thus we ignore all inputs. | ||
| } else if (matched < this.perKeyElements.size() |
There was a problem hiding this comment.
It appears matched >= this.perKeyElements.size() should be its own "Got more elements than expected" error case and throw an exception.
| matchedElements.write(-1); | ||
| receiver.output(KV.of(elm.getKey(), false)); | ||
| } else { | ||
| assert this.perKeyElements.get(matched).equals(elm.getValue()) |
There was a problem hiding this comment.
nit: I don't think this assert is going to work, it is the case above?
There was a problem hiding this comment.
the case above is denied (!...?)
There was a problem hiding this comment.
Sorry if my comment wasn't clear, here is what I'm thinking: the case above is mached < this.perKeyElements.size() && !... the assert triggers on !..., so the assert actually triggers on matched >= this.perKeyElements.size().
There was a problem hiding this comment.
After the latest update you should never be able to hit this assert, you can leave it as is.
There was a problem hiding this comment.
Java assert is, I think, less preferable than a library that generates more elaborate error messages. Also, failures within a user DoFn may be swallowed by a runner. Can the tests be described in terms of a dead-letter output and PAssert?
| Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).stream() | ||
| .map(elm -> String.format("k%s", elm)) | ||
| .collect(Collectors.toList()); | ||
| PCollection<KV<String, Integer>> kvSeeds = |
There was a problem hiding this comment.
Should we test types other than Integer?
There was a problem hiding this comment.
hmmm ... I could try to parameterize the test with different types -though IDK if the Parameterized JUnit runner integrates well with the ValidatesRunner framework. Do you have a suggestion? Should I try adding a more complex type instead of just ints?
There was a problem hiding this comment.
I don't think you'll be able to parameterize this, you'll need different test methods for each type.
I believe runners see these as byte[] today, so the behavior here actually depends on coders, so changes in the coder will change the ordering. (Future relational work will likely make some runners aware of the coder contents as well.) The byte coder is the trivial passthrough type, so Byte might actually be the best for a basic "does the runner produce deterministic ordering" test.
| public void testSingleCallOrderingWithShuffle() { | ||
| // Here we test that the output of a single process call in a DoFn will be output in order | ||
| List<Integer> perKeyElements = | ||
| Lists.newArrayList(-8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8); |
There was a problem hiding this comment.
Sorry for the late addition: You might want to test some more extreme values. VarIntCoder is variable sized, so around those size boundaries might be good to test.
kennknowles
left a comment
There was a problem hiding this comment.
I think that per-key-path ordered delivery is subtle enough it requires a pseudo-mathy treatment in a design doc. I might be behind on dev@ threads but want to check that this has happened. Tests are a useful executable "spec" but when we find a bug that wasn't covered by tests, we go back to the math docs to build the new correct test.
|
|
||
| /** | ||
| * Category tag for validation tests which rely on a runner providing per-key ordering in between | ||
| * transforms in the same stage. Tests tagged with {@link UsesPerKeyOrderInStage} should be run for |
There was a problem hiding this comment.
"Stage" is not a concept in the Beam model, unless you also define it elsewhere in this PR. (commenting as I go)
There was a problem hiding this comment.
renamed this to bundle. WDYT?
| /** | ||
| * Category tag for validation tests which rely on a runner providing per-key ordering. Tests tagged | ||
| * with {@link UsesPerKeyOrderedDelivery} should be run for runners which support key-to-key | ||
| * ordering of elements across shuffle / stage boundaries. |
There was a problem hiding this comment.
"shuffle" is also not a model concept. For example runners can choose to move shuffles around so then the ordered delivery might change. Unless you can demonstrate/require that allowable optimizations preserve the ordered delivery you are looking for.
There was a problem hiding this comment.
renamed to bundle and across bundle boundaries. WDYT?
|
|
||
| @Test | ||
| @Category(ValidatesRunner.class) | ||
| @Category({ValidatesRunner.class, UsesParDoLifecycle.class}) |
| matchedElements.write(-1); | ||
| receiver.output(KV.of(elm.getKey(), false)); | ||
| } else { | ||
| assert this.perKeyElements.get(matched).equals(elm.getValue()) |
There was a problem hiding this comment.
Java assert is, I think, less preferable than a library that generates more elaborate error messages. Also, failures within a user DoFn may be swallowed by a runner. Can the tests be described in terms of a dead-letter output and PAssert?
|
Run Website_Stage_GCS PreCommit |
|
|
||
| We say that the Beam runner supports **key-ordered delivery** if it guarantees | ||
| that these two events will be observed downstream in the same order, | ||
| independently of the kind of transmission. |
There was a problem hiding this comment.
Although it seems to be mentioned I think it would be good to restate here explicitly, that this holds if and only if the two PTransforms are directly connected - or at least there is no intermediate grouping transform in between. "A downstream PCollection" in the above paragraph might be interpreted as any downstream PCollection, which does not hold.
There was a problem hiding this comment.
I was thinking about this comment and it occurs to me that you have to have key-limited parallelism in the producer of course. I don't think this is required anywhere in the model (state/timers require key+window limited parallelism) and also not mentioned here.
There was a problem hiding this comment.
What I meant was that if we we have a chain of transforms as follows:
A -> B -> C
if all these transforms are stateful transforms, then there is no guarantee for ordering of elements emitted from A arriving at C. The only exception would be when there is no change in key, because then we can prove that the ordering will be preserved due to transitivity. If key between A and B changes, then there is no guarantee for ordering at C (even if the key changes back to the same as emitted from A).
There was a problem hiding this comment.
Agree with both the above points. Should it explicitly say the following?
- Key-ordered deliver is only guaranteed between immediately connected elements.
- To enforce key ordering both producer and consumer require key limited parallelism.
Or are these statements not accurate?
There was a problem hiding this comment.
Additionally, should it refer to stages or elements? When building a pipeline you don't necessarily know what stages will result from any fusion that occurs, so would you specify a requirement for key-ordered delivery between elements rather than stages?
There was a problem hiding this comment.
Hi all! Sorry about the delay, but I've finally tried to address these comments. Can you please take a look and LMK what you think>
|
@pabloem - Could you respond to the open comments? |
| - class: dataflow | ||
| l1: "Partially" | ||
| l2: | ||
| l3: Dataflow performs different shuffling algorithms for batch and streaming. Dataflow guarantees key-ordered delivery in streaming, though not in batch. |
There was a problem hiding this comment.
Is there any reference that this is true? It is something i am trying to figure out at the moment.
There was a problem hiding this comment.
hm we have not stated this in Dataflow documentation (I'm working on that), but it's true for streaming.
There was a problem hiding this comment.
Ok, i guess it makes sense to define the concept here first.
| This may allow the runner to avoid serializing elements; instead, the runner | ||
| can just pass the elements in memory. | ||
|
|
||
| Passing elements between transforms that are running on the same worker is |
There was a problem hiding this comment.
I've finally gotten around to addressing your comments. Can you please take another look?
I also wrote this doc to try to formalize a little more: https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
|
@pabloem - Yeah looks good, the only comment would be around @kennknowles point. I don't think "key-limited parallelism" is mentioned anywhere else in the Beam model. My understanding is it does happen, despite not explicitly being documented anywhere i can see. I'd be interested how Dataflow handles scaling the number of workers and if the guarantees still hold for any key that starts getting handled by a different worker. |
je-ik
left a comment
There was a problem hiding this comment.
LGTM, but from higher level perspective, do we have any plans to support ordering regardless of the direct connection between the two PTransforms?
|
@kennknowles PTAL : ) |
|
@kennknowles PTAL - if no other comments by the end of the week, I'll merge form lazy consensus. |
|
moving forward as-is, but this is an area of active discussion, so please feel free to engage on this, and we cna work to keep this up to date. |
|
#15378 broke spark PVR as well: https://ci-beam.apache.org/job/beam_PreCommit_Java_PVR_Spark_Batch_Cron/5140/ (build 5139 passes) This Jenkins Job was renamed and is still failing. PVR tests sickbayed in #16411 |
|
@pabloem @ibzib - Should this be reverted? (For reference https://issues.apache.org/jira/browse/BEAM-13522 is the tracker.) |
|
I don't think it should be reverted but better care around adding new test categories without verifying them should be done in the future. |
|
I've proposed making Flink/Spark validates runner tests precommits for |
|
We've converted portable Flink VR to a precommit already, I'll follow up with other jobs later. |
@ibzib, did the Spark VR also become a precommit? |
There are various use cases that can be implemented on top of Beam and its runners, but that need certain guarantees regarding the ordered transport of data in between execution stages/steps of a Beam Pipeline.
This PR proposes the following:
ValidatesRunnertests that allow us to verify runner behavior for per-key ordered delivery.With this, Beam users will be able to enable various order-dependent workloads on Beam/Dataflow with confidence that they need.
Results form ValidatesRunner tests:
r: @apilloud
r: @kennknowles
Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.