Skip to content

Commit 16af6a1

Browse files
authored
feat(spanner): Add E2E fallback to the spanner client. (#13518)
Adding E2E fallback will allow spanner clients to fallback to cloudpath when directpath is unavailable
1 parent 5aec566 commit 16af6a1

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

spanner/client.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,32 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
416416
return newClientWithConfig(ctx, database, config, nil, opts...)
417417
}
418418

419+
type fallbackWrapper struct {
420+
*grpcgcp.GCPFallback
421+
primaryConn gtransport.ConnPool
422+
fallbackConn gtransport.ConnPool
423+
}
424+
425+
// Conn returns nil because GCPFallback hides the underlying ClientConn.
426+
// The Spanner client handles this by using the interface methods (Invoke/NewStream).
427+
func (fw *fallbackWrapper) Conn() *grpc.ClientConn {
428+
return nil
429+
}
430+
431+
func (fw *fallbackWrapper) Num() int {
432+
return fw.primaryConn.Num()
433+
}
434+
435+
func (fw *fallbackWrapper) Close() error {
436+
fw.GCPFallback.Close()
437+
err1 := fw.primaryConn.Close()
438+
err2 := fw.fallbackConn.Close()
439+
if err1 != nil {
440+
return err1
441+
}
442+
return err2
443+
}
444+
419445
func newClientWithConfig(ctx context.Context, database string, config ClientConfig, gme *grpcgcp.GCPMultiEndpoint, opts ...option.ClientOption) (c *Client, err error) {
420446
// Validate database path.
421447
if err := validDatabaseName(database); err != nil {
@@ -496,6 +522,48 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
496522
// Use GCPMultiEndpoint if provided.
497523
pool = &gmeWrapper{gme}
498524
endpointClientOpts = append(endpointClientOpts, opts...)
525+
} else if isFallbackEnabled, _ := strconv.ParseBool(os.Getenv("GOOGLE_SPANNER_ENABLE_GCP_FALLBACK")); isFallbackEnabled && isDirectPathEnabled {
526+
var primaryConn gtransport.ConnPool
527+
var fallbackConn gtransport.ConnPool
528+
reqIDInjector := new(requestIDHeaderInjector)
529+
opts = append(opts,
530+
option.WithGRPCDialOption(grpc.WithChainStreamInterceptor(reqIDInjector.interceptStream)),
531+
option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(reqIDInjector.interceptUnary)),
532+
)
533+
allOpts := allClientOpts(config.NumChannels, config.Compression, opts...)
534+
endpointClientOpts = append(endpointClientOpts, allOpts...)
535+
primaryConn, err = gtransport.DialPool(ctx, allOpts...)
536+
if err != nil {
537+
return nil, err
538+
}
539+
540+
fallbackConnOpts := append(allOpts, internaloption.EnableDirectPath(false))
541+
fallbackConn, err = gtransport.DialPool(ctx, fallbackConnOpts...)
542+
if err != nil {
543+
primaryConn.Close()
544+
return nil, err
545+
}
546+
547+
if hasNumChannelsConfig && ((primaryConn.Num() != config.NumChannels) || (fallbackConn.Num() != config.NumChannels)) {
548+
primaryConn.Close()
549+
fallbackConn.Close()
550+
return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, primaryConn.Num()=%v, fallbackConn.Num()=%v", config.NumChannels, primaryConn.Num(), fallbackConn.Num())
551+
}
552+
553+
fbOpts := grpcgcp.NewGCPFallbackOptions()
554+
fbOpts.EnableFallback = true
555+
fbOpts.ErrorRateThreshold = 1
556+
fbOpts.MinFailedCalls = 1
557+
fbOpts.MeterProvider = config.OpenTelemetryMeterProvider
558+
559+
gcpFallback, err := grpcgcp.NewGCPFallback(ctx, primaryConn, fallbackConn, fbOpts)
560+
if err != nil {
561+
primaryConn.Close()
562+
fallbackConn.Close()
563+
return nil, err
564+
}
565+
566+
pool = &fallbackWrapper{gcpFallback, primaryConn, fallbackConn}
499567
} else {
500568
// Create gtransport ConnPool as usual if MultiEndpoint is not used.
501569
// gRPC options.

spanner/sessionclient.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ func (sc *sessionClient) nextClient() (spannerClient, error) {
275275
if _, ok := sc.connPool.(*gmeWrapper); ok {
276276
// Pass GCPMultiEndpoint as a pool.
277277
clientOpt = gtransport.WithConnPool(sc.connPool)
278+
} else if _, ok := sc.connPool.(*fallbackWrapper); ok {
279+
clientOpt = gtransport.WithConnPool(sc.connPool)
278280
} else {
279281
// Pick a grpc.ClientConn from a regular pool.
280282
conn := sc.connPool.Conn()

0 commit comments

Comments
 (0)