|
45 | 45 | -export([restart_stream/3, |
46 | 46 | add_replica/3, |
47 | 47 | delete_replica/3, |
48 | | - delete_all_replicas/1]). |
| 48 | + delete_all_replicas/1, |
| 49 | + grow/4]). |
49 | 50 | -export([format_osiris_event/2]). |
50 | 51 | -export([update_stream_conf/2]). |
51 | 52 | -export([readers/1]). |
@@ -1083,24 +1084,100 @@ delete_replica(VHost, Name, Node) -> |
1083 | 1084 | E |
1084 | 1085 | end. |
1085 | 1086 |
|
| 1087 | +-spec delete_all_replicas(node()) -> |
| 1088 | + [{rabbit_amqqueue:name(), |
| 1089 | + {ok, pos_integer()} | {error, pos_integer(), term()}}]. |
1086 | 1090 | delete_all_replicas(Node) -> |
1087 | 1091 | ?LOG_INFO("Asked to remove all stream replicas from node ~ts", [Node]), |
1088 | 1092 | Streams = rabbit_amqqueue:list_stream_queues_on(Node), |
1089 | | - lists:map(fun(Q) -> |
1090 | | - QName = amqqueue:get_name(Q), |
1091 | | - ?LOG_INFO("~ts: removing replica on node ~w", |
1092 | | - [rabbit_misc:rs(QName), Node]), |
1093 | | - #{name := StreamId} = amqqueue:get_type_state(Q), |
1094 | | - {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node), |
1095 | | - case Reply of |
1096 | | - ok -> |
1097 | | - {QName, ok}; |
1098 | | - Err -> |
1099 | | - ?LOG_WARNING("~ts: failed to remove replica on node ~w, error: ~w", |
1100 | | - [rabbit_misc:rs(QName), Node, Err]), |
1101 | | - {QName, {error, Err}} |
1102 | | - end |
1103 | | - end, Streams). |
| 1093 | + [delete_replica_with_retry(Q, Node) || Q <- Streams]. |
| 1094 | + |
| 1095 | +delete_replica_with_retry(Q, Node) -> |
| 1096 | + QName = amqqueue:get_name(Q), |
| 1097 | + #{name := StreamId} = amqqueue:get_type_state(Q), |
| 1098 | + Size = length(get_nodes(Q)), |
| 1099 | + ?LOG_INFO("~ts: removing replica on node ~w", [rabbit_misc:rs(QName), Node]), |
| 1100 | + case rabbit_stream_coordinator:delete_replica(StreamId, Node) of |
| 1101 | + {ok, ok, _} -> |
| 1102 | + {QName, {ok, Size - 1}}; |
| 1103 | + {ok, cluster_change_not_permitted, _} -> |
| 1104 | + ?LOG_INFO("~ts: cluster change not permitted, retrying in 500ms", |
| 1105 | + [rabbit_misc:rs(QName)]), |
| 1106 | + timer:sleep(500), |
| 1107 | + case rabbit_stream_coordinator:delete_replica(StreamId, Node) of |
| 1108 | + {ok, ok, _} -> |
| 1109 | + {QName, {ok, Size - 1}}; |
| 1110 | + {ok, Err, _} -> |
| 1111 | + ?LOG_WARNING("~ts: failed to remove replica on ~w: ~w", |
| 1112 | + [rabbit_misc:rs(QName), Node, Err]), |
| 1113 | + {QName, {error, Size, Err}}; |
| 1114 | + {error, Err} -> |
| 1115 | + ?LOG_WARNING("~ts: failed to remove replica on ~w: ~w", |
| 1116 | + [rabbit_misc:rs(QName), Node, Err]), |
| 1117 | + {QName, {error, Size, Err}} |
| 1118 | + end; |
| 1119 | + {ok, Err, _} -> |
| 1120 | + ?LOG_WARNING("~ts: failed to remove replica on ~w: ~w", |
| 1121 | + [rabbit_misc:rs(QName), Node, Err]), |
| 1122 | + {QName, {error, Size, Err}}; |
| 1123 | + {error, Err} -> |
| 1124 | + ?LOG_WARNING("~ts: failed to remove replica on ~w: ~w", |
| 1125 | + [rabbit_misc:rs(QName), Node, Err]), |
| 1126 | + {QName, {error, Size, Err}} |
| 1127 | + end. |
| 1128 | + |
| 1129 | +-spec grow(node(), binary(), binary(), all | even) -> |
| 1130 | + [{rabbit_amqqueue:name(), |
| 1131 | + {ok, pos_integer()} | {error, pos_integer(), term()}}]. |
| 1132 | +grow(Node, VhostSpec, QueueSpec, Strategy) -> |
| 1133 | + ?LOG_INFO("Adding stream replicas on node ~ts", [Node]), |
| 1134 | + Running = rabbit_nodes:list_running(), |
| 1135 | + Streams = [Q || Q <- rabbit_amqqueue:list(), |
| 1136 | + amqqueue:get_type(Q) == ?MODULE, |
| 1137 | + begin |
| 1138 | + Nodes = get_nodes(Q), |
| 1139 | + not lists:member(Node, Nodes) andalso |
| 1140 | + matches_strategy(Strategy, Nodes) |
| 1141 | + end, |
| 1142 | + lists:member(Node, Running), |
| 1143 | + is_match(amqqueue:get_vhost(Q), VhostSpec), |
| 1144 | + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], |
| 1145 | + [add_replica_with_retry(Q, Node) || Q <- Streams]. |
| 1146 | + |
| 1147 | +add_replica_with_retry(Q, Node) -> |
| 1148 | + QName = amqqueue:get_name(Q), |
| 1149 | + Size = length(get_nodes(Q)), |
| 1150 | + ?LOG_INFO("~ts: adding replica on node ~w", [rabbit_misc:rs(QName), Node]), |
| 1151 | + case rabbit_stream_coordinator:add_replica(Q, Node) of |
| 1152 | + ok -> |
| 1153 | + {QName, {ok, Size + 1}}; |
| 1154 | + {error, cluster_change_not_permitted} -> |
| 1155 | + ?LOG_INFO("~ts: cluster change not permitted (another member is being added), retrying in 500ms", |
| 1156 | + [rabbit_misc:rs(QName)]), |
| 1157 | + timer:sleep(500), |
| 1158 | + case rabbit_stream_coordinator:add_replica(Q, Node) of |
| 1159 | + ok -> |
| 1160 | + {QName, {ok, Size + 1}}; |
| 1161 | + {error, Err} -> |
| 1162 | + ?LOG_WARNING("~ts: failed to add replica on ~w: ~w", |
| 1163 | + [rabbit_misc:rs(QName), Node, Err]), |
| 1164 | + {QName, {error, Size, Err}} |
| 1165 | + end; |
| 1166 | + {error, Err} -> |
| 1167 | + ?LOG_WARNING("~ts: failed to add replica on ~w: ~w", |
| 1168 | + [rabbit_misc:rs(QName), Node, Err]), |
| 1169 | + {QName, {error, Size, Err}} |
| 1170 | + end. |
| 1171 | + |
| 1172 | +matches_strategy(all, _) -> true; |
| 1173 | +matches_strategy(even, Members) -> |
| 1174 | + length(Members) rem 2 == 0. |
| 1175 | + |
| 1176 | +is_match(Subj, E) -> |
| 1177 | + nomatch /= re:run(Subj, E). |
| 1178 | + |
| 1179 | +get_resource_name(#resource{name = Name}) -> |
| 1180 | + Name. |
1104 | 1181 |
|
1105 | 1182 | make_stream_conf(Q) -> |
1106 | 1183 | QName = amqqueue:get_name(Q), |
|
0 commit comments