Skip to content

Commit 71073a0

Browse files
authored
Reverts #35202 (#35306)
1 parent d50b4e4 commit 71073a0

2 files changed

Lines changed: 3 additions & 18 deletions

File tree

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,6 @@ type Config struct {
161161
// MaxBundleSize caps the number of elements permitted in a bundle.
162162
// 0 or less means this is ignored.
163163
MaxBundleSize int
164-
// Whether to use real-time clock as processing time
165-
EnableRTC bool
166164
}
167165

168166
// ElementManager handles elements, watermarks, and related errata to determine
@@ -2162,10 +2160,8 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) {
21622160
}
21632161

21642162
// "Test" mode -> advance to next processing time event if any, to allow execution.
2165-
if !em.config.EnableRTC {
2166-
if t, ok := em.processTimeEvents.Peek(); ok {
2167-
return t
2168-
}
2163+
if t, ok := em.processTimeEvents.Peek(); ok {
2164+
return t
21692165
}
21702166

21712167
// "Production" mode, always real time now.

sdks/go/pkg/beam/runners/prism/internal/execute.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,18 +145,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
145145
topo := prepro.preProcessGraph(comps, j)
146146
ts := comps.GetTransforms()
147147

148-
config := engine.Config{}
149-
m := j.PipelineOptions().AsMap()
150-
for _, exp := range m["beam:option:experiments:v1"].([]interface{}) {
151-
if expStr, ok := exp.(string); ok {
152-
if expStr == "prism_enable_rtc" {
153-
config.EnableRTC = true
154-
break // Found it, no need to check the rest of the slice
155-
}
156-
}
157-
}
158-
159-
em := engine.NewElementManager(config)
148+
em := engine.NewElementManager(engine.Config{})
160149

161150
// TODO move this loop and code into the preprocessor instead.
162151
stages := map[string]*stage{}

0 commit comments

Comments
 (0)