Skip to content

Conversation

@guybe7
Copy link
Collaborator

@guybe7 guybe7 commented Feb 16, 2022

Fix #6887
Deleting a stream while a client is blocked XREADGROUP should unblock the client.

The idea is that if a client is blocked via XREADGROUP is different from
any other blocking type in the sense that it depends on the existence of both
the key and the group. Even if the key is deleted and then revived with XADD
it won't help any clients blocked on XREADGROUP because the group no longer
exist, so they would fail with -NOGROUP anyway.
The conclusion is that it's better to unblock these clients (with error) upon
the deletion of the key, rather than waiting for the first XADD.

Other changes:

  1. Slightly optimize all serveClientsBlockedOn* functions by checking server.blocked_clients_by_type
  2. All serveClientsBlockedOn* functions now use a list iterator rather than looking at listFirst, relying on unblockClient to delete the head of the list. Before this commit, only serveClientsBlockedOnStreams used to work like that.
  3. bugfix: CLIENT UNBLOCK ERROR should work even if the command doesn't have a timeout_callback (only relevant to module commands)

Copy link
Member

@itamarhaber itamarhaber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@oranagra
Copy link
Member

@yossigo there was an argument that maybe this problem isn't worth the complexity of fixing it, and maybe we should just document that it's a known limitation.
the reason is that applications could simply avoid deleting stream keys while they have a blocked cgroup client.

personally, looking at the fix, i'm not sure the complexity is that high, so i'm willing to take it, even if just so that we know we don't have an awkward edge case unresolved in the code.

please take a quick sim through the changes and say what you think.

Copy link
Collaborator

@yossigo yossigo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oranagra I think it's not ideal but also not that terrible. I think this is a kind of payment for consumer groups being a non-key. My only concern here would be additional cases we did not consider that may further complicate this solution.

@oranagra oranagra added the release-notes indication that this issue needs to be mentioned in the release notes label Feb 27, 2022
Co-authored-by: Yossi Gottlieb <yossigo@gmail.com>
@oranagra
Copy link
Member

oranagra commented Mar 2, 2022

@soloestoy maybe you wanna take a quick look to see if you can spot something we missed?

@soloestoy
Copy link
Contributor

soloestoy commented Mar 2, 2022

I didn't read the code, but seems you didn't handle XGROUP DESTROY and XGROUP DELCONSUMER , not sure if it's needed.

ignore above, you already fixed it in another PR #6905

@guybe7
Copy link
Collaborator Author

guybe7 commented Mar 2, 2022

@soloestoy serveClientsBlockedOnStreamKey creates the consumer if it doesn't exist (not sure if by design, but that's the way it is)
so even if the blocked consumer is deleted, the XADD that should have awakened that consumer will re-create it as a side effect (and it will unblock the client)

c1:

127.0.0.1:6379> XGROUP CREATE x g $ MKSTREAM
OK
127.0.0.1:6379> XREADGROUP group g Alice block 0 streams x >

c2:

127.0.0.1:6379> XGROUP DELCONSUMER x g Alice
(integer) 0
127.0.0.1:6379> XADD x * f v
"1646214278403-0"
127.0.0.1:6379> 

c1:

127.0.0.1:6379> XREADGROUP group g Alice block 0 streams x >
1) 1) "x"
   2) 1) 1) "1646214278403-0"
         2) 1) "f"
            2) "v"
(33.97s)
127.0.0.1:6379> 

