Skip to content

Commit 8c6da0d

Browse files
authored
fix(bigtable): Fix NoopMetricsProvider panic (#12709)
1 parent a366278 commit 8c6da0d

File tree

3 files changed

+203
-143
lines changed

3 files changed

+203
-143
lines changed

bigtable/bigtable.go

Lines changed: 24 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ import (
3636
"cloud.google.com/go/internal/trace"
3737
gax "github.com/googleapis/gax-go/v2"
3838
"github.com/googleapis/gax-go/v2/apierror"
39-
"go.opentelemetry.io/otel/attribute"
40-
"go.opentelemetry.io/otel/metric"
4139
"google.golang.org/api/option"
4240
"google.golang.org/api/option/internaloption"
4341
gtransport "google.golang.org/api/transport/grpc"
@@ -140,19 +138,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
140138
o = append(o, opts...)
141139

142140
// TODO(b/372244283): Remove after b/358175516 has been fixed
143-
asyncRefreshMetricAttrs := metricsTracerFactory.clientAttributes
144-
asyncRefreshMetricAttrs = append(asyncRefreshMetricAttrs,
145-
attribute.String(metricLabelKeyTag, "async_refresh_dry_run"),
146-
// Table, cluster and zone are unknown at this point
147-
// Use default values
148-
attribute.String(monitoredResLabelKeyTable, defaultTable),
149-
attribute.String(monitoredResLabelKeyCluster, defaultCluster),
150-
attribute.String(monitoredResLabelKeyZone, defaultZone),
151-
)
152-
o = append(o, internaloption.EnableAsyncRefreshDryRun(func() {
153-
metricsTracerFactory.debugTags.Add(context.Background(), 1,
154-
metric.WithAttributes(asyncRefreshMetricAttrs...))
155-
}))
141+
o = append(o, internaloption.EnableAsyncRefreshDryRun(metricsTracerFactory.newAsyncRefreshErrHandler()))
156142

157143
connPool, err := gtransport.DialPool(ctx, o...)
158144
if err != nil {
@@ -528,11 +514,11 @@ func (c *Client) prepareStatementWithMetadata(ctx context.Context, query string,
528514
defer func() { trace.EndSpan(ctx, err) }()
529515

530516
mt := c.newBuiltinMetricsTracer(ctx, "", false)
531-
defer recordOperationCompletion(mt)
517+
defer mt.recordOperationCompletion()
532518

533519
preparedStatement, err = c.prepareStatement(ctx, mt, query, paramTypes, opts...)
534520
statusCode, statusErr := convertToGrpcStatusErr(err)
535-
mt.currOp.setStatus(statusCode.String())
521+
mt.setCurrOpStatus(statusCode.String())
536522
return preparedStatement, statusErr
537523
}
538524

@@ -707,11 +693,11 @@ func (bs *BoundStatement) Execute(ctx context.Context, f func(ResultRow) bool, o
707693
defer func() { trace.EndSpan(ctx, err) }()
708694

709695
mt := bs.ps.c.newBuiltinMetricsTracer(ctx, "", true)
710-
defer recordOperationCompletion(mt)
696+
defer mt.recordOperationCompletion()
711697

712698
err = bs.execute(ctx, f, mt)
713699
statusCode, statusErr := convertToGrpcStatusErr(err)
714-
mt.currOp.setStatus(statusCode.String())
700+
mt.setCurrOpStatus(statusCode.String())
715701
return statusErr
716702
}
717703

@@ -965,11 +951,11 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
965951
defer func() { trace.EndSpan(ctx, err) }()
966952

967953
mt := t.newBuiltinMetricsTracer(ctx, true)
968-
defer recordOperationCompletion(mt)
954+
defer mt.recordOperationCompletion()
969955

970956
err = t.readRows(ctx, arg, f, mt, opts...)
971957
statusCode, statusErr := convertToGrpcStatusErr(err)
972-
mt.currOp.setStatus(statusCode.String())
958+
mt.setCurrOpStatus(statusCode.String())
973959
return statusErr
974960
}
975961

@@ -1082,7 +1068,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
10821068
appBlockingLatencyStart := time.Now()
10831069
continueReading := f(row)
10841070
numRowsRead++
1085-
mt.currOp.incrementAppBlockingLatency(convertToMs(time.Since(appBlockingLatencyStart)))
1071+
mt.incrementAppBlockingLatency(convertToMs(time.Since(appBlockingLatencyStart)))
10861072

10871073
if !continueReading {
10881074
// Cancel and drain stream.
@@ -1640,11 +1626,11 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
16401626
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
16411627
defer func() { trace.EndSpan(ctx, err) }()
16421628
mt := t.newBuiltinMetricsTracer(ctx, false)
1643-
defer recordOperationCompletion(mt)
1629+
defer mt.recordOperationCompletion()
16441630

16451631
err = t.apply(ctx, mt, row, m, opts...)
16461632
statusCode, statusErr := convertToGrpcStatusErr(err)
1647-
mt.currOp.setStatus(statusCode.String())
1633+
mt.setCurrOpStatus(statusCode.String())
16481634
return statusErr
16491635
}
16501636

@@ -1918,7 +1904,7 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
19181904
func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...ApplyOption) (err error) {
19191905
attrMap := make(map[string]interface{})
19201906
mt := t.newBuiltinMetricsTracer(ctx, true)
1921-
defer recordOperationCompletion(mt)
1907+
defer mt.recordOperationCompletion()
19221908

19231909
err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
19241910
attrMap["rowCount"] = len(group)
@@ -1939,7 +1925,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
19391925
}, t.c.retryOption)
19401926

