Skip to content

Commit 65a3307

Browse files
committed
EXEC always fails with EXECABORT and multi-state is cleared
In order to support the use of multi-exec in pipeline, it is important that MULTI and EXEC are never rejected and it is easy for the client to know if the connection is still in multi state. It was easy to make sure MULTI and DISCARD never fail (done by previous commits) since these only change the client state and don't do any actual change in the server, but EXEC is a different story. Since in the past, it was possible for clients to handle some EXEC errors and retry the EXEC, we now can't affort to return any error on EXEC other than EXECABORT, which now carries with it the real reason for the abort too. Other fixes in this commit: - Some checks that where performed at the time of queuing need to be re- validated when EXEC runs, for instance if the transaction contains writes commands, it needs to be aborted. there was one check that was already done in execCommand (-READONLY), but other checks where missing: -OOM, -MISCONF, -NOREPLICAS, -MASTERDOWN - When a command is rejected by processCommand it was rejected with addReply, which was not recognized as an error in case the bad command came from the master. this will enable to count or MONITOR these errors in the future. - make it easier for tests to create additional (non deferred) clients. - add tests for the fixes of this commit.
1 parent 2ebcd63 commit 65a3307

6 files changed

Lines changed: 204 additions & 91 deletions

File tree

src/multi.c

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void initClientMultiState(client *c) {
3636
c->mstate.commands = NULL;
3737
c->mstate.count = 0;
3838
c->mstate.cmd_flags = 0;
39+
c->mstate.cmd_inv_flags = 0;
3940
}
4041

4142
/* Release all the resources associated with MULTI/EXEC state */
@@ -76,6 +77,7 @@ void queueMultiCommand(client *c) {
7677
incrRefCount(mc->argv[j]);
7778
c->mstate.count++;
7879
c->mstate.cmd_flags |= c->cmd->flags;
80+
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
7981
}
8082

8183
void discardTransaction(client *c) {
@@ -122,6 +124,23 @@ void execCommandPropagateExec(client *c) {
122124
PROPAGATE_AOF|PROPAGATE_REPL);
123125
}
124126

