Skip to content

Commit fa753a1

Browse files
Be more defensive against duplicate JSON keys when management stats are enabled
`rabbit_queue_type:format/2` now conditionally emits policy fields only when management stats are disabled: the stats augmentation code path provides them. 1. `format/2` now conditionally emits policy fields based on whether the management plugin stats are enabled 2. `combine/2` acts as the next layer of protection against duplicate keys: it drops the `format/2`-originating keys that are duplicates 3. `prepare_for_encoding/1` is the new name for `format_nulls/1` that reflects its expanded scope: another deduplication pass References #15182.
1 parent a8fb664 commit fa753a1

11 files changed

Lines changed: 91 additions & 39 deletions

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ stat(Q) ->
264264
{gen_server2, call, [stat, infinity]}).
265265

266266

267-
format(Q, _Ctx) when ?is_amqqueue(Q) ->
267+
format(Q, Ctx) when ?is_amqqueue(Q) ->
268268
State = case amqqueue:get_state(Q) of
269269
live ->
270270
running;
@@ -273,10 +273,18 @@ format(Q, _Ctx) when ?is_amqqueue(Q) ->
273273
end,
274274
[{type, rabbit_queue_type:short_alias_of(?MODULE)},
275275
{state, State},
276-
{node, node(amqqueue:get_pid(Q))},
277-
{policy, i(policy, Q)},
278-
{operator_policy, i(operator_policy, Q)},
279-
{effective_policy_definition, i(effective_policy_definition, Q)}].
276+
{node, node(amqqueue:get_pid(Q))}
277+
| format_policy_fields(Q, Ctx)].
278+
279+
format_policy_fields(Q, Ctx) ->
280+
case maps:get(management_stats_disabled, Ctx, true) of
281+
true ->
282+
[{policy, i(policy, Q)},
283+
{operator_policy, i(operator_policy, Q)},
284+
{effective_policy_definition, i(effective_policy_definition, Q)}];
285+
false ->
286+
[]
287+
end.
280288

281289
-spec init(amqqueue:amqqueue()) -> {ok, state()}.
282290
init(Q) when ?amqqueue_is_classic(Q) ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,11 +2154,19 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
21542154
{node, LeaderNode},
21552155
{members, Nodes},
21562156
{leader, LeaderNode},
2157-
{online, Online},
2158-
{policy, i(policy, Q)},
2159-
{operator_policy, i(operator_policy, Q)},
2160-
{effective_policy_definition, i(effective_policy_definition, Q)},
2161-
{delivery_limit, i(delivery_limit, Q)}].
2157+
{online, Online}
2158+
| format_policy_fields(Q, Ctx)].
2159+
2160+
format_policy_fields(Q, Ctx) ->
2161+
case maps:get(management_stats_disabled, Ctx, true) of
2162+
true ->
2163+
[{policy, i(policy, Q)},
2164+
{operator_policy, i(operator_policy, Q)},
2165+
{effective_policy_definition, i(effective_policy_definition, Q)},
2166+
{delivery_limit, i(delivery_limit, Q)}];
2167+
false ->
2168+
[]
2169+
end.
21622170

21632171
-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().
21642172

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,16 +303,22 @@ format(Q, Ctx) ->
303303
{leader, LeaderNode},
304304
{online, Online},
305305
{members, Nodes},
306-
{node, node(Pid)},
307-
{policy, i(policy, Q)},
308-
{operator_policy, i(operator_policy, Q)},
309-
{effective_policy_definition, i(effective_policy_definition, Q)}];
306+
{node, node(Pid)}
307+
| format_policy_fields(Q, Ctx)];
310308
_ ->
311309
[{type, rabbit_queue_type:short_alias_of(?MODULE)},
312-
{state, down},
313-
{policy, i(policy, Q)},
310+
{state, down}
311+
| format_policy_fields(Q, Ctx)]
312+
end.
313+
314+
format_policy_fields(Q, Ctx) ->
315+
case maps:get(management_stats_disabled, Ctx, true) of
316+
true ->
317+
[{policy, i(policy, Q)},
314318
{operator_policy, i(operator_policy, Q)},
315-
{effective_policy_definition, i(effective_policy_definition, Q)}]
319+
{effective_policy_definition, i(effective_policy_definition, Q)}];
320+
false ->
321+
[]
316322
end.
317323

318324
consume(Q, #{mode := {simple_prefetch, 0}}, _)

deps/rabbit_common/src/rabbit_json.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ encode(Term) ->
4646
encode(Term, Opts) ->
4747
%% Fixup for JSON encoding
4848
%% * Transforms any Funs into strings
49-
%% See rabbit_mgmt_format:format_nulls/1
49+
%% See rabbit_mgmt_format:prepare_for_encoding/1
5050
F = fun
5151
(V) when is_function(V) ->
5252
rabbit_data_coercion:to_binary(V);

deps/rabbitmq_management/src/rabbit_mgmt_db.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -665,10 +665,13 @@ node_stats(Ranges, Objs, Interval) ->
665665
end || Obj <- Objs].
666666

667667
combine(New, Old) ->
668+
NewKeys = [K || {K, _} <- New],
668669
case pget(state, Old) of
669-
unknown -> New ++ Old;
670-
live -> New ++ delete_keys([state, online], Old);
671-
_ -> lists:keydelete(state, 1, New) ++ Old
670+
unknown -> New ++ delete_keys(NewKeys, Old);
671+
live -> New ++ delete_keys([state, online | NewKeys], Old);
672+
_ ->
673+
NewKeysNoState = lists:delete(state, NewKeys),
674+
lists:keydelete(state, 1, New) ++ delete_keys(NewKeysNoState, Old)
672675
end.
673676