19411927
statusCode, statusErr := convertToGrpcStatusErr(err)
1942-
mt.currOp.setStatus(statusCode.String())
1928+
mt.setCurrOpStatus(statusCode.String())
19431929
return statusErr
19441930
}
19451931

@@ -2080,11 +2066,11 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod
20802066
ctx = mergeOutgoingMetadata(ctx, t.md)
20812067

20822068
mt := t.newBuiltinMetricsTracer(ctx, false)
2083-
defer recordOperationCompletion(mt)
2069+
defer mt.recordOperationCompletion()
20842070

20852071
updatedRow, err := t.applyReadModifyWrite(ctx, mt, row, m)
20862072
statusCode, statusErr := convertToGrpcStatusErr(err)
2087-
mt.currOp.setStatus(statusCode.String())
2073+
mt.setCurrOpStatus(statusCode.String())
20882074
return updatedRow, statusErr
20892075
}
20902076

@@ -2161,11 +2147,11 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
21612147
ctx = mergeOutgoingMetadata(ctx, t.md)
21622148

21632149
mt := t.newBuiltinMetricsTracer(ctx, true)
2164-
defer recordOperationCompletion(mt)
2150+
defer mt.recordOperationCompletion()
21652151

21662152
rowKeys, err := t.sampleRowKeys(ctx, mt)
21672153
statusCode, statusErr := convertToGrpcStatusErr(err)
2168-
mt.currOp.setStatus(statusCode.String())
2154+
mt.setCurrOpStatus(statusCode.String())
21692155
return rowKeys, statusErr
21702156
}
21712157

@@ -2227,34 +2213,6 @@ func (c *Client) newBuiltinMetricsTracer(ctx context.Context, table string, isSt
22272213
return &mt
22282214
}
22292215