robj *value = dictGetVal(kde);
was_stream = value->type == OBJ_STREAM;
}
if (replaced_with) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to check if the deleted stream is replaced with no-stream object:

  1. if the new object is not stream, it means the original key is overwritten, should awaken the blocked client.
  2. if the new object is stream, it may not have the specific group, should also awaken the blocked client to check if it needs to reply an -NOGROUP error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. not necessarily. if the new type of the key isn't list/zset/stream/module, no one will call signalKeyAsReady and the client will remain blocked.
  2. this statement is correct (but it's not related to this function, which only checks if the new key is not a stream)

Copy link
Contributor

@soloestoy soloestoy Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if the current stream is replaced with another db's stream, and the new stream doesn't have the original stream's group, we should also wake up the client in case it blocked forever.

no matter what new type it is we all need to call signalKeyAsReady, so we don't need to check the replaced_with dict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the SWAPDB command on a stream key is like a transaction:

MULTI
DEL stream
XADD stream ...
EXEC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I forgot you call scanDatabaseForReadyKeys after db swapped, I get your point now.

Co-authored-by: zhaozhao.zz <276441700@qq.com>
@oranagra oranagra merged commit 2a29540 into redis:unstable Mar 8, 2022
@oranagra oranagra mentioned this pull request Apr 5, 2022
guybe7 added a commit to guybe7/redis that referenced this pull request Sep 22, 2022
The use case is a module that wants to implement a blocking command on a key
that necessarily exists, and want to unblock the client in case the key is
deleted (much like what we implemnted for XREADGROUP in redis#10306)

blocked.c:
1. Both module and stream functions are called whether the key exists or not,
   and regardless of its type.
   We do that in order to allow modules/stream to unblock the client in case the
   key is no longer present or has changed type (the behavior for streams didn't
   change, just code that moved into serveClientsBlockedOnStreamKey)
2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in
   order to propagate actions from moduleTryServeClientBlockedOnKey.
3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after
   lookupKeyReadWithFlags in order to prevent a possible lazy-expire DEL to be
   mized with any command propagated by the preceding functions.

db.c:
1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will
   signalKeyAsReady for any key the was removed from the database or changed
   type.
   It is the responsibilty of the code in blocked.c to ignore or to act on
   deleted/type-changed keys.

module.c, redismodule.h:
1. new API RM_SignalKeyAsReadyByDbId to be used when RedisModuleCtx is not
   provided.

blockedonkey.c + tcl:
1. Added test of new capabilites (FSL.BPOPGT now requires the key to exist in
   order to work)
oranagra pushed a commit that referenced this pull request Oct 18, 2022
The use case is a module that wants to implement a blocking command on a key that
necessarily exists and wants to unblock the client in case the key is deleted (much like
what we implemented for XREADGROUP in #10306)

New module API:
* RedisModule_BlockClientOnKeysWithFlags

Flags:
* REDISMODULE_BLOCK_UNBLOCK_NONE
* REDISMODULE_BLOCK_UNBLOCK_DELETED

### Detailed description of code changes

blocked.c:
1. Both module and stream functions are called whether the key exists or not, regardless of
  its type. We do that in order to allow modules/stream to unblock the client in case the key
  is no longer present or has changed type (the behavior for streams didn't change, just code
  that moved into serveClientsBlockedOnStreamKey)
2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate
  actions from moduleTryServeClientBlockedOnKey.
3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags
  to prevent a possible lazy-expire DEL from being mixed with any command propagated by the
  preceding functions.
4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted.
   Minor optimizations (use dictAddRaw).
5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted.
  It will only signal if there's at least one client that awaits key deletion (to save calls to
  handleClientsBlockedOnKeys).
  Minor optimizations (use dictAddRaw)

db.c:
1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady
  for any key that was removed from the database or changed type. It is the responsibility of the code
  in blocked.c to ignore or act on deleted/type-changed keys.
2. Use the new signalDeletedKeyAsReady where needed

blockedonkey.c + tcl:
1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
madolson pushed a commit to madolson/redis that referenced this pull request Apr 19, 2023
…1310)

The use case is a module that wants to implement a blocking command on a key that
necessarily exists and wants to unblock the client in case the key is deleted (much like
what we implemented for XREADGROUP in redis#10306)

New module API:
* RedisModule_BlockClientOnKeysWithFlags

Flags:
* REDISMODULE_BLOCK_UNBLOCK_NONE
* REDISMODULE_BLOCK_UNBLOCK_DELETED

### Detailed description of code changes

blocked.c:
1. Both module and stream functions are called whether the key exists or not, regardless of
  its type. We do that in order to allow modules/stream to unblock the client in case the key
  is no longer present or has changed type (the behavior for streams didn't change, just code
  that moved into serveClientsBlockedOnStreamKey)
2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate
  actions from moduleTryServeClientBlockedOnKey.
3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags
  to prevent a possible lazy-expire DEL from being mixed with any command propagated by the
  preceding functions.
4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted.
   Minor optimizations (use dictAddRaw).
5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted.
  It will only signal if there's at least one client that awaits key deletion (to save calls to
  handleClientsBlockedOnKeys).
  Minor optimizations (use dictAddRaw)

db.c:
1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady
  for any key that was removed from the database or changed type. It is the responsibility of the code
  in blocked.c to ignore or act on deleted/type-changed keys.
2. Use the new signalDeletedKeyAsReady where needed

blockedonkey.c + tcl:
1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
enjoy-binbin pushed a commit to enjoy-binbin/redis that referenced this pull request Jul 31, 2023
…1310)

The use case is a module that wants to implement a blocking command on a key that
necessarily exists and wants to unblock the client in case the key is deleted (much like
what we implemented for XREADGROUP in redis#10306)

New module API:
* RedisModule_BlockClientOnKeysWithFlags

Flags:
* REDISMODULE_BLOCK_UNBLOCK_NONE
* REDISMODULE_BLOCK_UNBLOCK_DELETED

### Detailed description of code changes

blocked.c:
1. Both module and stream functions are called whether the key exists or not, regardless of
  its type. We do that in order to allow modules/stream to unblock the client in case the key
  is no longer present or has changed type (the behavior for streams didn't change, just code
  that moved into serveClientsBlockedOnStreamKey)
2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate
  actions from moduleTryServeClientBlockedOnKey.
3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags
  to prevent a possible lazy-expire DEL from being mixed with any command propagated by the
  preceding functions.
4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted.
   Minor optimizations (use dictAddRaw).
5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted.
  It will only signal if there's at least one client that awaits key deletion (to save calls to
  handleClientsBlockedOnKeys).
  Minor optimizations (use dictAddRaw)

db.c:
1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady
  for any key that was removed from the database or changed type. It is the responsibility of the code
  in blocked.c to ignore or act on deleted/type-changed keys.
2. Use the new signalDeletedKeyAsReady where needed

blockedonkey.c + tcl:
1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-notes indication that this issue needs to be mentioned in the release notes

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

Deleting a stream while blocking with XREADGROUP

6 participants