Skip to content

Commit 8f90da3

Browse files
authored
feat(bigtable): add client startup time metrics. (#13521)
TODO: on adding the transport type as we need to make Prime() rpc blocking.
1 parent 1cfd100 commit 8f90da3

File tree

7 files changed

+165
-26
lines changed

7 files changed

+165
-26
lines changed

bigtable/bigtable.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli
119119

120120
// NewClientWithConfig creates a new client with the given config.
121121
func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
122+
clientCreationTimestamp := time.Now()
122123
metricsProvider := config.MetricsProvider
123124
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
124125
// Do not emit metrics when emulator is being used
@@ -199,6 +200,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
199200
}
200201
return btransport.NewBigtableConn(grpcConn), nil
201202
},
203+
clientCreationTimestamp,
202204
// options
203205
btransport.WithInstanceName(fullInstanceName),
204206
btransport.WithAppProfile(config.AppProfile),

bigtable/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module cloud.google.com/go/bigtable
22

33
go 1.24.0
44

5+
toolchain go1.25.5
6+
57
require (
68
cloud.google.com/go v0.123.0
79
cloud.google.com/go/iam v1.5.3

bigtable/internal/transport/connpool.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"time"
3232

3333
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
34+
"go.opentelemetry.io/otel/attribute"
3435
"go.opentelemetry.io/otel/metric"
3536
gtransport "google.golang.org/api/transport/grpc"
3637
"google.golang.org/grpc/credentials/alts"
@@ -344,7 +345,7 @@ func (p *BigtableChannelPool) getConns() []*connEntry {
344345
// NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func()
345346
// NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up.
346347
// We keep it consistent with the current channelpool behavior which is lazily initialized.
347-
func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) {
348+
func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), clientCreationTimestamp time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) {
348349
if connPoolSize <= 0 {
349350
return nil, fmt.Errorf("bigtable_connpool: connPoolSize must be positive")
350351
}
@@ -429,9 +430,39 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
429430
btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err)
430431
}
431432
pool.startMonitors()
433+
434+
// record the client startup time
435+
// TODO: currently Prime() is non-blocking, we will make Prime() blocking and infer the transport type here.
436+
transportType := "unknown"
437+
pool.recordClientStartUp(clientCreationTimestamp, transportType)
438+
432439
return pool, nil
433440
}
434441

442+
func (p *BigtableChannelPool) recordClientStartUp(clientCreationTimestamp time.Time, transportType string) {
443+
if p.meterProvider == nil {
444+
return
445+
}
446+
447+
meter := p.meterProvider.Meter(clientMeterName)
448+
// Define buckets for startup latency (in milliseconds)
449+
bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000}
450+
clientStartupTime, err := meter.Float64Histogram(
451+
"startup_time",
452+
metric.WithDescription("Total time for completion of logic of NewClientWithConfig"),
453+
metric.WithUnit("ms"),
454+
metric.WithExplicitBucketBoundaries(bucketBounds...),
455+
)
456+
457+
if err == nil {
458+
elapsedTime := float64(time.Since(clientCreationTimestamp).Milliseconds())
459+
clientStartupTime.Record(p.poolCtx, elapsedTime, metric.WithAttributes(
460+
attribute.String("transport_type", transportType),
461+
attribute.String("status", "OK"),
462+
))
463+
}
464+
}
465+
435466
func (p *BigtableChannelPool) startMonitors() {
436467
for _, m := range p.monitors {
437468
btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build synctest
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"testing"
22+
"testing/synctest"
23+
"time"
24+
25+
btopt "cloud.google.com/go/bigtable/internal/option"
26+
"go.opentelemetry.io/otel/attribute"
27+
"go.opentelemetry.io/otel/sdk/metric"
28+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
29+
)
30+
31+
func TestRecordClientStartUp(t *testing.T) {
32+
fake := &fakeService{}
33+
addr := setupTestServer(t, fake)
34+
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
35+
36+
synctest.Test(t, func(t *testing.T) {
37+
ctx := context.Background()
38+
reader := metric.NewManualReader()
39+
provider := metric.NewMeterProvider(metric.WithReader(reader))
40+
41+
poolSize := 1
42+
startTime := time.Now()
43+
sleepTimer := 500
44+
time.Sleep(time.Duration(sleepTimer) * time.Millisecond)
45+
46+
channelPoolOptions := append(poolOpts(), WithMeterProvider(provider))
47+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...)
48+
49+
if err != nil {
50+
t.Fatalf("NewBigtableChannelPool failed: %v", err)
51+
}
52+
53+
defer pool.Close()
54+
55+
// Collect metrics
56+
rm := metricdata.ResourceMetrics{}
57+
if err := reader.Collect(ctx, &rm); err != nil {
58+
t.Fatalf("Failed to collect metrics: %v", err)
59+
}
60+
61+
if len(rm.ScopeMetrics) == 0 {
62+
t.Fatalf("No scope metrics found")
63+
}
64+
sm := rm.ScopeMetrics[0]
65+
if sm.Scope.Name != clientMeterName {
66+
t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName)
67+
}
68+
69+
if len(sm.Metrics) == 0 {
70+
t.Fatalf("No metrics found")
71+
}
72+
m := sm.Metrics[0]
73+
74+
if m.Name != "startup_time" {
75+
t.Errorf("Metric name got %q, want %q", m.Name, "startup_time")
76+
}
77+
if m.Unit != "ms" {
78+
t.Errorf("Metric unit got %q, want %q", m.Unit, "ms")
79+
}
80+
81+
hist, ok := m.Data.(metricdata.Histogram[float64])
82+
if !ok {
83+
t.Fatalf("Metric data is not a Histogram: %T", m.Data)
84+
}
85+
86+
if len(hist.DataPoints) != 1 {
87+
t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints))
88+
}
89+
dp := hist.DataPoints[0]
90+
expectedAttrs := attribute.NewSet(
91+
attribute.String("transport_type", "unknown"),
92+
attribute.String("status", "OK"),
93+
)
94+
if !dp.Attributes.Equals(&expectedAttrs) {
95+
t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs)
96+
}
97+
if dp.Count != 1 {
98+
t.Errorf("Data point count got %d, want 1", dp.Count)
99+
}
100+
if dp.Sum != float64(sleepTimer) {
101+
t.Errorf("Expected %f, got %f", float64(sleepTimer), dp.Sum)
102+
}
103+
})
104+
}

bigtable/internal/transport/connpool_test.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestNewBigtableChannelPoolEdgeCases(t *testing.T) {
156156

157157
for _, tc := range tests {
158158
t.Run(tc.name, func(t *testing.T) {
159-
pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, poolOpts()...)
159+
pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, time.Now(), poolOpts()...)
160160
if tc.wantErr {
161161
if err == nil {
162162
t.Errorf("NewBigtableChannelPool(%d) succeeded, want error containing %q", tc.size, tc.errMatch)
@@ -331,7 +331,7 @@ func TestPoolInvoke(t *testing.T) {
331331
addr := setupTestServer(t, fake)
332332
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
333333

334-
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
334+
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
335335
if err != nil {
336336
t.Fatalf("Failed to create pool: %v", err)
337337
}
@@ -403,7 +403,7 @@ func TestPoolNewStream(t *testing.T) {
403403
addr := setupTestServer(t, fake)
404404
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
405405

406-
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
406+
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
407407
if err != nil {
408408
t.Fatalf("Failed to create pool: %v", err)
409409
}
@@ -467,7 +467,7 @@ func TestPoolNewStream(t *testing.T) {
467467
fake := &fakeService{}
468468
addr := setupTestServer(t, fake)
469469
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
470-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
470+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
471471
if err != nil {
472472
t.Fatalf("Failed to create pool: %v", err)
473473
}
@@ -518,7 +518,7 @@ func TestNewBigtableChannelPool(t *testing.T) {
518518
addr := setupTestServer(t, fake)
519519
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
520520

521-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
521+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
522522
if err != nil {
523523
t.Fatalf("NewBigtableChannelPool failed: %v", err)
524524
}
@@ -565,7 +565,7 @@ func TestNewBigtableChannelPool(t *testing.T) {
565565
return dialBigtableserver(addr)
566566
}
567567

568-
_, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
568+
_, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
569569
if err == nil {
570570
t.Errorf("NewBigtableChannelPool should have failed due to dial error")
571571
}
@@ -671,7 +671,7 @@ func TestCachingStreamDecrement(t *testing.T) {
671671
addr := setupTestServer(t, fake)
672672
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
673673

674-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
674+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
675675
if err != nil {
676676
t.Fatalf("Failed to create pool: %v", err)
677677
}
@@ -775,7 +775,7 @@ func TestMultipleStreamsSingleConn(t *testing.T) {
775775
addr := setupTestServer(t, fake)
776776
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
777777

778-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
778+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
779779
if err != nil {
780780
t.Fatalf("Failed to create pool: %v", err)
781781
}
@@ -850,7 +850,7 @@ func TestPoolClose(t *testing.T) {
850850
fake := &fakeService{}
851851
addr := setupTestServer(t, fake)
852852
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
853-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
853+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
854854
if err != nil {
855855
t.Fatalf("Failed to create pool: %v", err)
856856
}
@@ -873,7 +873,7 @@ func TestGracefulDraining(t *testing.T) {
873873
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
874874

875875
t.Run("DrainingOnReplaceConnection", func(t *testing.T) {
876-
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
876+
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
877877
if err != nil {
878878
t.Fatalf("Failed to create pool: %v", err)
879879
}
@@ -941,7 +941,7 @@ func TestGracefulDraining(t *testing.T) {
941941
})
942942

943943
t.Run("SelectionSkipsDrainingConns", func(t *testing.T) {
944-
pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, poolOpts()...)
944+
pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
945945
if err != nil {
946946
t.Fatalf("Failed to create pool: %v", err)
947947
}
@@ -978,7 +978,7 @@ func TestGracefulDraining(t *testing.T) {
978978
maxDrainingTimeout = 100 * time.Millisecond
979979
defer func() { maxDrainingTimeout = originalTimeout }()
980980

981-
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
981+
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
982982
if err != nil {
983983
t.Fatalf("Failed to create pool: %v", err)
984984
}
@@ -1037,7 +1037,7 @@ func TestReplaceConnection(t *testing.T) {
10371037
mu.Unlock()
10381038
atomic.StoreInt32(&dialCount, 0)
10391039

1040-
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
1040+
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
10411041
if err != nil {
10421042
t.Fatalf("Failed to create pool: %v", err)
10431043
}
@@ -1071,7 +1071,7 @@ func TestReplaceConnection(t *testing.T) {
10711071
mu.Unlock()
10721072
atomic.StoreInt32(&dialCount, 0)
10731073

1074-
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
1074+
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
10751075
if err != nil {
10761076
t.Fatalf("Failed to create pool: %v", err)
10771077
}
@@ -1100,7 +1100,7 @@ func TestReplaceConnection(t *testing.T) {
11001100
mu.Unlock()
11011101
atomic.StoreInt32(&dialCount, 0)
11021102

1103-
poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
1103+
poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
11041104
if err != nil {
11051105
t.Fatalf("Failed to create poolCancelled: %v", err)
11061106
}
@@ -1130,7 +1130,7 @@ func TestReplaceConnection(t *testing.T) {
11301130
fake.setPingErr(pingErr)
11311131
atomic.StoreInt32(&dialCount, 0)
11321132

1133-
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
1133+
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
11341134
if err != nil {
11351135
t.Fatalf("Failed to create pool: %v", err)
11361136
}
@@ -1250,7 +1250,7 @@ func TestAddConnections(t *testing.T) {
12501250
innerCtx, cancel := context.WithCancel(ctx)
12511251
defer cancel()
12521252

1253-
pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, poolOpts()...)
1253+
pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, time.Now(), poolOpts()...)
12541254
if err != nil {
12551255
t.Fatalf("Failed to create pool: %v", err)
12561256
}
@@ -1353,7 +1353,7 @@ func TestRemoveConnections(t *testing.T) {
13531353

13541354
for _, tc := range tests {
13551355
t.Run(tc.name, func(t *testing.T) {
1356-
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...)
1356+
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
13571357
if err != nil {
13581358
t.Fatalf("Failed to create pool: %v", err)
13591359
}
@@ -1375,7 +1375,7 @@ func TestRemoveConnections(t *testing.T) {
13751375

13761376
t.Run("VerifyOldestIsRemoved", func(t *testing.T) {
13771377
poolSize := 5
1378-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
1378+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
13791379
if err != nil {
13801380
t.Fatalf("Failed to create pool: %v", err)
13811381
}
@@ -1449,7 +1449,7 @@ func TestConnPoolStatisticsVisitor(t *testing.T) {
14491449
addr := setupTestServer(t, fake)
14501450
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
14511451

1452-
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
1452+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
14531453
if err != nil {
14541454
t.Fatalf("Failed to create pool: %v", err)
14551455
}
@@ -1537,7 +1537,7 @@ func setupBenchmarkPool(b *testing.B, strategy btopt.LoadBalancingStrategy, pool
15371537
}
15381538

15391539
ctx := context.Background()
1540-
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
1540+
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
15411541
if err != nil {
15421542
b.Fatalf("Failed to create pool: %v", err)
15431543
}

0 commit comments

Comments
 (0)