Skip to content

Commit d944830

Browse files
authored
fix(datastore): add retries to emulator (#14591)
**Summary** This PR refines the Datastore client's resilience logic to fix regressions identified during TAP testing, specifically regarding emulator startup races, Unix domain sockets, and IPv6 literals. It ensures that the resilience benefits for bug **[b/473841984](http://b/473841984)** are applied consistently to both production and emulator environments without breaking existing tests. **Why these changes are needed** 1. **Handling Emulator Startup Races**: Many tests (e.g., `exchange:datastore_test`) experience a "connection refused" error if an RPC is attempted immediately after starting the emulator process. By adding **`"waitForReady": true`** to the gRPC Service Config, the client will now block and wait for the emulator to finish binding its port instead of failing the RPC immediately. 2. **Preventing Infinite Hangs**: With `waitForReady` enabled, an unreachable host could cause a test to hang indefinitely. This PR adds a canonical **`"timeout": "60s"`** to the generic `methodConfig` to ensure RPCs eventually fail with a clear error. 3. **Robust Scheme Handling (Unix Sockets & IPv6)**: Internal test environments like `tin` often use **Unix domain sockets** (`unix:///`) or IPv6 literals (`[::1]`) for emulator addresses. Previous logic would mangle these into invalid targets (e.g., `passthrough:///unix:///...`), leading to "too many colons" errors or connection hangs. The updated logic defensively checks for existing valid schemes before applying the `passthrough:///` optimization. 4. **Idempotent Retries for `Commit`**: A catch-all configuration block is added to the Service Config to ensure that **`UNAVAILABLE`** errors are retried for all methods, including `Commit` (used by `PutMulti`), which was previously omitted. **Why PR #14411 was insufficient** While PR #14411 introduced Keepalives and expanded retries for idempotent calls, it had several gaps: * It only applied the new configuration to the **production** code path, leaving the emulator path without resilience logic or standardized retries. * By removing the manual 100ms retry loop without adding `waitForReady: true` to the Service Config, it removed the "wait" period needed for emulators to become ready. * The aggressive prepending of `passthrough:///` did not account for `unix:///` schemes or bracketed IPv6 literals, causing dialer errors in specialized test environments. (Even though the passthrough was not added in PR #14411, the current google3 imported code did not have this logic. While trying to import [cl/906545069](http://cl/906545069) latest code with psssthrough, this issue was discovered. ) **Related Bug** [b/473841984](http://b/473841984)
1 parent c5aaedc commit d944830

1 file changed

Lines changed: 47 additions & 22 deletions

File tree

datastore/datastore.go

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os"
2323
"reflect"
2424
"regexp"
25+
"strings"
2526
"time"
2627

2728
pb "cloud.google.com/go/datastore/apiv1/datastorepb"
@@ -46,22 +47,37 @@ const (
4647
// retryPolicy is based on https://github.com/googleapis/googleapis/blob/master/google/datastore/v1/datastore_grpc_service_config.json
4748
// but updated to include INTERNAL and RESOURCE_EXHAUSTED for the data plane.
4849
const retryPolicy = `{
49-
"methodConfig": [{
50-
"name": [
51-
{"service": "google.datastore.v1.Datastore", "method": "Lookup"},
52-
{"service": "google.datastore.v1.Datastore", "method": "RunQuery"},
53-
{"service": "google.datastore.v1.Datastore", "method": "RunAggregationQuery"},
54-
{"service": "google.datastore.v1.Datastore", "method": "ReserveIds"}
55-
],
56-
"timeout": "60s",
57-
"retryPolicy": {
58-
"maxAttempts": 5,
59-
"initialBackoff": "0.100s",
60-
"maxBackoff": "60s",
61-
"backoffMultiplier": 1.3,
62-
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED", "INTERNAL", "RESOURCE_EXHAUSTED"]
50+
"methodConfig": [
51+
{
52+
"name": [{"service": "google.datastore.v1.Datastore"}],
53+
"timeout": "60s",
54+
"waitForReady": true,
55+
"retryPolicy": {
56+
"maxAttempts": 5,
57+
"initialBackoff": "0.100s",
58+
"maxBackoff": "60s",
59+
"backoffMultiplier": 1.3,
60+
"retryableStatusCodes": ["UNAVAILABLE"]
61+
}
62+
},
63+
{
64+
"name": [
65+
{"service": "google.datastore.v1.Datastore", "method": "Lookup"},
66+
{"service": "google.datastore.v1.Datastore", "method": "RunQuery"},
67+
{"service": "google.datastore.v1.Datastore", "method": "RunAggregationQuery"},
68+
{"service": "google.datastore.v1.Datastore", "method": "ReserveIds"}
69+
],
70+
"timeout": "60s",
71+
"waitForReady": true,
72+
"retryPolicy": {
73+
"maxAttempts": 5,
74+
"initialBackoff": "0.100s",
75+
"maxBackoff": "60s",
76+
"backoffMultiplier": 1.3,
77+
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED", "INTERNAL", "RESOURCE_EXHAUSTED"]
78+
}
6379
}
64-
}]
80+
]
6581
}`
6682

6783
// ScopeDatastore grants permissions to view and/or manage datastore entities
@@ -126,15 +142,28 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
126142
// Call (*Client).Close() when done with the client.
127143
func NewClientWithDatabase(ctx context.Context, projectID, databaseID string, opts ...option.ClientOption) (*Client, error) {
128144
var o []option.ClientOption
145+
keepaliveParams := keepalive.ClientParameters{
146+
Time: 1 * time.Minute,
147+
Timeout: 20 * time.Second,
148+
PermitWithoutStream: true,
149+
}
150+
129151
// Environment variables for gcd emulator:
130152
// https://cloud.google.com/datastore/docs/tools/datastore-emulator
131153
// If the emulator is available, dial it without passing any credentials.
132154
if addr := os.Getenv("DATASTORE_EMULATOR_HOST"); addr != "" {
133-
addr = schemeRegexp.ReplaceAllString(addr, "")
155+
// If it's already a valid gRPC target with a non-HTTP scheme (like unix:///), leave it.
156+
// Otherwise, strip legacy schemes and force passthrough for performance.
157+
if !(strings.Contains(addr, "://") && !strings.HasPrefix(addr, "http")) {
158+
addr = schemeRegexp.ReplaceAllString(addr, "")
159+
addr = "passthrough:///" + addr
160+
}
134161
o = []option.ClientOption{
135-
option.WithEndpoint("passthrough:///" + addr),
162+
option.WithEndpoint(addr),
136163
option.WithoutAuthentication(),
137164
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
165+
option.WithGRPCDialOption(grpc.WithDefaultServiceConfig(retryPolicy)),
166+
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepaliveParams)),
138167
}
139168
if projectID == DetectProjectID {
140169
projectID, _ = detectProjectIDFn(ctx, opts...)
@@ -153,11 +182,7 @@ func NewClientWithDatabase(ctx context.Context, projectID, databaseID string, op
153182
// Add the Service Config for retries
154183
option.WithGRPCDialOption(grpc.WithDefaultServiceConfig(retryPolicy)),
155184
// Add Keepalives to prune zombie connections
156-
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
157-
Time: 1 * time.Minute,
158-
Timeout: 20 * time.Second,
159-
PermitWithoutStream: true,
160-
})),
185+
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepaliveParams)),
161186
}
162187
}
163188
// Warn if we see the legacy emulator environment variables.

0 commit comments

Comments
 (0)