From 3a9657fc144c57546398cdb0203053e189c31f49 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Sat, 9 Jul 2022 08:47:43 +0000 Subject: [PATCH 1/3] allow multiple waiters --- Lib/asyncio/streams.py | 35 +++++++++---------- Lib/test/test_asyncio/test_streams.py | 19 ++++++++++ ...2-07-09-08-55-04.gh-issue-74116.0XwYC1.rst | 1 + 3 files changed, 37 insertions(+), 18 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 614b2cda60682ff..c21ef7447c93a8a 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -2,6 +2,7 @@ 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', 'open_connection', 'start_server') +import collections import socket import sys import weakref @@ -128,7 +129,7 @@ def __init__(self, loop=None): else: self._loop = loop self._paused = False - self._drain_waiter = None + self._drain_waiters = collections.deque() self._connection_lost = False def pause_writing(self): @@ -143,9 +144,8 @@ def resume_writing(self): if self._loop.get_debug(): logger.debug("%r resumes writing", self) - waiter = self._drain_waiter - if waiter is not None: - self._drain_waiter = None + while self._drain_waiters: + waiter = self._drain_waiters.popleft() if not waiter.done(): waiter.set_result(None) @@ -154,27 +154,26 @@ def connection_lost(self, exc): # Wake up the writer if currently paused. if not self._paused: return - waiter = self._drain_waiter - if waiter is None: - return - self._drain_waiter = None - if waiter.done(): - return - if exc is None: - waiter.set_result(None) - else: - waiter.set_exception(exc) + + while self._drain_waiters: + waiter = self._drain_waiters.popleft() + if not waiter.done(): + if exc is None: + waiter.set_result(None) + else: + waiter.set_exception(exc) async def _drain_helper(self): if self._connection_lost: raise ConnectionResetError('Connection lost') if not self._paused: return - waiter = self._drain_waiter - assert waiter is None or waiter.cancelled() waiter = self._loop.create_future() - self._drain_waiter = waiter - await waiter + self._drain_waiters.append(waiter) + try: + await waiter + finally: + self._drain_waiters.remove(waiter) def _get_close_waiter(self, stream): raise NotImplementedError diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 098a0da344d0fb0..0c49099bc499a58 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -864,6 +864,25 @@ def test_streamreaderprotocol_constructor_use_global_loop(self): self.assertEqual(cm.filename, __file__) self.assertIs(protocol._loop, self.loop) + def test_multiple_drain(self): + # See https://github.com/python/cpython/issues/74116 + drained = 0 + + async def drainer(stream): + nonlocal drained + await stream._drain_helper() + drained += 1 + + async def main(): + loop = asyncio.get_running_loop() + stream = asyncio.streams.FlowControlMixin(loop) + stream.pause_writing() + loop.call_later(0.1, stream.resume_writing) + await asyncio.gather(*[drainer(stream) for _ in range(10)]) + self.assertEqual(drained, 10) + + self.loop.run_until_complete(main()) + def test_drain_raises(self): # See http://bugs.python.org/issue25441 diff --git a/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst b/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst new file mode 100644 index 000000000000000..d7ef94303e4f51c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst @@ -0,0 +1 @@ +Fix :meth:`asyncio.StreamWriter.drain` to be awaited concurrently by multiple tasks. Patch by Kumar Aditya. From b14ab8a9eac687dad700c773704e2f67bc6ccd9e Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 1 Aug 2022 13:56:17 +0000 Subject: [PATCH 2/3] do not pop future --- Lib/asyncio/streams.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index c21ef7447c93a8a..0b46749b7c0a273 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -144,8 +144,7 @@ def resume_writing(self): if self._loop.get_debug(): logger.debug("%r resumes writing", self) - while self._drain_waiters: - waiter = self._drain_waiters.popleft() + for waiter in self._drain_waiters: if not waiter.done(): waiter.set_result(None) @@ -155,8 +154,7 @@ def connection_lost(self, exc): if not self._paused: return - while self._drain_waiters: - waiter = self._drain_waiters.popleft() + for waiter in self._drain_waiters: if not waiter.done(): if exc is None: waiter.set_result(None) From c5e526a4f737ca7ffd70e178509147db332b7b34 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 29 Aug 2022 23:26:31 +0530 Subject: [PATCH 3/3] nits --- Lib/asyncio/streams.py | 2 +- .../next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 0b46749b7c0a273..c4d837a1170819d 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -150,7 +150,7 @@ def resume_writing(self): def connection_lost(self, exc): self._connection_lost = True - # Wake up the writer if currently paused. + # Wake up the writer(s) if currently paused. if not self._paused: return diff --git a/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst b/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst index d7ef94303e4f51c..33782598745b783 100644 --- a/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst +++ b/Misc/NEWS.d/next/Library/2022-07-09-08-55-04.gh-issue-74116.0XwYC1.rst @@ -1 +1 @@ -Fix :meth:`asyncio.StreamWriter.drain` to be awaited concurrently by multiple tasks. Patch by Kumar Aditya. +Allow :meth:`asyncio.StreamWriter.drain` to be awaited concurrently by multiple tasks. Patch by Kumar Aditya.