Skip to content

Commit 5487855

Browse files
committed
refactor(gateway): share talk relay session lifecycle
1 parent 45f7aec commit 5487855

3 files changed

Lines changed: 91 additions & 60 deletions

File tree

src/gateway/talk-realtime-relay.ts

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import { randomUUID } from "node:crypto";
2-
import {
3-
asDateTimestampMs,
4-
resolveExpiresAtMsFromDurationMs,
5-
} from "@openclaw/normalization-core/number-coercion";
2+
import { resolveExpiresAtMsFromDurationMs } from "@openclaw/normalization-core/number-coercion";
63
import type { OpenClawConfig } from "../config/types.js";
74
import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js";
85
import {
@@ -45,6 +42,10 @@ import {
4542
} from "../talk/talk-session-controller.js";
4643
import { abortChatRunById } from "./chat-abort.js";
4744
import type { GatewayRequestContext } from "./server-methods/shared-types.js";
45+
import {
46+
closeExpiredTalkRelaySessions,
47+
requireActiveTalkRelaySession,
48+
} from "./talk-relay-session-lifecycle.js";
4849
import { forgetUnifiedTalkSession } from "./talk-session-registry.js";
4950

5051
const RELAY_SESSION_TTL_MS = 30 * 60 * 1000;
@@ -231,9 +232,7 @@ function broadcastToolResultToOwner(
231232
},
232233
): void {
233234
const payload =
234-
params.forced === true
235-
? { result: params.result, forced: true }
236-
: { result: params.result };
235+
params.forced === true ? { result: params.result, forced: true } : { result: params.result };
237236
broadcastToOwner(session.context, session.connId, {
238237
relaySessionId: session.id,
239238
type: "toolResult",
@@ -301,16 +300,11 @@ function closeRelaySession(session: RelaySession, reason: "completed" | "error")
301300
}
302301

303302
function pruneExpiredRelaySessions(nowMs = Date.now()): void {
304-
const validNowMs = asDateTimestampMs(nowMs);
305-
if (validNowMs === undefined) {
306-
return;
307-
}
308-
for (const session of relaySessions.values()) {
309-
const expiresAtMs = asDateTimestampMs(session.expiresAtMs);
310-
if (expiresAtMs === undefined || validNowMs > expiresAtMs) {
311-
closeRelaySession(session, "completed");
312-
}
313-
}
303+
closeExpiredTalkRelaySessions({
304+
sessions: relaySessions.values(),
305+
closeSession: (session) => closeRelaySession(session, "completed"),
306+
nowMs,
307+
});
314308
}
315309

316310
function countRelaySessionsForConn(connId: string): number {
@@ -724,22 +718,13 @@ function ensureRelayTurn(session: RelaySession): string {
724718
}
725719

726720
function getRelaySession(relaySessionId: string, connId: string): RelaySession {
727-
const session = relaySessions.get(relaySessionId);
728-
const nowMs = asDateTimestampMs(Date.now());
729-
const expiresAtMs = session ? asDateTimestampMs(session.expiresAtMs) : undefined;
730-
if (
731-
!session ||
732-
session.connId !== connId ||
733-
nowMs === undefined ||
734-
expiresAtMs === undefined ||
735-
nowMs > expiresAtMs
736-
) {
737-
if (session) {
738-
closeRelaySession(session, "completed");
739-
}
740-
throw new Error("Unknown realtime relay session");
741-
}
742-
return session;
721+
return requireActiveTalkRelaySession({
722+
sessions: relaySessions,
723+
sessionId: relaySessionId,
724+
connId,
725+
closeSession: (session) => closeRelaySession(session, "completed"),
726+
unknownSessionMessage: "Unknown realtime relay session",
727+
});
743728
}
744729

745730
export function sendTalkRealtimeRelayAudio(params: {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { asDateTimestampMs } from "@openclaw/normalization-core/number-coercion";
2+
3+
type TalkRelayLifecycleSession = {
4+
connId: string;
5+
expiresAtMs: number;
6+
};
7+
8+
type CloseTalkRelaySession<TSession extends TalkRelayLifecycleSession> = (
9+
session: TSession,
10+
) => void;
11+
12+
function isExpiredTalkRelaySession(
13+
session: TalkRelayLifecycleSession,
14+
validNowMs: number,
15+
): boolean {
16+
const expiresAtMs = asDateTimestampMs(session.expiresAtMs);
17+
return expiresAtMs === undefined || validNowMs > expiresAtMs;
18+
}
19+
20+
export function closeExpiredTalkRelaySessions<TSession extends TalkRelayLifecycleSession>(params: {
21+
sessions: Iterable<TSession>;
22+
closeSession: CloseTalkRelaySession<TSession>;
23+
nowMs?: number;
24+
}): void {
25+
const validNowMs = asDateTimestampMs(params.nowMs ?? Date.now());
26+
if (validNowMs === undefined) {
27+
return;
28+
}
29+
for (const session of params.sessions) {
30+
if (isExpiredTalkRelaySession(session, validNowMs)) {
31+
params.closeSession(session);
32+
}
33+
}
34+
}
35+
36+
export function requireActiveTalkRelaySession<TSession extends TalkRelayLifecycleSession>(params: {
37+
sessions: ReadonlyMap<string, TSession>;
38+
sessionId: string;
39+
connId: string;
40+
closeSession: CloseTalkRelaySession<TSession>;
41+
unknownSessionMessage: string;
42+
}): TSession {
43+
const session = params.sessions.get(params.sessionId);
44+
const nowMs = asDateTimestampMs(Date.now());
45+
if (
46+
!session ||
47+
session.connId !== params.connId ||
48+
nowMs === undefined ||
49+
isExpiredTalkRelaySession(session, nowMs)
50+
) {
51+
if (session) {
52+
params.closeSession(session);
53+
}
54+
throw new Error(params.unknownSessionMessage);
55+
}
56+
return session;
57+
}

src/gateway/talk-transcription-relay.ts

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { randomUUID } from "node:crypto";
22
import {
3-
asDateTimestampMs,
43
parseFiniteNumber as readFiniteNumber,
54
resolveExpiresAtMsFromDurationMs,
65
} from "@openclaw/normalization-core/number-coercion";
@@ -14,6 +13,10 @@ import {
1413
createTalkSessionController,
1514
} from "../talk/talk-session-controller.js";
1615
import type { GatewayRequestContext } from "./server-methods/shared-types.js";
16+
import {
17+
closeExpiredTalkRelaySessions,
18+
requireActiveTalkRelaySession,
19+
} from "./talk-relay-session-lifecycle.js";
1720

1821
const TRANSCRIPTION_SESSION_TTL_MS = 30 * 60 * 1000;
1922
const MAX_AUDIO_BASE64_BYTES = 512 * 1024;
@@ -181,16 +184,11 @@ function closeTranscriptionSession(
181184
}
182185

183186
function pruneExpiredTranscriptionSessions(nowMs = Date.now()): void {
184-
const validNowMs = asDateTimestampMs(nowMs);
185-
if (validNowMs === undefined) {
186-
return;
187-
}
188-
for (const session of transcriptionSessions.values()) {
189-
const expiresAtMs = asDateTimestampMs(session.expiresAtMs);
190-
if (expiresAtMs === undefined || validNowMs > expiresAtMs) {
191-
closeTranscriptionSession(session, "completed");
192-
}
193-
}
187+
closeExpiredTalkRelaySessions({
188+
sessions: transcriptionSessions.values(),
189+
closeSession: (session) => closeTranscriptionSession(session, "completed"),
190+
nowMs,
191+
});
194192
}
195193

196194
function countTranscriptionSessionsForConn(connId: string): number {
@@ -361,22 +359,13 @@ function getTranscriptionSession(
361359
transcriptionSessionId: string,
362360
connId: string,
363361
): TranscriptionRelaySession {
364-
const session = transcriptionSessions.get(transcriptionSessionId);
365-
const nowMs = asDateTimestampMs(Date.now());
366-
const expiresAtMs = session ? asDateTimestampMs(session.expiresAtMs) : undefined;
367-
if (
368-
!session ||
369-
session.connId !== connId ||
370-
nowMs === undefined ||
371-
expiresAtMs === undefined ||
372-
nowMs > expiresAtMs
373-
) {
374-
if (session) {
375-
closeTranscriptionSession(session, "completed");
376-
}
377-
throw new Error("Unknown transcription Talk session");
378-
}
379-
return session;
362+
return requireActiveTalkRelaySession({
363+
sessions: transcriptionSessions,
364+
sessionId: transcriptionSessionId,
365+
connId,
366+
closeSession: (session) => closeTranscriptionSession(session, "completed"),
367+
unknownSessionMessage: "Unknown transcription Talk session",
368+
});
380369
}
381370

382371
export function sendTalkTranscriptionRelayAudio(params: {

0 commit comments

Comments
 (0)