Skip to content

Pydantic AI Stream support#19118

Merged
B-Step62 merged 9 commits intomlflow:masterfrom
joelrobin18:feat/pydantic_ai_stream_support
Dec 18, 2025
Merged

Pydantic AI Stream support#19118
B-Step62 merged 9 commits intomlflow:masterfrom
joelrobin18:feat/pydantic_ai_stream_support

Conversation

@joelrobin18
Copy link
Collaborator

@joelrobin18 joelrobin18 commented Nov 30, 2025

🛠 DevTools 🛠

Open in GitHub Codespaces

Install mlflow from this PR

# mlflow
pip install git+https://github.com/mlflow/mlflow.git@refs/pull/19118/merge
# mlflow-skinny
pip install git+https://github.com/mlflow/mlflow.git@refs/pull/19118/merge#subdirectory=libs/skinny

For Databricks, use the following command:

%sh curl -LsSf https://raw.githubusercontent.com/mlflow/mlflow/HEAD/dev/install-skinny.sh | sh -s pull/19118/merge

Related Issues/PRs

Fixes #18999

What changes are proposed in this pull request?

PydanticAI Streaming Support

How is this PR tested?

  • Existing unit/integration tests
  • New unit/integration tests
  • Manual tests

Does this PR require documentation update?

  • No. You can skip the rest of this section.
  • Yes. I've updated:
    • Examples
    • API references
    • Instructions

Release Notes

Is this a user-facing change?

  • No. You can skip the rest of this section.
  • Yes. Give a description of this change to be included in the release notes for MLflow users.

What component(s), interfaces, languages, and integrations does this PR affect?

Components

  • area/tracking: Tracking Service, tracking client APIs, autologging
  • area/models: MLmodel format, model serialization/deserialization, flavors
  • area/model-registry: Model Registry service, APIs, and the fluent client calls for Model Registry
  • area/scoring: MLflow Model server, model deployment tools, Spark UDFs
  • area/evaluation: MLflow model evaluation features, evaluation metrics, and evaluation workflows
  • area/gateway: MLflow AI Gateway client APIs, server, and third-party integrations
  • area/prompts: MLflow prompt engineering features, prompt templates, and prompt management
  • area/tracing: MLflow Tracing features, tracing APIs, and LLM tracing functionality
  • area/projects: MLproject format, project running backends
  • area/uiux: Front-end, user experience, plotting, JavaScript, JavaScript dev server
  • area/build: Build and test infrastructure for MLflow
  • area/docs: MLflow documentation pages

How should the PR be classified in the release notes? Choose one:

  • rn/none - No description will be included. The PR will be mentioned only by the PR number in the "Small Bugfixes and Documentation Updates" section
  • rn/breaking-change - The PR will be mentioned in the "Breaking Changes" section
  • rn/feature - A new user-facing feature worth mentioning in the release notes
  • rn/bug-fix - A user-facing bug fix worth mentioning in the release notes
  • rn/documentation - A user-facing documentation change worth mentioning in the release notes

Should this PR be included in the next patch release?

Yes should be selected for bug fixes, documentation updates, and other small changes. No should be selected for new features and larger changes. If you're unsure about the release classification of this PR, leave this unchecked to let the maintainers decide.

What is a minor/patch release?
  • Minor release: a release that increments the second part of the version number (e.g., 1.2.0 -> 1.3.0).
    Bug fixes, doc updates and new features usually go into minor releases.
  • Patch release: a release that increments the third part of the version number (e.g., 1.2.0 -> 1.2.1).
    Bug fixes and doc updates usually go into patch releases.
  • Yes (this PR will be cherry-picked and included in the next patch release)
  • No (this PR will be included in the next minor release)

Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
@github-actions
Copy link
Contributor

github-actions bot commented Nov 30, 2025

Documentation preview for 11e2ed1 is available at:

Changed Pages (1)

