Ensure stream messages are always ordered#8059
Conversation
|
CI doesn't look sadder than usual. I'll follow up with a test and I think we can merge |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files + 4 20 suites +4 10h 57m 56s ⏱️ + 2h 37m 11s For more details on these failures, see this check. Results for commit 40253fb. ± Comparison against base commit 31af5c0. ♻️ This comment has been updated with latest results. |
| @gen_test() | ||
| async def test_messages_are_ordered_bsend(): | ||
| ledger = [] | ||
|
|
||
| async def async_handler(val): | ||
| await asyncio.sleep(0.1) | ||
| ledger.append(val) | ||
|
|
||
| def sync_handler(val): | ||
| ledger.append(val) | ||
|
|
||
| async with Server( | ||
| {}, | ||
| stream_handlers={ | ||
| "sync_handler": sync_handler, | ||
| "async_handler": async_handler, | ||
| }, | ||
| ) as s: | ||
| await s.listen() | ||
| comm = await connect(s.address) | ||
| try: | ||
| b = BatchedSend(interval=10) | ||
| try: | ||
| await comm.write({"op": "connection_stream"}) | ||
| b.start(comm) | ||
| n = 100 | ||
| for ix in range(n): | ||
| if ix % 2: | ||
| b.send({"op": "sync_handler", "val": ix}) | ||
| else: | ||
| b.send({"op": "async_handler", "val": ix}) | ||
| while not len(ledger) == n: | ||
| await asyncio.sleep(0.01) | ||
| assert ledger == list(range(n)) | ||
| finally: | ||
| await b.close() | ||
| finally: | ||
| await comm.close() |
There was a problem hiding this comment.
incredible that we didn't have test for this
| async def __aenter__(self): | ||
| await self | ||
| return self | ||
|
|
||
| async def __aexit__(self, *args): | ||
| await self.close() | ||
| return |
There was a problem hiding this comment.
This is an unrelated but pleasant addition to the CommPool
hendrikmakait
left a comment
There was a problem hiding this comment.
I like the reduced complexity this approach introduces. However, I can see this causing issues in real-world use cases because long-running handlers might delay others. Have you run benchmarks on this to see if anything pops up?
The possible benefits warrant giving this a try. So, this looks good to me with one suggestion for improving the tests before giving ✅.
Just to highlight this: This shouldn't cause issues, if anything, it should prevent them. I just wouldn't be surprised if some behavior implicitly relied on async handlers being non-blocking and executed in the background. |
No but reviewing the code base we actually don't even have any async handlers right now with the exception of handle_request_refresh_who_has although this one should not be async... |
|
Well, there is |
|
Ah sorry, the above was wrong. |
Thanks for the clarification. Let's drop |
Sure... doesn't really matter |
hendrikmakait
left a comment
There was a problem hiding this comment.
LGTM, assuming CI isn't unhappier than usual.
Some motivation for this over here #8049 (comment)
Essentially this breaks ordering of operations as soon as a handler is async. With this change, we'd ensure ordering within a given BatchedStream regardless of whether the handlers are sync or async.
I think this is a much easier to reason about behavior (but I have no idea where this will blow up... )