Skip to content

Commit f343b81

Browse files
authored
Move provisioned options outside of harness.Main (#26476)
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
1 parent e143a44 commit f343b81

3 files changed

Lines changed: 54 additions & 16 deletions

File tree

sdks/go/pkg/beam/core/runtime/harness/harness.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,32 @@ import (
4545
// URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
4646
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
4747

48-
// TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
48+
// Options for harness.Main that affect execution of the harness, such as runner capabilities.
49+
type Options struct {
50+
RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI.
51+
StatusEndpoint string // Endpoint for worker status reporting.
52+
}
4953

50-
// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
54+
// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
5155
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
5256
// ultimately responsible for correctly executing user code.
57+
//
58+
// Deprecated: Prefer MainWithOptions instead.
5359
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
60+
return MainWithOptions(ctx, loggingEndpoint, controlEndpoint, Options{})
61+
}
62+
63+
// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
64+
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
65+
// ultimately responsible for correctly executing user code.
66+
//
67+
// Options are optional configurations for interfacing with the runner or similar.
68+
func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error {
5469
hooks.DeserializeHooksFromOptions(ctx)
5570

56-
// Extract environment variables. These are optional runner supported capabilities.
57-
// Expected env variables:
58-
// RUNNER_CAPABILITIES : list of runner supported capability urn.
59-
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
60-
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
61-
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
6271
rcMap := make(map[string]bool)
63-
if len(runnerCapabilities) > 0 {
64-
for _, capability := range runnerCapabilities {
65-
rcMap[capability] = true
66-
}
72+
for _, capability := range opts.RunnerCapabilities {
73+
rcMap[capability] = true
6774
}
6875

6976
// Pass in the logging endpoint for use w/the default remote logging hook.
@@ -151,8 +158,8 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
151158
}
152159

153160
// if the runner supports worker status api then expose SDK harness status
154-
if statusEndpoint != "" {
155-
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
161+
if opts.StatusEndpoint != "" {
162+
statusHandler, err := newWorkerStatusHandler(ctx, opts.StatusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
156163
if err != nil {
157164
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
158165
} else {

sdks/go/pkg/beam/core/runtime/harness/init/init.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"encoding/json"
2424
"flag"
25+
"strings"
2526
"time"
2627

2728
"fmt"
@@ -118,7 +119,18 @@ func hook() {
118119
if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported {
119120
fmt.Println("Error Setting Rlimit ", err)
120121
}
121-
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {
122+
123+
// Extract environment variables. These are optional runner supported capabilities.
124+
// Expected env variables:
125+
// RUNNER_CAPABILITIES : list of runner supported capability urn.
126+
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
127+
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
128+
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
129+
options := harness.Options{
130+
StatusEndpoint: statusEndpoint,
131+
RunnerCapabilities: runnerCapabilities,
132+
}
133+
if err := harness.MainWithOptions(ctx, *loggingEndpoint, *controlEndpoint, options); err != nil {
122134
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
123135
switch ShutdownMode {
124136
case Terminate:

sdks/go/pkg/beam/runners/universal/extworker/extworker.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net"
2323
"sync"
2424

25+
"github.com/apache/beam/sdks/v2/go/container/tools"
2526
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness"
2627
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
2728
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
@@ -89,10 +90,28 @@ func (s *Loopback) StartWorker(ctx context.Context, req *fnpb.StartWorkerRequest
8990
ctx = grpcx.WriteWorkerID(s.root, req.GetWorkerId())
9091
ctx, s.workers[req.GetWorkerId()] = context.WithCancel(ctx)
9192

92-
go harness.Main(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl())
93+
opts := harnessOptions(ctx, req.GetProvisionEndpoint().GetUrl())
94+
95+
go harness.MainWithOptions(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl(), opts)
9396
return &fnpb.StartWorkerResponse{}, nil
9497
}
9598

99+
func harnessOptions(ctx context.Context, endpoint string) harness.Options {
100+
var opts harness.Options
101+
if endpoint == "" {
102+
return opts
103+
}
104+
info, err := tools.ProvisionInfo(ctx, endpoint)
105+
if err != nil {
106+
log.Infof(ctx, "error talking to provision service worker, using defaults:%v", err)
107+
return opts
108+
}
109+
110+
opts.StatusEndpoint = info.GetStatusEndpoint().GetUrl()
111+
opts.RunnerCapabilities = info.GetRunnerCapabilities()
112+
return opts
113+
}
114+
96115
// StopWorker terminates a worker harness, implementing BeamFnExternalWorkerPoolServer.StopWorker.
97116
func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest) (*fnpb.StopWorkerResponse, error) {
98117
log.Infof(ctx, "stopping worker %v", req.GetWorkerId())

0 commit comments

Comments
 (0)