-
Notifications
You must be signed in to change notification settings - Fork 24.4k
XREADGROUP: Unblock client if stream is deleted #10306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
itamarhaber
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@yossigo there was an argument that maybe this problem isn't worth the complexity of fixing it, and maybe we should just document that it's a known limitation. personally, looking at the fix, i'm not sure the complexity is that high, so i'm willing to take it, even if just so that we know we don't have an awkward edge case unresolved in the code. please take a quick sim through the changes and say what you think. |
yossigo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oranagra I think it's not ideal but also not that terrible. I think this is a kind of payment for consumer groups being a non-key. My only concern here would be additional cases we did not consider that may further complicate this solution.
Co-authored-by: Yossi Gottlieb <yossigo@gmail.com>
|
@soloestoy maybe you wanna take a quick look to see if you can spot something we missed? |
|
I didn't read the code, but seems you didn't handle ignore above, you already fixed it in another PR #6905 |
|
@soloestoy c1: c2: c1: |
| robj *value = dictGetVal(kde); | ||
| was_stream = value->type == OBJ_STREAM; | ||
| } | ||
| if (replaced_with) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to check if the deleted stream is replaced with no-stream object:
- if the new object is not stream, it means the original key is overwritten, should awaken the blocked client.
- if the new object is stream, it may not have the specific group, should also awaken the blocked client to check if it needs to reply an
-NOGROUPerror.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- not necessarily. if the new type of the key isn't list/zset/stream/module, no one will call
signalKeyAsReadyand the client will remain blocked. - this statement is correct (but it's not related to this function, which only checks if the new key is not a stream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if the current stream is replaced with another db's stream, and the new stream doesn't have the original stream's group, we should also wake up the client in case it blocked forever.
no matter what new type it is we all need to call signalKeyAsReady, so we don't need to check the replaced_with dict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the SWAPDB command on a stream key is like a transaction:
MULTI
DEL stream
XADD stream ...
EXEC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I forgot you call scanDatabaseForReadyKeys after db swapped, I get your point now.
Co-authored-by: zhaozhao.zz <276441700@qq.com>
The use case is a module that wants to implement a blocking command on a key that necessarily exists, and want to unblock the client in case the key is deleted (much like what we implemnted for XREADGROUP in redis#10306) blocked.c: 1. Both module and stream functions are called whether the key exists or not, and regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags in order to prevent a possible lazy-expire DEL to be mized with any command propagated by the preceding functions. db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key the was removed from the database or changed type. It is the responsibilty of the code in blocked.c to ignore or to act on deleted/type-changed keys. module.c, redismodule.h: 1. new API RM_SignalKeyAsReadyByDbId to be used when RedisModuleCtx is not provided. blockedonkey.c + tcl: 1. Added test of new capabilites (FSL.BPOPGT now requires the key to exist in order to work)
The use case is a module that wants to implement a blocking command on a key that necessarily exists and wants to unblock the client in case the key is deleted (much like what we implemented for XREADGROUP in #10306) New module API: * RedisModule_BlockClientOnKeysWithFlags Flags: * REDISMODULE_BLOCK_UNBLOCK_NONE * REDISMODULE_BLOCK_UNBLOCK_DELETED ### Detailed description of code changes blocked.c: 1. Both module and stream functions are called whether the key exists or not, regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags to prevent a possible lazy-expire DEL from being mixed with any command propagated by the preceding functions. 4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted. Minor optimizations (use dictAddRaw). 5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted. It will only signal if there's at least one client that awaits key deletion (to save calls to handleClientsBlockedOnKeys). Minor optimizations (use dictAddRaw) db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key that was removed from the database or changed type. It is the responsibility of the code in blocked.c to ignore or act on deleted/type-changed keys. 2. Use the new signalDeletedKeyAsReady where needed blockedonkey.c + tcl: 1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
…1310) The use case is a module that wants to implement a blocking command on a key that necessarily exists and wants to unblock the client in case the key is deleted (much like what we implemented for XREADGROUP in redis#10306) New module API: * RedisModule_BlockClientOnKeysWithFlags Flags: * REDISMODULE_BLOCK_UNBLOCK_NONE * REDISMODULE_BLOCK_UNBLOCK_DELETED ### Detailed description of code changes blocked.c: 1. Both module and stream functions are called whether the key exists or not, regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags to prevent a possible lazy-expire DEL from being mixed with any command propagated by the preceding functions. 4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted. Minor optimizations (use dictAddRaw). 5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted. It will only signal if there's at least one client that awaits key deletion (to save calls to handleClientsBlockedOnKeys). Minor optimizations (use dictAddRaw) db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key that was removed from the database or changed type. It is the responsibility of the code in blocked.c to ignore or act on deleted/type-changed keys. 2. Use the new signalDeletedKeyAsReady where needed blockedonkey.c + tcl: 1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
…1310) The use case is a module that wants to implement a blocking command on a key that necessarily exists and wants to unblock the client in case the key is deleted (much like what we implemented for XREADGROUP in redis#10306) New module API: * RedisModule_BlockClientOnKeysWithFlags Flags: * REDISMODULE_BLOCK_UNBLOCK_NONE * REDISMODULE_BLOCK_UNBLOCK_DELETED ### Detailed description of code changes blocked.c: 1. Both module and stream functions are called whether the key exists or not, regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags to prevent a possible lazy-expire DEL from being mixed with any command propagated by the preceding functions. 4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted. Minor optimizations (use dictAddRaw). 5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted. It will only signal if there's at least one client that awaits key deletion (to save calls to handleClientsBlockedOnKeys). Minor optimizations (use dictAddRaw) db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key that was removed from the database or changed type. It is the responsibility of the code in blocked.c to ignore or act on deleted/type-changed keys. 2. Use the new signalDeletedKeyAsReady where needed blockedonkey.c + tcl: 1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
Fix #6887
Deleting a stream while a client is blocked XREADGROUP should unblock the client.
The idea is that if a client is blocked via XREADGROUP is different from
any other blocking type in the sense that it depends on the existence of both
the key and the group. Even if the key is deleted and then revived with XADD
it won't help any clients blocked on XREADGROUP because the group no longer
exist, so they would fail with -NOGROUP anyway.
The conclusion is that it's better to unblock these clients (with error) upon
the deletion of the key, rather than waiting for the first XADD.
Other changes:
serveClientsBlockedOn*functions by checkingserver.blocked_clients_by_typeserveClientsBlockedOn*functions now use a list iterator rather than looking atlistFirst, relying onunblockClientto delete the head of the list. Before this commit, onlyserveClientsBlockedOnStreamsused to work like that.