Refactor getting replication updates from database.#7636
Refactor getting replication updates from database.#7636erikjohnston merged 13 commits intodevelopfrom
Conversation
40c58c7 to
b6e35f2
Compare
synapse/handlers/typing.py
Outdated
| rows.append((serial, (room_id, list(typing)))) | ||
| rows.sort() | ||
| return rows[:limit] | ||
| return rows[:limit], current_id, False |
There was a problem hiding this comment.
doesn't this need more intelligence?
There was a problem hiding this comment.
Woops, for some reason i thought we weren't limiting for this
| ) | ||
| sql = """ | ||
| SELECT stream_id, user_id | ||
| FROM push_rules_stream |
There was a problem hiding this comment.
why are we stuffing all those columns into the table if we don't care about them... 😕
There was a problem hiding this comment.
Oh, hmm, it does look like they're not used anywhere else either. Though I don't really propose doing anything in this PR.
| """Get updates for backfill replication stream, including all new | ||
| backfilled events and events that have gone from being outliers to not. | ||
| """ |
There was a problem hiding this comment.
could you document the args and return format of the update functions? (Are the ids inclusive or exclusive?)
I know they are all much the same, but if I'm working on the storage layer, I don't want to have to go digging into how the replication layer works to know what a given function is supposed to do, and not having it written down explicitly is a good way for assumptions to be made and off-by-one errors to get in.
synapse/handlers/typing.py
Outdated
| rows.append((serial, (room_id, list(typing)))) | ||
| rows.sort() | ||
| return rows[:limit], current_id, False | ||
| return rows[:limit], current_id, len(rows) > limit |
There was a problem hiding this comment.
this is doing the wrong thing for the returned token when the limit is hit.
There was a problem hiding this comment.
I am such a crank 🤦
There was a problem hiding this comment.
MAYBE THIS TIME I'VE fIXeD IT?!!!??!1?
|
|
||
| Returns: | ||
| A tuple consisting of: the updates, the position of the rows | ||
| returned up to, and whether we returned fewer rows than exists |
There was a problem hiding this comment.
can you rephrase "the position of the rows returned up to"? it's somewhat unclear: inconsistent use of "position" and "token", inclusive or exclusive, etc.
"the last token included in the results", maybe?
There was a problem hiding this comment.
Hopefully I've clarified it. Unfortunately, technically, the returned token doesn't have to be the last token included in the results (since current_id could be greater due to a gap)
| async def get_all_new_backfill_event_rows( | ||
| self, instance_name: str, last_id: int, current_id: int, limit: int | ||
| ) -> Tuple[List[Tuple[int, list]], int, bool]: | ||
| """Get updates for backfill replication stream, including all new |
There was a problem hiding this comment.
this docstring is very helpful, but please can all the updated/new storage methods have one, not just this method?
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
* commit 'f6f7511a4': Refactor getting replication updates from database. (#7636)
The aim here is to make it easier to reason about when streams are limited and when they're not, by moving the logic into the database functions themselves. This should mean we can kill of
db_query_to_update_functionfunction.You might want to look at each commit individually.
This only does a subset of the streams, and simply merges the functionality from
db_query_to_update_functioninto each streams get update DB function:synapse/synapse/replication/tcp/streams/_base.py
Lines 201 to 218 in 6c1f7c7
c.f #7340