Skip to content

Commit 607c3ec

Browse files
Introduce rabbitmq-stream equivalents of 'rabbitmq-queues {grow,shrink}'
1 parent dd96339 commit 607c3ec

9 files changed

Lines changed: 863 additions & 20 deletions

File tree

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
-export([restart_stream/3,
4646
add_replica/3,
4747
delete_replica/3,
48-
delete_all_replicas/1]).
48+
delete_all_replicas/1,
49+
grow/4]).
4950
-export([format_osiris_event/2]).
5051
-export([update_stream_conf/2]).
5152
-export([readers/1]).
@@ -1083,24 +1084,100 @@ delete_replica(VHost, Name, Node) ->
10831084
E
10841085
end.
10851086

1087+
-spec delete_all_replicas(node()) ->
1088+
[{rabbit_amqqueue:name(),
1089+
{ok, pos_integer()} | {error, pos_integer(), term()}}].
10861090
delete_all_replicas(Node) ->
10871091
?LOG_INFO("Asked to remove all stream replicas from node ~ts", [Node]),
10881092
Streams = rabbit_amqqueue:list_stream_queues_on(Node),
1089-
lists:map(fun(Q) ->
1090-
QName = amqqueue:get_name(Q),
1091-
?LOG_INFO("~ts: removing replica on node ~w",
1092-
[rabbit_misc:rs(QName), Node]),
1093-
#{name := StreamId} = amqqueue:get_type_state(Q),
1094-
{ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
1095-
case Reply of
1096-
ok ->
1097-
{QName, ok};
1098-
Err ->
1099-
?LOG_WARNING("~ts: failed to remove replica on node ~w, error: ~w",
1100-
[rabbit_misc:rs(QName), Node, Err]),
1101-
{QName, {error, Err}}
1102-
end
1103-
end, Streams).
1093+
[delete_replica_with_retry(Q, Node) || Q <- Streams].
1094+
1095+
delete_replica_with_retry(Q, Node) ->
1096+
QName = amqqueue:get_name(Q),
1097+
#{name := StreamId} = amqqueue:get_type_state(Q),
1098+
Size = length(get_nodes(Q)),
1099+
?LOG_INFO("~ts: removing replica on node ~w", [rabbit_misc:rs(QName), Node]),
1100+
case rabbit_stream_coordinator:delete_replica(StreamId, Node) of
1101+
{ok, ok, _} ->
1102+
{QName, {ok, Size - 1}};
1103+
{ok, cluster_change_not_permitted, _} ->
1104+
?LOG_INFO("~ts: cluster change not permitted, retrying in 500ms",
1105+
[rabbit_misc:rs(QName)]),
1106+
timer:sleep(500),
1107+
case rabbit_stream_coordinator:delete_replica(StreamId, Node) of
1108+
{ok, ok, _} ->
1109+
{QName, {ok, Size - 1}};
1110+
{ok, Err, _} ->
1111+
?LOG_WARNING("~ts: failed to remove replica on ~w: ~w",
1112+
[rabbit_misc:rs(QName), Node, Err]),
1113+
{QName, {error, Size, Err}};
1114+
{error, Err} ->
1115+
?LOG_WARNING("~ts: failed to remove replica on ~w: ~w",
1116+
[rabbit_misc:rs(QName), Node, Err]),
1117+
{QName, {error, Size, Err}}
1118+
end;
1119+
{ok, Err, _} ->
1120+
?LOG_WARNING("~ts: failed to remove replica on ~w: ~w",
1121+
[rabbit_misc:rs(QName), Node, Err]),
1122+
{QName, {error, Size, Err}};
1123+
{error, Err} ->
1124+
?LOG_WARNING("~ts: failed to remove replica on ~w: ~w",
1125+
[rabbit_misc:rs(QName), Node, Err]),
1126+
{QName, {error, Size, Err}}
1127+
end.
1128+
1129+
-spec grow(node(), binary(), binary(), all | even) ->
1130+
[{rabbit_amqqueue:name(),
1131+
{ok, pos_integer()} | {error, pos_integer(), term()}}].
1132+
grow(Node, VhostSpec, QueueSpec, Strategy) ->
1133+
?LOG_INFO("Adding stream replicas on node ~ts", [Node]),
1134+
Running = rabbit_nodes:list_running(),
1135+
Streams = [Q || Q <- rabbit_amqqueue:list(),
1136+
amqqueue:get_type(Q) == ?MODULE,
1137+
begin
1138+
Nodes = get_nodes(Q),
1139+
not lists:member(Node, Nodes) andalso
1140+
matches_strategy(Strategy, Nodes)
1141+
end,
1142+
lists:member(Node, Running),
1143+
is_match(amqqueue:get_vhost(Q), VhostSpec),
1144+
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
1145+
[add_replica_with_retry(Q, Node) || Q <- Streams].
1146+
1147+
add_replica_with_retry(Q, Node) ->
1148+
QName = amqqueue:get_name(Q),
1149+
Size = length(get_nodes(Q)),
1150+
?LOG_INFO("~ts: adding replica on node ~w", [rabbit_misc:rs(QName), Node]),
1151+
case rabbit_stream_coordinator:add_replica(Q, Node) of
1152+
ok ->
1153+
{QName, {ok, Size + 1}};
1154+
{error, cluster_change_not_permitted} ->
1155+
?LOG_INFO("~ts: cluster change not permitted (another member is being added), retrying in 500ms",
1156+
[rabbit_misc:rs(QName)]),
1157+
timer:sleep(500),
1158+
case rabbit_stream_coordinator:add_replica(Q, Node) of
1159+
ok ->
1160+
{QName, {ok, Size + 1}};
1161+
{error, Err} ->
1162+
?LOG_WARNING("~ts: failed to add replica on ~w: ~w",
1163+
[rabbit_misc:rs(QName), Node, Err]),
1164+
{QName, {error, Size, Err}}
1165+
end;
1166+
{error, Err} ->
1167+
?LOG_WARNING("~ts: failed to add replica on ~w: ~w",
1168+
[rabbit_misc:rs(QName), Node, Err]),
1169+
{QName, {error, Size, Err}}
1170+
end.
1171+
1172+
matches_strategy(all, _) -> true;
1173+
matches_strategy(even, Members) ->
1174+
length(Members) rem 2 == 0.
1175+
1176+
is_match(Subj, E) ->
1177+
nomatch /= re:run(Subj, E).
1178+
1179+
get_resource_name(#resource{name = Name}) ->
1180+
Name.
11041181

11051182
make_stream_conf(Q) ->
11061183
QName = amqqueue:get_name(Q),

deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ init_per_suite(Config) ->
3030
case rabbit_ct_helpers:is_mixed_versions() of
3131
false ->
3232
rabbit_ct_helpers:log_environment(),
33-
rabbit_ct_helpers:run_setup_steps(Config);
33+
Config1 = rabbit_ct_helpers:run_setup_steps(Config),
34+
rabbit_ct_helpers:ensure_rabbitmq_queues_cmd(Config1);
3435
_ ->
3536
{skip, "growing and shrinking cannot be done in mixed mode"}
3637
end.
@@ -53,8 +54,7 @@ end_per_group(tests, Config) ->
5354
rabbit_ct_broker_helpers:teardown_steps()).
5455

5556
init_per_testcase(Testcase, Config0) ->
56-
rabbit_ct_helpers:ensure_rabbitmq_queues_cmd(
57-
rabbit_ct_helpers:testcase_started(Config0, Testcase)).
57+
rabbit_ct_helpers:testcase_started(Config0, Testcase).
5858

5959
end_per_testcase(Testcase, Config0) ->
6060
rabbit_ct_helpers:testcase_finished(Config0, Testcase).

0 commit comments

Comments
 (0)