-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Bug]: PaneInfo not populated in Go SDK #31153
Copy link
Copy link
Closed
Milestone
Description
What happened?
I'm attempting to use early triggering and PaneInfo to limit bundle sizes to avoid running into the dataflow limit of 80MB and have found that PaneInfo does not appear to be populated correctly.
Runner: Dataflow
Beam Version: 2.55.1
Here's a test that I believe demonstrates the problem:
func init() {
register.Function2x0(produceFn)
register.Function4x0(getPanes)
register.Emitter1[int]()
}
func produceFn(_ []byte, emit func(beam.EventTime, int)) {
baseT := mtime.Now()
for i := 0; i < 10; i++ {
emit(baseT.Add(time.Minute), i)
}
}
func Produce(s beam.Scope) beam.PCollection {
return beam.ParDo(s, produceFn, beam.Impulse(s))
}
func getPanes(ctx context.Context, pi typex.PaneInfo, _ int, emit func(int)) {
log.Output(ctx, log.SevWarn, 0, fmt.Sprintf("got pane %+v", pi))
emit(int(pi.Index))
}
func TestPanes(t *testing.T) {
p, scp := beam.NewPipelineWithRoot()
c := Produce(scp)
windowed := beam.WindowInto(
scp,
window.NewFixedWindows(5*time.Minute),
c,
beam.Trigger(trigger.AfterEndOfWindow().
EarlyFiring(
trigger.Repeat(
trigger.AfterCount(2),
),
),
),
beam.PanesDiscard(),
)
panes := beam.ParDo(scp, getPanes, windowed)
paneIdxs := beam.WindowInto(scp, window.NewGlobalWindows(), panes)
passert.Count(scp, paneIdxs, "pane idxs", 10)
passert.EqualsList(scp, paneIdxs, []int{0, 0, 1, 1, 2, 0, 0, 1, 1, 2})
ptest.RunAndValidate(t, p)
}The logs are all:
got pane {Timing:0 IsFirst:false IsLast:false Index:0 NonSpeculativeIndex:0}
Even if I don't have the indexes correct in the test (the test is failing on the EqualsList), I would expect these to be internally consistent. That is, I would expect there to be at least one IsFirst:true and IsLast:true each.
Issue Priority
Priority: 2 (default)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Reactions are currently unavailable