Skip to content

Commit b33c29e

Browse files
authored
Cherry-pick #9081 into v1.81.x (#9102)
Original PR: #9081 RELEASE NOTES: * otel: Segregate client and server RPCInfo used for metrics and traces.
1 parent c45fae6 commit b33c29e

6 files changed

Lines changed: 188 additions & 36 deletions

File tree

stats/opentelemetry/client_metrics.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func getOrCreateCallInfo(ctx context.Context, cc *grpc.ClientConn, method string
7676
target: cc.CanonicalTarget(),
7777
method: determineMethod(method, opts...),
7878
}
79-
ctx = setCallInfo(ctx, ci)
79+
ctx = context.WithValue(ctx, callInfoKey{}, ci)
8080
}
8181
return ctx, ci
8282
}
@@ -157,17 +157,6 @@ func (h *clientMetricsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo
157157
// HandleConn exists to satisfy stats.Handler.
158158
func (h *clientMetricsHandler) HandleConn(context.Context, stats.ConnStats) {}
159159

160-
// getOrCreateRPCAttemptInfo retrieves or creates an rpc attemptInfo object
161-
// and ensures it is set in the context along with the rpcInfo.
162-
func getOrCreateRPCAttemptInfo(ctx context.Context) (context.Context, *attemptInfo) {
163-
ri := getRPCInfo(ctx)
164-
if ri != nil {
165-
return ctx, ri.ai
166-
}
167-
ri = &rpcInfo{ai: &attemptInfo{}}
168-
return setRPCInfo(ctx, ri), ri.ai
169-
}
170-
171160
// TagRPC implements per RPC attempt context management for metrics.
172161
func (h *clientMetricsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
173162
// Numerous stats handlers can be used for the same channel. The cluster
@@ -187,17 +176,18 @@ func (h *clientMetricsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
187176
}
188177
ctx = istats.SetLabels(ctx, labels)
189178
}
190-
ctx, ai := getOrCreateRPCAttemptInfo(ctx)
179+
ctx, ri := getOrCreateClientRPCInfo(ctx)
180+
ai := ri.ai
191181
ai.startTime = time.Now()
192182
ai.xdsLabels = labels.TelemetryLabels
193183
ai.method = removeLeadingSlash(info.FullMethodName)
194184

195-
return setRPCInfo(ctx, &rpcInfo{ai: ai})
185+
return ctx
196186
}
197187

198188
// HandleRPC handles per RPC stats implementation.
199189
func (h *clientMetricsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
200-
ri := getRPCInfo(ctx)
190+
ri := clientRPCInfo(ctx)
201191
if ri == nil {
202192
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
203193
return

stats/opentelemetry/client_tracing.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,14 @@ func (h *clientTracingHandler) HandleConn(context.Context, stats.ConnStats) {}
120120

121121
// TagRPC implements per RPC attempt context management for traces.
122122
func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
123-
ctx, ai := getOrCreateRPCAttemptInfo(ctx)
124-
ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay)
125-
return setRPCInfo(ctx, &rpcInfo{ai: ai})
123+
ctx, ri := getOrCreateClientRPCInfo(ctx)
124+
ctx, _ = h.traceTagRPC(ctx, ri.ai, info.NameResolutionDelay)
125+
return ctx
126126
}
127127

128128
// HandleRPC handles per RPC tracing implementation.
129129
func (h *clientTracingHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
130-
ri := getRPCInfo(ctx)
130+
ri := clientRPCInfo(ctx)
131131
if ri == nil {
132132
logger.Error("ctx passed into client side tracing handler trace event handling has no client attempt data present")
133133
return

stats/opentelemetry/e2e_test.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2204,3 +2204,146 @@ func runDisconnectScenario(t *testing.T, name, wantLabel string, action func(*st
22042204
t.Fatalf("Metric verification failed for case %s: %v", name, err)
22052205
}
22062206
}
2207+
2208+
// TestRelayContextCollisionMetrics verifies that when an application acts as
2209+
// both a server and a client using the same context, the client metrics do not
2210+
// inherit or overwrite the server's telemetry metadata (e.g., grpc.method).
2211+
func (s) TestRelayContextCollisionMetrics(t *testing.T) {
2212+
backendMetricsOpts, _ := defaultMetricsOptions(t, nil)
2213+
backendServer := setupStubServer(t, backendMetricsOpts, nil)
2214+
backendServer.EmptyCallF = func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
2215+
return nil, status.Error(codes.Unimplemented, "EmptyCall not implemented")
2216+
}
2217+
defer backendServer.Stop()
2218+
2219+
relayMetricsOpts, relayMetricsReader := defaultMetricsOptions(t, nil)
2220+
otelOpts := opentelemetry.Options{MetricsOptions: *relayMetricsOpts}
2221+
2222+
relayServer := &stubserver.StubServer{
2223+
UnaryCallF: func(ctx context.Context, _ *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
2224+
relayCC, err := grpc.NewClient(
2225+
backendServer.Address,
2226+
grpc.WithTransportCredentials(insecure.NewCredentials()),
2227+
opentelemetry.DialOption(otelOpts),
2228+
)
2229+
if err != nil {
2230+
return nil, fmt.Errorf("failed to create relay client: %v", err)
2231+
}
2232+
defer relayCC.Close()
2233+
client := testpb.NewTestServiceClient(relayCC)
2234+
_, err = client.EmptyCall(ctx, &testpb.Empty{})
2235+
if status.Code(err) != codes.Unimplemented {
2236+
t.Errorf("Expected Unimplemented error, got: %v", err)
2237+
}
2238+
return &testpb.SimpleResponse{}, nil
2239+
},
2240+
}
2241+
if err := relayServer.Start([]grpc.ServerOption{opentelemetry.ServerOption(otelOpts)}, opentelemetry.DialOption(otelOpts)); err != nil {
2242+
t.Fatalf("Failed to start relay server: %v", err)
2243+
}
2244+
defer relayServer.Stop()
2245+
2246+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2247+
defer cancel()
2248+
2249+
if _, err := relayServer.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
2250+
t.Fatalf("Unexpected UnaryCall error: %v", err)
2251+
}
2252+
2253+
// Verify Server Metric Identity is retained.
2254+
if err := checkMetricWithMethod(ctx, relayMetricsReader, "grpc.server.call.started", "grpc.testing.TestService/UnaryCall"); err != nil {
2255+
t.Fatal(err)
2256+
}
2257+
2258+
// Verify Client Metric Identity correctly resolved to "grpc.testing.TestService/EmptyCall".
2259+
if err := checkMetricWithMethod(ctx, relayMetricsReader, "grpc.client.attempt.started", "grpc.testing.TestService/EmptyCall"); err != nil {
2260+
t.Fatal(err)
2261+
}
2262+
}
2263+
2264+
// TestRelayContextCollisionTracing verifies that span context is correctly
2265+
// propagated from incoming server requests to outgoing client requests without
2266+
// the client span accidentally adopting the server's identity or breaking the
2267+
// trace chain.
2268+
func (s) TestRelayContextCollisionTracing(t *testing.T) {
2269+
backendTraceOpts, _ := defaultTraceOptions(t)
2270+
backendServer := setupStubServer(t, nil, backendTraceOpts)
2271+
backendServer.EmptyCallF = func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
2272+
return nil, status.Error(codes.Unimplemented, "EmptyCall not implemented")
2273+
}
2274+
defer backendServer.Stop()
2275+
2276+
relayTraceOpts, relayTraceExporter := defaultTraceOptions(t)
2277+
otelOpts := opentelemetry.Options{TraceOptions: *relayTraceOpts}
2278+
2279+
relayServer := &stubserver.StubServer{
2280+
UnaryCallF: func(ctx context.Context, _ *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
2281+
relayCC, err := grpc.NewClient(
2282+
backendServer.Address,
2283+
grpc.WithTransportCredentials(insecure.NewCredentials()),
2284+
opentelemetry.DialOption(otelOpts),
2285+
)
2286+
if err != nil {
2287+
return nil, fmt.Errorf("failed to create relay client: %v", err)
2288+
}
2289+
defer relayCC.Close()
2290+
client := testpb.NewTestServiceClient(relayCC)
2291+
_, err = client.EmptyCall(ctx, &testpb.Empty{})
2292+
if status.Code(err) != codes.Unimplemented {
2293+
t.Errorf("Expected Unimplemented error, got: %v", err)
2294+
}
2295+
return &testpb.SimpleResponse{}, nil
2296+
},
2297+
}
2298+
if err := relayServer.Start([]grpc.ServerOption{opentelemetry.ServerOption(otelOpts)}, opentelemetry.DialOption(otelOpts)); err != nil {
2299+
t.Fatalf("Failed to start relay server: %v", err)
2300+
}
2301+
defer relayServer.Stop()
2302+
2303+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2304+
defer cancel()
2305+
2306+
_, _ = relayServer.Client.UnaryCall(ctx, &testpb.SimpleRequest{})
2307+
2308+
wantSpans := []traceSpanInfo{
2309+
{name: "Recv.", spanKind: "server"},
2310+
{name: "Sent.grpc.testing.TestService.EmptyCall", spanKind: "client"},
2311+
}
2312+
spans, err := waitForTraceSpans(ctx, relayTraceExporter, wantSpans)
2313+
if err != nil {
2314+
t.Fatalf("Failed to wait for spans: %v", err)
2315+
}
2316+
2317+
var srvTraceID, cliTraceID oteltrace.TraceID
2318+
for _, span := range spans {
2319+
if span.Name == "Recv." && span.SpanKind == oteltrace.SpanKindServer {
2320+
srvTraceID = span.SpanContext.TraceID()
2321+
}
2322+
if span.Name == "Sent.grpc.testing.TestService.EmptyCall" && span.SpanKind == oteltrace.SpanKindClient {
2323+
cliTraceID = span.SpanContext.TraceID()
2324+
}
2325+
}
2326+
if !srvTraceID.IsValid() || !cliTraceID.IsValid() {
2327+
t.Fatalf("Invalid trace IDs found. Server: %s, Client: %s", srvTraceID, cliTraceID)
2328+
}
2329+
2330+
if srvTraceID != cliTraceID {
2331+
t.Errorf("Trace continuity broken: Server TraceID %s != Client TraceID %s", srvTraceID, cliTraceID)
2332+
}
2333+
}
2334+
2335+
// checkMetricWithMethod verifies that a metric with the specified name contains
2336+
// a data point matching the target grpc.method. It does not poll.
2337+
func checkMetricWithMethod(ctx context.Context, reader *metric.ManualReader, metricName, method string) error {
2338+
metrics := metricsDataFromReader(ctx, reader)
2339+
if m, ok := metrics[metricName]; ok {
2340+
if sum, ok := m.Data.(metricdata.Sum[int64]); ok {
2341+
for _, dp := range sum.DataPoints {
2342+
if val, ok := dp.Attributes.Value("grpc.method"); ok && val.AsString() == method {
2343+
return nil
2344+
}
2345+
}
2346+
}
2347+
}
2348+
return fmt.Errorf("metric %q with method %q not found", metricName, method)
2349+
}

stats/opentelemetry/opentelemetry.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,6 @@ type callInfo struct {
183183

184184
type callInfoKey struct{}
185185

186-
func setCallInfo(ctx context.Context, ci *callInfo) context.Context {
187-
return context.WithValue(ctx, callInfoKey{}, ci)
188-
}
189-
190186
// getCallInfo returns the callInfo stored in the context, or nil
191187
// if there isn't one.
192188
func getCallInfo(ctx context.Context) *callInfo {
@@ -200,19 +196,41 @@ type rpcInfo struct {
200196
ai *attemptInfo
201197
}
202198

203-
type rpcInfoKey struct{}
199+
type clientRPCInfoKey struct{}
200+
type serverRPCInfoKey struct{}
204201

205-
func setRPCInfo(ctx context.Context, ri *rpcInfo) context.Context {
206-
return context.WithValue(ctx, rpcInfoKey{}, ri)
202+
// clientRPCInfo returns the rpcInfo stored in the context for client, or nil
203+
// if there isn't one.
204+
func clientRPCInfo(ctx context.Context) *rpcInfo {
205+
ri, _ := ctx.Value(clientRPCInfoKey{}).(*rpcInfo)
206+
return ri
207207
}
208208

209-
// getRPCInfo returns the rpcInfo stored in the context, or nil
209+
// serverRPCInfo returns the rpcInfo stored in the context for server, or nil
210210
// if there isn't one.
211-
func getRPCInfo(ctx context.Context) *rpcInfo {
212-
ri, _ := ctx.Value(rpcInfoKey{}).(*rpcInfo)
211+
func serverRPCInfo(ctx context.Context) *rpcInfo {
212+
ri, _ := ctx.Value(serverRPCInfoKey{}).(*rpcInfo)
213213
return ri
214214
}
215215

216+
func getOrCreateClientRPCInfo(ctx context.Context) (context.Context, *rpcInfo) {
217+
ri := clientRPCInfo(ctx)
218+
if ri != nil {
219+
return ctx, ri
220+
}
221+
ri = &rpcInfo{ai: &attemptInfo{}}
222+
return context.WithValue(ctx, clientRPCInfoKey{}, ri), ri
223+
}
224+
225+
func getOrCreateServerRPCInfo(ctx context.Context) (context.Context, *rpcInfo) {
226+
ri := serverRPCInfo(ctx)
227+
if ri != nil {
228+
return ctx, ri
229+
}
230+
ri = &rpcInfo{ai: &attemptInfo{}}
231+
return context.WithValue(ctx, serverRPCInfoKey{}, ri), ri
232+
}
233+
216234
func removeLeadingSlash(mn string) string {
217235
return strings.TrimLeft(mn, "/")
218236
}

stats/opentelemetry/server_metrics.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,16 +196,17 @@ func (h *serverMetricsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
196196
method = "other"
197197
}
198198
}
199-
ctx, ai := getOrCreateRPCAttemptInfo(ctx)
199+
ctx, ri := getOrCreateServerRPCInfo(ctx)
200+
ai := ri.ai
200201
ai.startTime = time.Now()
201202
ai.method = removeLeadingSlash(method)
202203

203-
return setRPCInfo(ctx, &rpcInfo{ai: ai})
204+
return ctx
204205
}
205206

206207
// HandleRPC handles per RPC stats implementation.
207208
func (h *serverMetricsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
208-
ri := getRPCInfo(ctx)
209+
ri := serverRPCInfo(ctx)
209210
if ri == nil {
210211
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
211212
return

stats/opentelemetry/server_tracing.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ func (h *serverTracingHandler) initializeTraces() {
4040

4141
// TagRPC implements per RPC attempt context management for traces.
4242
func (h *serverTracingHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
43-
ctx, ai := getOrCreateRPCAttemptInfo(ctx)
44-
ctx, ai = h.traceTagRPC(ctx, ai)
45-
return setRPCInfo(ctx, &rpcInfo{ai: ai})
43+
ctx, ri := getOrCreateServerRPCInfo(ctx)
44+
ctx, _ = h.traceTagRPC(ctx, ri.ai)
45+
return ctx
4646
}
4747

4848
// traceTagRPC populates context with new span data using the TextMapPropagator
@@ -67,7 +67,7 @@ func (h *serverTracingHandler) traceTagRPC(ctx context.Context, ai *attemptInfo)
6767

6868
// HandleRPC handles per RPC tracing implementation.
6969
func (h *serverTracingHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
70-
ri := getRPCInfo(ctx)
70+
ri := serverRPCInfo(ctx)
7171
if ri == nil {
7272
logger.Error("ctx passed into server side tracing handler trace event handling has no server call data present")
7373
return

0 commit comments

Comments
 (0)