Skip to content

Commit d13c8b0

Browse files
committed
refactor: share Google Meet audio input loop
1 parent 7b3104f commit d13c8b0

1 file changed

Lines changed: 112 additions & 125 deletions

File tree

extensions/google-meet/src/realtime-node.ts

Lines changed: 112 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,85 @@ function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | un
8181
return trimmed;
8282
}
8383

84+
function startGoogleMeetNodeAudioInputLoop(params: {
85+
runtime: PluginRuntime;
86+
nodeId: string;
87+
bridgeId: string;
88+
logger: RuntimeLogger;
89+
logPrefix: string;
90+
isStopped: () => boolean;
91+
stop: () => Promise<void>;
92+
isInputSuppressed: () => boolean;
93+
onAudio: (audio: Buffer) => void;
94+
}) {
95+
let lastInputAt: string | undefined;
96+
let lastInputBytes = 0;
97+
let suppressedInputBytes = 0;
98+
let lastSuppressedInputAt: string | undefined;
99+
let consecutiveInputErrors = 0;
100+
let lastInputError: string | undefined;
101+
102+
void (async () => {
103+
for (;;) {
104+
if (params.isStopped()) {
105+
break;
106+
}
107+
try {
108+
const raw = await params.runtime.nodes.invoke({
109+
nodeId: params.nodeId,
110+
command: "googlemeet.chrome",
111+
params: { action: "pullAudio", bridgeId: params.bridgeId, timeoutMs: 250 },
112+
timeoutMs: 2_000,
113+
});
114+
const result = asRecord(asRecord(raw).payload ?? raw);
115+
consecutiveInputErrors = 0;
116+
lastInputError = undefined;
117+
const base64 = readString(result.base64);
118+
if (base64) {
119+
const audio = Buffer.from(base64, "base64");
120+
if (params.isInputSuppressed()) {
121+
lastSuppressedInputAt = new Date().toISOString();
122+
suppressedInputBytes += audio.byteLength;
123+
continue;
124+
}
125+
lastInputAt = new Date().toISOString();
126+
lastInputBytes += audio.byteLength;
127+
params.onAudio(audio);
128+
}
129+
if (result.closed === true) {
130+
await params.stop();
131+
}
132+
} catch (error) {
133+
if (!params.isStopped()) {
134+
const message = formatErrorMessage(error);
135+
consecutiveInputErrors += 1;
136+
lastInputError = message;
137+
params.logger.warn(
138+
`[google-meet] ${params.logPrefix} audio input failed (${consecutiveInputErrors}/5): ${message}`,
139+
);
140+
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
141+
await params.stop();
142+
} else {
143+
await new Promise((resolve) => setTimeout(resolve, 250));
144+
}
145+
}
146+
}
147+
}
148+
})();
149+
150+
return {
151+
getHealth: () => ({
152+
audioInputActive: lastInputBytes > 0,
153+
lastInputAt,
154+
lastSuppressedInputAt,
155+
lastInputBytes,
156+
suppressedInputBytes,
157+
consecutiveInputErrors,
158+
lastInputError,
159+
}),
160+
};
161+
}
162+
84163
export async function startNodeAgentAudioBridge(params: {
85164
config: GoogleMeetConfig;
86165
fullConfig: OpenClawConfig;
@@ -95,16 +174,10 @@ export async function startNodeAgentAudioBridge(params: {
95174
let stopped = false;
96175
let sttSession: RealtimeTranscriptionSession | null = null;
97176
let realtimeReady = false;
98-
let lastInputAt: string | undefined;
99177
let lastOutputAt: string | undefined;
100-
let lastInputBytes = 0;
101178
const outputActivity = createRealtimeVoiceOutputActivityTracker();
102-
let suppressedInputBytes = 0;
103-
let lastSuppressedInputAt: string | undefined;
104179
let suppressInputUntil = 0;
105180
let lastOutputPlayableUntilMs = 0;
106-
let consecutiveInputErrors = 0;
107-
let lastInputError: string | undefined;
108181
const resolved = resolveGoogleMeetRealtimeTranscriptionProvider({
109182
config: params.config,
110183
fullConfig: params.fullConfig,
@@ -260,53 +333,19 @@ export async function startNodeAgentAudioBridge(params: {
260333
await sttSession.connect();
261334
realtimeReady = true;
262335

263-
void (async () => {
264-
for (;;) {
265-
if (stopped) {
266-
break;
267-
}
268-
try {
269-
const raw = await params.runtime.nodes.invoke({
270-
nodeId: params.nodeId,
271-
command: "googlemeet.chrome",
272-
params: { action: "pullAudio", bridgeId: params.bridgeId, timeoutMs: 250 },
273-
timeoutMs: 2_000,
274-
});
275-
const result = asRecord(asRecord(raw).payload ?? raw);
276-
consecutiveInputErrors = 0;
277-
lastInputError = undefined;
278-
const base64 = readString(result.base64);
279-
if (base64) {
280-
const audio = Buffer.from(base64, "base64");
281-
if (Date.now() < suppressInputUntil) {
282-
lastSuppressedInputAt = new Date().toISOString();
283-
suppressedInputBytes += audio.byteLength;
284-
continue;
285-
}
286-
lastInputAt = new Date().toISOString();
287-
lastInputBytes += audio.byteLength;
288-
sttSession?.sendAudio(convertGoogleMeetBridgeAudioForStt(audio, params.config));
289-
}
290-
if (result.closed === true) {
291-
await stop();
292-
}
293-
} catch (error) {
294-
if (!stopped) {
295-
const message = formatErrorMessage(error);
296-
consecutiveInputErrors += 1;
297-
lastInputError = message;
298-
params.logger.warn(
299-
`[google-meet] node agent audio input failed (${consecutiveInputErrors}/5): ${message}`,
300-
);
301-
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
302-
await stop();
303-
} else {
304-
await new Promise((resolve) => setTimeout(resolve, 250));
305-
}
306-
}
307-
}
308-
}
309-
})();
336+
const audioInputLoop = startGoogleMeetNodeAudioInputLoop({
337+
runtime: params.runtime,
338+
nodeId: params.nodeId,
339+
bridgeId: params.bridgeId,
340+
logger: params.logger,
341+
logPrefix: "node agent",
342+
isStopped: () => stopped,
343+
stop,
344+
isInputSuppressed: () => Date.now() < suppressInputUntil,
345+
onAudio: (audio) => {
346+
sttSession?.sendAudio(convertGoogleMeetBridgeAudioForStt(audio, params.config));
347+
},
348+
});
310349

311350
return {
312351
type: "node-command-pair",
@@ -317,17 +356,11 @@ export async function startNodeAgentAudioBridge(params: {
317356
getHealth: () => ({
318357
providerConnected: sttSession?.isConnected() ?? false,
319358
realtimeReady,
320-
audioInputActive: lastInputBytes > 0,
359+
...audioInputLoop.getHealth(),
321360
audioOutputActive: outputActivity.isActive(),
322-
lastInputAt,
323361
lastOutputAt,
324-
lastSuppressedInputAt,
325-
lastInputBytes,
326362
lastOutputBytes: outputActivity.snapshot().sinkAudioBytes,
327-
suppressedInputBytes,
328363
...getGoogleMeetRealtimeTranscriptHealth(transcript),
329-
consecutiveInputErrors,
330-
lastInputError,
331364
bridgeClosed: stopped,
332365
}),
333366
stop,
@@ -348,17 +381,11 @@ export async function startNodeRealtimeAudioBridge(params: {
348381
let stopped = false;
349382
let bridge: RealtimeVoiceBridgeSession | null = null;
350383
let realtimeReady = false;
351-
let lastInputAt: string | undefined;
352384
let lastOutputAt: string | undefined;
353385
let lastClearAt: string | undefined;
354-
let lastInputBytes = 0;
355386
const outputActivity = createRealtimeVoiceOutputActivityTracker();
356-
let suppressedInputBytes = 0;
357-
let lastSuppressedInputAt: string | undefined;
358387
let suppressInputUntil = 0;
359388
let lastOutputPlayableUntilMs = 0;
360-
let consecutiveInputErrors = 0;
361-
let lastInputError: string | undefined;
362389
let clearCount = 0;
363390
const resolved = resolveGoogleMeetRealtimeProvider({
364391
config: params.config,
@@ -695,58 +722,24 @@ export async function startNodeRealtimeAudioBridge(params: {
695722

696723
await bridge.connect();
697724

698-
void (async () => {
699-
for (;;) {
700-
if (stopped) {
701-
break;
702-
}
703-
try {
704-
const raw = await params.runtime.nodes.invoke({
705-
nodeId: params.nodeId,
706-
command: "googlemeet.chrome",
707-
params: { action: "pullAudio", bridgeId: params.bridgeId, timeoutMs: 250 },
708-
timeoutMs: 2_000,
709-
});
710-
const result = asRecord(asRecord(raw).payload ?? raw);
711-
consecutiveInputErrors = 0;
712-
lastInputError = undefined;
713-
const base64 = readString(result.base64);
714-
if (base64) {
715-
const audio = Buffer.from(base64, "base64");
716-
if (Date.now() < suppressInputUntil) {
717-
lastSuppressedInputAt = new Date().toISOString();
718-
suppressedInputBytes += audio.byteLength;
719-
continue;
720-
}
721-
lastInputAt = new Date().toISOString();
722-
lastInputBytes += audio.byteLength;
723-
emitTalkEvent({
724-
type: "input.audio.delta",
725-
turnId: ensureTalkTurn(),
726-
payload: { byteLength: audio.byteLength },
727-
});
728-
bridge?.sendAudio(audio);
729-
}
730-
if (result.closed === true) {
731-
await stop();
732-
}
733-
} catch (error) {
734-
if (!stopped) {
735-
const message = formatErrorMessage(error);
736-
consecutiveInputErrors += 1;
737-
lastInputError = message;
738-
params.logger.warn(
739-
`[google-meet] node audio input failed (${consecutiveInputErrors}/5): ${message}`,
740-
);
741-
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
742-
await stop();
743-
} else {
744-
await new Promise((resolve) => setTimeout(resolve, 250));
745-
}
746-
}
747-
}
748-
}
749-
})();
725+
const audioInputLoop = startGoogleMeetNodeAudioInputLoop({
726+
runtime: params.runtime,
727+
nodeId: params.nodeId,
728+
bridgeId: params.bridgeId,
729+
logger: params.logger,
730+
logPrefix: "node",
731+
isStopped: () => stopped,
732+
stop,
733+
isInputSuppressed: () => Date.now() < suppressInputUntil,
734+
onAudio: (audio) => {
735+
emitTalkEvent({
736+
type: "input.audio.delta",
737+
turnId: ensureTalkTurn(),
738+
payload: { byteLength: audio.byteLength },
739+
});
740+
bridge?.sendAudio(audio);
741+
},
742+
});
750743

751744
return {
752745
type: "node-command-pair",
@@ -759,20 +752,14 @@ export async function startNodeRealtimeAudioBridge(params: {
759752
getHealth: () => ({
760753
providerConnected: bridge?.bridge.isConnected() ?? false,
761754
realtimeReady,
762-
audioInputActive: lastInputBytes > 0,
755+
...audioInputLoop.getHealth(),
763756
audioOutputActive: outputActivity.isActive(),
764-
lastInputAt,
765757
lastOutputAt,
766-
lastSuppressedInputAt,
767758
lastClearAt,
768-
lastInputBytes,
769759
lastOutputBytes: outputActivity.snapshot().sinkAudioBytes,
770-
suppressedInputBytes,
771760
...getGoogleMeetRealtimeTranscriptHealth(transcript),
772761
...getGoogleMeetRealtimeEventHealth(realtimeEvents),
773762
recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents),
774-
consecutiveInputErrors,
775-
lastInputError,
776763
clearCount,
777764
bridgeClosed: stopped,
778765
}),

0 commit comments

Comments
 (0)