iproto: make iproto resistant to misusage#10589
Merged
sergepetrenko merged 1 commit intotarantool:masterfrom Nov 29, 2024
Merged
iproto: make iproto resistant to misusage#10589sergepetrenko merged 1 commit intotarantool:masterfrom
sergepetrenko merged 1 commit intotarantool:masterfrom
Conversation
5a16ad5 to
28bcae9
Compare
xuniq
approved these changes
Sep 20, 2024
changelogs/unreleased/gh_10155_make_iproto_resistant_to_misusage.md
Outdated
Show resolved
Hide resolved
28bcae9 to
9093697
Compare
sergepetrenko
previously approved these changes
Oct 7, 2024
Collaborator
There was a problem hiding this comment.
Thanks for the fix! I like that the solution turned out even simpler than the one we've discussed f2f previously.
Please solicit another review from @locker.
locker
reviewed
Oct 7, 2024
changelogs/unreleased/gh_10155_make_iproto_resistant_to_misusage.md
Outdated
Show resolved
Hide resolved
885a8ef to
127220b
Compare
9201611 to
bca5f46
Compare
Contributor
Author
|
diff (manually edited, so there may be inaccuracies) Detailsdiff --git a/src/box/errcode.h b/src/box/errcode.h
index 719bf6a06f..3a65a3f075 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -434,7 +434,6 @@ struct errcode_record {
_(ER_WAL_QUEUE_FULL, 287, "The WAL queue is full") \
_(ER_INVALID_VCLOCK, 288, "Invalid vclock", "value", STRING) \
_(ER_SYNC_QUEUE_FULL, 289, "The synchronous transaction queue is full") \
- _(ER_UNABLE_TO_PROCESS_REPLICATION, 290, "Replication request couldn't be processed, responses to previous requests were not read") \
TEST_ERROR_CODES(_) /** This one should be last. */
/*
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index a6ff5381ad..6995de383b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -845,8 +845,6 @@ struct iproto_connection
* This field is accesable only from iproto thread.
*/
struct mh_i64ptr_t *streams;
- /** Count of active requests that write something to obuf (gh-10155). */
- size_t active_writing_requests;
/**
* Kharon is used to implement box.session.push().
* When a new push is ready, tx uses kharon to notify
@@ +921 @@ struct iproto_connection
/** Set if connection is accepted in TX. */
bool is_established;
+ /** Number of iproto requests in flight. */
+ size_t requests;
@@ +987 @@ iproto_msg_delete(struct iproto_msg *msg)
{
+ assert(msg->connection->requests > 0);
+ msg->connection->requests--;
struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
@@ +1014 @@ iproto_msg_new(struct iproto_connection *con)
msg->fiber = NULL;
rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
+ con->requests++;
return msg;
@@ -1287,33 +1285,16 @@ iproto_connection_input_buffer(struct iproto_connection *con)
return new_ibuf;
}
-static struct obuf *
-iproto_connection_output_buffer(struct iproto_connection *con)
-{
- struct obuf_svp obuf_end = obuf_create_svp(con->wpos.obuf);
- struct obuf_svp *begin = &con->wpos.svp;
- if (con->wend.obuf != con->wpos.obuf) {
- /*
- * Flush the current buffer before
- * advancing to the next one.
- */
- if (begin->used == obuf_end.used) {
- con->wpos.obuf = con->wend.obuf;
- obuf_svp_reset(begin);
- }
- }
- return con->wpos.obuf;
-}
-
static inline bool
iproto_flushed(struct iproto_connection *con)
{
- struct obuf *obuf = iproto_connection_output_buffer(con);
+ struct obuf *obuf = con->wpos.obuf;
struct obuf_svp *begin = &con->wpos.svp;
- struct obuf_svp end = con->wend.svp;
+ struct obuf_svp *end = &con->wend.svp;
+
if (con->wend.obuf != obuf)
- end = obuf_create_svp(obuf);
- return (begin->used == end.used);
+ return (begin->used == obuf->used) && (end->used == 0);
+ return (begin->used == end->used);
}
/**
@@ -1410,15 +1391,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]++;
iproto_msg_prepare(msg, &pos, reqend);
- /**
- * We want to ensure that the replication request will not be
- * processed until all writing requests have been completely
- * flushed to obuf. It so happened that all requests that come
- * here, except for some replication ones, write something to
- * obuf.
- */
- if (!con->is_in_replication)
- con->active_writing_requests++;
if (iproto_msg_start_processing_in_stream(msg)) {
cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
n_requests++;
@@ -1580,25 +1552,46 @@ error:;
static int
iproto_flush(struct iproto_connection *con)
{
- struct obuf *obuf = iproto_connection_output_buffer(con);
+ struct obuf *obuf = con->wpos.obuf;
+ struct obuf_svp obuf_end = obuf_create_svp(obuf);
struct obuf_svp *begin = &con->wpos.svp;
- struct obuf_svp end = con->wend.svp;
- if (con->wend.obuf != obuf)
- end = obuf_create_svp(obuf);
-
- assert(begin->used <= end.used);
- if (begin->used == end.used) {
+ struct obuf_svp *end = &con->wend.svp;
+ if (con->wend.obuf != obuf) {
+ /*
+ * Flush the current buffer before
+ * advancing to the next one.
+ */
+ if (begin->used == obuf_end.used) {
+ obuf = con->wpos.obuf = con->wend.obuf;
+ obuf_svp_reset(begin);
+ } else {
+ end = &obuf_end;
+ }
+ }
+ if (begin->used == end->used) {
/* Nothing to do. */
return 1;
}
if (!con->can_write) {
/* Receiving end was closed. Discard the output. */
- *begin = end;
+ *begin = *end;
return 0;
}
+ assert(begin->used < end->used);
+
+ ERROR_INJECT(ERRINJ_IPROTO_FLUSH_DELAY, {
+ return IOSTREAM_WANT_WRITE;
+ });
+
struct iovec iov[SMALL_OBUF_IOV_MAX+1];
struct iovec *src = obuf->iov;
- int iovcnt = end.pos - begin->pos + 1;
+ int iovcnt = end->pos - begin->pos + 1;
/*
* iov[i].iov_len may be concurrently modified in tx thread,
* but only for the last position.
@@ -1606,14 +1599,14 @@ iproto_flush(struct iproto_connection *con)
memcpy(iov, src + begin->pos, iovcnt * sizeof(struct iovec));
sio_add_to_iov(iov, -begin->iov_len);
/* *Overwrite* iov_len of the last pos as it may be garbage. */
- iov[iovcnt - 1].iov_len = end.iov_len - begin->iov_len * (iovcnt == 1);
+ iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1);
ssize_t nwr = iostream_writev(&con->io, iov, iovcnt);
if (nwr >= 0) {
/* Count statistics */
rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr);
- if (begin->used + nwr == end.used) {
- *begin = end;
+ if (begin->used + nwr == end->used) {
+ *begin = *end;
return 0;
}
size_t offset = 0;
@@ -1622,7 +1615,7 @@ iproto_flush(struct iproto_connection *con)
begin->used += nwr; /* advance write position */
begin->iov_len = advance == 0 ? begin->iov_len + offset: offset;
begin->pos += advance;
- assert(begin->pos <= end.pos);
+ assert(begin->pos <= end->pos);
return IOSTREAM_WANT_WRITE;
} else if (nwr == IOSTREAM_ERROR) {
/*
@@ -1633,7 +1626,7 @@ iproto_flush(struct iproto_connection *con)
*/
diag_log();
con->can_write = false;
- *begin = end;
+ *begin = *end;
return 0;
}
return nwr;
@@ -1675,7 +1668,6 @@ iproto_connection_new(struct iproto_thread *iproto_thread)
struct iproto_connection *con = (struct iproto_connection *)
xmempool_alloc(&iproto_thread->iproto_connection_pool);
con->streams = mh_i64ptr_new();
- con->active_writing_requests = 0;
con->iproto_thread = iproto_thread;
con->input.data = con->output.data = con;
con->loop = loop();
@@ +1703 @@ iproto_connection_new(struct iproto_thread *iproto_thread)
con->tx.is_push_sent = false;
rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1);
+ con->requests = 0;
return con;
@@ -1840,23 +1832,6 @@ iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
type = msg->header.type;
stream_id = msg->header.stream_id;
- is_replication_request = msg->header.type == IPROTO_JOIN ||
- msg->header.type == IPROTO_FETCH_SNAPSHOT ||
- msg->header.type == IPROTO_REGISTER ||
- msg->header.type == IPROTO_SUBSCRIBE;
-
- if (is_replication_request) {
- /**
- * Before processing a replication request, ensure that all
- * writing requests have been completely flushed to obuf.
- */
- if (con->active_writing_requests > 0 || !iproto_flushed(con)) {
- diag_set(ClientError, ER_UNABLE_TO_PROCESS_REPLICATION);
- goto error;
- }
- con->is_in_replication = true;
- }
-
request_is_not_for_stream =
((type > IPROTO_TYPE_STAT_MAX &&
type != IPROTO_PING) || type == IPROTO_AUTH);
@@ -1866,17 +1841,34 @@ iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
type == IPROTO_ROLLBACK);
if (stream_id != 0 && request_is_not_for_stream) {
- assert(!con->is_in_replication);
diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
iproto_type_name(type));
goto error;
} else if (stream_id == 0 && request_is_only_for_stream) {
- assert(!con->is_in_replication);
diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
iproto_type_name(type));
goto error;
}
+ is_replication_request = msg->header.type == IPROTO_JOIN ||
+ msg->header.type == IPROTO_FETCH_SNAPSHOT ||
+ msg->header.type == IPROTO_REGISTER ||
+ msg->header.type == IPROTO_SUBSCRIBE;
+
+ if (is_replication_request) {
+ /**
+ * Before processing a replication request, ensure that all
+ * writing requests have been completely flushed to obuf.
+ */
+ if (mempool_count(&iproto_thread->iproto_msg_pool) > 1 ||
+ !iproto_flushed(con)) {
+ diag_set(ClientError, ER_PROTOCOL, "Can't process "
+ "join/subscribe while there are pending requests");
+ goto error;
+ }
+ con->is_in_replication = true;
+ }
+
handler = mh_i32_find(handlers, type, NULL);
if (handler != mh_end(handlers)) {
assert(!con->is_in_replication);
@@ -3264,8 +3256,6 @@ net_send_msg(struct cmsg *m)
struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
- if (!con->is_in_replication)
- con->active_writing_requests--;
iproto_msg_finish_processing_in_stream(msg);
if (msg->len != 0) {
/* Discard request (see iproto_enqueue_batch()). */
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index c512bba598..be84283c6a 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -102,6 +102,8 @@ struct errinj {
_(ERRINJ_IPROTO_DISABLE_ID, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_IPROTO_DISABLE_WATCH, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_IPROTO_FLIP_FEATURE, ERRINJ_INT, {.iparam = -1}) \
+ _(ERRINJ_IPROTO_FLUSH_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_IPROTO_SET_VERSION, ERRINJ_INT, {.iparam = -1}) \
_(ERRINJ_IPROTO_TX_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\
diff --git a/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua b/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua
index b712d98190..f06bc38501 100644
--- a/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua
+++ b/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua
@@ -120,7 +120,47 @@ g.test_iproto_crash_on_subscribe = function(g)
iproto_error_type(box.error.UNKNOWN_REQUEST_TYPE))
request_type = socket_read(g.s)[key.REQUEST_TYPE]
t.assert(request_type == type.OK or request_type ==
- iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+ iproto_error_type(box.error.PROTOCOL))
+end
+
+-- The test is designed to check the behavior when, at the time of receiving a
+-- replication request, the responses to all previous requests are written to
+-- obuf but not flushed to the socket
+g.test_iproto_crash_on_subscribe_flush_delay = function(g)
+ t.tarantool.skip_if_not_debug()
+ local uuid = require('uuid').str()
+ local replicaset_uuid = g.server:eval('return box.info.replicaset.uuid')
+ -- You can't use lustest.server:exec here because it will block
+ write_eval(g.s, "box.error.injection.set('ERRINJ_IPROTO_FLUSH_DELAY', true)")
+ -- Several requests in a row to pretend to be a non-blocking client and
+ -- provoke iproto_connection_feed_input, so that subscribe is immediately
+ -- read
+ for _ = 1, 3 do
+ write_ok(g.s)
+ end
+ -- A short pause so that the packages do not end up in one batch and all
+ -- responses have time to be written in obuf
+ require('fiber').sleep(0.1)
+ write_subscribe(g.s, uuid, replicaset_uuid, true)
+ -- A short pause so that the tx thread has time to write the response to
+ -- the subscribe request to the socket
+ require('fiber').sleep(0.1)
+ g.server:exec(function()
+ box.error.injection.set('ERRINJ_IPROTO_FLUSH_DELAY', false)
+ end)
+ -- Read IPROTO_EVAL response
+ local request_type = socket_read(g.s)[key.REQUEST_TYPE]
+ t.assert(request_type == type.OK)
+ for _ = 1, 3 do
+ request_type = socket_read(g.s)[key.REQUEST_TYPE]
+ t.assert_equals(request_type,
+ iproto_error_type(box.error.UNKNOWN_REQUEST_TYPE))
+ end
+ request_type = socket_read(g.s)[key.REQUEST_TYPE]
+ t.assert(request_type == type.OK or request_type ==
+ iproto_error_type(box.error.PROTOCOL))
end
-- The case sends dummy OKs before subscribe
@@ -140,7 +180,7 @@ g.test_iproto_crash_on_subscribe_spam_ok = function(g)
end
request_type = socket_read(g.s)[key.REQUEST_TYPE]
t.assert(request_type == type.OK or request_type ==
- iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+ iproto_error_type(box.error.PROTOCOL))
end
-- The case simulates a situation where the user of anonymous replication
@@ -161,7 +201,7 @@ g.test_iproto_crash_fetch_snapshot_subscribe = function(g)
iproto_error_type(box.error.UNKNOWN_REQUEST_TYPE))
request_type = socket_read(g.s)[key.REQUEST_TYPE]
t.assert(request_type == type.OK or request_type ==
- iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+ iproto_error_type(box.error.PROTOCOL))
end
-- The same as above, but additionally forgot to pass is_anon option to
@@ -183,5 +223,5 @@ g.test_iproto_crash_fetch_snapshot_subscribe_not_anon = function(g)
request_type = socket_read(g.s)[key.REQUEST_TYPE]
t.assert(request_type ==
iproto_error_type(box.error.TOO_EARLY_SUBSCRIBE) or request_type ==
- iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+ iproto_error_type(box.error.PROTOCOL))
end
diff --git a/test/box/error.result b/test/box/error.result
index 12031f8fcb..506118065e 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -500,7 +500,6 @@ t;
| 287: box.error.WAL_QUEUE_FULL
| 288: box.error.INVALID_VCLOCK
| 289: box.error.SYNC_QUEUE_FULL
- | 290: box.error.UNABLE_TO_PROCESS_REPLICATION
| ...
test_run:cmd("setopt delimiter ''"); |
0307189 to
c5630ef
Compare
4ac12e1 to
30a0597
Compare
locker
reviewed
Nov 18, 2024
30a0597 to
44958bd
Compare
locker
approved these changes
Nov 28, 2024
Fixed an issue when the tarantool could be easily crashed using iproto incorrectly, not according to the protocol. Closes tarantool#10155 NO_DOC=bugfix
44958bd to
5147fcd
Compare
sergepetrenko
approved these changes
Nov 29, 2024
Collaborator
sergepetrenko
left a comment
There was a problem hiding this comment.
Thanks for the fix!
Collaborator
|
@Astronomax, please, prepare PRs with backports of this patch to 2.11 and 3.2 (3.3 wasn't released yet when I merged the patch, so backport to 3.3 isn't necessary). |
Contributor
Author
|
@sergepetrenko, prepared: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixed an issue when the tarantool could be easily crashed using iproto incorrectly, not according to the protocol.
The problem seems to be unsynchronized multi-threaded communication with the socket. Hence the large number of different backtraces and assertions. To reproduce this, instead of

IPROTO_OKyou can use anything whose route looks like ->main->iproto (looks like all messages are going this way now), for example, I checked this withIPROTO_PING. A bad execution scenario is shown in the picture:More specifically, the problem is that when
tx_process_replicationis executed inmain, it does not know anything about the c-messages that are queued at the "net_x" endpoint iniproto. Iniproto_enqueue_batchwe callev_io_stopon bothinput/outputwatchers, which prevents theEV_READ/EV_WRITEevents from firing (and also clears pending events), but this does not help againstev_feed_event, which is called in the cmsg callback queued in theiprotothread.Closes #10155
NO_DOC=bugfix