@@ -45,25 +45,32 @@ import (
4545// URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
4646const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
4747
48- // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
48+ // Options for harness.Main that affect execution of the harness, such as runner capabilities.
49+ type Options struct {
50+ RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI.
51+ StatusEndpoint string // Endpoint for worker status reporting.
52+ }
4953
50- // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
54+ // MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
5155// "pipeline-construction time" -- on each worker. It is a FnAPI client and
5256// ultimately responsible for correctly executing user code.
57+ //
58+ // Deprecated: Prefer MainWithOptions instead.
5359func Main (ctx context.Context , loggingEndpoint , controlEndpoint string ) error {
60+ return MainWithOptions (ctx , loggingEndpoint , controlEndpoint , Options {})
61+ }
62+
63+ // MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
64+ // "pipeline-construction time" -- on each worker. It is a FnAPI client and
65+ // ultimately responsible for correctly executing user code.
66+ //
67+ // Options are optional configurations for interfacing with the runner or similar.
68+ func MainWithOptions (ctx context.Context , loggingEndpoint , controlEndpoint string , opts Options ) error {
5469 hooks .DeserializeHooksFromOptions (ctx )
5570
56- // Extract environment variables. These are optional runner supported capabilities.
57- // Expected env variables:
58- // RUNNER_CAPABILITIES : list of runner supported capability urn.
59- // STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
60- statusEndpoint := os .Getenv ("STATUS_ENDPOINT" )
61- runnerCapabilities := strings .Split (os .Getenv ("RUNNER_CAPABILITIES" ), " " )
6271 rcMap := make (map [string ]bool )
63- if len (runnerCapabilities ) > 0 {
64- for _ , capability := range runnerCapabilities {
65- rcMap [capability ] = true
66- }
72+ for _ , capability := range opts .RunnerCapabilities {
73+ rcMap [capability ] = true
6774 }
6875
6976 // Pass in the logging endpoint for use w/the default remote logging hook.
@@ -151,8 +158,8 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
151158 }
152159
153160 // if the runner supports worker status api then expose SDK harness status
154- if statusEndpoint != "" {
155- statusHandler , err := newWorkerStatusHandler (ctx , statusEndpoint , ctrl .cache , func (statusInfo * strings.Builder ) { ctrl .metStoreToString (statusInfo ) })
161+ if opts . StatusEndpoint != "" {
162+ statusHandler , err := newWorkerStatusHandler (ctx , opts . StatusEndpoint , ctrl .cache , func (statusInfo * strings.Builder ) { ctrl .metStoreToString (statusInfo ) })
156163 if err != nil {
157164 log .Errorf (ctx , "error establishing connection to worker status API: %v" , err )
158165 } else {
0 commit comments