Skip to content

Commit 525f40d

Browse files
pitrou1st1
authored andcommitted
bpo-31819: Add AbstractEventLoop.sock_recv_into() (#4051)
* bpo-31819: Add AbstractEventLoop.sock_recv_into() * Add NEWS * Add doc
1 parent ea2ef5d commit 525f40d

File tree

9 files changed

+266
-58
lines changed

9 files changed

+266
-58
lines changed

Doc/library/asyncio-eventloop.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,21 @@ Low-level socket operations
554554

555555
This method is a :ref:`coroutine <coroutine>`.
556556

557+
.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf)
558+
559+
Receive data from the socket. Modeled after blocking
560+
:meth:`socket.socket.recv_into` method.
561+
562+
The received data is written into *buf* (a writable buffer).
563+
The return value is the number of bytes written.
564+
565+
With :class:`SelectorEventLoop` event loop, the socket *sock* must be
566+
non-blocking.
567+
568+
This method is a :ref:`coroutine <coroutine>`.
569+
570+
.. versionadded:: 3.7
571+
557572
.. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data)
558573

559574
Send data to the socket. Modeled after blocking

Lib/asyncio/events.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ def remove_writer(self, fd):
461461
def sock_recv(self, sock, nbytes):
462462
raise NotImplementedError
463463

464+
def sock_recv_into(self, sock, buf):
465+
raise NotImplementedError
466+
464467
def sock_sendall(self, sock, data):
465468
raise NotImplementedError
466469

Lib/asyncio/proactor_events.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,9 @@ def close(self):
439439
def sock_recv(self, sock, n):
440440
return self._proactor.recv(sock, n)
441441

442+
def sock_recv_into(self, sock, buf):
443+
return self._proactor.recv_into(sock, buf)
444+
442445
def sock_sendall(self, sock, data):
443446
return self._proactor.send(sock, data)
444447

Lib/asyncio/selector_events.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,41 @@ def _sock_recv(self, fut, registered, sock, n):
386386
else:
387387
fut.set_result(data)
388388