2230-
// recordOperationCompletion records as many operation specific metrics as it can
2231-
// Ignores error seen while creating metric attributes since metric can still
2232-
// be recorded with rest of the attributes
2233-
func recordOperationCompletion(mt *builtinMetricsTracer) {
2234-
if !mt.builtInEnabled {
2235-
return
2236-
}
2237-
2238-
// Calculate elapsed time
2239-
elapsedTimeMs := convertToMs(time.Since(mt.currOp.startTime))
2240-
2241-
// Record operation_latencies
2242-
opLatAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationLatencies)
2243-
mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributeSet(opLatAttrs))
2244-
2245-
// Record retry_count
2246-
retryCntAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount)
2247-
if mt.currOp.attemptCount > 1 {
2248-
// Only record when retry count is greater than 0 so the retry
2249-
// graph will be less confusing
2250-
mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributeSet(retryCntAttrs))
2251-
}
2252-
2253-
// Record application_latencies
2254-
appBlockingLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAppBlockingLatencies)
2255-
mt.instrumentAppBlockingLatencies.Record(mt.ctx, mt.currOp.appBlockingLatency, metric.WithAttributeSet(appBlockingLatAttrs))
2256-
}
2257-
22582216
// gaxInvokeWithRecorder:
22592217
// - wraps 'f' in a new function 'callWrapper' that:
22602218
// - updates tracer state and records built in attempt specific metrics
@@ -2267,82 +2225,16 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
22672225
attempTrailerMD := metadata.New(nil)
22682226
mt.setMethod(method)
22692227

2270-
var callWrapper func(context.Context, gax.CallSettings) error
2271-
if !mt.builtInEnabled {
2272-
callWrapper = func(ctx context.Context, callSettings gax.CallSettings) error {
2273-
// f makes calls to CBT service
2274-
return f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings)
2275-
}
2276-
} else {
2277-
callWrapper = func(ctx context.Context, callSettings gax.CallSettings) error {
2278-
// Increment number of attempts
2279-
mt.currOp.incrementAttemptCount()
2280-
2281-
mt.currOp.currAttempt = attemptTracer{}
2282-
2283-
// record start time
2284-
mt.currOp.currAttempt.setStartTime(time.Now())
2285-
2286-
// f makes calls to CBT service
2287-
err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings)
2288-
2289-
// Set attempt status
2290-
statusCode, _ := convertToGrpcStatusErr(err)
2291-
mt.currOp.currAttempt.setStatus(statusCode.String())
2228+
callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error {
2229+
mt.recordAttemptStart()
22922230

2293-
// Get location attributes from metadata and set it in tracer
2294-
// Ignore get location error since the metric can still be recorded with rest of the attributes
2295-
clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD)
2296-
mt.currOp.currAttempt.setClusterID(clusterID)
2297-
mt.currOp.currAttempt.setZoneID(zoneID)
2231+
// f makes calls to CBT service
2232+
err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings)
22982233

2299-
// Set server latency in tracer
2300-
serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD)
2301-
mt.currOp.currAttempt.setServerLatencyErr(serverLatencyErr)
2302-
mt.currOp.currAttempt.setServerLatency(serverLatency)
2303-
2304-
// Record attempt specific metrics
2305-
recordAttemptCompletion(mt)
2306-
return err
2307-
}
2308-
}
2309-
return gax.Invoke(ctx, callWrapper, opts...)
2310-
}
2311-
2312-
// recordAttemptCompletion records as many attempt specific metrics as it can
2313-
// Ignore errors seen while creating metric attributes since metric can still
2314-
// be recorded with rest of the attributes
2315-
func recordAttemptCompletion(mt *builtinMetricsTracer) {
2316-
if !mt.builtInEnabled {
2317-
return
2318-
}
2319-
2320-
// Calculate elapsed time
2321-
elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime))
2322-
2323-
// Record attempt_latencies
2324-
attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
2325-
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributeSet(attemptLatAttrs))
2326-
2327-
// Record server_latencies
2328-
serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies)
2329-
if mt.currOp.currAttempt.serverLatencyErr == nil {
2330-
mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributeSet(serverLatAttrs))
2234+
// Record attempt specific metrics
2235+
mt.recordAttemptCompletion(attemptHeaderMD, attempTrailerMD, err)
2236+
return err
23312237
}
23322238