127+
/* Aborts a transaction, with a specific error message.
128+
* The transaction is always aboarted with -EXECABORT so that the client knows
129+
* the server exited the multi state, but the actual reason for the abort is
130+
* included too. */
131+
void execCommandAbort(client *c, sds error) {
132+
discardTransaction(c);
133+
134+
if (error[0] == '-') error++;
135+
addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
136+
137+
/* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
138+
* already, and didn't send any of the queued commands, now we'll just send
139+
* EXEC so it is clear that the transaction is over. */
140+
if (listLength(server.monitors) && !server.loading)
141+
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
142+
}
143+
125144
void execCommand(client *c) {
126145
int j;
127146
robj **orig_argv;
@@ -135,15 +154,6 @@ void execCommand(client *c) {
135154
return;
136155
}
137156

138-
/* If we are in -BUSY state, flag the transaction and return the
139-
* -BUSY error, like Redis <= 5. This is a temporary fix, may be changed
140-
* ASAP, see issue #7353 on Github. */
141-
if (server.lua_timedout) {
142-
flagTransaction(c);
143-
addReply(c, shared.slowscripterr);
144-
return;
145-
}
146-
147157
/* Check if we need to abort the EXEC because:
148158
* 1) Some WATCHed key was touched.
149159
* 2) There was a previous error while queueing commands.
@@ -157,21 +167,6 @@ void execCommand(client *c) {
157167
goto handle_monitor;
158168
}
159169

160-
/* If there are write commands inside the transaction, and this is a read
161-
* only slave, we want to send an error. This happens when the transaction
162-
* was initiated when the instance was a master or a writable replica and
163-
* then the configuration changed (for example instance was turned into
164-
* a replica). */
165-
if (!server.loading && server.masterhost && server.repl_slave_ro &&
166-
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
167-
{
168-
addReplyError(c,
169-
"Transaction contains write commands but instance "
170-
"is now a read-only replica. EXEC aborted.");
171-
discardTransaction(c);
172-
goto handle_monitor;
173-
}
174-
175170
/* Exec all the queued commands */
176171
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
177172
orig_argv = c->argv;

src/networking.c

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -406,19 +406,23 @@ void addReplyError(client *c, const char *err) {
406406
addReplyErrorLength(c,err,strlen(err));
407407
}
408408

409+
/* See addReplyErrorLength.
410+
* Makes sure there are no newlines in the string, otherwise invalid protocol
411+
* is emitted. */
412+
void addReplyErrorSafe(client *c, char *s, size_t len) {
413+
size_t j;
414+
for (j = 0; j < len; j++) {
415+
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
416+
}
417+
addReplyErrorLength(c,s,sdslen(s));
418+
}
419+
409420
void addReplyErrorFormat(client *c, const char *fmt, ...) {
410-
size_t l, j;
411421
va_list ap;
412422
va_start(ap,fmt);
413423
sds s = sdscatvprintf(sdsempty(),fmt,ap);
414424
va_end(ap);
415-
/* Make sure there are no newlines in the string, otherwise invalid protocol
416-
* is emitted. */
417-
l = sdslen(s);
418-
for (j = 0; j < l; j++) {
419-
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
420-
}
421-
addReplyErrorLength(c,s,sdslen(s));
425+
addReplyErrorSafe(c, s, sdslen(s));
422426
sdsfree(s);
423427
}
424428

src/server.c

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3398,6 +3398,34 @@ void call(client *c, int flags) {
33983398
server.stat_numcommands++;
33993399
}
34003400

3401+
/* Used when a command that is ready for execution needs to be rejected, due to
3402+
* varios pre-execution checks. it returns the appropriate error to the client.
3403+
* If there's a transaction is flags it as dirty, and if the command is EXEC,
3404+
* it aborts the transaction. */
3405+
void rejectCommand(client *c, robj *reply) {
3406+
flagTransaction(c);
3407+
if (c->cmd && c->cmd->proc == execCommand) {
3408+
execCommandAbort(c, reply->ptr);
3409+
} else {
3410+
/* using addReplyError* rather than addReply so that the error can be logged. */
3411+
addReplyErrorSafe(c, reply->ptr, sdslen(reply->ptr));
3412+
}
3413+
}
3414+
3415+
void rejectCommandFormat(client *c, const char *fmt, ...) {
3416+
flagTransaction(c);
3417+
va_list ap;
3418+
va_start(ap,fmt);
3419+
sds s = sdscatvprintf(sdsempty(),fmt,ap);
3420+
va_end(ap);
3421+
if (c->cmd && c->cmd->proc == execCommand) {
3422+
execCommandAbort(c, s);
3423+
} else {
3424+
addReplyErrorSafe(c, s, sdslen(s));
3425+
}
3426+
sdsfree(s);
3427+
}
3428+
34013429
/* If this function gets called we already read a whole
34023430
* command, arguments are in the client argv/argc fields.
34033431
* processCommand() execute the command or prepare the
@@ -3423,23 +3451,30 @@ int processCommand(client *c) {
34233451
* such as wrong arity, bad command name and so forth. */
34243452
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
34253453
if (!c->cmd) {
3426-
flagTransaction(c);
34273454
sds args = sdsempty();
34283455
int i;
34293456
for (i=1; i < c->argc && sdslen(args) < 128; i++)
34303457
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
3431-
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
3458+
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
34323459
(char*)c->argv[0]->ptr, args);
34333460
sdsfree(args);
34343461
return C_OK;
34353462
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
34363463
(c->argc < -c->cmd->arity)) {
3437-
flagTransaction(c);
3438-
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
3464+
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
34393465
c->cmd->name);
34403466
return C_OK;
34413467
}
34423468

