Skip to content

[Feature Request][Proposal][Go SDK] Composite State and Timers  #25894

@lostluck

Description

@lostluck

Apache Dev List Thread: https://lists.apache.org/thread/90yqjyc2g2kr2yj4gcl2xhd4zzwknn5x

Top post will remain evergreen, and be updated WRT suggestions or clarifications from discussion.

Objective

Enable higher level abstractions built around State and Timers, that are not locked to a single DoFn implementation. These composites would be built from State and Timer primitives, or other state and timer composites. This would enable re-usable patterns either provided by Beam, or developed by users themselves.

Background

As presently designed for Timers and implemented for State, users must use State and Timers directly. This means that patterns in the programming guide must be followed closely to achieve desired behavior from an executing pipeline.

While ideal users would be fully aware of the consequences of the code being written, mistakes can still happen. This is noted in particular with output timestamps and blocking the watermark. See https://beam.apache.org/documentation/programming-guide/#timer-output-timestamps for an example where this can go wrong.

Proposal

Instead of having the interface identification for a Timer return the timer's domain "Domain" and a separate one for the ID, it can instead be a single method that returns a mapping from identifiers to domains map[string]Domain. This provides the FnAPI all the same information, in a consolidated fashion.

A composite would need to simply implement the same method to be considered a "timer".

Standard helper methods for merging/consolidating timers would be needed to simplify the implementation of these composites.

Then actual implementations would use the API on the primitive timers to build the desired behavior of the composites. Similarly this can apply to State.

Example

The following is directly translated from the timer-output-timestamps Java example. This is the direct primitives approach.

type StatefulDoFn struct {
	ElementBag state.Bag[string]
	TimerTime  state.Value[int64]
	MinTime    state.Combining[int64, int64, int64]

	OutputState timers.ProcessingTime
}

func NewStateful() *StatefulDoFn {
	return &StatefulDoFn{
		ElementBag: state.MakeBagState[string]("elementBag"),
		TimerTime:  state.MakeValueState[int64]("timerTime"),
		MinTime: state.MakeCombiningState[int64, int64, int64]("minTimeInBag", func(a, b int64) int64 {
			if a < b {
				return a
			}
			return b
		}),

		OutputState: timers.InProcessingTime("outputState"),
	}
}

func (s *StatefulDoFn) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
	s.ElementBag.Add(sp, word)
	s.MinTime.Add(sp, int64(ts))

	toFire, ok, err := s.TimerTime.Read(sp)
	if err != nil {
		return err
	}
	if !ok {
		toFire = int64(mtime.Now().Add(1 * time.Minute))
	}
	minTime, _, err := s.MinTime.Read(sp)
	if err != nil {
		return err
	}

	s.OutputState.SetWithOpts(tp, mtime.Time(toFire).ToTime(), timers.Opts{Hold: mtime.Time(minTime).ToTime()})
	s.TimerTime.Write(sp, toFire)
	
	
func (s *Stateful) OnTimer(...) error {
 ... Extracting and doing something with the batch elided ...
}

In order to achieve the same batching pattern, every caller needs to replicate this, even with minor changes to ultimate downstream use.

Instead composites could enable more intuitive APIs, and their re-use.

type StatefulDoFn struct {
	ElementBatch sntcomp.Batch[string]
}

func NewStateful() *Stateful {
	return &Stateful{
		ElementBag: sntcomp.MakeBatch("statefulBatch"),
	}
}

func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
	// Logic is encapulated in method on composite
	err := s.ElementBatch.Add(sp,tp, word, ts)
	if err != nil {
		return err
	}

	return nil
}
	
func (s *Stateful) OnTimer(timerID, timerTag string, ...) error {
 ... Extracting and doing something with the batch elided ...
    batch, ok, err := s.ElementBatch.OnTimer(timerID, timer)
    if err != nil {
       return err
    }
    if ok {
      // Do something with the batch.
    }
   ...
   return nil
}

In the sntcomp package with the composites.

type Batch[T any] struct {
	Name string
	ElementBag state.Bag[string]
	TimerTime  state.Value[int64]
	MinTime    state.Combining[int64, int64, int64]

	OutputState timers.ProcessingTime
}

func MakeBatch[T any](name String) Batch[T] {
	return &Batch[T]{
		Name: name
		ElementBag: state.MakeBagState[T](name +"-elementBag"),
		TimerTime:  state.MakeValueState[int64](name +"-timerTime"),
		MinTime: state.MakeCombiningState[int64, int64, int64](name+"minTimeInBag", func(a, b int64) int64 {
			if a < b {
				return a
			}
			return b
		}),

		OutputState: timers.InProcessingTime(name + "-outputState"),
	}
}

func (b *Batch[T]) Timers() map[string]timer.Domain {
   // Only one timer, so use it's method directly
   return b.OutputState.Timers()
}

func (b *Batch[T]) States() map[string]state.Type {
   // Several need combining, so we have a helper to do so.
   // state.Type contains the Type (Bag, Value, etc) and the types of the Coders for the state.
   return state.MergeStates(b.ElementBag, b.TimerTime, b.MinTime)
}

func (b *Batch[T]) OnTimer(timerID, tag string) ([]T, bool, err) {
  // Validate the callback is for this batch, and if so, produce the elements.
}

func (b *Batch[T]) Clear(timerID, tag string) {
// Clear the state as needed after processing.
} 

Alternatively, we could use reflection to do the same on the instance, searching for the interface on the fields, rather than forcing a repeated listing, trivializing the states implementation.

func (b *Batch[T any]) States() map[string]state.Type {
   return state.GatherStatesFromFields(b)
}

Implementing GatherStatesFromFields with reflection is reasonable since it's used only at Pipeline Construction time. Similarly for implementing Timer methods.

Composites add more tools to the Beam Go tool chest and built in composites for common patterns can be appropriately documented for use of users to develop their own from our primitives.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions