@@ -20,11 +20,14 @@ import (
2020 "context"
2121 "strings"
2222 "sync/atomic"
23+ "time"
2324
2425 vkit "cloud.google.com/go/spanner/apiv1"
2526 "cloud.google.com/go/spanner/apiv1/spannerpb"
2627 "cloud.google.com/go/spanner/internal"
2728 "github.com/googleapis/gax-go/v2"
29+ "go.opentelemetry.io/otel/attribute"
30+ oteltrace "go.opentelemetry.io/otel/trace"
2831 "google.golang.org/api/option"
2932 "google.golang.org/grpc"
3033 "google.golang.org/grpc/peer"
@@ -171,7 +174,66 @@ func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.De
171174 return err
172175}
173176
177+ // setSpanAttributes dynamically sets span attributes based on the request type.
178+ func setSpanAttributes [T any ](span oteltrace.Span , req T ) {
179+ if ! span .IsRecording () {
180+ return
181+ }
182+
183+ var attrs []attribute.KeyValue
184+
185+ if r , ok := any (req ).(interface {
186+ GetRequestOptions () * spannerpb.RequestOptions
187+ }); ok {
188+ if tag := r .GetRequestOptions ().GetTransactionTag (); tag != "" {
189+ attrs = append (attrs , attribute .String ("transaction.tag" , tag ))
190+ }
191+ if tag := r .GetRequestOptions ().GetRequestTag (); tag != "" {
192+ attrs = append (attrs , attribute .String ("statement.tag" , tag ))
193+ }
194+ }
195+
196+ if r , ok := any (req ).(interface { GetSql () string }); ok {
197+ if sql := r .GetSql (); sql != "" {
198+ attrs = append (attrs , attribute .String ("db.statement" , sql ))
199+ }
200+ } else if r , ok := any (req ).(interface {
201+ GetStatements () []* spannerpb.ExecuteBatchDmlRequest_Statement
202+ }); ok {
203+ if stmts := r .GetStatements (); len (stmts ) > 0 {
204+ sqls := make ([]string , len (stmts ))
205+ for i , stmt := range stmts {
206+ sqls [i ] = stmt .GetSql ()
207+ }
208+ attrs = append (attrs , attribute .StringSlice ("db.statement" , sqls ))
209+ }
210+ }
211+
212+ if r , ok := any (req ).(interface { GetTable () string }); ok {
213+ if table := r .GetTable (); table != "" {
214+ attrs = append (attrs , attribute .String ("db.table" , table ))
215+ }
216+ }
217+
218+ span .SetAttributes (attrs ... )
219+ }
220+
221+ func setGFEAndAFESpanAttributes (span oteltrace.Span , latencyMap map [string ]time.Duration ) {
222+ if ! span .IsRecording () {
223+ return
224+ }
225+ for t , v := range latencyMap {
226+ if t == gfeTimingHeader || t == afeTimingHeader {
227+ span .SetAttributes (
228+ attribute .Float64 (t [:3 ]+ ".latency_ms" , float64 (v .Nanoseconds ())/ 1e6 ),
229+ )
230+ }
231+ }
232+ }
233+
174234func (g * grpcSpannerClient ) ExecuteSql (ctx context.Context , req * spannerpb.ExecuteSqlRequest , opts ... gax.CallOption ) (* spannerpb.ResultSet , error ) {
235+ span := oteltrace .SpanFromContext (ctx )
236+ setSpanAttributes (span , req )
175237 mt := g .newBuiltinMetricsTracer (ctx )
176238 defer recordOperationCompletion (mt )
177239 ctx = context .WithValue (ctx , metricsTracerKey , mt )
@@ -182,6 +244,8 @@ func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.Execu
182244}
183245
184246func (g * grpcSpannerClient ) ExecuteStreamingSql (ctx context.Context , req * spannerpb.ExecuteSqlRequest , opts ... gax.CallOption ) (spannerpb.Spanner_ExecuteStreamingSqlClient , error ) {
247+ span := oteltrace .SpanFromContext (ctx )
248+ setSpanAttributes (span , req )
185249 // Note: This method does not add g.optsWithNextRequestID to inject x-goog-spanner-request-id
186250 // as it is already manually added when creating Stream iterators for ExecuteStreamingSql.
187251 client , err := g .raw .ExecuteStreamingSql (peer .NewContext (ctx , & peer.Peer {}), req , opts ... )
@@ -191,13 +255,17 @@ func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spanne
191255 }
192256 if mt != nil && client != nil && mt .currOp .currAttempt != nil {
193257 md , _ := client .Header ()
194- mt .currOp .currAttempt .setServerTimingMetrics (parseServerTimingHeader (md ))
258+ latencyMap := parseServerTimingHeader (md )
259+ setGFEAndAFESpanAttributes (span , latencyMap )
260+ mt .currOp .currAttempt .setServerTimingMetrics (latencyMap )
195261 mt .currOp .currAttempt .setDirectPathUsed (client .Context ())
196262 }
197263 return client , err
198264}
199265
200266func (g * grpcSpannerClient ) ExecuteBatchDml (ctx context.Context , req * spannerpb.ExecuteBatchDmlRequest , opts ... gax.CallOption ) (* spannerpb.ExecuteBatchDmlResponse , error ) {
267+ span := oteltrace .SpanFromContext (ctx )
268+ setSpanAttributes (span , req )
201269 mt := g .newBuiltinMetricsTracer (ctx )
202270 defer recordOperationCompletion (mt )
203271 ctx = context .WithValue (ctx , metricsTracerKey , mt )
@@ -208,6 +276,8 @@ func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.
208276}
209277
210278func (g * grpcSpannerClient ) Read (ctx context.Context , req * spannerpb.ReadRequest , opts ... gax.CallOption ) (* spannerpb.ResultSet , error ) {
279+ span := oteltrace .SpanFromContext (ctx )
280+ setSpanAttributes (span , req )
211281 mt := g .newBuiltinMetricsTracer (ctx )
212282 defer recordOperationCompletion (mt )
213283 ctx = context .WithValue (ctx , metricsTracerKey , mt )
@@ -220,14 +290,18 @@ func (g *grpcSpannerClient) Read(ctx context.Context, req *spannerpb.ReadRequest
220290func (g * grpcSpannerClient ) StreamingRead (ctx context.Context , req * spannerpb.ReadRequest , opts ... gax.CallOption ) (spannerpb.Spanner_StreamingReadClient , error ) {
221291 // Note: This method does not add g.optsWithNextRequestID, as it is already
222292 // manually added when creating Stream iterators for StreamingRead.
293+ span := oteltrace .SpanFromContext (ctx )
294+ setSpanAttributes (span , req )
223295 client , err := g .raw .StreamingRead (peer .NewContext (ctx , & peer.Peer {}), req , opts ... )
224296 mt , ok := ctx .Value (metricsTracerKey ).(* builtinMetricsTracer )
225297 if ! ok {
226298 return client , err
227299 }
228300 if mt != nil && client != nil && mt .currOp .currAttempt != nil {
229301 md , _ := client .Header ()
230- mt .currOp .currAttempt .setServerTimingMetrics (parseServerTimingHeader (md ))
302+ latencyMap := parseServerTimingHeader (md )
303+ setGFEAndAFESpanAttributes (span , latencyMap )
304+ mt .currOp .currAttempt .setServerTimingMetrics (latencyMap )
231305 mt .currOp .currAttempt .setDirectPathUsed (client .Context ())
232306 }
233307 return client , err
@@ -244,6 +318,8 @@ func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb
244318}
245319
246320func (g * grpcSpannerClient ) Commit (ctx context.Context , req * spannerpb.CommitRequest , opts ... gax.CallOption ) (* spannerpb.CommitResponse , error ) {
321+ span := oteltrace .SpanFromContext (ctx )
322+ setSpanAttributes (span , req )
247323 mt := g .newBuiltinMetricsTracer (ctx )
248324 defer recordOperationCompletion (mt )
249325 ctx = context .WithValue (ctx , metricsTracerKey , mt )
@@ -264,6 +340,8 @@ func (g *grpcSpannerClient) Rollback(ctx context.Context, req *spannerpb.Rollbac
264340}
265341
266342func (g * grpcSpannerClient ) PartitionQuery (ctx context.Context , req * spannerpb.PartitionQueryRequest , opts ... gax.CallOption ) (* spannerpb.PartitionResponse , error ) {
343+ span := oteltrace .SpanFromContext (ctx )
344+ setSpanAttributes (span , req )
267345 mt := g .newBuiltinMetricsTracer (ctx )
268346 defer recordOperationCompletion (mt )
269347 ctx = context .WithValue (ctx , metricsTracerKey , mt )
@@ -274,6 +352,8 @@ func (g *grpcSpannerClient) PartitionQuery(ctx context.Context, req *spannerpb.P
274352}
275353
276354func (g * grpcSpannerClient ) PartitionRead (ctx context.Context , req * spannerpb.PartitionReadRequest , opts ... gax.CallOption ) (* spannerpb.PartitionResponse , error ) {
355+ span := oteltrace .SpanFromContext (ctx )
356+ setSpanAttributes (span , req )
277357 mt := g .newBuiltinMetricsTracer (ctx )
278358 defer recordOperationCompletion (mt )
279359 ctx = context .WithValue (ctx , metricsTracerKey , mt )
@@ -284,14 +364,18 @@ func (g *grpcSpannerClient) PartitionRead(ctx context.Context, req *spannerpb.Pa
284364}
285365
286366func (g * grpcSpannerClient ) BatchWrite (ctx context.Context , req * spannerpb.BatchWriteRequest , opts ... gax.CallOption ) (spannerpb.Spanner_BatchWriteClient , error ) {
367+ span := oteltrace .SpanFromContext (ctx )
368+ setSpanAttributes (span , req )
287369 client , err := g .raw .BatchWrite (peer .NewContext (ctx , & peer.Peer {}), req , g .optsWithNextRequestID (opts )... )
288370 mt , ok := ctx .Value (metricsTracerKey ).(* builtinMetricsTracer )
289371 if ! ok {
290372 return client , err
291373 }
292374 if mt != nil && client != nil && mt .currOp .currAttempt != nil {
293375 md , _ := client .Header ()
294- mt .currOp .currAttempt .setServerTimingMetrics (parseServerTimingHeader (md ))
376+ latencyMap := parseServerTimingHeader (md )
377+ setGFEAndAFESpanAttributes (span , latencyMap )
378+ mt .currOp .currAttempt .setServerTimingMetrics (latencyMap )
295379 mt .currOp .currAttempt .setDirectPathUsed (client .Context ())
296380 }
297381 return client , err
0 commit comments