Skip to content

Commit 275ff56

Browse files
fix(storage): restore metadata operations timeout in gRPC (#14575)
fixes: Issue #14417 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 7f7c735 commit 275ff56

2 files changed

Lines changed: 75 additions & 1 deletion

File tree

storage/grpc_client.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (*grpcStor
156156
s := initSettings(opts...)
157157
s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
158158
// Disable all gax-level retries in favor of retry logic in the veneer client.
159-
s.gax = append(s.gax, gax.WithRetry(nil), gax.WithTimeout(0))
159+
s.gax = append(s.gax, gax.WithRetry(nil))
160160

161161
config := newStorageConfig(s.clientOption...)
162162
if config.readAPIWasSet {
@@ -187,10 +187,23 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (*grpcStor
187187
if err != nil {
188188
return nil, err
189189
}
190+
configureStreamingTimeouts(g)
190191
c.raw = g
191192
return c, nil
192193
}
193194

195+
// configureStreamingTimeouts explicitly overrides default call timeouts to 0 (unbounded)
196+
// for all generated payload streaming RPCs. This guarantees that long-running data reads
197+
// and writes are not prematurely aborted by default transport deadlines, while allowing
198+
// all transactional and metadata/unary operations to retain their safety deadlines.
199+
func configureStreamingTimeouts(g *gapic.Client) {
200+
g.CallOptions.ReadObject = append(g.CallOptions.ReadObject, gax.WithTimeout(0))
201+
g.CallOptions.WriteObject = append(g.CallOptions.WriteObject, gax.WithTimeout(0))
202+
g.CallOptions.BidiReadObject = append(g.CallOptions.BidiReadObject, gax.WithTimeout(0))
203+
g.CallOptions.BidiWriteObject = append(g.CallOptions.BidiWriteObject, gax.WithTimeout(0))
204+
g.CallOptions.CancelResumableWrite = append(g.CallOptions.CancelResumableWrite, gax.WithTimeout(0))
205+
}
206+
194207
func (c *grpcStorageClient) routingInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientInterceptor) {
195208
unary := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
196209
ctx, err := c.prepareDirectPathMetadata(ctx, cc.Target())

storage/grpc_client_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@ import (
2121
"fmt"
2222
"hash/crc32"
2323
"math/rand"
24+
"reflect"
2425
"strings"
2526
"testing"
2627
"time"
2728

2829
"cloud.google.com/go/storage/internal/apiv2/storagepb"
2930
"github.com/google/go-cmp/cmp"
31+
gax "github.com/googleapis/gax-go/v2"
32+
"google.golang.org/api/option"
3033
"google.golang.org/grpc"
3134
"google.golang.org/grpc/codes"
35+
"google.golang.org/grpc/credentials/insecure"
3236
"google.golang.org/grpc/mem"
3337
"google.golang.org/grpc/metadata"
3438
"google.golang.org/grpc/status"
@@ -566,3 +570,60 @@ func TestPrepareDirectPathMetadata_FeatureTracking(t *testing.T) {
566570
})
567571
}
568572
}
573+
574+
func TestNewGRPCStorageClient_NoGlobalTimeout(t *testing.T) {
575+
ctx := context.Background()
576+
client, err := newGRPCStorageClient(ctx,
577+
withClientOptions(
578+
option.WithoutAuthentication(),
579+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
580+
),
581+
)
582+
if err != nil {
583+
t.Fatalf("newGRPCStorageClient: %v", err)
584+
}
585+
defer client.Close()
586+
587+
// Verify that default/global settings do NOT contain any timeout override,
588+
// preventing indefinite hangs on metadata operations.
589+
for _, opt := range client.settings.gax {
590+
if strings.Contains(fmt.Sprintf("%T", opt), "timeoutOpt") {
591+
t.Errorf("expected default/global settings to not contain a timeout option, but found: %T (%v)", opt, opt)
592+
}
593+
}
594+
595+
// Helper to verify that a list of CallOptions has a timeout of 0s
596+
verifyTimeoutIsZero := func(method string, opts []gax.CallOption) {
597+
var cs gax.CallSettings
598+
// Apply a dummy non-zero timeout first
599+
gax.WithTimeout(1 * time.Hour).Resolve(&cs)
600+
601+
// Apply all options of interest
602+
for _, opt := range opts {
603+
opt.Resolve(&cs)
604+
}
605+
606+
val := reflect.ValueOf(cs)
607+
field := val.FieldByName("timeout")
608+
if !field.IsValid() {
609+
t.Errorf("method %q: gax.CallSettings structure changed, field 'timeout' not found", method)
610+
return
611+
}
612+
613+
timeout := time.Duration(field.Int())
614+
if timeout == 1*time.Hour {
615+
t.Errorf("method %q: expected explicit WithTimeout(0) to keep streams unbounded, but none was found", method)
616+
} else if timeout != 0 {
617+
t.Errorf("method %q: expected timeout of 0s, got %v", method, timeout)
618+
}
619+
}
620+
621+
// Verify that all generated streaming CallOptions explicitly contain WithTimeout(0)
622+
// to keep payload streams (Reads/Writes) completely timeout-free.
623+
opts := client.raw.CallOptions
624+
verifyTimeoutIsZero("ReadObject", opts.ReadObject)
625+
verifyTimeoutIsZero("WriteObject", opts.WriteObject)
626+
verifyTimeoutIsZero("BidiReadObject", opts.BidiReadObject)
627+
verifyTimeoutIsZero("BidiWriteObject", opts.BidiWriteObject)
628+
verifyTimeoutIsZero("CancelResumableWrite", opts.CancelResumableWrite)
629+
}

0 commit comments

Comments
 (0)