Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,12 @@ async def _process_send(self, message_envelope: SendMessageEnvelope) -> None:
cancellation_token=message_envelope.cancellation_token,
message_id=message_envelope.message_id,
)
with MessageHandlerContext.populate_context(recipient_agent.id):
response = await recipient_agent.on_message(
message_envelope.message,
ctx=message_context,
)
with self._tracer_helper.trace_block("process", recipient_agent.id, parent=message_envelope.metadata):
with MessageHandlerContext.populate_context(recipient_agent.id):
response = await recipient_agent.on_message(
message_envelope.message,
ctx=message_context,
)
except CancelledError as e:
if not message_envelope.future.cancelled():
message_envelope.future.set_exception(e)
Expand Down Expand Up @@ -503,7 +504,7 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No
agent = await self._get_agent(agent_id)

async def _on_message(agent: Agent, message_context: MessageContext) -> Any:
with self._tracer_helper.trace_block("process", agent.id, parent=None):
with self._tracer_helper.trace_block("process", agent.id, parent=message_envelope.metadata):
with MessageHandlerContext.populate_context(agent.id):
try:
return await agent.on_message(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dataclasses import dataclass
from typing import Dict, Mapping, Optional
from typing import Dict, Mapping, Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.propagate import extract
from opentelemetry.trace import Link, get_current_span
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


Expand All @@ -12,6 +13,7 @@ class EnvelopeMetadata:

traceparent: Optional[str] = None
tracestate: Optional[str] = None
links: Optional[Sequence[Link]] = None


def _get_carrier_for_envelope_metadata(envelope_metadata: EnvelopeMetadata) -> Dict[str, str]:
Expand Down Expand Up @@ -95,3 +97,31 @@ def get_telemetry_context(metadata: TelemetryMetadataContainer) -> Context:
return extract(_get_carrier_for_remote_call_metadata(metadata))
else:
raise ValueError(f"Unknown metadata type: {type(metadata)}")


def get_telemetry_links(
metadata: TelemetryMetadataContainer,
) -> Optional[Sequence[Link]]:
"""
Retrieves the telemetry links from the given metadata.

Args:
metadata (Optional[EnvelopeMetadata]): The metadata containing the telemetry links.

Returns:
Optional[Sequence[Link]]: The telemetry links extracted from the metadata, or None if there are no links.
"""
if metadata is None:
return None
elif isinstance(metadata, EnvelopeMetadata):
context = extract(_get_carrier_for_envelope_metadata(metadata))
elif hasattr(metadata, "__getitem__"):
context = extract(_get_carrier_for_remote_call_metadata(metadata))
else:
return None
# Retrieve the extracted SpanContext from the context.
linked_span = get_current_span(context)
# Use the linked span to get the SpanContext.
span_context = linked_span.get_span_context()
# Create a Link object using the SpanContext.
return [Link(span_context)]
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import contextlib
from typing import Dict, Generic, Iterator, Optional, Sequence
from typing import Dict, Generic, Iterator, Optional

from opentelemetry.trace import Link, NoOpTracerProvider, Span, SpanKind, TracerProvider
from opentelemetry.trace import NoOpTracerProvider, Span, SpanKind, TracerProvider
from opentelemetry.util import types

from ._propagation import TelemetryMetadataContainer, get_telemetry_context
from ._propagation import TelemetryMetadataContainer, get_telemetry_links
from ._tracing_config import Destination, ExtraAttributes, Operation, TracingConfig


Expand Down Expand Up @@ -37,7 +37,6 @@ def trace_block(
extraAttributes: ExtraAttributes | None = None,
kind: Optional[SpanKind] = None,
attributes: Optional[types.Attributes] = None,
links: Optional[Sequence[Link]] = None,
start_time: Optional[int] = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
Expand All @@ -55,7 +54,6 @@ def trace_block(
kind (SpanKind, optional): The kind of span. If not provided, it maps to PRODUCER or CONSUMER depending on the operation.
extraAttributes (ExtraAttributes, optional): Additional defined attributes for the span. Defaults to None.
attributes (Optional[types.Attributes], optional): Additional non-defined attributes for the span. Defaults to None.
links (Optional[Sequence[Link]], optional): Links to other spans. Defaults to None.
start_time (Optional[int], optional): The start time of the span. Defaults to None.
record_exception (bool, optional): Whether to record exceptions. Defaults to True.
set_status_on_exception (bool, optional): Whether to set the status on exception. Defaults to True.
Expand All @@ -67,7 +65,9 @@ def trace_block(
"""
span_name = self.instrumentation_builder_config.get_span_name(operation, destination)
span_kind = kind or self.instrumentation_builder_config.get_span_kind(operation)
context = get_telemetry_context(parent) if parent else None
# context = get_telemetry_context(parent) if parent else None
context = None # TODO: we may need to remove other code for using custom context.
links = get_telemetry_links(parent) if parent else None
attributes_with_defaults: Dict[str, types.AttributeValue] = {}
for key, value in (attributes or {}).items():
attributes_with_defaults[key] = value
Expand Down