Skip to content

Commit 3d5401b

Browse files
committed
fix(gateway): log slow node send diagnostics
1 parent 664e05e commit 3d5401b

2 files changed

Lines changed: 31 additions & 3 deletions

File tree

src/gateway/node-registry.test.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { EventEmitter } from "node:events";
22
import { describe, expect, it, vi } from "vitest";
3+
import { onDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js";
34
import { NodeRegistry, serializeEventPayload } from "./node-registry.js";
45
import { MAX_BUFFERED_BYTES } from "./server-constants.js";
56
import type { GatewayWsClient } from "./server/ws-types.js";
@@ -508,6 +509,9 @@ describe("gateway/node-registry", () => {
508509
});
509510

510511
it("rejects raw event sends when the node socket buffer is saturated", () => {
512+
resetDiagnosticEventsForTest();
513+
const diagnosticEvents: unknown[] = [];
514+
const stopDiagnostics = onDiagnosticEvent((event) => diagnosticEvents.push(event));
511515
const registry = new NodeRegistry();
512516
const socket = {
513517
bufferedAmount: MAX_BUFFERED_BYTES + 1,
@@ -522,9 +526,26 @@ describe("gateway/node-registry", () => {
522526
);
523527
const payload = serializeEventPayload({ foo: "bar" });
524528

525-
expect(registry.sendEventRaw("node-1", "chat", payload)).toBe(false);
526-
expect(socket.send).not.toHaveBeenCalled();
527-
expect(socket.close).toHaveBeenCalledWith(1008, "slow consumer");
529+
try {
530+
expect(registry.sendEventRaw("node-1", "chat", payload)).toBe(false);
531+
expect(socket.send).not.toHaveBeenCalled();
532+
expect(socket.close).toHaveBeenCalledWith(1008, "slow consumer");
533+
expect(diagnosticEvents).toEqual(
534+
expect.arrayContaining([
535+
expect.objectContaining({
536+
type: "payload.large",
537+
action: "rejected",
538+
surface: "gateway.ws.outbound_buffer",
539+
bytes: MAX_BUFFERED_BYTES + 1,
540+
limitBytes: MAX_BUFFERED_BYTES,
541+
reason: "ws_send_buffer_close",
542+
}),
543+
]),
544+
);
545+
} finally {
546+
stopDiagnostics();
547+
resetDiagnosticEventsForTest();
548+
}
528549
});
529550

530551
it("refreshes effective live surface within the declared surface", () => {

src/gateway/node-registry.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { randomUUID } from "node:crypto";
2+
import { logRejectedLargePayload } from "../logging/diagnostic-payload.js";
23
import { MAX_BUFFERED_BYTES } from "./server-constants.js";
34
import type { GatewayWsClient } from "./server/ws-types.js";
45

@@ -710,6 +711,12 @@ export class NodeRegistry {
710711
if (!(node.client.socket.bufferedAmount > MAX_BUFFERED_BYTES)) {
711712
return false;
712713
}
714+
logRejectedLargePayload({
715+
surface: "gateway.ws.outbound_buffer",
716+
bytes: node.client.socket.bufferedAmount,
717+
limitBytes: MAX_BUFFERED_BYTES,
718+
reason: "ws_send_buffer_close",
719+
});
713720
try {
714721
node.client.socket.close(SLOW_CONSUMER_CLOSE_CODE, "slow consumer");
715722
} catch {

0 commit comments

Comments
 (0)