Skip to content

Commit 1819e41

Browse files
authored
fix(gateway): preserve node reconnect state (#78351)
Preserve node registry ownership across same-node WebSocket reconnect races so stale old-socket closes cannot clear the replacement session or complete the wrong pending invoke. Thanks @samzong.
1 parent 9ef37d1 commit 1819e41

6 files changed

Lines changed: 171 additions & 17 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ Docs: https://docs.openclaw.ai
600600
- Managed proxy/security: classify raw socket callsites and proxy runtime mutations in boundary checks so new direct egress or unmanaged proxy-state changes cannot land without explicit review. (#77126) Thanks @jesse-merhi.
601601
- Channels/iMessage: surface the silent group-allowlist drop at default log level by emitting a one-time `warn` per account at monitor startup when `channels.imessage.groupPolicy: "allowlist"` is set without a `channels.imessage.groups` block, plus a one-time `warn` per `chat_id` when the runtime gate drops a specific group, naming the exact `channels.imessage.groups[...]` key to add to allow it. Fixes #78749. (#79190) Thanks @omarshahine.
602602
- WhatsApp: stop Gateway-originated outbound echoes from advancing inbound activity in `openclaw channels status`, so outbound self-sends no longer look like handled inbound messages. Fixes #79056. (#79057) Thanks @ai-hpc and @bittoby.
603+
- Gateway/nodes: preserve the live node registry session and invoke ownership when an older same-node WebSocket closes after reconnecting. (#78351) Thanks @samzong.
603604

604605
## 2026.5.3-1
605606

src/gateway/node-registry.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { describe, expect, it } from "vitest";
2+
import { NodeRegistry } from "./node-registry.js";
3+
import type { GatewayWsClient } from "./server/ws-types.js";
4+
5+
function makeClient(connId: string, nodeId: string, sent: string[] = []): GatewayWsClient {
6+
return {
7+
connId,
8+
usesSharedGatewayAuth: false,
9+
socket: {
10+
send(frame: unknown) {
11+
if (typeof frame === "string") {
12+
sent.push(frame);
13+
}
14+
},
15+
} as unknown as GatewayWsClient["socket"],
16+
connect: {
17+
client: { id: "openclaw-macos", version: "1.0.0", platform: "darwin", mode: "node" },
18+
device: {
19+
id: nodeId,
20+
publicKey: "public-key",
21+
signature: "signature",
22+
signedAt: 1,
23+
nonce: "nonce",
24+
},
25+
} as GatewayWsClient["connect"],
26+
};
27+
}
28+
29+
describe("gateway/node-registry", () => {
30+
it("keeps a reconnected node when the old connection unregisters", async () => {
31+
const registry = new NodeRegistry();
32+
const oldFrames: string[] = [];
33+
const newClient = makeClient("conn-new", "node-1");
34+
35+
registry.register(makeClient("conn-old", "node-1", oldFrames), {});
36+
const oldInvoke = registry.invoke({
37+
nodeId: "node-1",
38+
command: "system.run",
39+
timeoutMs: 1_000,
40+
});
41+
const oldDisconnected = oldInvoke.catch((err: unknown) => err);
42+
const oldRequest = JSON.parse(oldFrames[0] ?? "{}") as { payload?: { id?: string } };
43+
const newSession = registry.register(newClient, {});
44+
45+
expect(
46+
registry.handleInvokeResult({
47+
id: oldRequest.payload?.id ?? "",
48+
nodeId: "node-1",
49+
connId: "conn-new",
50+
ok: true,
51+
}),
52+
).toBe(false);
53+
expect(registry.unregister("conn-old")).toBeNull();
54+
expect(registry.get("node-1")).toBe(newSession);
55+
await expect(oldDisconnected).resolves.toBeInstanceOf(Error);
56+
});
57+
});

src/gateway/node-registry.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export type NodeSession = {
2424

2525
type PendingInvoke = {
2626
nodeId: string;
27+
connId: string;
2728
command: string;
2829
resolve: (value: NodeInvokeResult) => void;
2930
reject: (err: Error) => void;
@@ -88,16 +89,19 @@ export class NodeRegistry {
8889
return null;
8990
}
9091
this.nodesByConn.delete(connId);
91-
this.nodesById.delete(nodeId);
92+
const unregistersCurrentNode = this.nodesById.get(nodeId)?.connId === connId;
93+
if (unregistersCurrentNode) {
94+
this.nodesById.delete(nodeId);
95+
}
9296
for (const [id, pending] of this.pendingInvokes.entries()) {
93-
if (pending.nodeId !== nodeId) {
97+
if (pending.connId !== connId) {
9498
continue;
9599
}
96100
clearTimeout(pending.timer);
97101
pending.reject(new Error(`node disconnected (${pending.command})`));
98102
this.pendingInvokes.delete(id);
99103
}
100-
return nodeId;
104+
return unregistersCurrentNode ? nodeId : null;
101105
}
102106

103107
listConnected(): NodeSession[] {
@@ -150,6 +154,7 @@ export class NodeRegistry {
150154
}, timeoutMs);
151155
this.pendingInvokes.set(requestId, {
152156
nodeId: params.nodeId,
157+
connId: node.connId,
153158
command: params.command,
154159
resolve,
155160
reject,
@@ -161,6 +166,7 @@ export class NodeRegistry {
161166
handleInvokeResult(params: {
162167
id: string;
163168
nodeId: string;
169+
connId: string | undefined;
164170
ok: boolean;
165171
payload?: unknown;
166172
payloadJSON?: string | null;
@@ -170,7 +176,7 @@ export class NodeRegistry {
170176
if (!pending) {
171177
return false;
172178
}
173-
if (pending.nodeId !== params.nodeId) {
179+
if (pending.nodeId !== params.nodeId || pending.connId !== params.connId) {
174180
return false;
175181
}
176182
clearTimeout(pending.timer);

src/gateway/server-methods/nodes.handlers.invoke-result.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export const handleNodeInvokeResult: GatewayRequestHandler = async ({
5454
const ok = context.nodeRegistry.handleInvokeResult({
5555
id: p.id,
5656
nodeId: p.nodeId,
57+
connId: client?.connId,
5758
ok: p.ok,
5859
payload: p.payload,
5960
payloadJSON: p.payloadJSON ?? null,

src/gateway/server/ws-connection.test.ts

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,22 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
33
import type { WebSocketServer } from "ws";
44
import type { ResolvedGatewayAuth } from "../auth.js";
55

6-
const { attachGatewayWsMessageHandlerMock } = vi.hoisted(() => ({
7-
attachGatewayWsMessageHandlerMock: vi.fn(),
8-
}));
6+
const { attachGatewayWsMessageHandlerMock, broadcastPresenceSnapshotMock, upsertPresenceMock } =
7+
vi.hoisted(() => ({
8+
attachGatewayWsMessageHandlerMock: vi.fn(),
9+
broadcastPresenceSnapshotMock: vi.fn(),
10+
upsertPresenceMock: vi.fn(),
11+
}));
912

1013
vi.mock("./ws-connection/message-handler.js", () => ({
1114
attachGatewayWsMessageHandler: attachGatewayWsMessageHandlerMock,
1215
}));
16+
vi.mock("../../infra/system-presence.js", () => ({
17+
upsertPresence: upsertPresenceMock,
18+
}));
19+
vi.mock("./presence-events.js", () => ({
20+
broadcastPresenceSnapshot: broadcastPresenceSnapshotMock,
21+
}));
1322

1423
import { attachGatewayWsConnectionHandler } from "./ws-connection.js";
1524
import { resolveSharedGatewaySessionGeneration } from "./ws-shared-generation.js";
@@ -122,6 +131,8 @@ async function connectTestWs(
122131
describe("attachGatewayWsConnectionHandler", () => {
123132
beforeEach(() => {
124133
attachGatewayWsMessageHandlerMock.mockReset();
134+
broadcastPresenceSnapshotMock.mockReset();
135+
upsertPresenceMock.mockReset();
125136
});
126137

127138
afterEach(() => {
@@ -234,4 +245,78 @@ describe("attachGatewayWsConnectionHandler", () => {
234245
vi.advanceTimersByTime(25_000);
235246
expect(socket.ping).toHaveBeenCalledTimes(1);
236247
});
248+
249+
it("skips node presence disconnects for stale reconnected sockets", async () => {
250+
const listeners = new Map<string, (...args: unknown[]) => void>();
251+
const unregister = vi.fn(() => null);
252+
const wss = {
253+
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
254+
listeners.set(event, handler);
255+
}),
256+
} as unknown as WebSocketServer;
257+
const socket = Object.assign(new EventEmitter(), {
258+
_socket: {
259+
remoteAddress: "127.0.0.1",
260+
remotePort: 1234,
261+
localAddress: "127.0.0.1",
262+
localPort: 5678,
263+
},
264+
send: vi.fn(),
265+
close: vi.fn(),
266+
});
267+
const upgradeReq = {
268+
headers: { host: "127.0.0.1:19001" },
269+
socket: { localAddress: "127.0.0.1" },
270+
};
271+
272+
attachGatewayWsConnectionHandler({
273+
wss,
274+
clients: new Set(),
275+
preauthConnectionBudget: { release: vi.fn() } as never,
276+
port: 19001,
277+
resolvedAuth: createResolvedAuth("token"),
278+
gatewayMethods: [],
279+
events: [],
280+
refreshHealthSnapshot: vi.fn(),
281+
logGateway: createLogger() as never,
282+
logHealth: createLogger() as never,
283+
logWsControl: createLogger() as never,
284+
extraHandlers: {},
285+
broadcast: vi.fn(),
286+
buildRequestContext: () =>
287+
({
288+
unsubscribeAllSessionEvents: vi.fn(),
289+
nodeRegistry: { unregister },
290+
nodeUnsubscribeAll: vi.fn(),
291+
}) as never,
292+
});
293+
294+
const onConnection = listeners.get("connection");
295+
expect(onConnection).toBeTypeOf("function");
296+
onConnection?.(socket, upgradeReq);
297+
await waitForLazyMessageHandler();
298+
299+
const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as {
300+
setClient: (client: unknown) => boolean;
301+
};
302+
expect(
303+
passed.setClient({
304+
socket,
305+
connect: {
306+
role: "node",
307+
client: { id: "openclaw-macos", mode: "node" },
308+
device: { id: "node-1" },
309+
},
310+
connId: "conn-old",
311+
presenceKey: "node-1",
312+
usesSharedGatewayAuth: false,
313+
}),
314+
).toBe(true);
315+
316+
socket.emit("close", 1000, Buffer.from("stale"));
317+
318+
expect(unregister).toHaveBeenCalledTimes(1);
319+
expect(upsertPresenceMock).not.toHaveBeenCalled();
320+
expect(broadcastPresenceSnapshotMock).not.toHaveBeenCalled();
321+
});
237322
});

src/gateway/server/ws-connection.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -387,19 +387,23 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
387387
`webchat disconnected code=${code} reason=${logReason || "n/a"} conn=${connId}`,
388388
);
389389
}
390-
if (client?.presenceKey) {
391-
upsertPresence(client.presenceKey, { reason: "disconnect" });
392-
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
393-
}
394390
const context = buildRequestContext();
395391
context.unsubscribeAllSessionEvents(connId);
392+
let currentDisconnectedNodeId: string | null = null;
396393
if (client?.connect?.role === "node") {
397-
const nodeId = context.nodeRegistry.unregister(connId);
398-
if (nodeId) {
399-
removeRemoteNodeInfo(nodeId);
400-
context.nodeUnsubscribeAll(nodeId);
401-
clearNodeWakeState(nodeId);
402-
}
394+
currentDisconnectedNodeId = context.nodeRegistry.unregister(connId);
395+
}
396+
if (
397+
client?.presenceKey &&
398+
(client.connect.role !== "node" || currentDisconnectedNodeId !== null)
399+
) {
400+
upsertPresence(client.presenceKey, { reason: "disconnect" });
401+
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
402+
}
403+
if (currentDisconnectedNodeId) {
404+
removeRemoteNodeInfo(currentDisconnectedNodeId);
405+
context.nodeUnsubscribeAll(currentDisconnectedNodeId);
406+
clearNodeWakeState(currentDisconnectedNodeId);
403407
}
404408
logWs("out", "close", {
405409
connId,

0 commit comments

Comments
 (0)