@@ -89,6 +89,55 @@ class _RecordedUsage:
8989 cache_read_input_tokens : "Optional[int]" = 0
9090
9191
92+ class _StreamSpanContext :
93+ """
94+ Sets accumulated data on the stream's span and finishes the span on exit.
95+ Is a no-op if the stream has no span set, i.e., when the span has already been finished.
96+ """
97+
98+ def __init__ (
99+ self ,
100+ stream : "Union[Stream, MessageStream, AsyncStream, AsyncMessageStream]" ,
101+ # Flag to avoid unreachable branches when the stream state is known to be initialized (stream._model, etc. are set).
102+ guaranteed_streaming_state : bool = False ,
103+ ) -> None :
104+ self ._stream = stream
105+ self ._guaranteed_streaming_state = guaranteed_streaming_state
106+
107+ def __enter__ (self ) -> "_StreamSpanContext" :
108+ return self
109+
110+ def __exit__ (
111+ self ,
112+ exc_type : "Optional[type[BaseException]]" ,
113+ exc_val : "Optional[BaseException]" ,
114+ exc_tb : "Optional[Any]" ,
115+ ) -> None :
116+ with capture_internal_exceptions ():
117+ if not hasattr (self ._stream , "_span" ):
118+ return
119+
120+ if not self ._guaranteed_streaming_state and not hasattr (
121+ self ._stream , "_model"
122+ ):
123+ self ._stream ._span .__exit__ (exc_type , exc_val , exc_tb )
124+ del self ._stream ._span
125+ return
126+
127+ _set_streaming_output_data (
128+ span = self ._stream ._span ,
129+ integration = self ._stream ._integration ,
130+ model = self ._stream ._model ,
131+ usage = self ._stream ._usage ,
132+ content_blocks = self ._stream ._content_blocks ,
133+ response_id = self ._stream ._response_id ,
134+ finish_reason = self ._stream ._finish_reason ,
135+ )
136+
137+ self ._stream ._span .__exit__ (exc_type , exc_val , exc_tb )
138+ del self ._stream ._span
139+
140+
92141class AnthropicIntegration (Integration ):
93142 identifier = "anthropic"
94143 origin = f"auto.ai.{ identifier } "
@@ -446,7 +495,7 @@ def _wrap_synchronous_message_iterator(
446495 Sets information received while iterating the response stream on the AI Client Span.
447496 Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
448497 """
449- try :
498+ with _StreamSpanContext ( stream , guaranteed_streaming_state = True ) :
450499 for event in iterator :
451500 # Message and content types are aliases for corresponding Raw* types, introduced in
452501 # https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
@@ -466,19 +515,6 @@ def _wrap_synchronous_message_iterator(
466515
467516 _accumulate_event_data (stream , event )
468517 yield event
469- finally :
470- with capture_internal_exceptions ():
471- if hasattr (stream , "_span" ):
472- _finish_streaming_span (
473- span = stream ._span ,
474- integration = stream ._integration ,
475- model = stream ._model ,
476- usage = stream ._usage ,
477- content_blocks = stream ._content_blocks ,
478- response_id = stream ._response_id ,
479- finish_reason = stream ._finish_reason ,
480- )
481- del stream ._span
482518
483519
484520async def _wrap_asynchronous_message_iterator (
@@ -489,7 +525,7 @@ async def _wrap_asynchronous_message_iterator(
489525 Sets information received while iterating the response stream on the AI Client Span.
490526 Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
491527 """
492- try :
528+ with _StreamSpanContext ( stream , guaranteed_streaming_state = True ) :
493529 async for event in iterator :
494530 # Message and content types are aliases for corresponding Raw* types, introduced in
495531 # https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
@@ -509,19 +545,6 @@ async def _wrap_asynchronous_message_iterator(
509545
510546 _accumulate_event_data (stream , event )
511547 yield event
512- finally :
513- with capture_internal_exceptions ():
514- if hasattr (stream , "_span" ):
515- _finish_streaming_span (
516- span = stream ._span ,
517- integration = stream ._integration ,
518- model = stream ._model ,
519- usage = stream ._usage ,
520- content_blocks = stream ._content_blocks ,
521- response_id = stream ._response_id ,
522- finish_reason = stream ._finish_reason ,
523- )
524- del stream ._span
525548
526549
527550def _set_output_data (
@@ -533,7 +556,6 @@ def _set_output_data(
533556 cache_read_input_tokens : "int | None" ,
534557 cache_write_input_tokens : "int | None" ,
535558 content_blocks : "list[Any]" ,
536- finish_span : bool = False ,
537559 response_id : "str | None" = None ,
538560 finish_reason : "str | None" = None ,
539561) -> None :
@@ -577,9 +599,6 @@ def _set_output_data(
577599 input_tokens_cache_write = cache_write_input_tokens ,
578600 )
579601
580- if finish_span :
581- span .__exit__ (None , None , None )
582-
583602
584603def _sentry_patched_create_common (f : "Any" , * args : "Any" , ** kwargs : "Any" ) -> "Any" :
585604 integration = kwargs .pop ("integration" )
@@ -658,10 +677,10 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
658677 cache_read_input_tokens = cache_read_input_tokens ,
659678 cache_write_input_tokens = cache_write_input_tokens ,
660679 content_blocks = content_blocks ,
661- finish_span = True ,
662680 response_id = getattr (result , "id" , None ),
663681 finish_reason = getattr (result , "stop_reason" , None ),
664682 )
683+ span .__exit__ (None , None , None )
665684 else :
666685 span .set_data ("unknown_response" , True )
667686 span .__exit__ (None , None , None )
@@ -742,7 +761,7 @@ def _accumulate_event_data(
742761 stream ._finish_reason = finish_reason
743762
744763
745- def _finish_streaming_span (
764+ def _set_streaming_output_data (
746765 span : "Span" ,
747766 integration : "AnthropicIntegration" ,
748767 model : "Optional[str]" ,
@@ -752,7 +771,7 @@ def _finish_streaming_span(
752771 finish_reason : "Optional[str]" ,
753772) -> None :
754773 """
755- Set output attributes on the AI Client Span and end the span .
774+ Set output attributes on the AI Client Span.
756775 """
757776 # Anthropic's input_tokens excludes cached/cache_write tokens.
758777 # Normalize to total input tokens for correct cost calculations.
@@ -771,7 +790,6 @@ def _finish_streaming_span(
771790 cache_read_input_tokens = usage .cache_read_input_tokens ,
772791 cache_write_input_tokens = usage .cache_write_input_tokens ,
773792 content_blocks = [{"text" : "" .join (content_blocks ), "type" : "text" }],
774- finish_span = True ,
775793 response_id = response_id ,
776794 finish_reason = finish_reason ,
777795 )
@@ -785,27 +803,9 @@ def _wrap_close(
785803 """
786804
787805 def close (self : "Union[Stream, MessageStream]" ) -> None :
788- if not hasattr (self , "_span" ):
789- return f (self )
790-
791- if not hasattr (self , "_model" ):
792- self ._span .__exit__ (None , None , None )
793- del self ._span
806+ with _StreamSpanContext (self ):
794807 return f (self )
795808
796- _finish_streaming_span (
797- span = self ._span ,
798- integration = self ._integration ,
799- model = self ._model ,
800- usage = self ._usage ,
801- content_blocks = self ._content_blocks ,
802- response_id = self ._response_id ,
803- finish_reason = self ._finish_reason ,
804- )
805- del self ._span
806-
807- return f (self )
808-
809809 return close
810810
811811
@@ -855,27 +855,9 @@ def _wrap_async_close(
855855 """
856856
857857 async def close (self : "AsyncStream" ) -> None :
858- if not hasattr (self , "_span" ):
859- return await f (self )
860-
861- if not hasattr (self , "_model" ):
862- self ._span .__exit__ (None , None , None )
863- del self ._span
858+ with _StreamSpanContext (self ):
864859 return await f (self )
865860
866- _finish_streaming_span (
867- span = self ._span ,
868- integration = self ._integration ,
869- model = self ._model ,
870- usage = self ._usage ,
871- content_blocks = self ._content_blocks ,
872- response_id = self ._response_id ,
873- finish_reason = self ._finish_reason ,
874- )
875- del self ._span
876-
877- return await f (self )
878-
879861 return close
880862
881863
0 commit comments