@@ -81,6 +81,85 @@ function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | un
8181 return trimmed ;
8282}
8383
84+ function startGoogleMeetNodeAudioInputLoop ( params : {
85+ runtime : PluginRuntime ;
86+ nodeId : string ;
87+ bridgeId : string ;
88+ logger : RuntimeLogger ;
89+ logPrefix : string ;
90+ isStopped : ( ) => boolean ;
91+ stop : ( ) => Promise < void > ;
92+ isInputSuppressed : ( ) => boolean ;
93+ onAudio : ( audio : Buffer ) => void ;
94+ } ) {
95+ let lastInputAt : string | undefined ;
96+ let lastInputBytes = 0 ;
97+ let suppressedInputBytes = 0 ;
98+ let lastSuppressedInputAt : string | undefined ;
99+ let consecutiveInputErrors = 0 ;
100+ let lastInputError : string | undefined ;
101+
102+ void ( async ( ) => {
103+ for ( ; ; ) {
104+ if ( params . isStopped ( ) ) {
105+ break ;
106+ }
107+ try {
108+ const raw = await params . runtime . nodes . invoke ( {
109+ nodeId : params . nodeId ,
110+ command : "googlemeet.chrome" ,
111+ params : { action : "pullAudio" , bridgeId : params . bridgeId , timeoutMs : 250 } ,
112+ timeoutMs : 2_000 ,
113+ } ) ;
114+ const result = asRecord ( asRecord ( raw ) . payload ?? raw ) ;
115+ consecutiveInputErrors = 0 ;
116+ lastInputError = undefined ;
117+ const base64 = readString ( result . base64 ) ;
118+ if ( base64 ) {
119+ const audio = Buffer . from ( base64 , "base64" ) ;
120+ if ( params . isInputSuppressed ( ) ) {
121+ lastSuppressedInputAt = new Date ( ) . toISOString ( ) ;
122+ suppressedInputBytes += audio . byteLength ;
123+ continue ;
124+ }
125+ lastInputAt = new Date ( ) . toISOString ( ) ;
126+ lastInputBytes += audio . byteLength ;
127+ params . onAudio ( audio ) ;
128+ }
129+ if ( result . closed === true ) {
130+ await params . stop ( ) ;
131+ }
132+ } catch ( error ) {
133+ if ( ! params . isStopped ( ) ) {
134+ const message = formatErrorMessage ( error ) ;
135+ consecutiveInputErrors += 1 ;
136+ lastInputError = message ;
137+ params . logger . warn (
138+ `[google-meet] ${ params . logPrefix } audio input failed (${ consecutiveInputErrors } /5): ${ message } ` ,
139+ ) ;
140+ if ( consecutiveInputErrors >= 5 || / u n k n o w n b r i d g e I d | b r i d g e i s n o t o p e n / i. test ( message ) ) {
141+ await params . stop ( ) ;
142+ } else {
143+ await new Promise ( ( resolve ) => setTimeout ( resolve , 250 ) ) ;
144+ }
145+ }
146+ }
147+ }
148+ } ) ( ) ;
149+
150+ return {
151+ getHealth : ( ) => ( {
152+ audioInputActive : lastInputBytes > 0 ,
153+ lastInputAt,
154+ lastSuppressedInputAt,
155+ lastInputBytes,
156+ suppressedInputBytes,
157+ consecutiveInputErrors,
158+ lastInputError,
159+ } ) ,
160+ } ;
161+ }
162+
84163export async function startNodeAgentAudioBridge ( params : {
85164 config : GoogleMeetConfig ;
86165 fullConfig : OpenClawConfig ;
@@ -95,16 +174,10 @@ export async function startNodeAgentAudioBridge(params: {
95174 let stopped = false ;
96175 let sttSession : RealtimeTranscriptionSession | null = null ;
97176 let realtimeReady = false ;
98- let lastInputAt : string | undefined ;
99177 let lastOutputAt : string | undefined ;
100- let lastInputBytes = 0 ;
101178 const outputActivity = createRealtimeVoiceOutputActivityTracker ( ) ;
102- let suppressedInputBytes = 0 ;
103- let lastSuppressedInputAt : string | undefined ;
104179 let suppressInputUntil = 0 ;
105180 let lastOutputPlayableUntilMs = 0 ;
106- let consecutiveInputErrors = 0 ;
107- let lastInputError : string | undefined ;
108181 const resolved = resolveGoogleMeetRealtimeTranscriptionProvider ( {
109182 config : params . config ,
110183 fullConfig : params . fullConfig ,
@@ -260,53 +333,19 @@ export async function startNodeAgentAudioBridge(params: {
260333 await sttSession . connect ( ) ;
261334 realtimeReady = true ;
262335
263- void ( async ( ) => {
264- for ( ; ; ) {
265- if ( stopped ) {
266- break ;
267- }
268- try {
269- const raw = await params . runtime . nodes . invoke ( {
270- nodeId : params . nodeId ,
271- command : "googlemeet.chrome" ,
272- params : { action : "pullAudio" , bridgeId : params . bridgeId , timeoutMs : 250 } ,
273- timeoutMs : 2_000 ,
274- } ) ;
275- const result = asRecord ( asRecord ( raw ) . payload ?? raw ) ;
276- consecutiveInputErrors = 0 ;
277- lastInputError = undefined ;
278- const base64 = readString ( result . base64 ) ;
279- if ( base64 ) {
280- const audio = Buffer . from ( base64 , "base64" ) ;
281- if ( Date . now ( ) < suppressInputUntil ) {
282- lastSuppressedInputAt = new Date ( ) . toISOString ( ) ;
283- suppressedInputBytes += audio . byteLength ;
284- continue ;
285- }
286- lastInputAt = new Date ( ) . toISOString ( ) ;
287- lastInputBytes += audio . byteLength ;
288- sttSession ?. sendAudio ( convertGoogleMeetBridgeAudioForStt ( audio , params . config ) ) ;
289- }
290- if ( result . closed === true ) {
291- await stop ( ) ;
292- }
293- } catch ( error ) {
294- if ( ! stopped ) {
295- const message = formatErrorMessage ( error ) ;
296- consecutiveInputErrors += 1 ;
297- lastInputError = message ;
298- params . logger . warn (
299- `[google-meet] node agent audio input failed (${ consecutiveInputErrors } /5): ${ message } ` ,
300- ) ;
301- if ( consecutiveInputErrors >= 5 || / u n k n o w n b r i d g e I d | b r i d g e i s n o t o p e n / i. test ( message ) ) {
302- await stop ( ) ;
303- } else {
304- await new Promise ( ( resolve ) => setTimeout ( resolve , 250 ) ) ;
305- }
306- }
307- }
308- }
309- } ) ( ) ;
336+ const audioInputLoop = startGoogleMeetNodeAudioInputLoop ( {
337+ runtime : params . runtime ,
338+ nodeId : params . nodeId ,
339+ bridgeId : params . bridgeId ,
340+ logger : params . logger ,
341+ logPrefix : "node agent" ,
342+ isStopped : ( ) => stopped ,
343+ stop,
344+ isInputSuppressed : ( ) => Date . now ( ) < suppressInputUntil ,
345+ onAudio : ( audio ) => {
346+ sttSession ?. sendAudio ( convertGoogleMeetBridgeAudioForStt ( audio , params . config ) ) ;
347+ } ,
348+ } ) ;
310349
311350 return {
312351 type : "node-command-pair" ,
@@ -317,17 +356,11 @@ export async function startNodeAgentAudioBridge(params: {
317356 getHealth : ( ) => ( {
318357 providerConnected : sttSession ?. isConnected ( ) ?? false ,
319358 realtimeReady,
320- audioInputActive : lastInputBytes > 0 ,
359+ ... audioInputLoop . getHealth ( ) ,
321360 audioOutputActive : outputActivity . isActive ( ) ,
322- lastInputAt,
323361 lastOutputAt,
324- lastSuppressedInputAt,
325- lastInputBytes,
326362 lastOutputBytes : outputActivity . snapshot ( ) . sinkAudioBytes ,
327- suppressedInputBytes,
328363 ...getGoogleMeetRealtimeTranscriptHealth ( transcript ) ,
329- consecutiveInputErrors,
330- lastInputError,
331364 bridgeClosed : stopped ,
332365 } ) ,
333366 stop,
@@ -348,17 +381,11 @@ export async function startNodeRealtimeAudioBridge(params: {
348381 let stopped = false ;
349382 let bridge : RealtimeVoiceBridgeSession | null = null ;
350383 let realtimeReady = false ;
351- let lastInputAt : string | undefined ;
352384 let lastOutputAt : string | undefined ;
353385 let lastClearAt : string | undefined ;
354- let lastInputBytes = 0 ;
355386 const outputActivity = createRealtimeVoiceOutputActivityTracker ( ) ;
356- let suppressedInputBytes = 0 ;
357- let lastSuppressedInputAt : string | undefined ;
358387 let suppressInputUntil = 0 ;
359388 let lastOutputPlayableUntilMs = 0 ;
360- let consecutiveInputErrors = 0 ;
361- let lastInputError : string | undefined ;
362389 let clearCount = 0 ;
363390 const resolved = resolveGoogleMeetRealtimeProvider ( {
364391 config : params . config ,
@@ -695,58 +722,24 @@ export async function startNodeRealtimeAudioBridge(params: {
695722
696723 await bridge . connect ( ) ;
697724
698- void ( async ( ) => {
699- for ( ; ; ) {
700- if ( stopped ) {
701- break ;
702- }
703- try {
704- const raw = await params . runtime . nodes . invoke ( {
705- nodeId : params . nodeId ,
706- command : "googlemeet.chrome" ,
707- params : { action : "pullAudio" , bridgeId : params . bridgeId , timeoutMs : 250 } ,
708- timeoutMs : 2_000 ,
709- } ) ;
710- const result = asRecord ( asRecord ( raw ) . payload ?? raw ) ;
711- consecutiveInputErrors = 0 ;
712- lastInputError = undefined ;
713- const base64 = readString ( result . base64 ) ;
714- if ( base64 ) {
715- const audio = Buffer . from ( base64 , "base64" ) ;
716- if ( Date . now ( ) < suppressInputUntil ) {
717- lastSuppressedInputAt = new Date ( ) . toISOString ( ) ;
718- suppressedInputBytes += audio . byteLength ;
719- continue ;
720- }
721- lastInputAt = new Date ( ) . toISOString ( ) ;
722- lastInputBytes += audio . byteLength ;
723- emitTalkEvent ( {
724- type : "input.audio.delta" ,
725- turnId : ensureTalkTurn ( ) ,
726- payload : { byteLength : audio . byteLength } ,
727- } ) ;
728- bridge ?. sendAudio ( audio ) ;
729- }
730- if ( result . closed === true ) {
731- await stop ( ) ;
732- }
733- } catch ( error ) {
734- if ( ! stopped ) {
735- const message = formatErrorMessage ( error ) ;
736- consecutiveInputErrors += 1 ;
737- lastInputError = message ;
738- params . logger . warn (
739- `[google-meet] node audio input failed (${ consecutiveInputErrors } /5): ${ message } ` ,
740- ) ;
741- if ( consecutiveInputErrors >= 5 || / u n k n o w n b r i d g e I d | b r i d g e i s n o t o p e n / i. test ( message ) ) {
742- await stop ( ) ;
743- } else {
744- await new Promise ( ( resolve ) => setTimeout ( resolve , 250 ) ) ;
745- }
746- }
747- }
748- }
749- } ) ( ) ;
725+ const audioInputLoop = startGoogleMeetNodeAudioInputLoop ( {
726+ runtime : params . runtime ,
727+ nodeId : params . nodeId ,
728+ bridgeId : params . bridgeId ,
729+ logger : params . logger ,
730+ logPrefix : "node" ,
731+ isStopped : ( ) => stopped ,
732+ stop,
733+ isInputSuppressed : ( ) => Date . now ( ) < suppressInputUntil ,
734+ onAudio : ( audio ) => {
735+ emitTalkEvent ( {
736+ type : "input.audio.delta" ,
737+ turnId : ensureTalkTurn ( ) ,
738+ payload : { byteLength : audio . byteLength } ,
739+ } ) ;
740+ bridge ?. sendAudio ( audio ) ;
741+ } ,
742+ } ) ;
750743
751744 return {
752745 type : "node-command-pair" ,
@@ -759,20 +752,14 @@ export async function startNodeRealtimeAudioBridge(params: {
759752 getHealth : ( ) => ( {
760753 providerConnected : bridge ?. bridge . isConnected ( ) ?? false ,
761754 realtimeReady,
762- audioInputActive : lastInputBytes > 0 ,
755+ ... audioInputLoop . getHealth ( ) ,
763756 audioOutputActive : outputActivity . isActive ( ) ,
764- lastInputAt,
765757 lastOutputAt,
766- lastSuppressedInputAt,
767758 lastClearAt,
768- lastInputBytes,
769759 lastOutputBytes : outputActivity . snapshot ( ) . sinkAudioBytes ,
770- suppressedInputBytes,
771760 ...getGoogleMeetRealtimeTranscriptHealth ( transcript ) ,
772761 ...getGoogleMeetRealtimeEventHealth ( realtimeEvents ) ,
773762 recentTalkEvents : summarizeGoogleMeetTalkEvents ( recentTalkEvents ) ,
774- consecutiveInputErrors,
775- lastInputError,
776763 clearCount,
777764 bridgeClosed : stopped ,
778765 } ) ,
0 commit comments