389+
def sock_recv_into(self, sock, buf):
390+
"""Receive data from the socket.
391+
392+
The received data is written into *buf* (a writable buffer).
393+
The return value is the number of bytes written.
394+
395+
This method is a coroutine.
396+
"""
397+
if self._debug and sock.gettimeout() != 0:
398+
raise ValueError("the socket must be non-blocking")
399+
fut = self.create_future()
400+
self._sock_recv_into(fut, False, sock, buf)
401+
return fut
402+
403+
def _sock_recv_into(self, fut, registered, sock, buf):
404+
# _sock_recv_into() can add itself as an I/O callback if the operation
405+
# can't be done immediately. Don't use it directly, call sock_recv_into().
406+
fd = sock.fileno()
407+
if registered:
408+
# Remove the callback early. It should be rare that the
409+
# selector says the fd is ready but the call still returns
410+
# EAGAIN, and I am willing to take a hit in that case in
411+
# order to simplify the common case.
412+
self.remove_reader(fd)
413+
if fut.cancelled():
414+
return
415+
try:
416+
nbytes = sock.recv_into(buf)
417+
except (BlockingIOError, InterruptedError):
418+
self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
419+
except Exception as exc:
420+
fut.set_exception(exc)
421+
else:
422+
fut.set_result(nbytes)
423+
389424
def sock_sendall(self, sock, data):
390425
"""Send data to the socket.
391426

Lib/asyncio/windows_events.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,28 @@ def finish_recv(trans, key, ov):
448448

449449
return self._register(ov, conn, finish_recv)
450450

451+
def recv_into(self, conn, buf, flags=0):
452+
self._register_with_iocp(conn)
453+
ov = _overlapped.Overlapped(NULL)
454+
try:
455+
if isinstance(conn, socket.socket):
456+
ov.WSARecvInto(conn.fileno(), buf, flags)
457+
else:
458+
ov.ReadFileInto(conn.fileno(), buf)
459+
except BrokenPipeError:
460+
return self._result(b'')
461+
462+
def finish_recv(trans, key, ov):
463+
try:
464+
return ov.getresult()
465+
except OSError as exc:
466+
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
467+
raise ConnectionResetError(*exc.args)
468+
else:
469+
raise
470+
471+
return self._register(ov, conn, finish_recv)
472+
451473
def send(self, conn, buf, flags=0):
452474
self._register_with_iocp(conn)
453475
ov = _overlapped.Overlapped(NULL)

Lib/test/test_asyncio/test_events.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,9 @@ def _basetest_sock_client_ops(self, httpd, sock):
425425
with self.assertRaises(ValueError):
426426
self.loop.run_until_complete(
427427
self.loop.sock_recv(sock, 1024))
428+
with self.assertRaises(ValueError):
429+
self.loop.run_until_complete(
430+
self.loop.sock_recv_into(sock, bytearray()))
428431
with self.assertRaises(ValueError):
429432
self.loop.run_until_complete(
430433
self.loop.sock_accept(sock))
@@ -443,16 +446,37 @@ def _basetest_sock_client_ops(self, httpd, sock):
443446
sock.close()
444447
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
445448

449+
def _basetest_sock_recv_into(self, httpd, sock):
450+
# same as _basetest_sock_client_ops, but using sock_recv_into
451+
sock.setblocking(False)
452+
self.loop.run_until_complete(
453+
self.loop.sock_connect(sock, httpd.address))
454+
self.loop.run_until_complete(
455+
self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
456+
data = bytearray(1024)
457+
with memoryview(data) as buf:
458+
nbytes = self.loop.run_until_complete(
459+
self.loop.sock_recv_into(sock, buf[:1024]))
460+
# consume data
461+
self.loop.run_until_complete(
462+
self.loop.sock_recv_into(sock, buf[nbytes:]))
463+
sock.close()
464+
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
465+
446466
def test_sock_client_ops(self):
447467
with test_utils.run_test_server() as httpd:
448468
sock = socket.socket()
449469
self._basetest_sock_client_ops(httpd, sock)
470+
sock = socket.socket()
471+
self._basetest_sock_recv_into(httpd, sock)
450472

451473
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
452474
def test_unix_sock_client_ops(self):
453475
with test_utils.run_test_unix_server() as httpd:
454476
sock = socket.socket(socket.AF_UNIX)
455477
self._basetest_sock_client_ops(httpd, sock)
478+
sock = socket.socket(socket.AF_UNIX)
479+
self._basetest_sock_recv_into(httpd, sock)
456480

457481
def test_sock_client_fail(self):
458482
# Make sure that we will get an unused port
@@ -2612,6 +2636,8 @@ def test_not_implemented(self):
26122636
NotImplementedError, loop.remove_writer, 1)
26132637
self.assertRaises(
26142638
NotImplementedError, loop.sock_recv, f, 10)
2639+
self.assertRaises(
2640+
NotImplementedError, loop.sock_recv_into, f, 10)
26152641
self.assertRaises(
26162642
NotImplementedError, loop.sock_sendall, f, 10)
26172643
self.assertRaises(

Lib/test/test_asyncio/test_proactor_events.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,11 @@ def test_sock_recv(self):
489489
self.loop.sock_recv(self.sock, 1024)
490490
self.proactor.recv.assert_called_with(self.sock, 1024)
491491

492+
def test_sock_recv_into(self):
493+
buf = bytearray(10)
494+
self.loop.sock_recv_into(self.sock, buf)
495+
self.proactor.recv_into.assert_called_with(self.sock, buf)
496+
492497
def test_sock_sendall(self):
493498
self.loop.sock_sendall(self.sock, b'data')
494499
self.proactor.send.assert_called_with(self.sock, b'data')
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add AbstractEventLoop.sock_recv_into().

0 commit comments

Comments
 (0)