Skip to content

Commit 6524782

Browse files
[v1.x] fix: handle ClosedResourceError when transport closes mid-request (#2334)
Co-authored-by: Owen Devereaux <owendevereaux@users.noreply.github.com> Co-authored-by: Max Isbey <224885523+maxisbey@users.noreply.github.com>
1 parent 2e9897e commit 6524782

File tree

3 files changed

+208
-25
lines changed

3 files changed

+208
-25
lines changed

src/mcp/server/lowlevel/server.py

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -671,16 +671,23 @@ async def run(
671671
await stack.enter_async_context(task_support.run())
672672

673673
async with anyio.create_task_group() as tg:
674-
async for message in session.incoming_messages:
675-
logger.debug("Received message: %s", message)
676-
677-
tg.start_soon(
678-
self._handle_message,
679-
message,
680-
session,
681-
lifespan_context,
682-
raise_exceptions,
683-
)
674+
try:
675+
async for message in session.incoming_messages:
676+
logger.debug("Received message: %s", message)
677+
678+
tg.start_soon(
679+
self._handle_message,
680+
message,
681+
session,
682+
lifespan_context,
683+
raise_exceptions,
684+
)
685+
finally:
686+
# Transport closed: cancel in-flight handlers. Without this the
687+
# TG join waits for them, and when they eventually try to
688+
# respond they hit a closed write stream (the session's
689+
# _receive_loop closed it when the read stream ended).
690+
tg.cancel_scope.cancel()
684691

685692
async def _handle_message(
686693
self,
@@ -763,12 +770,18 @@ async def _handle_request(
763770
response = await handler(req)
764771
except McpError as err: # pragma: no cover
765772
response = err.error
766-
except anyio.get_cancelled_exc_class(): # pragma: no cover
767-
logger.info(
768-
"Request %s cancelled - duplicate response suppressed",
769-
message.request_id,
770-
)
771-
return
773+
except anyio.get_cancelled_exc_class():
774+
if message.cancelled:
775+
# Client sent CancelledNotification; responder.cancel() already
776+
# sent an error response, so skip the duplicate.
777+
logger.info(
778+
"Request %s cancelled - duplicate response suppressed",
779+
message.request_id,
780+
)
781+
return
782+
# Transport-close cancellation from the TG in run(); re-raise so the
783+
# TG swallows its own cancellation.
784+
raise
772785
except Exception as err: # pragma: no cover
773786
if raise_exceptions:
774787
raise err
@@ -777,16 +790,24 @@ async def _handle_request(
777790
# Reset the global state after we are done
778791
if token is not None: # pragma: no branch
779792
request_ctx.reset(token)
780-
781-
await message.respond(response)
782793
else: # pragma: no cover
783-
await message.respond(
784-
types.ErrorData(
785-
code=types.METHOD_NOT_FOUND,
786-
message="Method not found",
787-
)
794+
response = types.ErrorData(
795+
code=types.METHOD_NOT_FOUND,
796+
message="Method not found",
788797
)
789798

799+
try:
800+
await message.respond(response)
801+
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
802+
# Transport closed between handler unblocking and respond. Happens
803+
# when _receive_loop's finally wakes a handler blocked on
804+
# send_request: the handler runs to respond() before run()'s TG
805+
# cancel fires, but after the write stream closed. Closed if our
806+
# end closed (_receive_loop's async-with exit); Broken if the peer
807+
# end closed first (streamable_http terminate()).
808+
logger.debug("Response for %s dropped - transport closed", message.request_id)
809+
return
810+
790811
logger.debug("Response sent")
791812

792813
async def _handle_notification(self, notify: Any):

src/mcp/shared/session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def __exit__(
108108
) -> None:
109109
"""Exit the context manager, performing cleanup and notifying completion."""
110110
try:
111-
if self._completed: # pragma: no branch
111+
if self._completed:
112112
self._on_complete(self)
113113
finally:
114114
self._entered = False
@@ -445,7 +445,9 @@ async def _receive_loop(self) -> None:
445445
finally:
446446
# after the read stream is closed, we need to send errors
447447
# to any pending requests
448-
for id, stream in self._response_streams.items():
448+
# Snapshot: stream.send() wakes the waiter, whose finally pops
449+
# from _response_streams before the next __next__() call.
450+
for id, stream in list(self._response_streams.items()):
449451
error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
450452
try:
451453
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))

tests/server/test_cancel_handling.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,21 @@
99
from mcp.server.lowlevel.server import Server
1010
from mcp.shared.exceptions import McpError
1111
from mcp.shared.memory import create_connected_server_and_client_session
12+
from mcp.shared.message import SessionMessage
1213
from mcp.types import (
14+
LATEST_PROTOCOL_VERSION,
1315
CallToolRequest,
1416
CallToolRequestParams,
1517
CallToolResult,
1618
CancelledNotification,
1719
CancelledNotificationParams,
20+
ClientCapabilities,
1821
ClientNotification,
1922
ClientRequest,
23+
Implementation,
24+
InitializeRequestParams,
25+
JSONRPCNotification,
26+
JSONRPCRequest,
2027
Tool,
2128
)
2229

@@ -108,3 +115,156 @@ async def first_request():
108115
assert isinstance(content, types.TextContent)
109116
assert content.text == "Call number: 2"
110117
assert call_count == 2
118+
119+
120+
@pytest.mark.anyio
121+
async def test_server_cancels_in_flight_handlers_on_transport_close():
122+
"""When the transport closes mid-request, server.run() must cancel in-flight
123+
handlers rather than join on them.
124+
125+
Without the cancel, the task group waits for the handler, which then tries
126+
to respond through a write stream that _receive_loop already closed,
127+
raising ClosedResourceError and crashing server.run() with exit code 1.
128+
129+
This drives server.run() with raw memory streams because InMemoryTransport
130+
wraps it in its own finally-cancel (_memory.py) which masks the bug.
131+
"""
132+
handler_started = anyio.Event()
133+
handler_cancelled = anyio.Event()
134+
server_run_returned = anyio.Event()
135+
136+
server = Server("test")
137+
138+
@server.call_tool()
139+
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
140+
handler_started.set()
141+
try:
142+
await anyio.sleep_forever()
143+
finally:
144+
handler_cancelled.set()
145+
# unreachable: sleep_forever only exits via cancellation
146+
raise AssertionError # pragma: no cover
147+
148+
to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
149+
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)
150+
151+
async def run_server():
152+
await server.run(server_read, server_write, server.create_initialization_options())
153+
server_run_returned.set()
154+
155+
init_req = JSONRPCRequest(
156+
jsonrpc="2.0",
157+
id=1,
158+
method="initialize",
159+
params=InitializeRequestParams(
160+
protocolVersion=LATEST_PROTOCOL_VERSION,
161+
capabilities=ClientCapabilities(),
162+
clientInfo=Implementation(name="test", version="1.0"),
163+
).model_dump(by_alias=True, mode="json", exclude_none=True),
164+
)
165+
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
166+
call_req = JSONRPCRequest(
167+
jsonrpc="2.0",
168+
id=2,
169+
method="tools/call",
170+
params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"),
171+
)
172+
173+
with anyio.fail_after(5):
174+
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
175+
tg.start_soon(run_server)
176+
177+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(init_req)))
178+
await from_server.receive() # init response
179+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(initialized)))
180+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(call_req)))
181+
182+
await handler_started.wait()
183+
184+
# Close the server's input stream — this is what stdin EOF does.
185+
# server.run()'s incoming_messages loop ends, finally-cancel fires,
186+
# handler gets CancelledError, server.run() returns.
187+
await to_server.aclose()
188+
189+
await server_run_returned.wait()
190+
191+
assert handler_cancelled.is_set()
192+
193+
194+
@pytest.mark.anyio
195+
async def test_server_handles_transport_close_with_pending_server_to_client_requests():
196+
"""When the transport closes while handlers are blocked on server→client
197+
requests (sampling, roots, elicitation), server.run() must still exit cleanly.
198+
199+
Two bugs covered:
200+
1. _receive_loop's finally iterates _response_streams with await checkpoints
201+
inside; the woken handler's send_request finally pops from that dict
202+
before the next __next__() — RuntimeError: dictionary changed size.
203+
2. The woken handler's MCPError is caught in _handle_request, which falls
204+
through to respond() against a write stream _receive_loop already closed.
205+
"""
206+
handlers_started = 0
207+
both_started = anyio.Event()
208+
server_run_returned = anyio.Event()
209+
210+
server = Server("test")
211+
212+
@server.call_tool()
213+
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
214+
nonlocal handlers_started
215+
handlers_started += 1
216+
if handlers_started == 2:
217+
both_started.set()
218+
# Blocks on send_request waiting for a client response that never comes.
219+
# _receive_loop's finally will wake this with CONNECTION_CLOSED.
220+
await server.request_context.session.list_roots()
221+
raise AssertionError # pragma: no cover
222+
223+
to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
224+
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)
225+
226+
async def run_server():
227+
await server.run(server_read, server_write, server.create_initialization_options())
228+
server_run_returned.set()
229+
230+
init_req = JSONRPCRequest(
231+
jsonrpc="2.0",
232+
id=1,
233+
method="initialize",
234+
params=InitializeRequestParams(
235+
protocolVersion=LATEST_PROTOCOL_VERSION,
236+
capabilities=ClientCapabilities(),
237+
clientInfo=Implementation(name="test", version="1.0"),
238+
).model_dump(by_alias=True, mode="json", exclude_none=True),
239+
)
240+
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
241+
242+
with anyio.fail_after(5):
243+
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
244+
tg.start_soon(run_server)
245+
246+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(init_req)))
247+
await from_server.receive() # init response
248+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(initialized)))
249+
250+
# Two tool calls → two handlers → two _response_streams entries.
251+
for rid in (2, 3):
252+
call_req = JSONRPCRequest(
253+
jsonrpc="2.0",
254+
id=rid,
255+
method="tools/call",
256+
params=CallToolRequestParams(name="t", arguments={}).model_dump(by_alias=True, mode="json"),
257+
)
258+
await to_server.send(SessionMessage(message=types.JSONRPCMessage(call_req)))
259+
260+
await both_started.wait()
261+
# Drain the two roots/list requests so send_request's _write_stream.send()
262+
# completes and both handlers are parked at response_stream_reader.receive().
263+
await from_server.receive()
264+
await from_server.receive()
265+
266+
await to_server.aclose()
267+
268+
# Without the fixes: RuntimeError (dict mutation) or ClosedResourceError
269+
# (respond after write-stream close) escapes run_server and this hangs.
270+
await server_run_returned.wait()

0 commit comments

Comments
 (0)