-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Running under sqlite, Synapse incorrectly populates the to-device messages current stream ID #16681
Description
Spotted by @kegsay, diagnosed by me.
Synapse writes to the device inbox and outbox tables here:
synapse/synapse/storage/databases/main/deviceinbox.py
Lines 804 to 814 in 3e8531d
| async with self._device_inbox_id_gen.get_next() as stream_id: | |
| now_ms = self._clock.time_msec() | |
| await self.db_pool.runInteraction( | |
| "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id | |
| ) | |
| for user_id in local_messages_by_user_then_device.keys(): | |
| self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id) | |
| for destination in remote_messages_by_destination.keys(): | |
| self._device_federation_outbox_stream_cache.entity_has_changed( | |
| destination, stream_id | |
| ) |
synapse/synapse/storage/databases/main/deviceinbox.py
Lines 741 to 802 in 3e8531d
| def add_messages_txn( | |
| txn: LoggingTransaction, now_ms: int, stream_id: int | |
| ) -> None: | |
| # Add the local messages directly to the local inbox. | |
| self._add_messages_to_local_device_inbox_txn( | |
| txn, stream_id, local_messages_by_user_then_device | |
| ) | |
| # Add the remote messages to the federation outbox. | |
| # We'll send them to a remote server when we next send a | |
| # federation transaction to that destination. | |
| self.db_pool.simple_insert_many_txn( | |
| txn, | |
| table="device_federation_outbox", | |
| keys=( | |
| "destination", | |
| "stream_id", | |
| "queued_ts", | |
| "messages_json", | |
| "instance_name", | |
| ), | |
| values=[ | |
| ( | |
| destination, | |
| stream_id, | |
| now_ms, | |
| json_encoder.encode(edu), | |
| self._instance_name, | |
| ) | |
| for destination, edu in remote_messages_by_destination.items() | |
| ], | |
| ) | |
| for destination, edu in remote_messages_by_destination.items(): | |
| if issue9533_logger.isEnabledFor(logging.DEBUG): | |
| issue9533_logger.debug( | |
| "Queued outgoing to-device messages with " | |
| "stream_id %i, EDU message_id %s, type %s for %s: %s", | |
| stream_id, | |
| edu["message_id"], | |
| edu["type"], | |
| destination, | |
| [ | |
| f"{user_id}/{device_id} (msgid " | |
| f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})" | |
| for (user_id, messages_by_device) in edu["messages"].items() | |
| for (device_id, msg) in messages_by_device.items() | |
| ], | |
| ) | |
| for user_id, messages_by_device in edu["messages"].items(): | |
| for device_id, msg in messages_by_device.items(): | |
| with start_active_span("store_outgoing_to_device_message"): | |
| set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"]) | |
| set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"]) | |
| set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"]) | |
| set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) | |
| set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) | |
| set_tag( | |
| SynapseTags.TO_DEVICE_MSGID, | |
| msg.get(EventContentFields.TO_DEVICE_MSGID), | |
| ) |
Suppose that:
- Synapse calls add_messages_txn with 0 local events (for the inbox) and at least 1 remote event (for the outbox). This is stored normally in the DB.
- The event fails to send, or otherwise does not get its chance to be sent out to the destination.
- Synapse shuts down.
At this point,
SELECT MAX(stream_id) FROM device_inboxis strictly smaller than
SELECT MAX(stream_id) FROM device_federation_outbox(the latter is the stream ID used in step 1.)
Now Synapse restarts. On sqlite, it populates the to-device stream using
synapse/synapse/storage/databases/main/deviceinbox.py
Lines 104 to 106 in 3e8531d
| self._device_inbox_id_gen = StreamIdGenerator( | |
| db_conn, hs.get_replication_notifier(), "device_inbox", "stream_id" | |
| ) |
which will end up calling
synapse/synapse/storage/util/id_generators.py
Lines 78 to 93 in 91587d4
| def _load_current_id( | |
| db_conn: LoggingDatabaseConnection, table: str, column: str, step: int = 1 | |
| ) -> int: | |
| cur = db_conn.cursor(txn_name="_load_current_id") | |
| if step == 1: | |
| cur.execute("SELECT MAX(%s) FROM %s" % (column, table)) | |
| else: | |
| cur.execute("SELECT MIN(%s) FROM %s" % (column, table)) | |
| result = cur.fetchone() | |
| assert result is not None | |
| (val,) = result | |
| cur.close() | |
| current_id = int(val) if val else step | |
| res = (max if step > 0 else min)(current_id, step) | |
| logger.info("Initialising stream generator for %s(%s): %i", table, column, res) | |
| return res |
which is the first query, which is smaller.
This means the stream_id has jumped backwards.
I don't think this is necessarily the end of the world, since it looks like the tables don't expect the stream ID to be unique (i.e. they're fine with it being reused). But it is a surprise and makes debugging more confusing.