Skip to content

Commit 66abb3a

Browse files
committed
rabbit_db_exchange: Always use a transaction to bump exchange serials
[Why] Initially, I thought a transaction would be too much for a simple counter. Therefore I use a regular `get`, plus a `put` with a condition on the tree node payload version to ensure the put would only succeed of the read value was still the one in the store. Unfortunately this technic is very inefficient with a lot of concurrency. That's because all processes will read the same value and will all commit a `put`, but only one of them will succeed. After that, all but on will retry the same read/modify/write with the same poor result. In the end, all processes are spamming `put` commands and they make very little progress, loading the store in the process. [How] This patch goes back to a simple transaction. This solves the contention issue.
1 parent d563c07 commit 66abb3a

1 file changed

Lines changed: 9 additions & 20 deletions

File tree

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -535,36 +535,25 @@ next_serial_in_mnesia_tx(XName) ->
535535
Serial.
536536

537537
next_serial_in_khepri(XName) ->
538-
%% Just storing the serial number is enough, no need to keep #exchange_serial{}
539-
Path = khepri_exchange_serial_path(XName),
540-
Ret1 = rabbit_khepri:adv_get(Path),
541-
case Ret1 of
542-
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
543-
UpdatePath =
544-
khepri_path:combine_with_conditions(
545-
Path, [#if_payload_version{version = Vsn}]),
546-
case rabbit_khepri:put(UpdatePath, Serial + 1, #{timeout => infinity}) of
547-
ok ->
548-
Serial;
549-
{error, {khepri, mismatching_node, _}} ->
550-
next_serial_in_khepri(XName)
551-
end;
552-
_ ->
553-
Serial = 1,
554-
ok = rabbit_khepri:put(Path, Serial + 1, #{timeout => infinity}),
555-
Serial
556-
end.
538+
Serial = rabbit_khepri:transaction(
539+
fun() ->
540+
next_serial_in_khepri_tx(XName)
541+
end, rw),
542+
Serial.
557543

558544
-spec next_serial_in_khepri_tx(Exchange) -> Serial when
559-
Exchange :: rabbit_types:exchange(),
545+
Exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
560546
Serial :: integer().
561547

562548
next_serial_in_khepri_tx(#exchange{name = XName}) ->
549+
next_serial_in_khepri_tx(XName);
550+
next_serial_in_khepri_tx(XName) ->
563551
Path = khepri_exchange_serial_path(XName),
564552
Serial = case khepri_tx:get(Path) of
565553
{ok, Serial0} -> Serial0;
566554
_ -> 1
567555
end,
556+
%% Just storing the serial number is enough, no need to keep #exchange_serial{}
568557
ok = khepri_tx:put(Path, Serial + 1),
569558
Serial.
570559

0 commit comments

Comments
 (0)