Skip to content

Commit cf06802

Browse files
authored
feat(bigquery/storage/managedwriter): introduce location routing header (#7663)
This allows for the backend to more efficient route traffic. Normally we'd extract this from the request, but location is not part of the write identifier.
1 parent cd36965 commit cf06802

3 files changed

Lines changed: 40 additions & 4 deletions

File tree

bigquery/storage/managedwriter/client.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/google/uuid"
2929
"github.com/googleapis/gax-go/v2"
3030
"google.golang.org/api/option"
31+
"google.golang.org/grpc/metadata"
3132
)
3233

3334
// DetectProjectID is a sentinel value that instructs NewClient to detect the
@@ -217,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
217218
}
218219

219220
// No existing pool available, create one for the location and add to shared pools.
220-
pool, err := c.createPool(ctx, nil, streamFunc)
221+
pool, err := c.createPool(ctx, loc, nil, streamFunc)
221222
if err != nil {
222223
return nil, err
223224
}
@@ -226,13 +227,17 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
226227
}
227228

228229
// createPool builds a connectionPool.
229-
func (c *Client) createPool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
230+
func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
230231
cCtx, cancel := context.WithCancel(ctx)
231232

232233
if c.cfg == nil {
233234
cancel()
234235
return nil, fmt.Errorf("missing client config")
235236
}
237+
if location != "" {
238+
// add location header to the retained pool context.
239+
cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location))
240+
}
236241
fcRequests := c.cfg.defaultInflightRequests
237242
fcBytes := c.cfg.defaultInflightBytes
238243
arOpts := c.cfg.defaultAppendRowsCallOptions
@@ -250,6 +255,7 @@ func (c *Client) createPool(ctx context.Context, settings *streamSettings, strea
250255

251256
pool := &connectionPool{
252257
id: newUUID(poolIDPrefix),
258+
location: location,
253259
ctx: cCtx,
254260
cancel: cancel,
255261
open: createOpenF(ctx, streamFunc),

bigquery/storage/managedwriter/client_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/googleapis/gax-go/v2"
2222
"google.golang.org/grpc"
23+
"google.golang.org/grpc/metadata"
2324
)
2425

2526
func TestTableParentFromStreamName(t *testing.T) {
@@ -53,6 +54,34 @@ func TestTableParentFromStreamName(t *testing.T) {
5354
}
5455
}
5556

57+
func TestCreatePool_Location(t *testing.T) {
58+
c := &Client{
59+
cfg: &writerClientConfig{},
60+
}
61+
pool, err := c.createPool(context.Background(), "foo", nil, nil)
62+
if err != nil {
63+
t.Fatalf("createPool: %v", err)
64+
}
65+
meta, ok := metadata.FromOutgoingContext(pool.ctx)
66+
if !ok {
67+
t.Fatalf("no metadata in outgoing context")
68+
}
69+
vals, ok := meta["x-goog-request-params"]
70+
if !ok {
71+
t.Fatalf("metadata key not present")
72+
}
73+
found := false
74+
for _, v := range vals {
75+
if v == "write_location=foo" {
76+
found = true
77+
break
78+
}
79+
}
80+
if !found {
81+
t.Fatal("expected location header not found")
82+
}
83+
}
84+
5685
// TestCreatePool tests the result of calling createPool with different combinations
5786
// of global configuration and per-writer configuration.
5887
func TestCreatePool(t *testing.T) {
@@ -126,7 +155,7 @@ func TestCreatePool(t *testing.T) {
126155
c := &Client{
127156
cfg: tc.cfg,
128157
}
129-
got, err := c.createPool(context.Background(), tc.settings, nil)
158+
got, err := c.createPool(context.Background(), "", tc.settings, nil)
130159
if err != nil {
131160
if !tc.wantErr {
132161
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)

bigquery/storage/managedwriter/connection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ var (
4343
// The pool retains references to connections, and maintains the mapping between writers
4444
// and connections.
4545
type connectionPool struct {
46-
id string
46+
id string
47+
location string // BQ region associated with this pool.
4748

4849
// the pool retains the long-lived context responsible for opening/maintaining bidi connections.
4950
ctx context.Context

0 commit comments

Comments
 (0)