More info
  • Ignore this comment if this PR does not change the documentation.
  • The preview is updated when a new commit is pushed to this PR.
  • This comment was created by this workflow run.
  • The documentation was built by this workflow run.

Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
@github-actions github-actions bot added area/tracing MLflow Tracing and its integrations rn/feature Mention under Features in Changelogs. labels Nov 30, 2025

# run_stream_sync was added in pydantic-ai 1.10.0
if hasattr(Agent, "run_stream_sync"):
agent_methods.append("run_stream_sync")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: The run_stream_sync just calls run_stream under the hood with exact same inputs and outputs. I'm wondering if we only need to patch run_stream and users can see whatever information they want to see. Having another run_stream_sync span with same info might be redundant.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though run_stream_sync simply calls run_stream under the hood, patching only run_stream is not enough. Because the async generator pauses at the first yield, the context manager never exits, which means spans are started but never properly closed. If we dont patch the run_stream_sync, the outputs from the code wont be patches which results in incomplete trace data

https://github.com/pydantic/pydantic-ai/blob/54a41d91600a4446cc3879ad399de6e2415efc96/pydantic_ai_slim/pydantic_ai/agent/abstract.py#L730

Code:

def run_stream_sync(self, ...):
    async def _consume_stream():
        async with self.run_stream(...) as stream_result:  #Context manager starts here
            yield stream_result                           #Generator pauses here
    
    async_result = _utils.get_event_loop().run_until_complete(anext(_consume_stream()))
    return result.StreamedRunResultSync(async_result)



@pytest.fixture
def test_model_agent():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the existing simple_agent and agent_with_tool?

assert len(traces) == 1
spans = traces[0].data.spans

run_stream_span = next((s for s in spans if s.name == "Agent.run_stream"), None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert if the necessary spans like LLM spans are included in the trace, like the other test cases?


if _is_async_context_manager_factory(method):
_patch_streaming_method(cls, method_name, patched_async_stream_call)
elif _returns_sync_streamed_result(method):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested run_stream_sync locally with a simple agent but it does not capture child spans. It might be related to how they run the async code inside sync function?

Screenshot 2025-12-01 at 12 18 53
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')
response = agent.run_stream_sync('What is the capital of the UK?')
print(response.get_output())

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, This is due to how they call run_stream inside the run_stream_sync function. run_stream is decorated with @asynccontextmanager. When called from run_stream_sync, the async generator pauses at yield stream_result. run_stream.aexit never executes, leaving spans without any parent trace. Thus child spans are not captured here.

Same reason as this: #19118 (comment)

Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
@joelrobin18
Copy link
Collaborator Author

Examples:

simple_agent = Agent("openai:gpt-4o", instrument=True)

response = simple_agent.run_stream_sync("Tell me a short joke")
for chunk in response.stream_text():
    print(chunk, end="", flush=True)
Screenshot 2025-12-17 at 1 00 06 AM
class CityInfo(BaseModel):
    name: str
    country: str
    population: int
    famous_landmark: str


structured_agent = Agent(
    "openai:gpt-4o",
    output_type=CityInfo,
    instrument=True,
)

response = structured_agent.run_stream_sync("Tell me about Paris")
result = response.get_output()
Screenshot 2025-12-17 at 1 01 09 AM
calculator_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a calculator assistant. Use the tools to perform calculations.",
    instrument=True,
)


@calculator_agent.tool
def add(ctx: RunContext[None], a: float, b: float) -> float:
    """Add two numbers together."""
    return a + b


@calculator_agent.tool
def multiply(ctx: RunContext[None], a: float, b: float) -> float:
    """Multiply two numbers together."""
    return a * b


@calculator_agent.tool
def subtract(ctx: RunContext[None], a: float, b: float) -> float:
    """Subtract b from a."""
    return a - b


response = calculator_agent.run_stream_sync(
    "Calculate (5 + 3) * 2 - 1. Show your work step by step."
)
for chunk in response.stream_text():
    print(chunk, end="", flush=True)
Screenshot 2025-12-17 at 1 02 29 AM Screenshot 2025-12-17 at 1 02 39 AM
basic_agent = Agent("openai:gpt-4o", instrument=True)
response = basic_agent.run_stream_sync("What is 2 + 2?")
print(f"Direct output: {response.get_output()}")
Screenshot 2025-12-17 at 1 03 30 AM

Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
@joelrobin18 joelrobin18 requested a review from B-Step62 December 16, 2025 20:07
@joelrobin18
Copy link
Collaborator Author

simple_agent = Agent("openai:gpt-4o", instrument=True)

    async with simple_agent.run_stream("Tell me a short joke") as response:
        async for chunk in response.stream_text(delta=True):
            print(chunk, end="", flush=True)
Screenshot 2025-12-17 at 1 40 49 AM
class CityInfo(BaseModel):
    name: str
    country: str
    population: int
    famous_landmark: str

structured_agent = Agent(
        "openai:gpt-4o",
        output_type=CityInfo,
        instrument=True,
    )

    async with structured_agent.run_stream("Tell me about Paris") as response:
        result = await response.get_output()
Screenshot 2025-12-17 at 1 41 03 AM
calculator_agent = Agent(
        "openai:gpt-4o",
        system_prompt="You are a calculator assistant. Use the tools to perform calculations.",
        instrument=True,
    )

    @calculator_agent.tool
    def add(ctx: RunContext[None], a: float, b: float) -> float:
        """Add two numbers together."""
        return a + b

    @calculator_agent.tool
    def multiply(ctx: RunContext[None], a: float, b: float) -> float:
        """Multiply two numbers together."""
        return a * b

    @calculator_agent.tool
    def subtract(ctx: RunContext[None], a: float, b: float) -> float:
        """Subtract b from a."""
        return a - b

    print("Streaming calculation response:")
    async with calculator_agent.run_stream(
        "Calculate (5 + 3) * 2 - 1. Show your work step by step."
    ) as response:
        async for chunk in response.stream_text(delta=True):
            print(chunk, end="", flush=True)
Screenshot 2025-12-17 at 1 41 26 AM Screenshot 2025-12-17 at 1 41 38 AM

Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
@hayescode
Copy link
Contributor

@joelrobin18 @B-Step62 thank you very much for the work on this! Testing with my implementation in Databricks mlflow (run_stream_events()) works perfectly! The only difference I see between this and my other mlflow implementation using OpenAI Responses API is that the events aren't captured in the events tab. I'm not sure how big of a deal this is (i don't use them personally) or maybe it's because I'm using Databricks MLFlow. For me this PR looks to deliver the feature request. Thanks again!

Copy link
Collaborator

@B-Step62 B-Step62 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@B-Step62
Copy link
Collaborator

The only difference I see between this and my other mlflow implementation using OpenAI Responses API is that the events aren't captured in the events tab.

Great point. Capturing streaming event in authoring framework is a bit overkilling given that it propagates up to the span hierarchy and they are mostly same as the token stream that LLM generates. As you mentioned, even captured events in LLM span might not be used by many people, then we may update it to opt-in in the future to reduce the trace size. For now, let's keep the Pydantic-ai autologging behavior simple.

@B-Step62 B-Step62 added this pull request to the merge queue Dec 18, 2025
Merged via the queue into mlflow:master with commit 5c718f4 Dec 18, 2025
95 of 97 checks passed
WeichenXu123 pushed a commit to WeichenXu123/mlflow that referenced this pull request Dec 19, 2025
Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
WeichenXu123 pushed a commit that referenced this pull request Dec 19, 2025
Signed-off-by: joelrobin18 <joelrobin1818@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/tracing MLflow Tracing and its integrations rn/feature Mention under Features in Changelogs. v3.8.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FR] PydanticAI Tracing | Streaming Support

3 participants