make notification of signatures work with workers#6254
Conversation
richvdh
left a comment
There was a problem hiding this comment.
looks generally ok, though I think it should probably be a separate stream. I don't know if a separate stream id is required though. @erikjohnston do you have any thoughts on how this stuff is meant to work?
|
Yeah, you probably want it to be a separate stream, but can share stream ID generator. ( Adding a stream is a matter of creating a new stream, similar to https://github.com/matrix-org/synapse/blob/master/synapse/replication/tcp/streams/federation.py, and adding it to the stream map https://github.com/matrix-org/synapse/blob/master/synapse/replication/tcp/streams/__init__.py. Then in the worker stores that use the stream add code to |
| result["device_lists"] = self._device_list_id_gen.get_current_token() | ||
| result["user_signature"] = result[ | ||
| "device_lists" | ||
| ] = self._device_list_id_gen.get_current_token() |
There was a problem hiding this comment.
This is kind of ugly, but it's what black came up with. 🤷♂️
Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.
There was a problem hiding this comment.
Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.
this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.
| result["device_lists"] = self._device_list_id_gen.get_current_token() | ||
| result["user_signature"] = result[ | ||
| "device_lists" | ||
| ] = self._device_list_id_gen.get_current_token() |
There was a problem hiding this comment.
Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.
this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.
| def stream_positions(self): | ||
| result = super(SlavedDeviceStore, self).stream_positions() | ||
| result["device_lists"] = self._device_list_id_gen.get_current_token() | ||
| result["user_signature"] = result[ |
There was a problem hiding this comment.
I know the existing code isn't good for this, but I'd rather we used the symbolic constants (UserSignatureStream.NAME in this case) for the stream names, which makes it much easier to find the places the streams are used.
| self._device_list_id_gen.advance(token) | ||
| for row in rows: | ||
| self._invalidate_caches_for_devices(token, row.user_id, row.destination) | ||
| elif stream_name == "user_signature": |
There was a problem hiding this comment.
| elif stream_name == "user_signature": | |
| elif stream_name == UserSignatureStream.NAME: |
…kers_notify * commit '3b4216f96': clean up code a bit make user signatures a separate stream add changelog make notification of signatures work with workers
The UNION query is kind of ugly, but shows one way of fixing the issue. Basically, the function needs to return the rows from the user signature stream in addition to the device lists stream.
Alternatively, I could use a new stream ID generator instead of using the device stream ID generator, which would involve more code. Though I suspect that it would be the better option.