Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 1b6f3d2

Browse files
feat: Add OpenTelemetry support for Subscribe Side (#1252)
1 parent 1ae49de commit 1b6f3d2

17 files changed

+2066
-30
lines changed

google/cloud/pubsub_v1/open_telemetry/context_propagation.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from opentelemetry.propagators.textmap import Setter
15+
from typing import Optional, List
16+
17+
from opentelemetry.propagators.textmap import Setter, Getter
1618

1719
from google.pubsub_v1 import PubsubMessage
1820

@@ -37,3 +39,17 @@ def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
3739
None
3840
"""
3941
carrier.attributes["googclient_" + key] = value
42+
43+
44+
class OpenTelemetryContextGetter(Getter):
45+
"""
46+
Used by Open Telemetry for context propagation.
47+
"""
48+
49+
def get(self, carrier: PubsubMessage, key: str) -> Optional[List[str]]:
50+
if ("googclient_" + key) not in carrier.attributes:
51+
return None
52+
return [carrier.attributes["googclient_" + key]]
53+
54+
def keys(self, carrier: PubsubMessage) -> List[str]:
55+
return list(map(str, carrier.attributes.keys()))
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright 2024, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Optional, List
16+
from datetime import datetime
17+
18+
from opentelemetry import trace, context
19+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
20+
from opentelemetry.trace.propagation import set_span_in_context
21+
22+
from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
23+
OpenTelemetryContextGetter,
24+
)
25+
from google.pubsub_v1.types import PubsubMessage
26+
27+
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
28+
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
29+
30+
31+
class SubscribeOpenTelemetry:
32+
def __init__(self, message: PubsubMessage):
33+
self._message: PubsubMessage = message
34+
35+
# subscribe span will be initialized by the `start_subscribe_span`
36+
# method.
37+
self._subscribe_span: Optional[trace.Span] = None
38+
39+
# subscriber concurrency control span will be initialized by the
40+
# `start_subscribe_concurrency_control_span` method.
41+
self._concurrency_control_span: Optional[trace.Span] = None
42+
43+
# scheduler span will be initialized by the
44+
# `start_subscribe_scheduler_span` method.
45+
self._scheduler_span: Optional[trace.Span] = None
46+
47+
# This will be set by `start_subscribe_span` method and will be used
48+
# for other spans, such as process span.
49+
self._subscription_id: Optional[str] = None
50+
51+
# This will be set by `start_process_span` method.
52+
self._process_span: Optional[trace.Span] = None
53+
54+
# This will be set by `start_subscribe_span` method, if a publisher create span
55+
# context was extracted from trace propagation. And will be used by spans like
56+
# proces span to add links to the publisher create span.
57+
self._publisher_create_span_context: Optional[context.Context] = None
58+
59+
# This will be set by `start_subscribe_span` method and will be used
60+
# for other spans, such as modack span.
61+
self._project_id: Optional[str] = None
62+
63+
@property
64+
def subscription_id(self) -> Optional[str]:
65+
return self._subscription_id
66+
67+
@property
68+
def project_id(self) -> Optional[str]:
69+
return self._project_id
70+
71+
@property
72+
def subscribe_span(self) -> Optional[trace.Span]:
73+
return self._subscribe_span
74+
75+
def start_subscribe_span(
76+
self,
77+
subscription: str,
78+
exactly_once_enabled: bool,
79+
ack_id: str,
80+
delivery_attempt: int,
81+
) -> None:
82+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
83+
parent_span_context = TraceContextTextMapPropagator().extract(
84+
carrier=self._message,
85+
getter=OpenTelemetryContextGetter(),
86+
)
87+
self._publisher_create_span_context = parent_span_context
88+
split_subscription: List[str] = subscription.split("/")
89+
assert len(split_subscription) == 4
90+
subscription_short_name = split_subscription[3]
91+
self._project_id = split_subscription[1]
92+
self._subscription_id = subscription_short_name
93+
with tracer.start_as_current_span(
94+
name=f"{subscription_short_name} subscribe",
95+
context=parent_span_context if parent_span_context else None,
96+
kind=trace.SpanKind.CONSUMER,
97+
attributes={
98+
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
99+
"messaging.destination.name": subscription_short_name,
100+
"gcp.project_id": subscription.split("/")[1],
101+
"messaging.message.id": self._message.message_id,
102+
"messaging.message.body.size": len(self._message.data),
103+
"messaging.gcp_pubsub.message.ack_id": ack_id,
104+
"messaging.gcp_pubsub.message.ordering_key": self._message.ordering_key,
105+
"messaging.gcp_pubsub.message.exactly_once_delivery": exactly_once_enabled,
106+
"code.function": "_on_response",
107+
"messaging.gcp_pubsub.message.delivery_attempt": delivery_attempt,
108+
},
109+
end_on_exit=False,
110+
) as subscribe_span:
111+
self._subscribe_span = subscribe_span
112+
113+
def add_subscribe_span_event(self, event: str) -> None:
114+
assert self._subscribe_span is not None
115+
self._subscribe_span.add_event(
116+
name=event,
117+
attributes={
118+
"timestamp": str(datetime.now()),
119+
},
120+
)
121+
122+
def end_subscribe_span(self) -> None:
123+
assert self._subscribe_span is not None
124+
self._subscribe_span.end()
125+
126+
def set_subscribe_span_result(self, result: str) -> None:
127+
assert self._subscribe_span is not None
128+
self._subscribe_span.set_attribute(
129+
key="messaging.gcp_pubsub.result",
130+
value=result,
131+
)
132+
133+
def start_subscribe_concurrency_control_span(self) -> None:
134+
assert self._subscribe_span is not None
135+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
136+
with tracer.start_as_current_span(
137+
name="subscriber concurrency control",
138+
kind=trace.SpanKind.INTERNAL,
139+
context=set_span_in_context(self._subscribe_span),
140+
end_on_exit=False,
141+
) as concurrency_control_span:
142+
self._concurrency_control_span = concurrency_control_span
143+
144+
def end_subscribe_concurrency_control_span(self) -> None:
145+
assert self._concurrency_control_span is not None
146+
self._concurrency_control_span.end()
147+
148+
def start_subscribe_scheduler_span(self) -> None:
149+
assert self._subscribe_span is not None
150+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
151+
with tracer.start_as_current_span(
152+
name="subscriber scheduler",
153+
kind=trace.SpanKind.INTERNAL,
154+
context=set_span_in_context(self._subscribe_span),
155+
end_on_exit=False,
156+
) as scheduler_span:
157+
self._scheduler_span = scheduler_span
158+
159+
def end_subscribe_scheduler_span(self) -> None:
160+
assert self._scheduler_span is not None
161+
self._scheduler_span.end()
162+
163+
def start_process_span(self) -> None:
164+
assert self._subscribe_span is not None
165+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
166+
publish_create_span_link: Optional[trace.Link] = None
167+
if self._publisher_create_span_context:
168+
publish_create_span: trace.Span = trace.get_current_span(
169+
self._publisher_create_span_context
170+
)
171+
span_context: Optional[
172+
trace.SpanContext
173+
] = publish_create_span.get_span_context()
174+
publish_create_span_link = (
175+
trace.Link(span_context) if span_context else None
176+
)
177+
178+
with tracer.start_as_current_span(
179+
name=f"{self._subscription_id} process",
180+
attributes={
181+
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
182+
},
183+
kind=trace.SpanKind.INTERNAL,
184+
context=set_span_in_context(self._subscribe_span),
185+
links=[publish_create_span_link] if publish_create_span_link else None,
186+
end_on_exit=False,
187+
) as process_span:
188+
self._process_span = process_span
189+
190+
def end_process_span(self) -> None:
191+
assert self._process_span is not None
192+
self._process_span.end()
193+
194+
def add_process_span_event(self, event: str) -> None:
195+
assert self._process_span is not None
196+
self._process_span.add_event(
197+
name=event,
198+
attributes={
199+
"timestamp": str(datetime.now()),
200+
},
201+
)
202+
203+
204+
def start_modack_span(
205+
subscribe_span_links: List[trace.Link],
206+
subscription_id: Optional[str],
207+
message_count: int,
208+
deadline: float,
209+
project_id: Optional[str],
210+
code_function: str,
211+
receipt_modack: bool,
212+
) -> trace.Span:
213+
assert subscription_id is not None
214+
assert project_id is not None
215+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
216+
with tracer.start_as_current_span(
217+
name=f"{subscription_id} modack",
218+
attributes={
219+
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
220+
"messaging.batch.message_count": message_count,
221+
"messaging.gcp_pubsub.message.ack_deadline": deadline,
222+
"messaging.destination.name": subscription_id,
223+
"gcp.project_id": project_id,
224+
"messaging.operation.name": "modack",
225+
"code.function": code_function,
226+
"messaging.gcp_pubsub.is_receipt_modack": receipt_modack,
227+
},
228+
links=subscribe_span_links,
229+
kind=trace.SpanKind.CLIENT,
230+
end_on_exit=False,
231+
) as modack_span:
232+
return modack_span
233+
234+
235+
def start_ack_span(
236+
subscription_id: str,
237+
message_count: int,
238+
project_id: str,
239+
links: List[trace.Link],
240+
) -> trace.Span:
241+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
242+
with tracer.start_as_current_span(
243+
name=f"{subscription_id} ack",
244+
attributes={
245+
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
246+
"messaging.batch.message_count": message_count,
247+
"messaging.operation": "ack",
248+
"gcp.project_id": project_id,
249+
"messaging.destination.name": subscription_id,
250+
"code.function": "ack",
251+
},
252+
kind=trace.SpanKind.CLIENT,
253+
links=links,
254+
end_on_exit=False,
255+
) as ack_span:
256+
return ack_span
257+
258+
259+
def start_nack_span(
260+
subscription_id: str,
261+
message_count: int,
262+
project_id: str,
263+
links: List[trace.Link],
264+
) -> trace.Span:
265+
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
266+
with tracer.start_as_current_span(
267+
name=f"{subscription_id} nack",
268+
attributes={
269+
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
270+
"messaging.batch.message_count": message_count,
271+
"messaging.operation": "nack",
272+
"gcp.project_id": project_id,
273+
"messaging.destination.name": subscription_id,
274+
"code.function": "modify_ack_deadline",
275+
},
276+
kind=trace.SpanKind.CLIENT,
277+
links=links,
278+
end_on_exit=False,
279+
) as nack_span:
280+
return nack_span

0 commit comments

Comments
 (0)