11import { randomUUID } from "node:crypto" ;
2+ import { MAX_BUFFERED_BYTES } from "./server-constants.js" ;
23import type { GatewayWsClient } from "./server/ws-types.js" ;
34
45export type NodeSession = {
@@ -72,6 +73,7 @@ type PingableSocket = {
7273const SERIALIZED_EVENT_PAYLOAD = Symbol ( "openclaw.serializedEventPayload" ) ;
7374const AUTHORIZED_SYSTEM_RUN_EVENT_GRACE_MS = 5 * 60 * 1000 ;
7475const WEBSOCKET_OPEN_READY_STATE = 1 ;
76+ const SLOW_CONSUMER_CLOSE_CODE = 1008 ;
7577
7678export type SerializedEventPayload = {
7779 readonly json : string ;
@@ -657,6 +659,9 @@ export class NodeRegistry {
657659 }
658660
659661 private sendEventInternal ( node : NodeSession , event : string , payload : unknown ) : boolean {
662+ if ( this . rejectSlowNodeSocket ( node ) ) {
663+ return false ;
664+ }
660665 try {
661666 node . client . socket . send (
662667 JSON . stringify ( {
@@ -683,6 +688,9 @@ export class NodeRegistry {
683688 ) {
684689 return false ;
685690 }
691+ if ( this . rejectSlowNodeSocket ( node ) ) {
692+ return false ;
693+ }
686694 try {
687695 const payloadFragment = payloadJSON ? `,"payload":${ payloadJSON . json } ` : "" ;
688696 node . client . socket . send (
@@ -697,4 +705,16 @@ export class NodeRegistry {
697705 private sendEventToSession ( node : NodeSession , event : string , payload : unknown ) : boolean {
698706 return this . sendEventInternal ( node , event , payload ) ;
699707 }
708+
709+ private rejectSlowNodeSocket ( node : NodeSession ) : boolean {
710+ if ( ! ( node . client . socket . bufferedAmount > MAX_BUFFERED_BYTES ) ) {
711+ return false ;
712+ }
713+ try {
714+ node . client . socket . close ( SLOW_CONSUMER_CLOSE_CODE , "slow consumer" ) ;
715+ } catch {
716+ /* ignore */
717+ }
718+ return true ;
719+ }
700720}
0 commit comments