2333-
// Record connectivity_error_count
2334-
connErrCountAttrs, _ := mt.toOtelMetricAttrs(metricNameConnErrCount)
2335-
// Determine if connection error should be incremented.
2336-
// A true connectivity error occurs only when we receive NO server-side signals.
2337-
// 1. Server latency (from server-timing header) is a signal, but absent in DirectPath.
2338-
// 2. Location (from x-goog-ext header) is a signal present in both paths.
2339-
// Therefore, we only count an error if BOTH signals are missing.
2340-
isServerLatencyEffectivelyEmpty := mt.currOp.currAttempt.serverLatencyErr != nil || mt.currOp.currAttempt.serverLatency == 0
2341-
isLocationEmpty := mt.currOp.currAttempt.clusterID == defaultCluster
2342-
if isServerLatencyEffectivelyEmpty && isLocationEmpty {
2343-
// This is a connectivity error: the request likely never reached Google's network.
2344-
mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributeSet(connErrCountAttrs))
2345-
} else {
2346-
mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributeSet(connErrCountAttrs))
2347-
}
2239+
return gax.Invoke(ctx, callWrapper, opts...)
23482240
}

bigtable/integration_test.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -896,23 +896,54 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
896896
wg.Wait()
897897
}
898898

899-
func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
899+
func TestIntegration_NoopMetricsProvider(t *testing.T) {
900900
ctx := context.Background()
901+
testEnv, _, adminClient, _, tableName, cleanup, err := setupIntegration(ctx, t)
902+
if err != nil {
903+
t.Fatal(err)
904+
}
905+
defer cleanup()
901906

902-
// Reduce sampling period for faster test runs
903-
origSamplePeriod := defaultSamplePeriod
904-
defaultSamplePeriod = time.Minute
905-
defer func() {
906-
defaultSamplePeriod = origSamplePeriod
907-
}()
907+
if testing.Short() || !testEnv.Config().UseProd {
908+
t.Skip("Skip long running tests in short mode or non-prod environments")
909+
}
910+
911+
family := "export"
912+
if err := createColumnFamily(ctx, t, adminClient, tableName, family, nil); err != nil {
913+
t.Fatalf("Creating column family: %v", err)
914+
}
915+
916+
noopClient, err := testEnv.NewClientWithConfig(ClientConfig{MetricsProvider: NoopMetricsProvider{}})
917+
if err != nil {
918+
t.Fatalf("NewClientWithConfig: %v", err)
919+
}
908920

921+
noopTable := noopClient.Open(tableName)
922+
for i := 0; i < 10; i++ {
923+
mut := NewMutation()
924+
mut.Set(family, "col", 1000, []byte("test"))
925+
if err := noopTable.Apply(ctx, fmt.Sprintf("row-%v", i), mut); err != nil {
926+
t.Fatalf("Apply: %v", err)
927+
}
928+
}
929+
930+
err = noopTable.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
931+
return true
932+
}, RowFilter(ColumnFilter("col")))
933+
if err != nil {
934+
t.Fatalf("ReadRows: %v", err)
935+
}
936+
}
937+
938+
func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
909939
// record start time
910940
testStartTime := time.Now()
911941
tsListStart := &timestamppb.Timestamp{
912942
Seconds: testStartTime.Unix(),
913943
Nanos: int32(testStartTime.Nanosecond()),
914944
}
915945

946+
ctx := context.Background()
916947
testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
917948
if err != nil {
918949
t.Fatal(err)
@@ -935,6 +966,7 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
935966
t.Fatalf("Apply: %v", err)
936967
}
937968
}
969+
938970
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
939971
return true
940972
}, RowFilter(ColumnFilter("col")))
@@ -6126,6 +6158,9 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C
61266158
ctx, cancel := context.WithTimeout(ctx, timeout)
61276159
_ = cancel // ignore for test
61286160

6161+
// Reduce sampling period for faster test runs
6162+
origSamplePeriod := defaultSamplePeriod
6163+
defaultSamplePeriod = time.Minute
61296164
client, err := testEnv.NewClient()
61306165
if err != nil {
61316166
t.Logf("Error creating client: %v", err)
@@ -6185,6 +6220,7 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C
61856220
}
61866221

61876222
return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
6223+
defaultSamplePeriod = origSamplePeriod
61886224
if err := deleteTable(ctx, t, adminClient, tableName); err != nil {
61896225
t.Errorf("DeleteTable got error %v", err)
61906226
}

0 commit comments

Comments
 (0)