Skip to content

Add support for global sequence processing to the "ordered" extension in Java SDK#32540

Merged
damccorm merged 36 commits intoapache:masterfrom
slilichenko:master
Oct 9, 2024
Merged

Add support for global sequence processing to the "ordered" extension in Java SDK#32540
damccorm merged 36 commits intoapache:masterfrom
slilichenko:master

Conversation

@slilichenko
Copy link
Copy Markdown
Contributor

@slilichenko slilichenko commented Sep 23, 2024

Global sequence processing is used to ensure that events for a given key are only processed when it's guaranteed that they all the elements for this particular have been received.

Consider a PCollection which contains these event tuples (first element of the tuple is the global sequence number):
[1, key1, data], [2, key2, data], [3, key1, data], [4, key1, data], [5, key2, data], [7, key2, data]. Elements for key1 must be processed in the following order: 1, 3, 4. Elements for key2 must be processed in the following order: 2, 5. Event with sequence 7 can't be processed because there is a missing sequence 6.

The approach used to implement ordered processing in the presence of global sequencing:

  • Generate (periodically for streaming pipelines, once for batch) the side input which contains the maximum contiguous range of sequences across all keys and the maximum timestamp of events in that range.
  • Use this side input by the DoFns which process events to a) store in the per-key processing state the latest maximum range and b) set up an event time based timer to fire off at the latest timestamp.
  • Save all the events for a particular key into the ordered list state (with some optimization exceptions) because the events received in this DoFn are per key - they are not guaranteed to be contiguous and can't be processed right away.
  • Once the timer fires off there is a guarantee that all the events for a given key have been received up to the firing timestamp. The latest contiguous range stored in the processing state is used to limit the events in the ordered list state that can be safely processed in a loop.

This high level diagram illustrates the overall approach.

There are dedicated unit tests to cover both per-key and global sequence processing. Please refer to them to understand the details of use cases.

Note that the batch unit tests for global processing don't automatically run under global sequencing. This is due to the apparent incorrectness of the DirectRunner implementation (it is supposed to block processing event processing DoFns until the side input is calculated once). The batch processing tests were successfully run manually using DataflowRunner. Additional work will be needed to either fix the DirectRunner, switch to PrismRunner (when it supports all the primitives used in this transform), or enable test to run a DataflowRunner.

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants