Skip to content

Commit 8d56b30

Browse files
fix(anthropic): Set exception info on streaming span when applicable (#5683)
Consolidate span finishing logic in a new `_StreamSpanContext` context manager. Forward exception info from `_StreamSpanContext.__exit__()` to `Span.__exit__()`.
1 parent e103926 commit 8d56b30

File tree

2 files changed

+482
-74
lines changed

2 files changed

+482
-74
lines changed

sentry_sdk/integrations/anthropic.py

Lines changed: 56 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,55 @@ class _RecordedUsage:
8989
cache_read_input_tokens: "Optional[int]" = 0
9090

9191

92+
class _StreamSpanContext:
93+
"""
94+
Sets accumulated data on the stream's span and finishes the span on exit.
95+
Is a no-op if the stream has no span set, i.e., when the span has already been finished.
96+
"""
97+
98+
def __init__(
99+
self,
100+
stream: "Union[Stream, MessageStream, AsyncStream, AsyncMessageStream]",
101+
# Flag to avoid unreachable branches when the stream state is known to be initialized (stream._model, etc. are set).
102+
guaranteed_streaming_state: bool = False,
103+
) -> None:
104+
self._stream = stream
105+
self._guaranteed_streaming_state = guaranteed_streaming_state
106+
107+
def __enter__(self) -> "_StreamSpanContext":
108+
return self
109+
110+
def __exit__(
111+
self,
112+
exc_type: "Optional[type[BaseException]]",
113+
exc_val: "Optional[BaseException]",
114+
exc_tb: "Optional[Any]",
115+
) -> None:
116+
with capture_internal_exceptions():
117+
if not hasattr(self._stream, "_span"):
118+
return
119+
120+
if not self._guaranteed_streaming_state and not hasattr(
121+
self._stream, "_model"
122+
):
123+
self._stream._span.__exit__(exc_type, exc_val, exc_tb)
124+
del self._stream._span
125+
return
126+
127+
_set_streaming_output_data(
128+
span=self._stream._span,
129+
integration=self._stream._integration,
130+
model=self._stream._model,
131+
usage=self._stream._usage,
132+
content_blocks=self._stream._content_blocks,
133+
response_id=self._stream._response_id,
134+
finish_reason=self._stream._finish_reason,
135+
)
136+
137+
self._stream._span.__exit__(exc_type, exc_val, exc_tb)
138+
del self._stream._span
139+
140+
92141
class AnthropicIntegration(Integration):
93142
identifier = "anthropic"
94143
origin = f"auto.ai.{identifier}"
@@ -446,7 +495,7 @@ def _wrap_synchronous_message_iterator(
446495
Sets information received while iterating the response stream on the AI Client Span.
447496
Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
448497
"""
449-
try:
498+
with _StreamSpanContext(stream, guaranteed_streaming_state=True):
450499
for event in iterator:
451500
# Message and content types are aliases for corresponding Raw* types, introduced in
452501
# https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
@@ -466,19 +515,6 @@ def _wrap_synchronous_message_iterator(
466515

467516
_accumulate_event_data(stream, event)
468517
yield event
469-
finally:
470-
with capture_internal_exceptions():
471-
if hasattr(stream, "_span"):
472-
_finish_streaming_span(
473-
span=stream._span,
474-
integration=stream._integration,
475-
model=stream._model,
476-
usage=stream._usage,
477-
content_blocks=stream._content_blocks,
478-
response_id=stream._response_id,
479-
finish_reason=stream._finish_reason,
480-
)
481-
del stream._span
482518

483519

484520
async def _wrap_asynchronous_message_iterator(
@@ -489,7 +525,7 @@ async def _wrap_asynchronous_message_iterator(
489525
Sets information received while iterating the response stream on the AI Client Span.
490526
Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
491527
"""
492-
try:
528+
with _StreamSpanContext(stream, guaranteed_streaming_state=True):
493529
async for event in iterator:
494530
# Message and content types are aliases for corresponding Raw* types, introduced in
495531
# https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
@@ -509,19 +545,6 @@ async def _wrap_asynchronous_message_iterator(
509545

510546
_accumulate_event_data(stream, event)
511547
yield event
512-
finally:
513-
with capture_internal_exceptions():
514-
if hasattr(stream, "_span"):
515-
_finish_streaming_span(
516-
span=stream._span,
517-
integration=stream._integration,
518-
model=stream._model,
519-
usage=stream._usage,
520-
content_blocks=stream._content_blocks,
521-
response_id=stream._response_id,
522-
finish_reason=stream._finish_reason,
523-
)
524-
del stream._span
525548

526549

527550
def _set_output_data(
@@ -533,7 +556,6 @@ def _set_output_data(
533556
cache_read_input_tokens: "int | None",
534557
cache_write_input_tokens: "int | None",
535558
content_blocks: "list[Any]",
536-
finish_span: bool = False,
537559
response_id: "str | None" = None,
538560
finish_reason: "str | None" = None,
539561
) -> None:
@@ -577,9 +599,6 @@ def _set_output_data(
577599
input_tokens_cache_write=cache_write_input_tokens,
578600
)
579601

580-
if finish_span:
581-
span.__exit__(None, None, None)
582-
583602

584603
def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
585604
integration = kwargs.pop("integration")
@@ -658,10 +677,10 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
658677
cache_read_input_tokens=cache_read_input_tokens,
659678
cache_write_input_tokens=cache_write_input_tokens,
660679
content_blocks=content_blocks,
661-
finish_span=True,
662680
response_id=getattr(result, "id", None),
663681
finish_reason=getattr(result, "stop_reason", None),
664682
)
683+
span.__exit__(None, None, None)
665684
else:
666685
span.set_data("unknown_response", True)
667686
span.__exit__(None, None, None)
@@ -742,7 +761,7 @@ def _accumulate_event_data(
742761
stream._finish_reason = finish_reason
743762

744763

745-
def _finish_streaming_span(
764+
def _set_streaming_output_data(
746765
span: "Span",
747766
integration: "AnthropicIntegration",
748767
model: "Optional[str]",
@@ -752,7 +771,7 @@ def _finish_streaming_span(
752771
finish_reason: "Optional[str]",
753772
) -> None:
754773
"""
755-
Set output attributes on the AI Client Span and end the span.
774+
Set output attributes on the AI Client Span.
756775
"""
757776
# Anthropic's input_tokens excludes cached/cache_write tokens.
758777
# Normalize to total input tokens for correct cost calculations.
@@ -771,7 +790,6 @@ def _finish_streaming_span(
771790
cache_read_input_tokens=usage.cache_read_input_tokens,
772791
cache_write_input_tokens=usage.cache_write_input_tokens,
773792
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
774-
finish_span=True,
775793
response_id=response_id,
776794
finish_reason=finish_reason,
777795
)
@@ -785,27 +803,9 @@ def _wrap_close(
785803
"""
786804

787805
def close(self: "Union[Stream, MessageStream]") -> None:
788-
if not hasattr(self, "_span"):
789-
return f(self)
790-
791-
if not hasattr(self, "_model"):
792-
self._span.__exit__(None, None, None)
793-
del self._span
806+
with _StreamSpanContext(self):
794807
return f(self)
795808

796-
_finish_streaming_span(
797-
span=self._span,
798-
integration=self._integration,
799-
model=self._model,
800-
usage=self._usage,
801-
content_blocks=self._content_blocks,
802-
response_id=self._response_id,
803-
finish_reason=self._finish_reason,
804-
)
805-
del self._span
806-
807-
return f(self)
808-
809809
return close
810810

811811

@@ -855,27 +855,9 @@ def _wrap_async_close(
855855
"""
856856

857857
async def close(self: "AsyncStream") -> None:
858-
if not hasattr(self, "_span"):
859-
return await f(self)
860-
861-
if not hasattr(self, "_model"):
862-
self._span.__exit__(None, None, None)
863-
del self._span
858+
with _StreamSpanContext(self):
864859
return await f(self)
865860

866-
_finish_streaming_span(
867-
span=self._span,
868-
integration=self._integration,
869-
model=self._model,
870-
usage=self._usage,
871-
content_blocks=self._content_blocks,
872-
response_id=self._response_id,
873-
finish_reason=self._finish_reason,
874-
)
875-
del self._span
876-
877-
return await f(self)
878-
879861
return close
880862

881863

0 commit comments

Comments
 (0)