Skip to content

Commit 664e05e

Browse files
committed
fix(gateway): reject slow node event sends
Signed-off-by: samzong <samzong.lu@gmail.com>
1 parent 79197b3 commit 664e05e

2 files changed

Lines changed: 41 additions & 0 deletions

File tree

src/gateway/node-registry.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { EventEmitter } from "node:events";
22
import { describe, expect, it, vi } from "vitest";
33
import { NodeRegistry, serializeEventPayload } from "./node-registry.js";
4+
import { MAX_BUFFERED_BYTES } from "./server-constants.js";
45
import type { GatewayWsClient } from "./server/ws-types.js";
56

67
function makeClient(
@@ -506,6 +507,26 @@ describe("gateway/node-registry", () => {
506507
]);
507508
});
508509

510+
it("rejects raw event sends when the node socket buffer is saturated", () => {
511+
const registry = new NodeRegistry();
512+
const socket = {
513+
bufferedAmount: MAX_BUFFERED_BYTES + 1,
514+
send: vi.fn(),
515+
close: vi.fn(),
516+
};
517+
registry.register(
518+
makeClient("conn-1", "node-1", [], {
519+
socket: socket as unknown as GatewayWsClient["socket"],
520+
}),
521+
{},
522+
);
523+
const payload = serializeEventPayload({ foo: "bar" });
524+
525+
expect(registry.sendEventRaw("node-1", "chat", payload)).toBe(false);
526+
expect(socket.send).not.toHaveBeenCalled();
527+
expect(socket.close).toHaveBeenCalledWith(1008, "slow consumer");
528+
});
529+
509530
it("refreshes effective live surface within the declared surface", () => {
510531
const registry = new NodeRegistry();
511532
const client = makeClient("conn-1", "node-1", [], {

src/gateway/node-registry.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { randomUUID } from "node:crypto";
2+
import { MAX_BUFFERED_BYTES } from "./server-constants.js";
23
import type { GatewayWsClient } from "./server/ws-types.js";
34

45
export type NodeSession = {
@@ -72,6 +73,7 @@ type PingableSocket = {
7273
const SERIALIZED_EVENT_PAYLOAD = Symbol("openclaw.serializedEventPayload");
7374
const AUTHORIZED_SYSTEM_RUN_EVENT_GRACE_MS = 5 * 60 * 1000;
7475
const WEBSOCKET_OPEN_READY_STATE = 1;
76+
const SLOW_CONSUMER_CLOSE_CODE = 1008;
7577

7678
export type SerializedEventPayload = {
7779
readonly json: string;
@@ -657,6 +659,9 @@ export class NodeRegistry {
657659
}
658660

659661
private sendEventInternal(node: NodeSession, event: string, payload: unknown): boolean {
662+
if (this.rejectSlowNodeSocket(node)) {
663+
return false;
664+
}
660665
try {
661666
node.client.socket.send(
662667
JSON.stringify({
@@ -683,6 +688,9 @@ export class NodeRegistry {
683688
) {
684689
return false;
685690
}
691+
if (this.rejectSlowNodeSocket(node)) {
692+
return false;
693+
}
686694
try {
687695
const payloadFragment = payloadJSON ? `,"payload":${payloadJSON.json}` : "";
688696
node.client.socket.send(
@@ -697,4 +705,16 @@ export class NodeRegistry {
697705
private sendEventToSession(node: NodeSession, event: string, payload: unknown): boolean {
698706
return this.sendEventInternal(node, event, payload);
699707
}
708+
709+
private rejectSlowNodeSocket(node: NodeSession): boolean {
710+
if (!(node.client.socket.bufferedAmount > MAX_BUFFERED_BYTES)) {
711+
return false;
712+
}
713+
try {
714+
node.client.socket.close(SLOW_CONSUMER_CLOSE_CODE, "slow consumer");
715+
} catch {
716+
/* ignore */
717+
}
718+
return true;
719+
}
700720
}

0 commit comments

Comments
 (0)