Skip to content

Commit 0fcd5fa

Browse files
guci314claude
andcommitted
feat(plugin): add rate-limit-circuit-breaker to prevent death loops in group chats
When multiple agents share a group chat with requireMention:false, a rate-limit error surfaced by one agent triggers other agents to respond, causing them to also hit rate limits in an infinite cascade. This plugin detects consecutive rate-limit error messages per room and suppresses them using a circuit breaker pattern with exponential backoff. Verified in production: a 5-agent Matrix group hit a 13+ minute death loop (146 surface_error events) before this fix. After deployment, zero surface_error events with the same rate limit conditions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ab9be8d commit 0fcd5fa

6 files changed

Lines changed: 625 additions & 0 deletions

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Rate Limit Circuit Breaker
2+
3+
Prevents death loops in multi-agent group chats (Matrix, Discord, etc.) when LLM API rate limits are hit.
4+
5+
## Problem
6+
7+
When multiple agents share a group chat with `requireMention: false`, a single rate-limit error can cascade into an infinite loop:
8+
9+
1. Agent A receives a message → calls LLM API → gets rate-limited (429)
10+
2. Gateway surfaces the error as a chat message: "API rate limit reached"
11+
3. Agent B sees this message → tries to respond → also gets rate-limited
12+
4. Repeat indefinitely
13+
14+
This loop is self-sustaining because the error messages themselves trigger more API calls.
15+
16+
## Solution
17+
18+
This plugin hooks into `message_sending` to detect and suppress repeated rate-limit error messages using a circuit breaker pattern:
19+
20+
- **CLOSED**: Normal operation. Counts consecutive rate-limit errors per room.
21+
- **OPEN**: After N consecutive errors (default: 3), suppresses further error messages for a cooldown period. Normal messages still flow through.
22+
- **HALF_OPEN**: After cooldown expires, allows one error message through as a retry probe. If the next message is normal (non-error), the circuit fully resets. If another error occurs, the circuit re-opens with doubled cooldown.
23+
24+
Exponential backoff ensures the cooldown grows (60s → 120s → 240s → ... up to 10 minutes) if the rate limit persists.
25+
26+
## Configuration
27+
28+
```json
29+
{
30+
"plugins": {
31+
"entries": {
32+
"rate-limit-circuit-breaker": {
33+
"enabled": true,
34+
"config": {
35+
"maxConsecutiveErrors": 3,
36+
"baseCooldownMs": 60000,
37+
"maxCooldownMs": 600000
38+
}
39+
}
40+
}
41+
}
42+
}
43+
```
44+
45+
| Option | Default | Description |
46+
|--------|---------|-------------|
47+
| `maxConsecutiveErrors` | 3 | Consecutive rate-limit errors before circuit opens |
48+
| `baseCooldownMs` | 60000 | Base cooldown (60s), doubles each trip |
49+
| `maxCooldownMs` | 600000 | Maximum cooldown cap (10 minutes) |
50+
51+
## Recommended companion config
52+
53+
For best results, also configure fallback models so rate limits trigger model failover before reaching `surface_error`:
54+
55+
```json
56+
{
57+
"agents": {
58+
"defaults": {
59+
"model": {
60+
"primary": "provider/model-a",
61+
"fallbacks": ["provider/model-b", "provider/model-c"]
62+
}
63+
}
64+
}
65+
}
66+
```
67+
68+
With fallback models, rate limits are handled by model switching (first line of defense). The circuit breaker acts as a second line of defense for when all models are exhausted.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Rate Limit Circuit Breaker Plugin for OpenClaw
3+
*
4+
* Prevents death loops in multi-agent group chats (Matrix, Discord, etc.)
5+
* where a rate-limit error surfaced by one agent triggers other agents to
6+
* respond, causing them to also hit rate limits in an infinite cascade.
7+
*
8+
* Mechanism:
9+
* 1. Hooks into `message_sending` to inspect every outgoing message
10+
* 2. Detects rate-limit/overload error messages by pattern matching
11+
* 3. Tracks consecutive rate-limit errors per room (channel + target)
12+
* 4. After N consecutive errors, opens the circuit breaker:
13+
* - Suppresses further error messages for a cooldown period
14+
* - Uses exponential backoff on repeated trips
15+
* 5. After cooldown, allows one retry (half-open state)
16+
* 6. On success (non-error message), fully resets the circuit
17+
*/
18+
19+
import { RateLimitCircuitBreaker } from "./src/circuit-breaker.js";
20+
21+
// Singleton — shared across all hooks for the lifetime of the gateway process
22+
let breaker: RateLimitCircuitBreaker | null = null;
23+
24+
// Periodic cleanup interval handle
25+
let cleanupInterval: ReturnType<typeof setInterval> | null = null;
26+
27+
export default function register(api: any) {
28+
const pluginConfig = api.pluginConfig ?? {};
29+
const logger = api.logger ?? { warn: console.warn, debug: undefined };
30+
31+
breaker = new RateLimitCircuitBreaker(
32+
{
33+
maxConsecutiveErrors: pluginConfig.maxConsecutiveErrors ?? 3,
34+
baseCooldownMs: pluginConfig.baseCooldownMs ?? 60_000,
35+
maxCooldownMs: pluginConfig.maxCooldownMs ?? 600_000,
36+
},
37+
{
38+
warn: (msg: string) => logger.warn(msg),
39+
debug: logger.debug ? (msg: string) => logger.debug!(msg) : undefined,
40+
},
41+
);
42+
43+
// --- message_sending hook: intercept outgoing messages ---
44+
api.on(
45+
"message_sending",
46+
(
47+
event: { to: string; content: string; metadata?: Record<string, unknown> },
48+
ctx: { channelId: string; accountId?: string; conversationId?: string },
49+
) => {
50+
if (!breaker || !event.content) return;
51+
52+
const channelId = ctx.channelId ?? (event.metadata?.channel as string) ?? "unknown";
53+
const to = event.to ?? "";
54+
55+
if (!to) return;
56+
57+
const suppress = breaker.shouldSuppress(channelId, to, event.content);
58+
if (suppress) {
59+
return { cancel: true };
60+
}
61+
// Allow the message through (no modification)
62+
return undefined;
63+
},
64+
{ priority: -100 }, // Run early so we can cancel before other hooks process
65+
);
66+
67+
// --- gateway_start hook: set up periodic cleanup ---
68+
api.on("gateway_start", () => {
69+
// Clean up stale circuit breaker entries every 30 minutes
70+
cleanupInterval = setInterval(() => {
71+
breaker?.cleanup(3_600_000); // 1 hour max age
72+
}, 30 * 60 * 1000);
73+
});
74+
75+
// --- gateway_stop hook: teardown ---
76+
api.on("gateway_stop", () => {
77+
if (cleanupInterval) {
78+
clearInterval(cleanupInterval);
79+
cleanupInterval = null;
80+
}
81+
});
82+
83+
// --- Register a gateway method for diagnostics ---
84+
api.registerGatewayMethod(
85+
"circuit-breaker-status",
86+
async (params: { channel?: string; to?: string }) => {
87+
if (!breaker) return { status: "not_initialized" };
88+
if (params.channel && params.to) {
89+
const state = breaker.getState(params.channel, params.to);
90+
return { room: `${params.channel}:${params.to}`, state: state ?? "no_data" };
91+
}
92+
return { status: "ok", message: "Pass channel and to params to query a specific room" };
93+
},
94+
);
95+
96+
logger.warn("[rate-limit-circuit-breaker] Plugin registered");
97+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"id": "rate-limit-circuit-breaker",
3+
"name": "Rate Limit Circuit Breaker",
4+
"description": "Prevents death loops in group chats when LLM API rate limits are hit. Suppresses repeated rate-limit error messages with exponential backoff to stop agents from triggering each other.",
5+
"configSchema": {
6+
"type": "object",
7+
"additionalProperties": false,
8+
"properties": {
9+
"maxConsecutiveErrors": {
10+
"type": "number",
11+
"description": "Number of consecutive rate-limit errors before the circuit breaker opens (suppresses further error messages). Default: 3",
12+
"default": 3
13+
},
14+
"baseCooldownMs": {
15+
"type": "number",
16+
"description": "Base cooldown in milliseconds after the circuit breaker opens. Doubles with each subsequent trip (exponential backoff). Default: 60000 (60 seconds)",
17+
"default": 60000
18+
},
19+
"maxCooldownMs": {
20+
"type": "number",
21+
"description": "Maximum cooldown in milliseconds (cap for exponential backoff). Default: 600000 (10 minutes)",
22+
"default": 600000
23+
}
24+
}
25+
}
26+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "@openclaw/rate-limit-circuit-breaker",
3+
"version": "2026.4.10",
4+
"description": "Prevents death loops in multi-agent group chats caused by LLM API rate limit errors",
5+
"type": "module",
6+
"dependencies": {},
7+
"devDependencies": {
8+
"@openclaw/plugin-sdk": "workspace:*",
9+
"openclaw": "workspace:*"
10+
},
11+
"peerDependencies": {
12+
"openclaw": ">=2026.3.0"
13+
},
14+
"exports": {
15+
".": "./index.ts"
16+
}
17+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import { describe, it, expect, beforeEach, vi } from "vitest";
2+
import {
3+
RateLimitCircuitBreaker,
4+
isRateLimitErrorMessage,
5+
isTransientErrorMessage,
6+
} from "./circuit-breaker.js";
7+
8+
describe("isRateLimitErrorMessage", () => {
9+
it("matches the standard OpenClaw rate limit message", () => {
10+
expect(isRateLimitErrorMessage("API rate limit reached. Please try again later.")).toBe(true);
11+
});
12+
13+
it("matches messages with emoji prefix", () => {
14+
expect(isRateLimitErrorMessage("\u26a0\ufe0f API rate limit reached. Please try again later.")).toBe(true);
15+
});
16+
17+
it("matches 429 status code references", () => {
18+
expect(isRateLimitErrorMessage("HTTP 429 Too Many Requests")).toBe(true);
19+
});
20+
21+
it("matches rate_limit_exceeded error type", () => {
22+
expect(isRateLimitErrorMessage('{"type":"rate_limit_exceeded"}')).toBe(true);
23+
});
24+
25+
it("matches overloaded messages", () => {
26+
expect(isRateLimitErrorMessage("The AI service is temporarily overloaded. Please try again in a moment.")).toBe(true);
27+
});
28+
29+
it("does not match normal messages", () => {
30+
expect(isRateLimitErrorMessage("Hello, how are you?")).toBe(false);
31+
expect(isRateLimitErrorMessage("The rate of improvement is excellent")).toBe(false);
32+
});
33+
});
34+
35+
describe("isTransientErrorMessage", () => {
36+
it("matches timeout messages", () => {
37+
expect(isTransientErrorMessage("LLM request timed out.")).toBe(true);
38+
});
39+
40+
it("matches rate limit messages (superset)", () => {
41+
expect(isTransientErrorMessage("API rate limit reached. Please try again later.")).toBe(true);
42+
});
43+
44+
it("does not match normal messages", () => {
45+
expect(isTransientErrorMessage("Here is your report.")).toBe(false);
46+
});
47+
});
48+
49+
describe("RateLimitCircuitBreaker", () => {
50+
let breaker: RateLimitCircuitBreaker;
51+
const channel = "matrix";
52+
const room = "!room123:server.org";
53+
const errorMsg = "\u26a0\ufe0f API rate limit reached. Please try again later.";
54+
const normalMsg = "Here is the analysis you requested.";
55+
56+
const warnSpy = vi.fn();
57+
const debugSpy = vi.fn();
58+
59+
beforeEach(() => {
60+
warnSpy.mockReset();
61+
debugSpy.mockReset();
62+
breaker = new RateLimitCircuitBreaker(
63+
{ maxConsecutiveErrors: 3, baseCooldownMs: 1000, maxCooldownMs: 8000 },
64+
{ warn: warnSpy, debug: debugSpy },
65+
);
66+
});
67+
68+
it("allows normal messages through", () => {
69+
expect(breaker.shouldSuppress(channel, room, normalMsg)).toBe(false);
70+
});
71+
72+
it("allows first N-1 rate limit errors through", () => {
73+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(false); // 1/3
74+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(false); // 2/3
75+
});
76+
77+
it("suppresses the Nth consecutive error (trips the circuit)", () => {
78+
breaker.shouldSuppress(channel, room, errorMsg); // 1
79+
breaker.shouldSuppress(channel, room, errorMsg); // 2
80+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(true); // 3 -> OPEN
81+
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("closed -> open"));
82+
});
83+
84+
it("suppresses subsequent errors while circuit is open", () => {
85+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
86+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(true);
87+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(true);
88+
});
89+
90+
it("allows normal messages while circuit is open (does not suppress non-error)", () => {
91+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
92+
// Non-error messages always go through
93+
expect(breaker.shouldSuppress(channel, room, normalMsg)).toBe(false);
94+
});
95+
96+
it("transitions to half_open after cooldown and allows one retry", () => {
97+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
98+
const state = breaker.getState(channel, room)!;
99+
// Simulate cooldown expiration by backdating openedAt
100+
state.openedAt = Date.now() - 2000; // 2s > 1s cooldown
101+
// Next error triggers half_open transition + is allowed through
102+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(false);
103+
expect(breaker.getState(channel, room)!.state).toBe("half_open");
104+
});
105+
106+
it("re-opens with doubled cooldown if retry fails in half_open", () => {
107+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
108+
const state = breaker.getState(channel, room)!;
109+
state.openedAt = Date.now() - 2000;
110+
breaker.shouldSuppress(channel, room, errorMsg); // -> half_open, allowed
111+
// Another error in half_open
112+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(true);
113+
const updatedState = breaker.getState(channel, room)!;
114+
expect(updatedState.state).toBe("open");
115+
expect(updatedState.cooldownMs).toBe(2000); // doubled from 1000
116+
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("half_open -> open"));
117+
});
118+
119+
it("resets to closed on success in half_open state", () => {
120+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
121+
const state = breaker.getState(channel, room)!;
122+
state.openedAt = Date.now() - 2000;
123+
breaker.shouldSuppress(channel, room, errorMsg); // -> half_open
124+
// Normal message = success
125+
expect(breaker.shouldSuppress(channel, room, normalMsg)).toBe(false);
126+
const updatedState = breaker.getState(channel, room)!;
127+
expect(updatedState.state).toBe("closed");
128+
expect(updatedState.consecutiveErrors).toBe(0);
129+
expect(updatedState.tripCount).toBe(0);
130+
});
131+
132+
it("resets error count on normal message in closed state", () => {
133+
breaker.shouldSuppress(channel, room, errorMsg); // 1
134+
breaker.shouldSuppress(channel, room, errorMsg); // 2
135+
breaker.shouldSuppress(channel, room, normalMsg); // reset
136+
breaker.shouldSuppress(channel, room, errorMsg); // 1 again
137+
breaker.shouldSuppress(channel, room, errorMsg); // 2 again
138+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(true); // 3 -> trips
139+
});
140+
141+
it("tracks rooms independently", () => {
142+
const room2 = "!room456:server.org";
143+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
144+
// room1 is open
145+
expect(breaker.shouldSuppress(channel, room, errorMsg)).toBe(true);
146+
// room2 is still closed
147+
expect(breaker.shouldSuppress(channel, room2, errorMsg)).toBe(false);
148+
});
149+
150+
it("caps cooldown at maxCooldownMs", () => {
151+
// Trip multiple times
152+
for (let trip = 0; trip < 10; trip++) {
153+
// Fill up consecutive errors
154+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
155+
// Now it's open. Expire the cooldown.
156+
const state = breaker.getState(channel, room)!;
157+
state.openedAt = Date.now() - state.cooldownMs - 100;
158+
breaker.shouldSuppress(channel, room, errorMsg); // -> half_open, allowed
159+
// Fail the retry
160+
breaker.shouldSuppress(channel, room, errorMsg); // -> open with doubled cooldown
161+
}
162+
const finalState = breaker.getState(channel, room)!;
163+
expect(finalState.cooldownMs).toBeLessThanOrEqual(8000);
164+
});
165+
166+
it("recordSuccess resets the circuit fully", () => {
167+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
168+
expect(breaker.getState(channel, room)!.state).toBe("open");
169+
breaker.recordSuccess(channel, room);
170+
expect(breaker.getState(channel, room)!.state).toBe("closed");
171+
expect(breaker.getState(channel, room)!.consecutiveErrors).toBe(0);
172+
});
173+
174+
it("cleanup removes stale entries", () => {
175+
for (let i = 0; i < 3; i++) breaker.shouldSuppress(channel, room, errorMsg);
176+
const state = breaker.getState(channel, room)!;
177+
state.openedAt = Date.now() - 7_200_000; // 2 hours ago
178+
breaker.cleanup(3_600_000); // 1 hour max age
179+
expect(breaker.getState(channel, room)).toBeUndefined();
180+
});
181+
});

0 commit comments

Comments
 (0)