@@ -36,8 +36,6 @@ import (
3636 "cloud.google.com/go/internal/trace"
3737 gax "github.com/googleapis/gax-go/v2"
3838 "github.com/googleapis/gax-go/v2/apierror"
39- "go.opentelemetry.io/otel/attribute"
40- "go.opentelemetry.io/otel/metric"
4139 "google.golang.org/api/option"
4240 "google.golang.org/api/option/internaloption"
4341 gtransport "google.golang.org/api/transport/grpc"
@@ -140,19 +138,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
140138 o = append (o , opts ... )
141139
142140 // TODO(b/372244283): Remove after b/358175516 has been fixed
143- asyncRefreshMetricAttrs := metricsTracerFactory .clientAttributes
144- asyncRefreshMetricAttrs = append (asyncRefreshMetricAttrs ,
145- attribute .String (metricLabelKeyTag , "async_refresh_dry_run" ),
146- // Table, cluster and zone are unknown at this point
147- // Use default values
148- attribute .String (monitoredResLabelKeyTable , defaultTable ),
149- attribute .String (monitoredResLabelKeyCluster , defaultCluster ),
150- attribute .String (monitoredResLabelKeyZone , defaultZone ),
151- )
152- o = append (o , internaloption .EnableAsyncRefreshDryRun (func () {
153- metricsTracerFactory .debugTags .Add (context .Background (), 1 ,
154- metric .WithAttributes (asyncRefreshMetricAttrs ... ))
155- }))
141+ o = append (o , internaloption .EnableAsyncRefreshDryRun (metricsTracerFactory .newAsyncRefreshErrHandler ()))
156142
157143 connPool , err := gtransport .DialPool (ctx , o ... )
158144 if err != nil {
@@ -528,11 +514,11 @@ func (c *Client) prepareStatementWithMetadata(ctx context.Context, query string,
528514 defer func () { trace .EndSpan (ctx , err ) }()
529515
530516 mt := c .newBuiltinMetricsTracer (ctx , "" , false )
531- defer recordOperationCompletion (mt )
517+ defer mt . recordOperationCompletion ()
532518
533519 preparedStatement , err = c .prepareStatement (ctx , mt , query , paramTypes , opts ... )
534520 statusCode , statusErr := convertToGrpcStatusErr (err )
535- mt .currOp . setStatus (statusCode .String ())
521+ mt .setCurrOpStatus (statusCode .String ())
536522 return preparedStatement , statusErr
537523}
538524
@@ -707,11 +693,11 @@ func (bs *BoundStatement) Execute(ctx context.Context, f func(ResultRow) bool, o
707693 defer func () { trace .EndSpan (ctx , err ) }()
708694
709695 mt := bs .ps .c .newBuiltinMetricsTracer (ctx , "" , true )
710- defer recordOperationCompletion (mt )
696+ defer mt . recordOperationCompletion ()
711697
712698 err = bs .execute (ctx , f , mt )
713699 statusCode , statusErr := convertToGrpcStatusErr (err )
714- mt .currOp . setStatus (statusCode .String ())
700+ mt .setCurrOpStatus (statusCode .String ())
715701 return statusErr
716702}
717703
@@ -965,11 +951,11 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
965951 defer func () { trace .EndSpan (ctx , err ) }()
966952
967953 mt := t .newBuiltinMetricsTracer (ctx , true )
968- defer recordOperationCompletion (mt )
954+ defer mt . recordOperationCompletion ()
969955
970956 err = t .readRows (ctx , arg , f , mt , opts ... )
971957 statusCode , statusErr := convertToGrpcStatusErr (err )
972- mt .currOp . setStatus (statusCode .String ())
958+ mt .setCurrOpStatus (statusCode .String ())
973959 return statusErr
974960}
975961
@@ -1082,7 +1068,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
10821068 appBlockingLatencyStart := time .Now ()
10831069 continueReading := f (row )
10841070 numRowsRead ++
1085- mt .currOp . incrementAppBlockingLatency (convertToMs (time .Since (appBlockingLatencyStart )))
1071+ mt .incrementAppBlockingLatency (convertToMs (time .Since (appBlockingLatencyStart )))
10861072
10871073 if ! continueReading {
10881074 // Cancel and drain stream.
@@ -1640,11 +1626,11 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
16401626 ctx = trace .StartSpan (ctx , "cloud.google.com/go/bigtable/Apply" )
16411627 defer func () { trace .EndSpan (ctx , err ) }()
16421628 mt := t .newBuiltinMetricsTracer (ctx , false )
1643- defer recordOperationCompletion (mt )
1629+ defer mt . recordOperationCompletion ()
16441630
16451631 err = t .apply (ctx , mt , row , m , opts ... )
16461632 statusCode , statusErr := convertToGrpcStatusErr (err )
1647- mt .currOp . setStatus (statusCode .String ())
1633+ mt .setCurrOpStatus (statusCode .String ())
16481634 return statusErr
16491635}
16501636
@@ -1918,7 +1904,7 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
19181904func (t * Table ) applyGroup (ctx context.Context , group []* entryErr , opts ... ApplyOption ) (err error ) {
19191905 attrMap := make (map [string ]interface {})
19201906 mt := t .newBuiltinMetricsTracer (ctx , true )
1921- defer recordOperationCompletion (mt )
1907+ defer mt . recordOperationCompletion ()
19221908
19231909 err = gaxInvokeWithRecorder (ctx , mt , "MutateRows" , func (ctx context.Context , headerMD , trailerMD * metadata.MD , _ gax.CallSettings ) error {
19241910 attrMap ["rowCount" ] = len (group )
@@ -1939,7 +1925,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
19391925 }, t .c .retryOption )
19401926
19411927 statusCode , statusErr := convertToGrpcStatusErr (err )
1942- mt .currOp . setStatus (statusCode .String ())
1928+ mt .setCurrOpStatus (statusCode .String ())
19431929 return statusErr
19441930}
19451931
@@ -2080,11 +2066,11 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod
20802066 ctx = mergeOutgoingMetadata (ctx , t .md )
20812067
20822068 mt := t .newBuiltinMetricsTracer (ctx , false )
2083- defer recordOperationCompletion (mt )
2069+ defer mt . recordOperationCompletion ()
20842070
20852071 updatedRow , err := t .applyReadModifyWrite (ctx , mt , row , m )
20862072 statusCode , statusErr := convertToGrpcStatusErr (err )
2087- mt .currOp . setStatus (statusCode .String ())
2073+ mt .setCurrOpStatus (statusCode .String ())
20882074 return updatedRow , statusErr
20892075}
20902076
@@ -2161,11 +2147,11 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
21612147 ctx = mergeOutgoingMetadata (ctx , t .md )
21622148
21632149 mt := t .newBuiltinMetricsTracer (ctx , true )
2164- defer recordOperationCompletion (mt )
2150+ defer mt . recordOperationCompletion ()
21652151
21662152 rowKeys , err := t .sampleRowKeys (ctx , mt )
21672153 statusCode , statusErr := convertToGrpcStatusErr (err )
2168- mt .currOp . setStatus (statusCode .String ())
2154+ mt .setCurrOpStatus (statusCode .String ())
21692155 return rowKeys , statusErr
21702156}
21712157
@@ -2227,34 +2213,6 @@ func (c *Client) newBuiltinMetricsTracer(ctx context.Context, table string, isSt
22272213 return & mt
22282214}
22292215
2230- // recordOperationCompletion records as many operation specific metrics as it can
2231- // Ignores error seen while creating metric attributes since metric can still
2232- // be recorded with rest of the attributes
2233- func recordOperationCompletion (mt * builtinMetricsTracer ) {
2234- if ! mt .builtInEnabled {
2235- return
2236- }
2237-
2238- // Calculate elapsed time
2239- elapsedTimeMs := convertToMs (time .Since (mt .currOp .startTime ))
2240-
2241- // Record operation_latencies
2242- opLatAttrs , _ := mt .toOtelMetricAttrs (metricNameOperationLatencies )
2243- mt .instrumentOperationLatencies .Record (mt .ctx , elapsedTimeMs , metric .WithAttributeSet (opLatAttrs ))
2244-
2245- // Record retry_count
2246- retryCntAttrs , _ := mt .toOtelMetricAttrs (metricNameRetryCount )
2247- if mt .currOp .attemptCount > 1 {
2248- // Only record when retry count is greater than 0 so the retry
2249- // graph will be less confusing
2250- mt .instrumentRetryCount .Add (mt .ctx , mt .currOp .attemptCount - 1 , metric .WithAttributeSet (retryCntAttrs ))
2251- }
2252-
2253- // Record application_latencies
2254- appBlockingLatAttrs , _ := mt .toOtelMetricAttrs (metricNameAppBlockingLatencies )
2255- mt .instrumentAppBlockingLatencies .Record (mt .ctx , mt .currOp .appBlockingLatency , metric .WithAttributeSet (appBlockingLatAttrs ))
2256- }
2257-
22582216// gaxInvokeWithRecorder:
22592217// - wraps 'f' in a new function 'callWrapper' that:
22602218// - updates tracer state and records built in attempt specific metrics
@@ -2267,82 +2225,16 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
22672225 attempTrailerMD := metadata .New (nil )
22682226 mt .setMethod (method )
22692227
2270- var callWrapper func (context.Context , gax.CallSettings ) error
2271- if ! mt .builtInEnabled {
2272- callWrapper = func (ctx context.Context , callSettings gax.CallSettings ) error {
2273- // f makes calls to CBT service
2274- return f (ctx , & attemptHeaderMD , & attempTrailerMD , callSettings )
2275- }
2276- } else {
2277- callWrapper = func (ctx context.Context , callSettings gax.CallSettings ) error {
2278- // Increment number of attempts
2279- mt .currOp .incrementAttemptCount ()
2280-
2281- mt .currOp .currAttempt = attemptTracer {}
2282-
2283- // record start time
2284- mt .currOp .currAttempt .setStartTime (time .Now ())
2285-
2286- // f makes calls to CBT service
2287- err := f (ctx , & attemptHeaderMD , & attempTrailerMD , callSettings )
2288-
2289- // Set attempt status
2290- statusCode , _ := convertToGrpcStatusErr (err )
2291- mt .currOp .currAttempt .setStatus (statusCode .String ())
2228+ callWrapper := func (ctx context.Context , callSettings gax.CallSettings ) error {
2229+ mt .recordAttemptStart ()
22922230
2293- // Get location attributes from metadata and set it in tracer
2294- // Ignore get location error since the metric can still be recorded with rest of the attributes
2295- clusterID , zoneID , _ := extractLocation (attemptHeaderMD , attempTrailerMD )
2296- mt .currOp .currAttempt .setClusterID (clusterID )
2297- mt .currOp .currAttempt .setZoneID (zoneID )
2231+ // f makes calls to CBT service
2232+ err := f (ctx , & attemptHeaderMD , & attempTrailerMD , callSettings )
22982233
2299- // Set server latency in tracer
2300- serverLatency , serverLatencyErr := extractServerLatency (attemptHeaderMD , attempTrailerMD )
2301- mt .currOp .currAttempt .setServerLatencyErr (serverLatencyErr )
2302- mt .currOp .currAttempt .setServerLatency (serverLatency )
2303-
2304- // Record attempt specific metrics
2305- recordAttemptCompletion (mt )
2306- return err
2307- }
2308- }
2309- return gax .Invoke (ctx , callWrapper , opts ... )
2310- }
2311-
2312- // recordAttemptCompletion records as many attempt specific metrics as it can
2313- // Ignore errors seen while creating metric attributes since metric can still
2314- // be recorded with rest of the attributes
2315- func recordAttemptCompletion (mt * builtinMetricsTracer ) {
2316- if ! mt .builtInEnabled {
2317- return
2318- }
2319-
2320- // Calculate elapsed time
2321- elapsedTime := convertToMs (time .Since (mt .currOp .currAttempt .startTime ))
2322-
2323- // Record attempt_latencies
2324- attemptLatAttrs , _ := mt .toOtelMetricAttrs (metricNameAttemptLatencies )
2325- mt .instrumentAttemptLatencies .Record (mt .ctx , elapsedTime , metric .WithAttributeSet (attemptLatAttrs ))
2326-
2327- // Record server_latencies
2328- serverLatAttrs , _ := mt .toOtelMetricAttrs (metricNameServerLatencies )
2329- if mt .currOp .currAttempt .serverLatencyErr == nil {
2330- mt .instrumentServerLatencies .Record (mt .ctx , mt .currOp .currAttempt .serverLatency , metric .WithAttributeSet (serverLatAttrs ))
2234+ // Record attempt specific metrics
2235+ mt .recordAttemptCompletion (attemptHeaderMD , attempTrailerMD , err )
2236+ return err
23312237 }
23322238
2333- // Record connectivity_error_count
2334- connErrCountAttrs , _ := mt .toOtelMetricAttrs (metricNameConnErrCount )
2335- // Determine if connection error should be incremented.
2336- // A true connectivity error occurs only when we receive NO server-side signals.
2337- // 1. Server latency (from server-timing header) is a signal, but absent in DirectPath.
2338- // 2. Location (from x-goog-ext header) is a signal present in both paths.
2339- // Therefore, we only count an error if BOTH signals are missing.
2340- isServerLatencyEffectivelyEmpty := mt .currOp .currAttempt .serverLatencyErr != nil || mt .currOp .currAttempt .serverLatency == 0
2341- isLocationEmpty := mt .currOp .currAttempt .clusterID == defaultCluster
2342- if isServerLatencyEffectivelyEmpty && isLocationEmpty {
2343- // This is a connectivity error: the request likely never reached Google's network.
2344- mt .instrumentConnErrCount .Add (mt .ctx , 1 , metric .WithAttributeSet (connErrCountAttrs ))
2345- } else {
2346- mt .instrumentConnErrCount .Add (mt .ctx , 0 , metric .WithAttributeSet (connErrCountAttrs ))
2347- }
2239+ return gax .Invoke (ctx , callWrapper , opts ... )
23482240}
0 commit comments