Skip to content

Commit adc25bb

Browse files
Prevent federation links from restarting during node shutdown
or plugin shutdown, for that matter. With this guardrail in place, nodes with hundreds or thousands of federation links will avoid potentially significant shutdown delays that have to do with links being restarted while the node as a whole is preparing to shut down. Per discussion with @dcorbacho @ansd.
1 parent 9351b23 commit adc25bb

9 files changed

Lines changed: 81 additions & 3 deletions

deps/rabbitmq_exchange_federation/src/rabbit_exchange_federation_app.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
-include_lib("kernel/include/logger.hrl").
1414

1515
-behaviour(application).
16-
-export([start/2, stop/1]).
16+
-export([start/2, prep_stop/1, stop/1]).
1717

1818
%% Dummy supervisor - see Ulf Wiger's comment at
1919
%% http://erlang.org/pipermail/erlang-questions/2010-April/050508.html
@@ -37,6 +37,10 @@ start(_Type, _StartArgs) ->
3737
#{link_module => rabbit_federation_exchange_link_sup_sup}}),
3838
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
3939

40+
prep_stop(State) ->
41+
rabbit_federation_app_state:mark_as_shutting_down(),
42+
State.
43+
4044
stop(_State) ->
4145
ets:delete(?FEDERATION_ETS, rabbitmq_exchange_federation),
4246
rabbit_federation_pg:stop_scope(?FEDERATION_PG_SCOPE),

deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ start_link(Args) ->
6666
gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]).
6767

6868
init({Upstream, XName}) ->
69+
case rabbit_federation_app_state:is_shutting_down() of
70+
true ->
71+
ignore;
72+
false ->
73+
init_link({Upstream, XName})
74+
end.
75+
76+
init_link({Upstream, XName}) ->
6977
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION,
7078
exchange => XName}),
7179
%% If we are starting up due to a policy change then it's possible

deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link_sup_sup.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ start_link() ->
3131
%% and other places, so we have to start it very early outside of the supervision tree.
3232
%% The scope is stopped in stop/1.
3333
_ = rabbit_federation_pg:start_scope(?FEDERATION_PG_SCOPE),
34+
rabbit_federation_app_state:reset_shutting_down_marker(),
3435
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
3536
?MODULE, []).
3637

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% Tracks transient application state for federation plugins.
9+
%% Used to prevent link restarts during node shutdown.
10+
-module(rabbit_federation_app_state).
11+
12+
-export([is_shutting_down/0, mark_as_shutting_down/0, reset_shutting_down_marker/0]).
13+
14+
-define(APP, rabbitmq_federation_common).
15+
16+
-spec is_shutting_down() -> boolean().
17+
is_shutting_down() ->
18+
application:get_env(?APP, shutting_down, false).
19+
20+
-spec mark_as_shutting_down() -> ok.
21+
mark_as_shutting_down() ->
22+
application:set_env(?APP, shutting_down, true).
23+
24+
-spec reset_shutting_down_marker() -> ok.
25+
reset_shutting_down_marker() ->
26+
application:unset_env(?APP, shutting_down).

deps/rabbitmq_federation_common/test/unit_SUITE.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ all() -> [
1717
obfuscate_upstream,
1818
obfuscate_upstream_params_network,
1919
obfuscate_upstream_params_network_with_char_list_password_value,
20-
obfuscate_upstream_params_direct
20+
obfuscate_upstream_params_direct,
21+
shutdown_flag_defaults_to_false,
22+
shutdown_flag_can_be_set,
23+
shutdown_flag_can_be_cleared
2124
].
2225

2326
init_per_suite(Config) ->
@@ -63,3 +66,23 @@ obfuscate_upstream_params_network_with_char_list_password_value(_Config) ->
6366
ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(UpstreamParams),
6467
?assertEqual(UpstreamParams, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)),
6568
ok.
69+
70+
shutdown_flag_defaults_to_false(_Config) ->
71+
application:unset_env(rabbitmq_federation_common, shutting_down),
72+
?assertEqual(false, rabbit_federation_app_state:is_shutting_down()),
73+
ok.
74+
75+
shutdown_flag_can_be_set(_Config) ->
76+
application:unset_env(rabbitmq_federation_common, shutting_down),
77+
?assertEqual(false, rabbit_federation_app_state:is_shutting_down()),
78+
ok = rabbit_federation_app_state:mark_as_shutting_down(),
79+
?assertEqual(true, rabbit_federation_app_state:is_shutting_down()),
80+
application:unset_env(rabbitmq_federation_common, shutting_down),
81+
ok.
82+
83+
shutdown_flag_can_be_cleared(_Config) ->
84+
ok = rabbit_federation_app_state:mark_as_shutting_down(),
85+
?assertEqual(true, rabbit_federation_app_state:is_shutting_down()),
86+
ok = rabbit_federation_app_state:reset_shutting_down_marker(),
87+
?assertEqual(false, rabbit_federation_app_state:is_shutting_down()),
88+
ok.

deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ q(QName) ->
5454
%%----------------------------------------------------------------------------
5555

5656
init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
57+
case rabbit_federation_app_state:is_shutting_down() of
58+
true ->
59+
ignore;
60+
false ->
61+
init_link({Upstream, Queue})
62+
end.
63+
64+
init_link({Upstream, Queue}) when ?is_amqqueue(Queue) ->
5765
QName = amqqueue:get_name(Queue),
5866
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION,
5967
queue => QName}),

deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link_sup_sup.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ start_link() ->
3030
%% and other places, so we have to start it very early outside of the supervision tree.
3131
%% The scope is stopped in stop/1.
3232
_ = rabbit_federation_pg:start_scope(?FEDERATION_PG_SCOPE),
33+
rabbit_federation_app_state:reset_shutting_down_marker(),
3334
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
3435
?MODULE, []).
3536

deps/rabbitmq_queue_federation/src/rabbit_queue_federation_app.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
-include("rabbit_queue_federation.hrl").
1313

1414
-behaviour(application).
15-
-export([start/2, stop/1]).
15+
-export([start/2, prep_stop/1, stop/1]).
1616

1717
%% Dummy supervisor - see Ulf Wiger's comment at
1818
%% http://erlang.org/pipermail/erlang-questions/2010-April/050508.html
@@ -36,6 +36,10 @@ start(_Type, _StartArgs) ->
3636
#{link_module => rabbit_federation_queue_link_sup_sup}}),
3737
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
3838

39+
prep_stop(State) ->
40+
rabbit_federation_app_state:mark_as_shutting_down(),
41+
State.
42+
3943
stop(_State) ->
4044
ets:delete(?FEDERATION_ETS, rabbitmq_queue_federation),
4145
rabbit_federation_pg:stop_scope(?FEDERATION_PG_SCOPE),

deps/rabbitmq_queue_federation/test/queue_SUITE.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ end_per_group(_, Config) ->
146146
rabbit_ct_client_helpers:teardown_steps() ++
147147
rabbit_ct_broker_helpers:teardown_steps()).
148148

149+
init_per_testcase(dynamic_plugin_stop_start = Testcase, Config) ->
150+
ct:timetrap({seconds, 90}),
151+
rabbit_ct_helpers:testcase_started(Config, Testcase);
149152
init_per_testcase(Testcase, Config) ->
150153
rabbit_ct_helpers:testcase_started(Config, Testcase).
151154

0 commit comments

Comments
 (0)