Skip to content

iproto: make iproto resistant to misusage#10589

Merged
sergepetrenko merged 1 commit intotarantool:masterfrom
Astronomax:iproto_misusage_championship
Nov 29, 2024
Merged

iproto: make iproto resistant to misusage#10589
sergepetrenko merged 1 commit intotarantool:masterfrom
Astronomax:iproto_misusage_championship

Conversation

@Astronomax
Copy link
Contributor

@Astronomax Astronomax commented Sep 18, 2024

Fixed an issue when the tarantool could be easily crashed using iproto incorrectly, not according to the protocol.

The problem seems to be unsynchronized multi-threaded communication with the socket. Hence the large number of different backtraces and assertions. To reproduce this, instead of IPROTO_OK you can use anything whose route looks like ->main->iproto (looks like all messages are going this way now), for example, I checked this with IPROTO_PING. A bad execution scenario is shown in the picture:
iproto_misusage_championship
More specifically, the problem is that when tx_process_replication is executed in main, it does not know anything about the c-messages that are queued at the "net_x" endpoint in iproto. In iproto_enqueue_batch we call ev_io_stop on both input/output watchers, which prevents the EV_READ/EV_WRITE events from firing (and also clears pending events), but this does not help against ev_feed_event, which is called in the cmsg callback queued in the iproto thread.

Closes #10155

NO_DOC=bugfix

@Astronomax Astronomax requested a review from a team as a code owner September 18, 2024 16:22
@coveralls
Copy link

coveralls commented Sep 18, 2024

Coverage Status

coverage: 87.364% (+0.04%) from 87.328%
when pulling 5147fcd on Astronomax:iproto_misusage_championship
into b4c226e
on tarantool:master
.

@Astronomax Astronomax force-pushed the iproto_misusage_championship branch 4 times, most recently from 5a16ad5 to 28bcae9 Compare September 19, 2024 06:52
sergepetrenko
sergepetrenko previously approved these changes Oct 7, 2024
Copy link
Collaborator

@sergepetrenko sergepetrenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! I like that the solution turned out even simpler than the one we've discussed f2f previously.

Please solicit another review from @locker.

@sergepetrenko sergepetrenko requested a review from locker October 7, 2024 12:54
@sergepetrenko sergepetrenko assigned locker and unassigned sergepetrenko Oct 7, 2024
@Astronomax Astronomax added the do not merge Not ready to be merged label Oct 20, 2024
@Astronomax Astronomax force-pushed the iproto_misusage_championship branch 9 times, most recently from 885a8ef to 127220b Compare November 1, 2024 18:28
@locker locker removed their assignment Nov 2, 2024
@locker locker dismissed sergepetrenko’s stale review November 2, 2024 10:15

The patch has been reworked.

@Astronomax Astronomax force-pushed the iproto_misusage_championship branch 2 times, most recently from 9201611 to bca5f46 Compare November 11, 2024 16:39
@Astronomax
Copy link
Contributor Author

Astronomax commented Nov 11, 2024

diff (manually edited, so there may be inaccuracies)

Details
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 719bf6a06f..3a65a3f075 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -434,7 +434,6 @@ struct errcode_record {
 	_(ER_WAL_QUEUE_FULL, 287,		"The WAL queue is full") \
 	_(ER_INVALID_VCLOCK, 288,		"Invalid vclock", "value", STRING) \
 	_(ER_SYNC_QUEUE_FULL, 289,		"The synchronous transaction queue is full") \
-	_(ER_UNABLE_TO_PROCESS_REPLICATION, 290, "Replication request couldn't be processed, responses to previous requests were not read") \
 	TEST_ERROR_CODES(_) /** This one should be last. */
 
 /*
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index a6ff5381ad..6995de383b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -845,8 +845,6 @@ struct iproto_connection
 	 * This field is accesable only from iproto thread.
 	 */
 	struct mh_i64ptr_t *streams;
-	/** Count of active requests that write something to obuf (gh-10155). */
-	size_t active_writing_requests;
 	/**
 	 * Kharon is used to implement box.session.push().
 	 * When a new push is ready, tx uses kharon to notify
@@ +921 @@ struct iproto_connection
	/** Set if connection is accepted in TX. */
	bool is_established;
+	/** Number of iproto requests in flight. */
+	size_t requests;
@@ +987 @@ iproto_msg_delete(struct iproto_msg *msg)
{
+	assert(msg->connection->requests > 0);
+	msg->connection->requests--;
	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
@@ +1014 @@ iproto_msg_new(struct iproto_connection *con)
	msg->fiber = NULL;
	rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
+	con->requests++;
	return msg;
@@ -1287,33 +1285,16 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
-static struct obuf *
-iproto_connection_output_buffer(struct iproto_connection *con)
-{
-	struct obuf_svp obuf_end = obuf_create_svp(con->wpos.obuf);
-	struct obuf_svp *begin = &con->wpos.svp;
-	if (con->wend.obuf != con->wpos.obuf) {
-		/*
-		 * Flush the current buffer before
-		 * advancing to the next one.
-		 */
-		if (begin->used == obuf_end.used) {
-			con->wpos.obuf = con->wend.obuf;
-			obuf_svp_reset(begin);
-		}
-	}
-	return con->wpos.obuf;
-}
-
 static inline bool
 iproto_flushed(struct iproto_connection *con)
 {
-	struct obuf *obuf = iproto_connection_output_buffer(con);
+	struct obuf *obuf = con->wpos.obuf;
 	struct obuf_svp *begin = &con->wpos.svp;
-	struct obuf_svp end = con->wend.svp;
+	struct obuf_svp *end = &con->wend.svp;
+
 	if (con->wend.obuf != obuf)
-		end = obuf_create_svp(obuf);
-	return (begin->used == end.used);
+		return (begin->used == obuf->used) && (end->used == 0);
+	return (begin->used == end->used);
 }
 
 /**
@@ -1410,15 +1391,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]++;
 
 		iproto_msg_prepare(msg, &pos, reqend);
-		/**
-		 * We want to ensure that the replication request will not be
-		 * processed until all writing requests have been completely
-		 * flushed to obuf. It so happened that all requests that come
-		 * here, except for some replication ones, write something to
-		 * obuf.
-		 */
-		if (!con->is_in_replication)
-			con->active_writing_requests++;
 		if (iproto_msg_start_processing_in_stream(msg)) {
 			cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
 			n_requests++;
@@ -1580,25 +1552,46 @@ error:;
 static int
 iproto_flush(struct iproto_connection *con)
 {
-	struct obuf *obuf = iproto_connection_output_buffer(con);
+	struct obuf *obuf = con->wpos.obuf;
+	struct obuf_svp obuf_end = obuf_create_svp(obuf);
 	struct obuf_svp *begin = &con->wpos.svp;
-	struct obuf_svp end = con->wend.svp;
-	if (con->wend.obuf != obuf)
-		end = obuf_create_svp(obuf);
-
-	assert(begin->used <= end.used);
-	if (begin->used == end.used) {
+	struct obuf_svp *end = &con->wend.svp;
+	if (con->wend.obuf != obuf) {
+		/*
+		 * Flush the current buffer before
+		 * advancing to the next one.
+		 */
+		if (begin->used == obuf_end.used) {
+			obuf = con->wpos.obuf = con->wend.obuf;
+			obuf_svp_reset(begin);
+		} else {
+			end = &obuf_end;
+		}
+	}
+	if (begin->used == end->used) {
 		/* Nothing to do. */
 		return 1;
 	}
 	if (!con->can_write) {
 		/* Receiving end was closed. Discard the output. */
-		*begin = end;
+		*begin = *end;
 		return 0;
 	}
+	assert(begin->used < end->used);
+
+	ERROR_INJECT(ERRINJ_IPROTO_FLUSH_DELAY, {
+		return IOSTREAM_WANT_WRITE;
+	});
+
 	struct iovec iov[SMALL_OBUF_IOV_MAX+1];
 	struct iovec *src = obuf->iov;
-	int iovcnt = end.pos - begin->pos + 1;
+	int iovcnt = end->pos - begin->pos + 1;
 	/*
 	 * iov[i].iov_len may be concurrently modified in tx thread,
 	 * but only for the last position.
@@ -1606,14 +1599,14 @@ iproto_flush(struct iproto_connection *con)
 	memcpy(iov, src + begin->pos, iovcnt * sizeof(struct iovec));
 	sio_add_to_iov(iov, -begin->iov_len);
 	/* *Overwrite* iov_len of the last pos as it may be garbage. */
-	iov[iovcnt - 1].iov_len = end.iov_len - begin->iov_len * (iovcnt == 1);
+	iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1);
 
 	ssize_t nwr = iostream_writev(&con->io, iov, iovcnt);
 	if (nwr >= 0) {
 		/* Count statistics */
 		rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr);
-		if (begin->used + nwr == end.used) {
-			*begin = end;
+		if (begin->used + nwr == end->used) {
+			*begin = *end;
 			return 0;
 		}
 		size_t offset = 0;
@@ -1622,7 +1615,7 @@ iproto_flush(struct iproto_connection *con)
 		begin->used += nwr;             /* advance write position */
 		begin->iov_len = advance == 0 ? begin->iov_len + offset: offset;
 		begin->pos += advance;
-		assert(begin->pos <= end.pos);
+		assert(begin->pos <= end->pos);
 		return IOSTREAM_WANT_WRITE;
 	} else if (nwr == IOSTREAM_ERROR) {
 		/*
@@ -1633,7 +1626,7 @@ iproto_flush(struct iproto_connection *con)
 		 */
 		diag_log();
 		con->can_write = false;
-		*begin = end;
+		*begin = *end;
 		return 0;
 	}
 	return nwr;
@@ -1675,7 +1668,6 @@ iproto_connection_new(struct iproto_thread *iproto_thread)
 	struct iproto_connection *con = (struct iproto_connection *)
 		xmempool_alloc(&iproto_thread->iproto_connection_pool);
 	con->streams = mh_i64ptr_new();
-	con->active_writing_requests = 0;
 	con->iproto_thread = iproto_thread;
 	con->input.data = con->output.data = con;
 	con->loop = loop();
@@ +1703 @@ iproto_connection_new(struct iproto_thread *iproto_thread)
	con->tx.is_push_sent = false;
	rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1);
+	con->requests = 0;
	return con;
@@ -1840,23 +1832,6 @@ iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
 	type = msg->header.type;
 	stream_id = msg->header.stream_id;
 
-	is_replication_request = msg->header.type == IPROTO_JOIN ||
-				 msg->header.type == IPROTO_FETCH_SNAPSHOT ||
-				 msg->header.type == IPROTO_REGISTER ||
-				 msg->header.type == IPROTO_SUBSCRIBE;
-
-	if (is_replication_request) {
-		/**
-		 * Before processing a replication request, ensure that all
-		 * writing requests have been completely flushed to obuf.
-		 */
-		if (con->active_writing_requests > 0 || !iproto_flushed(con)) {
-			diag_set(ClientError, ER_UNABLE_TO_PROCESS_REPLICATION);
-			goto error;
-		}
-		con->is_in_replication = true;
-	}
-
 	request_is_not_for_stream =
 		((type > IPROTO_TYPE_STAT_MAX &&
 		 type != IPROTO_PING) || type == IPROTO_AUTH);
@@ -1866,17 +1841,34 @@ iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
 		 type == IPROTO_ROLLBACK);
 
 	if (stream_id != 0 && request_is_not_for_stream) {
-		assert(!con->is_in_replication);
 		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
 			 iproto_type_name(type));
 		goto error;
 	} else if (stream_id == 0 && request_is_only_for_stream) {
-		assert(!con->is_in_replication);
 		diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
 			 iproto_type_name(type));
 		goto error;
 	}
 
