The simple pipeline below (i.e. C = [A, B] | Flatten(), D = A | GroupByKey()) crashes prism runner.
0: panic in stage.Execute bundle processing goroutine: zero length key: stage-004 ref_PCollection_PCollection_3, stage: &{ID:stage-003 transforms:[ref_AppliedPTransform_side1-FlatMap-lambda-at-core-py-3970-_4 ref_AppliedPTransform_side1-Map-decode-_6] primaryInput:ref_PCollection_PCollection_1 outputs:[{Transform:ref_AppliedPTransform_side1-Map-decode-_6 Local:None Global:ref_PCollection_PCollection_3}] sideInputs:[] internalCols:[ref_PCollection_PCollection_2] envID:ref_Environment_default_environment_1 finalize:false stateful:false onWindowExpiration:{TransformID: TimerFamily:} hasTimers:[] processingTimeTimers:map[] stateTypeLen:map[] exe:<nil> inputTransformID:stage-003_source inputInfo:{GlobalID:ref_PCollection_PCollection_1 WindowCoder:0 WDec:0x10314e680 WEnc:0x10314e680 EDec:0x100f51320 KeyDec:<nil>} desc:0x140002dafc0 prepareSides:0x100f70bd0 SinkToPCollection:map[ref_AppliedPTransform_side1-Map-decode-_6_None:ref_PCollection_PCollection_3] OutputsToCoders:map[ref_PCollection_PCollection_3:{GlobalID:ref_PCollection_PCollection_3 WindowCoder:0 WDec:0x10314e680 WEnc:0x10314e680 EDec:0x100f51320 KeyDec:<nil>}] baseProgTick:{v:100000000}},stackTrace:
goroutine 47 [running]:
runtime/debug.Stack()
.../go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.darwin-arm64/src/runtime/debug/stack.go:26 +0x64
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.(*stage).Execute.func1()
.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/stage.go:118 +0x4c
panic({0x101c795a0?, 0x140009001a0?})
../go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.darwin-arm64/src/runtime/panic.go:792 +0xf0
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*aggregateStageKind).addPending(0x10314e680, 0x1400062d520, 0x140001decc0, {0x14000533c20, 0x1, 0x1})
.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1276 +0xac8
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*stageState).AddPending(0x1400062d520, 0x140001decc0, {0x14000533b80, 0x1, 0x1})
.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1250 +0xec
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*ElementManager).PersistBundle(0x140001decc0, {{0x140008082d0, 0x9}, {0x14000808397, 0x7}, 0x20c49ba5e353f7}, 0x140006f9590, {0x14000711830, 0x0, 0x0, ...}, ...)
.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:841 +0x11c4
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.(*stage).Execute(0x14000642480, {0x102169cb0, 0x140001f1c20}, 0x140003006c0, 0x14000178000, 0x14000176280, 0x140001decc0, {{0x140008082d0, 0x9}, {0x14000808397, ...}, ...})
.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/stage.go:333 +0x2324
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.executePipeline.func4()
.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:355 +0x104
golang.org/x/sync/errgroup.(*Group).Go.func1()
...g/go/pkg/mod/golang.org/x/sync@v0.13.0/errgroup/errgroup.go:79 +0xa8
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 84
.../go/pkg/mod/golang.org/x/sync@v0.13.0/errgroup/errgroup.go:76 +0xfc
What happened?
The simple pipeline below (i.e.
C = [A, B] | Flatten(), D = A | GroupByKey()) crashes prism runner.The error log:
This is the failed case I discovered when working on #34641. It also demonstrates that the previous approach of overwriting the input pcollection coder of
Flattencould be problematic.Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components