Skip to content

Commit 3dcb847

Browse files
ishymkoholtskinner
andauthored
fix: do not crash on SSE comment line (#636)
# Description The cause is florimondmanca/httpx-sse#35: using comments among events with `id` field causes `httpx-sse` to emit an event with empty data. Although according to [the standard (item 2)](https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage) empty `buffer` shouldn't produce an event, with the way how `httpx-sse` API is defined it may still be reasonable to emit an object with just `retry` field for instance so that consumer could handle it. [The standard](https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation) defines `retry` field handling outside of events dispatching, so in the context of this library it's up to the client (see [here](https://github.com/florimondmanca/httpx-sse?tab=readme-ov-file#handling-reconnections)). That being said, even if comment handling bug is fixed, it still makes sense to add this check against `data` field unconditionally without any TODOs. Tested by mocking one level lower to put `httpx-sse` under test as well as it's an integration issue. Fixes #540 --------- Co-authored-by: Holt Skinner <13262395+holtskinner@users.noreply.github.com>
1 parent 2698cc0 commit 3dcb847

4 files changed

Lines changed: 125 additions & 0 deletions

File tree

src/a2a/client/transports/jsonrpc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ async def send_message_streaming(
176176
try:
177177
event_source.response.raise_for_status()
178178
async for sse in event_source.aiter_sse():
179+
if not sse.data:
180+
continue
179181
response = SendStreamingMessageResponse.model_validate(
180182
json.loads(sse.data)
181183
)

src/a2a/client/transports/rest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ async def send_message_streaming(
154154
try:
155155
event_source.response.raise_for_status()
156156
async for sse in event_source.aiter_sse():
157+
if not sse.data:
158+
continue
157159
event = a2a_pb2.StreamResponse()
158160
Parse(sse.data, event)
159161
yield proto_utils.FromProto.stream_response(event)

tests/client/transports/test_jsonrpc_client.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import httpx
88
import pytest
9+
import respx
910

1011
from httpx_sse import EventSource, SSEError, ServerSentEvent
1112

@@ -466,6 +467,63 @@ async def test_send_message_streaming_success(
466467
== mock_stream_response_2.result.model_dump()
467468
)
468469

470+
# Repro of https://github.com/a2aproject/a2a-python/issues/540
471+
@pytest.mark.asyncio
472+
@respx.mock
473+
async def test_send_message_streaming_comment_success(
474+
self,
475+
mock_agent_card: MagicMock,
476+
):
477+
async with httpx.AsyncClient() as client:
478+
transport = JsonRpcTransport(
479+
httpx_client=client, agent_card=mock_agent_card
480+
)
481+
params = MessageSendParams(
482+
message=create_text_message_object(content='Hello stream')
483+
)
484+
mock_stream_response_1 = SendMessageSuccessResponse(
485+
id='stream_id_123',
486+
jsonrpc='2.0',
487+
result=create_text_message_object(
488+
content='First part', role=Role.agent
489+
),
490+
)
491+
mock_stream_response_2 = SendMessageSuccessResponse(
492+
id='stream_id_123',
493+
jsonrpc='2.0',
494+
result=create_text_message_object(
495+
content='Second part', role=Role.agent
496+
),
497+
)
498+
499+
sse_content = (
500+
'id: stream_id_1\n'
501+
f'data: {mock_stream_response_1.model_dump_json()}\n\n'
502+
': keep-alive\n\n'
503+
'id: stream_id_2\n'
504+
f'data: {mock_stream_response_2.model_dump_json()}\n\n'
505+
': keep-alive\n\n'
506+
)
507+
508+
respx.post(mock_agent_card.url).mock(
509+
return_value=httpx.Response(
510+
200,
511+
headers={'Content-Type': 'text/event-stream'},
512+
content=sse_content,
513+
)
514+
)
515+
516+
results = [
517+
item
518+
async for item in transport.send_message_streaming(
519+
request=params
520+
)
521+
]
522+
523+
assert len(results) == 2
524+
assert results[0] == mock_stream_response_1.result
525+
assert results[1] == mock_stream_response_2.result
526+
469527
@pytest.mark.asyncio
470528
async def test_send_request_http_status_error(
471529
self, mock_httpx_client: AsyncMock, mock_agent_card: MagicMock

tests/client/transports/test_rest_client.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@
33

44
import httpx
55
import pytest
6+
import respx
67

8+
from google.protobuf.json_format import MessageToJson
79
from httpx_sse import EventSource, ServerSentEvent
810

911
from a2a.client import create_text_message_object
1012
from a2a.client.errors import A2AClientHTTPError
1113
from a2a.client.transports.rest import RestTransport
1214
from a2a.extensions.common import HTTP_EXTENSION_HEADER
15+
from a2a.grpc import a2a_pb2
1316
from a2a.types import (
1417
AgentCapabilities,
1518
AgentCard,
1619
MessageSendParams,
20+
Role,
1721
)
22+
from a2a.utils import proto_utils
1823

1924

2025
@pytest.fixture
@@ -88,6 +93,64 @@ async def test_send_message_with_default_extensions(
8893
},
8994
)
9095

96+
# Repro of https://github.com/a2aproject/a2a-python/issues/540
97+
@pytest.mark.asyncio
98+
@respx.mock
99+
async def test_send_message_streaming_comment_success(
100+
self,
101+
mock_agent_card: MagicMock,
102+
):
103+
"""Test that SSE comments are ignored."""
104+
async with httpx.AsyncClient() as client:
105+
transport = RestTransport(
106+
httpx_client=client, agent_card=mock_agent_card
107+
)
108+
params = MessageSendParams(
109+
message=create_text_message_object(content='Hello stream')
110+
)
111+
112+
mock_stream_response_1 = a2a_pb2.StreamResponse(
113+
msg=proto_utils.ToProto.message(
114+
create_text_message_object(
115+
content='First part', role=Role.agent
116+
)
117+
)
118+
)
119+
mock_stream_response_2 = a2a_pb2.StreamResponse(
120+
msg=proto_utils.ToProto.message(
121+
create_text_message_object(
122+
content='Second part', role=Role.agent
123+
)
124+
)
125+
)
126+
127+
sse_content = (
128+
'id: stream_id_1\n'
129+
f'data: {MessageToJson(mock_stream_response_1, indent=None)}\n\n'
130+
': keep-alive\n\n'
131+
'id: stream_id_2\n'
132+
f'data: {MessageToJson(mock_stream_response_2, indent=None)}\n\n'
133+
': keep-alive\n\n'
134+
)
135+
136+
respx.post(
137+
f'{mock_agent_card.url.rstrip("/")}/v1/message:stream'
138+
).mock(
139+
return_value=httpx.Response(
140+
200,
141+
headers={'Content-Type': 'text/event-stream'},
142+
content=sse_content,
143+
)
144+
)
145+
146+
results = []
147+
async for item in transport.send_message_streaming(request=params):
148+
results.append(item)
149+
150+
assert len(results) == 2
151+
assert results[0].parts[0].root.text == 'First part'
152+
assert results[1].parts[0].root.text == 'Second part'
153+
91154
@pytest.mark.asyncio
92155
@patch('a2a.client.transports.rest.aconnect_sse')
93156
async def test_send_message_streaming_with_new_extensions(

0 commit comments

Comments
 (0)