-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Feature Request][Proposal][Go SDK] Composite State and Timers #25894
Description
Apache Dev List Thread: https://lists.apache.org/thread/90yqjyc2g2kr2yj4gcl2xhd4zzwknn5x
Top post will remain evergreen, and be updated WRT suggestions or clarifications from discussion.
Objective
Enable higher level abstractions built around State and Timers, that are not locked to a single DoFn implementation. These composites would be built from State and Timer primitives, or other state and timer composites. This would enable re-usable patterns either provided by Beam, or developed by users themselves.
Background
As presently designed for Timers and implemented for State, users must use State and Timers directly. This means that patterns in the programming guide must be followed closely to achieve desired behavior from an executing pipeline.
While ideal users would be fully aware of the consequences of the code being written, mistakes can still happen. This is noted in particular with output timestamps and blocking the watermark. See https://beam.apache.org/documentation/programming-guide/#timer-output-timestamps for an example where this can go wrong.
Proposal
Instead of having the interface identification for a Timer return the timer's domain "Domain" and a separate one for the ID, it can instead be a single method that returns a mapping from identifiers to domains map[string]Domain. This provides the FnAPI all the same information, in a consolidated fashion.
A composite would need to simply implement the same method to be considered a "timer".
Standard helper methods for merging/consolidating timers would be needed to simplify the implementation of these composites.
Then actual implementations would use the API on the primitive timers to build the desired behavior of the composites. Similarly this can apply to State.
Example
The following is directly translated from the timer-output-timestamps Java example. This is the direct primitives approach.
type StatefulDoFn struct {
ElementBag state.Bag[string]
TimerTime state.Value[int64]
MinTime state.Combining[int64, int64, int64]
OutputState timers.ProcessingTime
}
func NewStateful() *StatefulDoFn {
return &StatefulDoFn{
ElementBag: state.MakeBagState[string]("elementBag"),
TimerTime: state.MakeValueState[int64]("timerTime"),
MinTime: state.MakeCombiningState[int64, int64, int64]("minTimeInBag", func(a, b int64) int64 {
if a < b {
return a
}
return b
}),
OutputState: timers.InProcessingTime("outputState"),
}
}
func (s *StatefulDoFn) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
s.ElementBag.Add(sp, word)
s.MinTime.Add(sp, int64(ts))
toFire, ok, err := s.TimerTime.Read(sp)
if err != nil {
return err
}
if !ok {
toFire = int64(mtime.Now().Add(1 * time.Minute))
}
minTime, _, err := s.MinTime.Read(sp)
if err != nil {
return err
}
s.OutputState.SetWithOpts(tp, mtime.Time(toFire).ToTime(), timers.Opts{Hold: mtime.Time(minTime).ToTime()})
s.TimerTime.Write(sp, toFire)
func (s *Stateful) OnTimer(...) error {
... Extracting and doing something with the batch elided ...
}
In order to achieve the same batching pattern, every caller needs to replicate this, even with minor changes to ultimate downstream use.
Instead composites could enable more intuitive APIs, and their re-use.
type StatefulDoFn struct {
ElementBatch sntcomp.Batch[string]
}
func NewStateful() *Stateful {
return &Stateful{
ElementBag: sntcomp.MakeBatch("statefulBatch"),
}
}
func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
// Logic is encapulated in method on composite
err := s.ElementBatch.Add(sp,tp, word, ts)
if err != nil {
return err
}
return nil
}
func (s *Stateful) OnTimer(timerID, timerTag string, ...) error {
... Extracting and doing something with the batch elided ...
batch, ok, err := s.ElementBatch.OnTimer(timerID, timer)
if err != nil {
return err
}
if ok {
// Do something with the batch.
}
...
return nil
}
In the sntcomp package with the composites.
type Batch[T any] struct {
Name string
ElementBag state.Bag[string]
TimerTime state.Value[int64]
MinTime state.Combining[int64, int64, int64]
OutputState timers.ProcessingTime
}
func MakeBatch[T any](name String) Batch[T] {
return &Batch[T]{
Name: name
ElementBag: state.MakeBagState[T](name +"-elementBag"),
TimerTime: state.MakeValueState[int64](name +"-timerTime"),
MinTime: state.MakeCombiningState[int64, int64, int64](name+"minTimeInBag", func(a, b int64) int64 {
if a < b {
return a
}
return b
}),
OutputState: timers.InProcessingTime(name + "-outputState"),
}
}
func (b *Batch[T]) Timers() map[string]timer.Domain {
// Only one timer, so use it's method directly
return b.OutputState.Timers()
}
func (b *Batch[T]) States() map[string]state.Type {
// Several need combining, so we have a helper to do so.
// state.Type contains the Type (Bag, Value, etc) and the types of the Coders for the state.
return state.MergeStates(b.ElementBag, b.TimerTime, b.MinTime)
}
func (b *Batch[T]) OnTimer(timerID, tag string) ([]T, bool, err) {
// Validate the callback is for this batch, and if so, produce the elements.
}
func (b *Batch[T]) Clear(timerID, tag string) {
// Clear the state as needed after processing.
}
Alternatively, we could use reflection to do the same on the instance, searching for the interface on the fields, rather than forcing a repeated listing, trivializing the states implementation.
func (b *Batch[T any]) States() map[string]state.Type {
return state.GatherStatesFromFields(b)
}
Implementing GatherStatesFromFields with reflection is reasonable since it's used only at Pipeline Construction time. Similarly for implementing Timer methods.
Composites add more tools to the Beam Go tool chest and built in composites for common patterns can be appropriately documented for use of users to develop their own from our primitives.
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner