From b9d20984c514b4632f7c613e4d6ab58b1776a939 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Thu, 4 Dec 2025 16:28:06 -0500 Subject: [PATCH] READY feat(bigtable): add metrics exporter in the client to export outstandingrpcs and perconnectionerror count (#3) * comments: * WIP * WIP * WIP * Fix review comments --- bigtable/bigtable.go | 4 +- bigtable/internal/option/option.go | 15 ++ bigtable/internal/transport/connpool.go | 67 ++++++ bigtable/internal/transport/connpool_test.go | 82 +++++++ .../internal/transport/metrics_reporter.go | 157 +++++++++++++ .../transport/metrics_reporter_test.go | 207 ++++++++++++++++++ bigtable/metrics.go | 4 + bigtable/otel_metrics.go | 18 +- 8 files changed, 544 insertions(+), 10 deletions(-) create mode 100644 bigtable/internal/transport/metrics_reporter.go create mode 100644 bigtable/internal/transport/metrics_reporter_test.go diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 62c709e73d00..ebcf701bbfb9 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -203,6 +203,8 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C btransport.WithInstanceName(fullInstanceName), btransport.WithAppProfile(config.AppProfile), btransport.WithFeatureFlagsMetadata(ffMD), + btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()), + btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider), ) if err != nil { @@ -212,7 +214,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C // Validate dynamic config early if enabled if config.EnableDynamicChannelPool { - if err := btransport.ValidateDynamicConfig(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize)); err != nil { + if err := btransport.ValidateDynamicConfig(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize), defaultBigtableConnPoolSize); err != nil { return nil, fmt.Errorf("invalid DynamicChannelPoolConfig: %w", err) } diff --git a/bigtable/internal/option/option.go b/bigtable/internal/option/option.go index c87ec097262a..9df4eb90d973 100644 --- a/bigtable/internal/option/option.go +++ b/bigtable/internal/option/option.go @@ -252,3 +252,18 @@ func DefaultDynamicChannelPoolConfig(initialConns int) DynamicChannelPoolConfig MaxRemoveConns: 2, // Only Cap for removals } } + +// MetricsReporterConfig for periodic reporting +// MetricsReporterConfig holds the parameters for metrics reporting. +type MetricsReporterConfig struct { + Enabled bool + ReportingInterval time.Duration +} + +// DefaultMetricsReporterConfig with defaults used. +func DefaultMetricsReporterConfig() MetricsReporterConfig { + return MetricsReporterConfig{ + Enabled: true, + ReportingInterval: 1 * time.Minute, + } +} diff --git a/bigtable/internal/transport/connpool.go b/bigtable/internal/transport/connpool.go index 20cf4f12aee0..332e7de19c98 100644 --- a/bigtable/internal/transport/connpool.go +++ b/bigtable/internal/transport/connpool.go @@ -49,6 +49,20 @@ const requestParamsHeader = "x-goog-request-params" // BigtableChannelPoolOption options for configurable type BigtableChannelPoolOption func(*BigtableChannelPool) +// connPoolStatsSupplier callback that returns a snapshot of connection pool statistics. +type connPoolStatsSupplier func() []connPoolStats + +// connPoolStats holds a snapshot of statistics for a single connection. +type connPoolStats struct { + OutstandingUnaryLoad int32 + OutstandingStreamingLoad int32 + ErrorCount int64 + IsALTSUsed bool + LBPolicy string +} + +var _ Monitor = (*MetricsReporter)(nil) + // WithAppProfile provides the appProfile func WithAppProfile(appProfile string) BigtableChannelPoolOption { return func(p *BigtableChannelPool) { @@ -134,6 +148,28 @@ func (bc *BigtableConn) Prime(ctx context.Context, fullInstanceName, appProfileI return nil } +// connPoolStatsSupplier returns a snapshot of the current connection pool statistics. +func (p *BigtableChannelPool) connPoolStatsSupplier() []connPoolStats { + conns := p.getConns() + if len(conns) == 0 { + return nil + } + + stats := make([]connPoolStats, len(conns)) + lbPolicy := p.strategy.String() + + for i, entry := range conns { + stats[i] = connPoolStats{ + OutstandingUnaryLoad: entry.unaryLoad.Load(), + OutstandingStreamingLoad: entry.streamingLoad.Load(), + ErrorCount: entry.errorCount.Swap(0), + IsALTSUsed: entry.isALTSUsed(), + LBPolicy: lbPolicy, + } + } + return stats +} + // NewBigtableConn creates a wrapped grpc Client Conn func NewBigtableConn(conn *grpc.ClientConn) *BigtableConn { bc := &BigtableConn{ @@ -242,6 +278,15 @@ type BigtableChannelPool struct { instanceName string featureFlagsMD metadata.MD meterProvider metric.MeterProvider + // configs + metricsConfig btopt.MetricsReporterConfig + + // background monitors + monitors []Monitor +} + +func WithMetricsReporterConfig(config btopt.MetricsReporterConfig) BigtableChannelPoolOption { + return func(p *BigtableChannelPool) { p.metricsConfig = config } } // getConns safely loads the current slice of connections. @@ -330,9 +375,27 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop } pool.conns.Store(&initialConns) + + btopt.Debugf(pool.logger, "bigtable_connpool: using load balancing strategy: %s\n", strategy) + + metricsReporter, err := NewMetricsReporter(pool.metricsConfig, pool.connPoolStatsSupplier, pool.logger, pool.meterProvider) + if err == nil { + // ignore + pool.monitors = append(pool.monitors, metricsReporter) + } else { + btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err) + } + pool.startMonitors() return pool, nil } +func (p *BigtableChannelPool) startMonitors() { + for _, m := range p.monitors { + btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m) + m.Start(p.poolCtx) + } +} + // Num returns the number of connections in the pool. func (p *BigtableChannelPool) Num() int { return len(p.getConns()) @@ -341,6 +404,10 @@ func (p *BigtableChannelPool) Num() int { // Close closes all connections in the pool. func (p *BigtableChannelPool) Close() error { p.poolCancel() // Cancel the context for background tasks + // Stop all monitors. + for _, m := range p.monitors { + m.Stop() + } conns := p.getConns() var errs multiError diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index 9a79ef403aa9..c87675ce76f8 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "net/url" + "reflect" "sort" "strings" "sync" @@ -1390,6 +1391,87 @@ func TestRemoveConnections(t *testing.T) { }) } +func TestConnPoolStatisticsVisitor(t *testing.T) { + ctx := context.Background() + poolSize := 3 + fake := &fakeService{} + addr := setupTestServer(t, fake) + dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + // Wait for connections to be established + time.Sleep(100 * time.Millisecond) + + conns := pool.getConns() + if len(conns) != poolSize { + t.Fatalf("Pool size mismatch: got %d, want %d", len(conns), poolSize) + } + + testData := []struct { + unary int32 + streaming int32 + errors int64 + isALTS bool + }{ + {unary: 5, streaming: 2, errors: 10, isALTS: false}, + {unary: 0, streaming: 1, errors: 0, isALTS: true}, + {unary: 3, streaming: 0, errors: 5, isALTS: false}, + } + + if len(testData) != poolSize { + t.Fatalf("Test data size: pool size mismatch: got %d, want %d", len(testData), poolSize) + } + + for i, data := range testData { + if i < len(conns) { + conns[i].unaryLoad.Store(data.unary) + conns[i].streamingLoad.Store(data.streaming) + conns[i].errorCount.Store(data.errors) + conns[i].conn.isALTSConn.Store(data.isALTS) + } + } + + // Get the snapshot + stats := pool.connPoolStatsSupplier() + + if len(stats) != poolSize { + t.Errorf("Snapshot size mismatch: got %d, want %d", len(stats), poolSize) + } + + expectedStats := make([]connPoolStats, poolSize) + for i, data := range testData { + expectedStats[i] = connPoolStats{ + OutstandingUnaryLoad: data.unary, + OutstandingStreamingLoad: data.streaming, + ErrorCount: data.errors, + IsALTSUsed: data.isALTS, + LBPolicy: btopt.RoundRobin.String(), + } + } + + sort.Slice(stats, func(i, j int) bool { return stats[i].OutstandingUnaryLoad < stats[j].OutstandingUnaryLoad }) + sort.Slice(expectedStats, func(i, j int) bool { + return expectedStats[i].OutstandingUnaryLoad < expectedStats[j].OutstandingUnaryLoad + }) + + if !reflect.DeepEqual(stats, expectedStats) { + t.Errorf("Snapshot data mismatch:\ngot: %v\nwant: %v", stats, expectedStats) + } + + // Verify error counts are reset + connsAfter := pool.getConns() + for i, entry := range connsAfter { + if entry.errorCount.Load() != 0 { + t.Errorf("entry[%d].errorCount was not reset: got %d, want 0", i, entry.errorCount.Load()) + } + } +} + // --- Benchmarks --- func createBenchmarkFake() *fakeService { diff --git a/bigtable/internal/transport/metrics_reporter.go b/bigtable/internal/transport/metrics_reporter.go new file mode 100644 index 000000000000..bcab93223c67 --- /dev/null +++ b/bigtable/internal/transport/metrics_reporter.go @@ -0,0 +1,157 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + btopt "cloud.google.com/go/bigtable/internal/option" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + // outstandingRPCsMetricName is the name for the outstanding RPCs histogram. + outstandingRPCsMetricName = "connection_pool/outstanding_rpcs" + // perConnErrorCountMetricName is the name for the per-connection error count histogram. + perConnErrorCountMetricName = "per_connection_error_count" + // clientMeterName is the prefix for metrics name + clientMeterName = "bigtable.googleapis.com/internal/client/" +) + +// MetricsReporter periodically collects and reports metrics for the connection pool. +type MetricsReporter struct { + config btopt.MetricsReporterConfig + connPoolStatsSupplier connPoolStatsSupplier + logger *log.Logger + ticker *time.Ticker + done chan struct{} + stopOnce sync.Once + + // OpenTelemetry metric instruments + meterProvider metric.MeterProvider + outstandingRPCsHistogram metric.Float64Histogram + perConnectionErrorCountHistogram metric.Float64Histogram +} + +// NewMetricsReporter starts a monitor to export periodic metrics +func NewMetricsReporter(config btopt.MetricsReporterConfig, connPoolStatsSupplier connPoolStatsSupplier, logger *log.Logger, mp metric.MeterProvider) (*MetricsReporter, error) { + mr := &MetricsReporter{ + config: config, + connPoolStatsSupplier: connPoolStatsSupplier, + logger: logger, + done: make(chan struct{}), + meterProvider: mp, + } + + if !config.Enabled { + // Stats Disabled, return error during ctor + return nil, fmt.Errorf("bigtable_connpool: MetricsReporterConfig.Enabled is false") + } + + if mp == nil { + // State: Stats Enabled but MeterProvider is nil, return error during ctor + return nil, fmt.Errorf("bigtable_connpool: MetricsReporterConfig.Enabled is true, but MeterProvider is nil") + } + + // create meter + meter := mp.Meter(clientMeterName) + var err error + + // fail if meter cannot be created. + mr.outstandingRPCsHistogram, err = meter.Float64Histogram(outstandingRPCsMetricName, metric.WithDescription("Distribution of outstanding RPCs per connection."), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("bigtable_connpool: failed to create %s histogram: %w", outstandingRPCsMetricName, err) + } + + mr.perConnectionErrorCountHistogram, err = meter.Float64Histogram(perConnErrorCountMetricName, metric.WithDescription("Distribution of errors per connection per minute."), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("bigtable_connpool: failed to create %s histogram: %w", perConnErrorCountMetricName, err) + } + + return mr, nil +} + +// Start starts the reporter. +func (mr *MetricsReporter) Start(ctx context.Context) { + // config.Enabled should ensure that all relevant thing (meterProvider and meter should exists) + if !mr.config.Enabled { + return + } + + mr.ticker = time.NewTicker(mr.config.ReportingInterval) + go func() { + defer mr.ticker.Stop() + for { + select { + case <-mr.ticker.C: + mr.snapshotAndRecordMetrics(ctx) + case <-mr.done: + return + case <-ctx.Done(): + return + } + } + }() +} + +// Stop stops the reporter gracefully +func (mr *MetricsReporter) Stop() { + mr.stopOnce.Do(func() { + if mr.config.Enabled { + close(mr.done) + } + }) +} + +// snapshotAndRecordMetrics collects and records metrics for the current state of the connection pool. +func (mr *MetricsReporter) snapshotAndRecordMetrics(ctx context.Context) { + stats := mr.connPoolStatsSupplier() + if len(stats) == 0 { + return + } + + for _, stat := range stats { + // Record per-connection error count for the interval + if mr.perConnectionErrorCountHistogram != nil { + mr.perConnectionErrorCountHistogram.Record(ctx, float64(stat.ErrorCount)) + } + transportType := "cloudpath" + if stat.IsALTSUsed { + transportType = "directpath" + } + + // Common attributes for this connection + baseAttrs := []attribute.KeyValue{ + attribute.String("transport_type", transportType), + attribute.String("lb_policy", stat.LBPolicy), + } + + if mr.outstandingRPCsHistogram != nil { + // Record distribution sample for unary load + unaryAttrs := attribute.NewSet(append(baseAttrs, attribute.Bool("streaming", false))...) + mr.outstandingRPCsHistogram.Record(ctx, float64(stat.OutstandingUnaryLoad), metric.WithAttributeSet(unaryAttrs)) + + // Record distribution sample for streaming load + streamingAttrs := attribute.NewSet(append(baseAttrs, attribute.Bool("streaming", true))...) + mr.outstandingRPCsHistogram.Record(ctx, float64(stat.OutstandingStreamingLoad), metric.WithAttributeSet(streamingAttrs)) + } + + } +} diff --git a/bigtable/internal/transport/metrics_reporter_test.go b/bigtable/internal/transport/metrics_reporter_test.go new file mode 100644 index 000000000000..ff39c228f737 --- /dev/null +++ b/bigtable/internal/transport/metrics_reporter_test.go @@ -0,0 +1,207 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + btopt "cloud.google.com/go/bigtable/internal/option" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/grpc" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +func findMetric(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + fmt.Print(m.Name) + if m.Name == name { + return m, true + } + } + } + return metricdata.Metrics{}, false +} + +func TestMetricsExporting(t *testing.T) { + ctx := context.Background() + fake := &fakeService{} + addr := setupTestServer(t, fake) + dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + + poolSize := 2 + strategy := btopt.RoundRobin + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig())) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + // Wait for initial priming to settle + time.Sleep(100 * time.Millisecond) + + // Action 1: Successful Invoke on conn 0 + req := &testpb.SimpleRequest{Payload: &testpb.Payload{Body: []byte("hello")}} + res := &testpb.SimpleResponse{} + if err := pool.Invoke(ctx, "/grpc.testing.BenchmarkService/UnaryCall", req, res); err != nil { + t.Errorf("Invoke failed: %v", err) + } + + // Action 2: Failed Invoke on conn 1 + fake.serverErr = errors.New("simulated error") + pool.Invoke(ctx, "/grpc.testing.BenchmarkService/UnaryCall", req, res) // Error expected + fake.serverErr = nil + + // Action 3: Successful Stream on conn 0 + stream, err := pool.NewStream(ctx, &grpc.StreamDesc{StreamName: "StreamingCall"}, "/grpc.testing.BenchmarkService/StreamingCall") + if err != nil { + t.Fatalf("NewStream failed: %v", err) + } + stream.SendMsg(req) + stream.RecvMsg(res) + stream.CloseSend() + for { + if err := stream.RecvMsg(res); err != nil { + break + } + } + + // Action 4: Stream with SendMsg error on conn 1 + fake.streamSendErr = errors.New("simulated send error") + stream2, err := pool.NewStream(ctx, &grpc.StreamDesc{StreamName: "StreamingCall"}, "/grpc.testing.BenchmarkService/StreamingCall") + if err != nil { + t.Fatalf("NewStream 2 failed: %v", err) + } + stream2.SendMsg(req) // Expect error, triggers load decrement and error count + stream2.CloseSend() + for { + if err := stream2.RecvMsg(res); err != nil { + break + } + } + fake.streamSendErr = nil + + time.Sleep(20 * time.Millisecond) // Allow stream error handling to complete + + // Find the MetricsReporter and force metrics collection + var metricsReporter *MetricsReporter + for _, monitor := range pool.monitors { + if mr, ok := monitor.(*MetricsReporter); ok { + metricsReporter = mr + break + } + } + + // Force metrics collection + metricsReporter.snapshotAndRecordMetrics(ctx) + + rm := metricdata.ResourceMetrics{} + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatalf("Failed to collect metrics: %v", err) + } + + // --- Check outstanding_rpcs --- + outstandingRPCs, ok := findMetric(rm, "connection_pool/outstanding_rpcs") + if !ok { + t.Fatalf("Metric connection_pool/outstanding_rpcs not found") + } + + hist, ok := outstandingRPCs.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("Metric connection_pool/outstanding_rpcs is not a Histogram[float64]: %T", outstandingRPCs.Data) + } + + // Expected: poolSize * 2 data points (unary and streaming for each connection) + if len(hist.DataPoints) != 2 { + t.Errorf("Outstanding RPCs histogram has %d data points, want %d", len(hist.DataPoints), poolSize*2) + } + + var totalUnary, totalStreaming float64 + for _, dp := range hist.DataPoints { + attrMap := make(map[string]attribute.Value) + for _, kv := range dp.Attributes.ToSlice() { + attrMap[string(kv.Key)] = kv.Value + } + + if val, ok := attrMap["lb_policy"]; !ok || val.AsString() != strategy.String() { + t.Errorf("Missing or incorrect lb_policy attribute: want %v, got %v", strategy.String(), val) + } + if val, ok := attrMap["transport_type"]; !ok || val.AsString() != "cloudpath" { + t.Errorf("Missing or incorrect transport_type attribute: want 'cloudpath', got %v", val) + } + + streamingVal, ok := attrMap["streaming"] + if !ok { + t.Errorf("Missing streaming attribute") + continue + } + if streamingVal.Type() != attribute.BOOL { + t.Errorf("streaming attribute is not a BOOL: got %v", streamingVal.Type()) + continue + } + isStreaming := streamingVal.AsBool() + + if isStreaming { + totalStreaming += dp.Sum + } else { + totalUnary += dp.Sum + } + } + if totalUnary != 0 { + t.Errorf("Total Unary load sum is %f, want 0", totalUnary) + } + if totalStreaming != 0 { + t.Errorf("Total Streaming load sum is %f, want 0", totalStreaming) + } + + // --- Check per_connection_error_count --- + errorCount, ok := findMetric(rm, "per_connection_error_count") + if !ok { + t.Fatalf("Metric per_connection_error_count not found") + } + errorHist, ok := errorCount.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("Metric per_connection_error_count is not a Histogram[float64]: %T", errorCount.Data) + } + + if len(errorHist.DataPoints) != 1 { + t.Errorf("Error Count histogram has %d data points, want %d", len(errorHist.DataPoints), poolSize) + } + + var totalErrorSum float64 + for _, dp := range errorHist.DataPoints { + totalErrorSum += dp.Sum + } + // Expected errors: 1 from Invoke, 1 from Stream SendMsg + if totalErrorSum != 2 { + t.Errorf("Total Error Count sum is %f, want 2", totalErrorSum) + } + + // Check if error counts on entries are reset + for _, entry := range pool.getConns() { + if entry.errorCount.Load() != 0 { + t.Errorf("entry.errorCount is %d after metric collection, want 0", entry.errorCount.Load()) + } + } +} diff --git a/bigtable/metrics.go b/bigtable/metrics.go index 738dc6372d20..b852252fb447 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -195,6 +195,9 @@ type builtinMetricsTracerFactory struct { // do not change across different function calls on client clientAttributes []attribute.KeyValue + // otelMeterProvider + otelMeterProvider metric.MeterProvider + operationLatencies metric.Float64Histogram serverLatencies metric.Float64Histogram attemptLatencies metric.Float64Histogram @@ -260,6 +263,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appP // the error from newOtelMetricsContext is silently ignored since metrics are not critical to client creation. if err == nil { tracerFactory.clientOpts = otelContext.clientOpts + tracerFactory.otelMeterProvider = otelContext.otelMeterProvider } tracerFactory.shutdown = func() { if otelContext != nil { diff --git a/bigtable/otel_metrics.go b/bigtable/otel_metrics.go index 2f150de0dcf5..b7b35b12de99 100644 --- a/bigtable/otel_metrics.go +++ b/bigtable/otel_metrics.go @@ -127,11 +127,11 @@ func newBigtableClientMonitoredResource(ctx context.Context, project, appProfile return smr, nil } -type metricsContext struct { +type otelMetricsContext struct { // client options passed to gRPC channels clientOpts []option.ClientOption // instance of metric reader used by gRPC client-side metrics - provider *metric.MeterProvider + otelMeterProvider *metric.MeterProvider // clean func to call when closing gRPC client close func() } @@ -149,7 +149,7 @@ type metricsConfig struct { resourceOpts []resource.Option // used by tests } -func newOtelMetricsContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { +func newOtelMetricsContext(ctx context.Context, cfg metricsConfig) (*otelMetricsContext, error) { var exporter metric.Exporter meterOpts := []metric.Option{} if cfg.customExporter == nil { @@ -187,9 +187,9 @@ func newOtelMetricsContext(ctx context.Context, cfg metricsConfig) (*metricsCont meterOpts = append(meterOpts, metric.WithReader( metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval)))) } - provider := metric.NewMeterProvider(meterOpts...) + otelMeterProvider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ - MeterProvider: provider, + MeterProvider: otelMeterProvider, Metrics: stats.NewMetricSet( "grpc.client.attempt.duration", "grpc.lb.rls.default_target_picks", @@ -206,11 +206,11 @@ func newOtelMetricsContext(ctx context.Context, cfg metricsConfig) (*metricsCont option.WithGRPCDialOption( grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), } - return &metricsContext{ - clientOpts: opts, - provider: provider, + return &otelMetricsContext{ + clientOpts: opts, + otelMeterProvider: otelMeterProvider, close: func() { - provider.Shutdown(ctx) + otelMeterProvider.Shutdown(ctx) }, }, nil }