+	is_replication_request = msg->header.type == IPROTO_JOIN ||
+				 msg->header.type == IPROTO_FETCH_SNAPSHOT ||
+				 msg->header.type == IPROTO_REGISTER ||
+				 msg->header.type == IPROTO_SUBSCRIBE;
+
+	if (is_replication_request) {
+		/**
+		 * Before processing a replication request, ensure that all
+		 * writing requests have been completely flushed to obuf.
+		 */
+		if (mempool_count(&iproto_thread->iproto_msg_pool) > 1 ||
+		    !iproto_flushed(con)) {
+			diag_set(ClientError, ER_PROTOCOL, "Can't process "
+				 "join/subscribe while there are pending requests");
+			goto error;
+		}
+		con->is_in_replication = true;
+	}
+
 	handler = mh_i32_find(handlers, type, NULL);
 	if (handler != mh_end(handlers)) {
 		assert(!con->is_in_replication);
@@ -3264,8 +3256,6 @@ net_send_msg(struct cmsg *m)
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
 
-	if (!con->is_in_replication)
-		con->active_writing_requests--;
 	iproto_msg_finish_processing_in_stream(msg);
 	if (msg->len != 0) {
 		/* Discard request (see iproto_enqueue_batch()). */
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index c512bba598..be84283c6a 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -102,6 +102,8 @@ struct errinj {
 	_(ERRINJ_IPROTO_DISABLE_ID, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_IPROTO_DISABLE_WATCH, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_IPROTO_FLIP_FEATURE, ERRINJ_INT, {.iparam = -1}) \
+	_(ERRINJ_IPROTO_FLUSH_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_IPROTO_SET_VERSION, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_IPROTO_TX_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\
diff --git a/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua b/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua
index b712d98190..f06bc38501 100644
--- a/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua
+++ b/test/box-luatest/gh_10155_make_iproto_resistant_to_misusage_test.lua
@@ -120,7 +120,47 @@ g.test_iproto_crash_on_subscribe = function(g)
         iproto_error_type(box.error.UNKNOWN_REQUEST_TYPE))
     request_type = socket_read(g.s)[key.REQUEST_TYPE]
     t.assert(request_type == type.OK or request_type ==
-        iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+        iproto_error_type(box.error.PROTOCOL))
+end
+
+-- The test is designed to check the behavior when, at the time of receiving a
+-- replication request, the responses to all previous requests are written to
+-- obuf but not flushed to the socket
+g.test_iproto_crash_on_subscribe_flush_delay = function(g)
+    t.tarantool.skip_if_not_debug()
+    local uuid = require('uuid').str()
+    local replicaset_uuid = g.server:eval('return box.info.replicaset.uuid')
+    -- You can't use lustest.server:exec here because it will block
+    write_eval(g.s, "box.error.injection.set('ERRINJ_IPROTO_FLUSH_DELAY', true)")
+    -- Several requests in a row to pretend to be a non-blocking client and
+    -- provoke iproto_connection_feed_input, so that subscribe is immediately
+    -- read
+    for _ = 1, 3 do
+        write_ok(g.s)
+    end
+    -- A short pause so that the packages do not end up in one batch and all
+    -- responses have time to be written in obuf
+    require('fiber').sleep(0.1)
+    write_subscribe(g.s, uuid, replicaset_uuid, true)
+    -- A short pause so that the tx thread has time to write the response to
+    -- the subscribe request to the socket
+    require('fiber').sleep(0.1)
+    g.server:exec(function()
+        box.error.injection.set('ERRINJ_IPROTO_FLUSH_DELAY', false)
+    end)
+    -- Read IPROTO_EVAL response
+    local request_type = socket_read(g.s)[key.REQUEST_TYPE]
+    t.assert(request_type == type.OK)
+    for _ = 1, 3 do
+        request_type = socket_read(g.s)[key.REQUEST_TYPE]
+        t.assert_equals(request_type,
+            iproto_error_type(box.error.UNKNOWN_REQUEST_TYPE))
+    end
+    request_type = socket_read(g.s)[key.REQUEST_TYPE]
+    t.assert(request_type == type.OK or request_type ==
+        iproto_error_type(box.error.PROTOCOL))
 end
 
 -- The case sends dummy OKs before subscribe
@@ -140,7 +180,7 @@ g.test_iproto_crash_on_subscribe_spam_ok = function(g)
     end
     request_type = socket_read(g.s)[key.REQUEST_TYPE]
     t.assert(request_type == type.OK or request_type ==
-        iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+        iproto_error_type(box.error.PROTOCOL))
 end
 
 -- The case simulates a situation where the user of anonymous replication
@@ -161,7 +201,7 @@ g.test_iproto_crash_fetch_snapshot_subscribe = function(g)
         iproto_error_type(box.error.UNKNOWN_REQUEST_TYPE))
     request_type = socket_read(g.s)[key.REQUEST_TYPE]
     t.assert(request_type == type.OK or request_type ==
-        iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+        iproto_error_type(box.error.PROTOCOL))
 end
 
 -- The same as above, but additionally forgot to pass is_anon option to
@@ -183,5 +223,5 @@ g.test_iproto_crash_fetch_snapshot_subscribe_not_anon = function(g)
     request_type = socket_read(g.s)[key.REQUEST_TYPE]
     t.assert(request_type ==
         iproto_error_type(box.error.TOO_EARLY_SUBSCRIBE) or request_type ==
-        iproto_error_type(box.error.UNABLE_TO_PROCESS_REPLICATION))
+        iproto_error_type(box.error.PROTOCOL))
 end
diff --git a/test/box/error.result b/test/box/error.result
index 12031f8fcb..506118065e 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -500,7 +500,6 @@ t;
  |   287: box.error.WAL_QUEUE_FULL
  |   288: box.error.INVALID_VCLOCK
  |   289: box.error.SYNC_QUEUE_FULL
- |   290: box.error.UNABLE_TO_PROCESS_REPLICATION
  | ...
 
 test_run:cmd("setopt delimiter ''");

@Astronomax Astronomax force-pushed the iproto_misusage_championship branch 2 times, most recently from 0307189 to c5630ef Compare November 12, 2024 12:49
@Astronomax Astronomax requested a review from locker November 12, 2024 13:05
@Astronomax Astronomax force-pushed the iproto_misusage_championship branch 3 times, most recently from 4ac12e1 to 30a0597 Compare November 14, 2024 05:46
@locker locker requested a review from Gerold103 November 18, 2024 11:18
@Astronomax Astronomax force-pushed the iproto_misusage_championship branch from 30a0597 to 44958bd Compare November 26, 2024 08:28
@Astronomax Astronomax assigned Gerold103 and locker and unassigned Astronomax Nov 26, 2024
@Astronomax Astronomax requested a review from locker November 26, 2024 08:59
@locker locker assigned sergepetrenko and unassigned locker Nov 28, 2024
Fixed an issue when the tarantool could be easily crashed using iproto
incorrectly, not according to the protocol.

Closes tarantool#10155

NO_DOC=bugfix
@Astronomax Astronomax force-pushed the iproto_misusage_championship branch from 44958bd to 5147fcd Compare November 28, 2024 12:57
Copy link
Collaborator

@sergepetrenko sergepetrenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix!

@sergepetrenko sergepetrenko removed the request for review from Gerold103 November 29, 2024 12:42
@sergepetrenko sergepetrenko added the full-ci Enables all tests for a pull request label Nov 29, 2024
@sergepetrenko sergepetrenko merged commit 45f42c7 into tarantool:master Nov 29, 2024
@sergepetrenko
Copy link
Collaborator

@Astronomax, please, prepare PRs with backports of this patch to 2.11 and 3.2 (3.3 wasn't released yet when I merged the patch, so backport to 3.3 isn't necessary).

@Astronomax
Copy link
Contributor Author

@sergepetrenko, prepared:
#10892
#10893

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full-ci Enables all tests for a pull request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tarantool is extremely easy to crash by IPROTO misusage

6 participants