Skip to content

Commit e85d706

Browse files
aseeringolavloite
andauthored
feat(spanner): add ClientContext support (#13775)
Add support for the new ClientContext field to Spanner's Go Client. This enables support for Secure Parameters, as well as other future features. --------- Co-authored-by: Knut Olav Løite <koloite@gmail.com>
1 parent b4a6f4c commit e85d706

File tree

5 files changed

+717
-24
lines changed

5 files changed

+717
-24
lines changed

spanner/batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
158158
Index: index,
159159
Columns: columns,
160160
KeySet: kset,
161-
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
161+
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, "", mergeClientContext(t.txReadOnly.clientContext, readOptions.ClientContext)),
162162
DataBoostEnabled: readOptions.DataBoostEnabled,
163163
DirectedReadOptions: readOptions.DirectedReadOptions,
164164
}
@@ -225,7 +225,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
225225
Params: params,
226226
ParamTypes: paramTypes,
227227
QueryOptions: qOpts.Options,
228-
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
228+
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, "", mergeClientContext(t.txReadOnly.clientContext, qOpts.ClientContext)),
229229
DataBoostEnabled: qOpts.DataBoostEnabled,
230230
DirectedReadOptions: qOpts.DirectedReadOptions,
231231
}

spanner/client.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ type Client struct {
130130
dro *sppb.DirectedReadOptions
131131
otConfig *openTelemetryConfig
132132
metricsTracerFactory *builtinMetricsTracerFactory
133+
clientContext *sppb.RequestOptions_ClientContext
133134
}
134135

135136
// DatabaseName returns the full name of a database, e.g.,
@@ -368,6 +369,9 @@ type ClientConfig struct {
368369

369370
// Default: false
370371
IsExperimentalHost bool
372+
373+
// ClientContext is the default context for all requests made by the client.
374+
ClientContext *sppb.RequestOptions_ClientContext
371375
}
372376

373377
type openTelemetryConfig struct {
@@ -601,6 +605,7 @@ Multiplexed session enabled: true
601605
dro: config.DirectedReadOptions,
602606
otConfig: otConfig,
603607
metricsTracerFactory: metricsTracerFactory,
608+
clientContext: config.ClientContext,
604609
}
605610
return c, nil
606611
}
@@ -805,6 +810,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
805810
t.txReadOnly.qo.DirectedReadOptions = c.dro
806811
t.txReadOnly.ro.DirectedReadOptions = c.dro
807812
t.txReadOnly.ro.LockHint = sppb.ReadRequest_LOCK_HINT_UNSPECIFIED
813+
t.txReadOnly.clientContext = c.clientContext
808814
t.ct = c.ct
809815
t.otConfig = c.otConfig
810816
return t
@@ -832,6 +838,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
832838
t.txReadOnly.qo.DirectedReadOptions = c.dro
833839
t.txReadOnly.ro.DirectedReadOptions = c.dro
834840
t.txReadOnly.ro.LockHint = sppb.ReadRequest_LOCK_HINT_UNSPECIFIED
841+
t.txReadOnly.clientContext = c.clientContext
835842
t.ct = c.ct
836843
t.otConfig = c.otConfig
837844
return t
@@ -870,6 +877,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
870877
ReadOnly: buildTransactionOptionsReadOnly(tb, true),
871878
},
872879
},
880+
RequestOptions: createRequestOptions(sppb.RequestOptions_PRIORITY_UNSPECIFIED, "", "", c.clientContext),
873881
})
874882
if err != nil {
875883
return nil, ToSpannerError(err)
@@ -902,6 +910,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
902910
t.txReadOnly.qo.DirectedReadOptions = c.dro
903911
t.txReadOnly.ro.DirectedReadOptions = c.dro
904912
t.txReadOnly.ro.LockHint = sppb.ReadRequest_LOCK_HINT_UNSPECIFIED
913+
t.txReadOnly.clientContext = c.clientContext
905914
t.ct = c.ct
906915
t.otConfig = c.otConfig
907916
return t, nil
@@ -938,6 +947,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
938947
t.txReadOnly.qo.DirectedReadOptions = c.dro
939948
t.txReadOnly.ro.DirectedReadOptions = c.dro
940949
t.txReadOnly.ro.LockHint = sppb.ReadRequest_LOCK_HINT_UNSPECIFIED
950+
t.txReadOnly.clientContext = c.clientContext
941951
t.ct = c.ct
942952
t.otConfig = c.otConfig
943953
return t
@@ -1021,8 +1031,10 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
10211031
t.txReadOnly.qo = c.qo
10221032
t.txReadOnly.ro = c.ro
10231033
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
1034+
t.txReadOnly.clientContext = c.clientContext
10241035
t.wb = []*Mutation{}
10251036
t.txOpts = c.txo.merge(options)
1037+
t.txReadOnly.clientContext = mergeClientContext(c.clientContext, t.txOpts.ClientContext)
10261038
t.ct = c.ct
10271039
t.otConfig = c.otConfig
10281040
}
@@ -1165,7 +1177,7 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
11651177
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, CommitOptions: ao.commitOptions, IsolationLevel: ao.isolationLevel})
11661178
return resp.CommitTs, err
11671179
}
1168-
t := &writeOnlyTransaction{sm: c.sm, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, commitOptions: ao.commitOptions, isolationLevel: ao.isolationLevel}
1180+
t := &writeOnlyTransaction{sm: c.sm, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, commitOptions: ao.commitOptions, isolationLevel: ao.isolationLevel, clientContext: c.clientContext}
11691181
return t.applyAtLeastOnce(ctx, ms...)
11701182
}
11711183

@@ -1181,6 +1193,9 @@ type BatchWriteOptions struct {
11811193
// in this batch write request will not be recorded in allowed tracking
11821194
// change treams with DDL option allow_txn_exclusion=true.
11831195
ExcludeTxnFromChangeStreams bool
1196+
1197+
// ClientContext contains client-owned context information to be passed with the batch write request.
1198+
ClientContext *sppb.RequestOptions_ClientContext
11841199
}
11851200

11861201
// merge combines two BatchWriteOptions such that the input parameter will have higher
@@ -1190,6 +1205,7 @@ func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
11901205
TransactionTag: bwo.TransactionTag,
11911206
Priority: bwo.Priority,
11921207
ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams || opts.ExcludeTxnFromChangeStreams,
1208+
ClientContext: mergeClientContext(bwo.ClientContext, opts.ClientContext),
11931209
}
11941210
if opts.TransactionTag != "" {
11951211
merged.TransactionTag = opts.TransactionTag
@@ -1345,7 +1361,7 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
13451361
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
13461362
Session: sh.getID(),
13471363
MutationGroups: mgsPb,
1348-
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
1364+
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag, mergeClientContext(c.clientContext, opts.ClientContext)),
13491365
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
13501366
}, gax.WithGRPCOptions(grpc.Header(&md)))
13511367

0 commit comments

Comments
 (0)