3469+
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
3470+
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
3471+
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
3472+
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
3473+
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
3474+
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
3475+
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
3476+
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
3477+
34433478
/* Check if the user is authenticated. This check is skipped in case
34443479
* the default user is flagged as "nopass" and is active. */
34453480
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
@@ -3449,8 +3484,7 @@ int processCommand(client *c) {
34493484
/* AUTH and HELLO and no auth modules are valid even in
34503485
* non-authenticated state. */
34513486
if (!(c->cmd->flags & CMD_NO_AUTH)) {
3452-
flagTransaction(c);
3453-
addReply(c,shared.noautherr);
3487+
rejectCommand(c,shared.noautherr);
34543488
return C_OK;
34553489
}
34563490
}
@@ -3461,13 +3495,12 @@ int processCommand(client *c) {
34613495
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
34623496
if (acl_retval != ACL_OK) {
34633497
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
3464-
flagTransaction(c);
34653498
if (acl_retval == ACL_DENIED_CMD)
3466-
addReplyErrorFormat(c,
3499+
rejectCommandFormat(c,
34673500
"-NOPERM this user has no permissions to run "
34683501
"the '%s' command or its subcommand", c->cmd->name);
34693502
else
3470-
addReplyErrorFormat(c,
3503+
rejectCommandFormat(c,
34713504
"-NOPERM this user has no permissions to access "
34723505
"one of the keys used as arguments");
34733506
return C_OK;
@@ -3515,13 +3548,11 @@ int processCommand(client *c) {
35153548
* is trying to execute is denied during OOM conditions or the client
35163549
* is in MULTI/EXEC context? Error. */
35173550
if (out_of_memory &&
3518-
(c->cmd->flags & CMD_DENYOOM ||
3551+
(is_denyoom_command ||
35193552
(c->flags & CLIENT_MULTI &&
3520-
c->cmd->proc != execCommand &&
35213553
c->cmd->proc != discardCommand)))
35223554
{
3523-
flagTransaction(c);
3524-
addReply(c, shared.oomerr);
3555+
rejectCommand(c, shared.oomerr);
35253556
return C_OK;
35263557
}
35273558

@@ -3542,17 +3573,14 @@ int processCommand(client *c) {
35423573
int deny_write_type = writeCommandsDeniedByDiskError();
35433574
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
35443575
server.masterhost == NULL &&
3545-
(c->cmd->flags & CMD_WRITE ||
3546-
c->cmd->proc == pingCommand))
3576+
(is_write_command ||c->cmd->proc == pingCommand))
35473577
{
3548-
flagTransaction(c);
35493578
if (deny_write_type == DISK_ERROR_TYPE_RDB)
3550-
addReply(c, shared.bgsaveerr);
3579+
rejectCommand(c, shared.bgsaveerr);
35513580
else
3552-
addReplySds(c,
3553-
sdscatprintf(sdsempty(),
3581+
rejectCommandFormat(c,
35543582
"-MISCONF Errors writing to the AOF file: %s\r\n",
3555-
strerror(server.aof_last_write_errno)));
3583+
strerror(server.aof_last_write_errno));
35563584
return C_OK;
35573585
}
35583586

@@ -3561,22 +3589,20 @@ int processCommand(client *c) {
35613589
if (server.masterhost == NULL &&
35623590
server.repl_min_slaves_to_write &&
35633591
server.repl_min_slaves_max_lag &&
3564-
c->cmd->flags & CMD_WRITE &&
3592+
is_write_command &&
35653593
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
35663594
{
3567-
flagTransaction(c);
3568-
addReply(c, shared.noreplicaserr);
3595+
rejectCommand(c, shared.noreplicaserr);
35693596
return C_OK;
35703597
}
35713598

35723599
/* Don't accept write commands if this is a read only slave. But
35733600
* accept write commands if this is our master. */
35743601
if (server.masterhost && server.repl_slave_ro &&
35753602
!(c->flags & CLIENT_MASTER) &&
3576-
c->cmd->flags & CMD_WRITE)
3603+
is_write_command)
35773604
{
3578-
flagTransaction(c);
3579-
addReply(c, shared.roslaveerr);
3605+
rejectCommand(c, shared.roslaveerr);
35803606
return C_OK;
35813607
}
35823608

@@ -3588,7 +3614,7 @@ int processCommand(client *c) {
35883614
c->cmd->proc != unsubscribeCommand &&
35893615
c->cmd->proc != psubscribeCommand &&
35903616
c->cmd->proc != punsubscribeCommand) {
3591-
addReplyErrorFormat(c,
3617+
rejectCommandFormat(c,
35923618
"Can't execute '%s': only (P)SUBSCRIBE / "
35933619
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
35943620
c->cmd->name);
@@ -3600,17 +3626,16 @@ int processCommand(client *c) {
36003626
* link with master. */
36013627
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
36023628
server.repl_serve_stale_data == 0 &&
3603-
!(c->cmd->flags & CMD_STALE))
3629+
is_denystale_command)
36043630
{
3605-
flagTransaction(c);
3606-
addReply(c, shared.masterdownerr);
3631+
rejectCommand(c, shared.masterdownerr);
36073632
return C_OK;
36083633
}
36093634

36103635
/* Loading DB? Return an error if the command has not the
36113636
* CMD_LOADING flag. */
3612-
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
3613-
addReply(c, shared.loadingerr);
3637+
if (server.loading && is_denyloading_command) {
3638+
rejectCommand(c, shared.loadingerr);
36143639
return C_OK;
36153640
}
36163641

@@ -3625,7 +3650,6 @@ int processCommand(client *c) {
36253650
c->cmd->proc != helloCommand &&
36263651
c->cmd->proc != replconfCommand &&
36273652
c->cmd->proc != multiCommand &&
3628-
c->cmd->proc != execCommand &&
36293653
c->cmd->proc != discardCommand &&
36303654
c->cmd->proc != watchCommand &&
36313655
c->cmd->proc != unwatchCommand &&
@@ -3636,8 +3660,7 @@ int processCommand(client *c) {
36363660
c->argc == 2 &&
36373661
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
36383662
{
3639-
flagTransaction(c);
3640-
addReply(c, shared.slowscripterr);
3663+
rejectCommand(c, shared.slowscripterr);
36413664
return C_OK;
36423665
}
36433666

src/server.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,9 @@ typedef struct multiState {
666666
int cmd_flags; /* The accumulated command flags OR-ed together.
667667
So if at least a command has a given flag, it
668668
will be set in this field. */
669+
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
670+
is possible to know if all the commands have a
671+
certain flag. */
669672
int minreplicas; /* MINREPLICAS for synchronous replication */
670673
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
671674
} multiState;
@@ -1626,6 +1629,7 @@ void addReplyBulkLongLong(client *c, long long ll);
16261629
void addReply(client *c, robj *obj);
16271630
void addReplySds(client *c, sds s);
16281631
void addReplyBulkSds(client *c, sds s);
1632+
void addReplyErrorSafe(client *c, char *s, size_t len);
16291633
void addReplyError(client *c, const char *err);
16301634
void addReplyStatus(client *c, const char *status);
16311635
void addReplyDouble(client *c, double d);
@@ -1724,6 +1728,7 @@ void touchWatchedKey(redisDb *db, robj *key);
17241728
void touchWatchedKeysOnFlush(int dbid);
17251729
void discardTransaction(client *c);
17261730
void flagTransaction(client *c);
1731+
void execCommandAbort(client *c, sds error);
17271732
void execCommandPropagateMulti(client *c);
17281733
void execCommandPropagateExec(client *c);
17291734

tests/test_helper.tcl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,21 @@ proc redis_deferring_client {args} {
196196
return $client
197197
}
198198

199+
proc redis_client {args} {
200+
set level 0
201+
if {[llength $args] > 0 && [string is integer [lindex $args 0]]} {
202+
set level [lindex $args 0]
203+
set args [lrange $args 1 end]
204+
}
205+
206+
# create client that defers reading reply
207+
set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls]
208+
209+
# select the right db and read the response (OK)
210+
$client select 9
211+
return $client
212+
}
213+
199214
# Provide easy access to INFO properties. Same semantic as "proc r".
200215
proc s {args} {
201216
set level 0

0 commit comments

Comments
 (0)