674677
delete_keys(Keys, List) ->

deps/rabbitmq_management/src/rabbit_mgmt_oauth_bootstrap.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ validate_auth_mechanism({Type, <<"basic">>}, _AuthSettings) ->
103103
validate_auth_mechanism({_, _}, _AuthSettings) -> {error, unknown_auth_mechanism}.
104104

105105
set_oauth_settings(AuthSettings) ->
106-
JsonAuthSettings = rabbit_json:encode(rabbit_mgmt_format:format_nulls(AuthSettings)),
106+
JsonAuthSettings = rabbit_json:encode(rabbit_mgmt_format:prepare_for_encoding(AuthSettings)),
107107
["set_oauth_settings(", JsonAuthSettings, ");"].
108108

109109
set_token_auth(AuthSettings, Req0) ->

deps/rabbitmq_management/src/rabbit_mgmt_util.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ reply0(Facts, ReqData, Context) ->
267267
{<<"application">>, <<"bert">>, _} ->
268268
{term_to_binary(Facts), ReqData1, Context};
269269
_ ->
270-
{rabbit_json:encode(rabbit_mgmt_format:format_nulls(Facts)),
270+
{rabbit_json:encode(rabbit_mgmt_format:prepare_for_encoding(Facts)),
271271
ReqData1, Context}
272272
end
273273
catch exit:{json_encode, E} ->

deps/rabbitmq_management/src/rabbit_mgmt_wm_queue.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,19 @@ resource_exists(ReqData, Context) ->
3939

4040
to_json(ReqData, Context) ->
4141
try
42-
case rabbit_mgmt_util:disable_stats(ReqData) of
42+
StatsDisabled = rabbit_mgmt_util:disable_stats(ReqData),
43+
case StatsDisabled of
4344
false ->
4445
[Q] = rabbit_mgmt_db:augment_queues(
45-
[queue(ReqData)], rabbit_mgmt_util:range_ceil(ReqData),
46+
[queue(ReqData, StatsDisabled)],
47+
rabbit_mgmt_util:range_ceil(ReqData),
4648
full),
4749
Payload = rabbit_mgmt_format:clean_consumer_details(
4850
rabbit_mgmt_format:strip_pids(Q)),
4951
rabbit_mgmt_util:reply(ensure_defaults(Payload), ReqData, Context);
5052
true ->
5153
Q = case rabbit_mgmt_util:enable_queue_totals(ReqData) of
52-
false -> queue(ReqData);
54+
false -> queue(ReqData, StatsDisabled);
5355
true -> queue_with_totals(ReqData)
5456
end,
5557
rabbit_mgmt_util:reply(
@@ -108,15 +110,20 @@ ensure_defaults(Payload0) ->
108110
end.
109111

110112
queue(ReqData) ->
113+
queue(ReqData, rabbit_mgmt_util:disable_stats(ReqData)).
114+
115+
queue(ReqData, StatsDisabled) ->
111116
case rabbit_mgmt_util:vhost(ReqData) of
112117
not_found -> not_found;
113-
VHost -> queue(VHost, rabbit_mgmt_util:id(queue, ReqData))
118+
VHost ->
119+
Ctx = #{management_stats_disabled => StatsDisabled},
120+
queue(VHost, rabbit_mgmt_util:id(queue, ReqData), Ctx)
114121
end.
115122

116-
queue(VHost, QName) ->
123+
queue(VHost, QName, Ctx) ->
117124
Name = rabbit_misc:r(VHost, queue, QName),
118125
case rabbit_amqqueue:lookup(Name) of
119-
{ok, Q} -> rabbit_mgmt_format:queue(Q);
126+
{ok, Q} -> rabbit_mgmt_format:queue(Q, Ctx);
120127
{error, not_found} -> not_found
121128
end.
122129

deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,11 @@ basic(ReqData) ->
7676
%% a cluster wide query with a reasonably long (10s) timeout.
7777
%% TODO: replace with faster approximate function
7878
Running = rabbit_nodes:list_running(),
79-
Ctx = #{running_nodes => Running},
79+
StatsDisabled = rabbit_mgmt_util:disable_stats(ReqData),
80+
Ctx = #{running_nodes => Running,
81+
management_stats_disabled => StatsDisabled},
8082
FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end,
81-
case rabbit_mgmt_util:disable_stats(ReqData) of
83+
case StatsDisabled of
8284
false ->
8385
list_queues(ReqData, Running, FmtQ, FmtQ);
8486
true ->

deps/rabbitmq_management/src/rabbit_mgmt_wm_user_queues.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ basic(ReqData) ->
7070
%% a cluster wide query with a reasonably long (10s) timeout.
7171
%% TODO: replace with faster approximate function
7272
Running = rabbit_nodes:list_running(),
73-
Ctx = #{running_nodes => Running},
73+
StatsDisabled = rabbit_mgmt_util:disable_stats(ReqData),
74+
Ctx = #{running_nodes => Running,
75+
management_stats_disabled => StatsDisabled},
7476
FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end,
7577
User = rabbit_mgmt_util:id(user, ReqData),
7678
list_queues(ReqData, Running, FmtQ, FmtQ, User).

0 commit comments

Comments
 (0)