@@ -17,7 +17,7 @@ private func makeRealtimeAudioTapBlock(
1717 inputSampleRate: inputSampleRate,
1818 targetSampleRate: targetSampleRate)
1919 guard !encoded. isEmpty else { return }
20- let timestampMs = ProcessInfo . processInfo. systemUptime * 1000
20+ let timestampMs = ( ProcessInfo . processInfo. systemUptime * 1000 ) . rounded ( )
2121 let rms = RealtimeTalkRelaySession . rmsLevel ( buffer: buffer)
2222 onAudio ( encoded, timestampMs, rms)
2323 }
@@ -125,15 +125,24 @@ final class RealtimeTalkRelaySession {
125125 private var eventTask : Task < Void , Never > ?
126126 private var outputTask : Task < Void , Never > ?
127127 private var outputContinuation : AsyncThrowingStream < Data , Error > . Continuation ?
128+ private var outputIdleTask : Task < Void , Never > ?
129+ private var outputSessionId = 0
130+ private var pendingOutputChunks : [ Data ] = [ ]
131+ private var pendingOutputDone = false
128132 private var audioSender : RealtimeAudioSender ?
129133 private var isClosed = false
130134 private var isOutputPlaying = false
131135 private var outputStartedAtMs : Double ?
136+ private var outputPlaybackExpectedEndMs : Double = 0
132137 private var lastBargeInAtMs : Double = 0
133138 private var micLogFrameCount = 0
134139 private var micLogByteCount = 0
135140 private var micLogMaxRms : Float = 0
136141 private var lastMicLogAtMs : Double = 0
142+ private var suppressedEchoFrameCount = 0
143+ private var suppressedEchoByteCount = 0
144+ private var suppressedEchoMaxRms : Float = 0
145+ private var lastSuppressedEchoLogAtMs : Double = 0
137146 private var outputAudioChunkCount = 0
138147 private var outputAudioByteCount = 0
139148
@@ -168,7 +177,6 @@ final class RealtimeTalkRelaySession {
168177 let eventStream = await self . gateway. subscribeServerEvents ( bufferingNewest: 200 )
169178 self . startEventPump ( stream: eventStream)
170179 self . configureAudioContract ( result. audio)
171- self . startOutputPlayback ( )
172180 try self . startMicrophonePump ( )
173181 self . onStatus ( " Listening (Realtime) " )
174182 } catch {
@@ -219,7 +227,6 @@ final class RealtimeTalkRelaySession {
219227
220228 func cancelOutput( reason: String = " user " ) {
221229 self . stopOutputPlayback ( )
222- self . startOutputPlayback ( )
223230 guard let relaySessionId else { return }
224231 Task { [ gateway] in
225232 let payload : [ String : Any ] = [
@@ -306,12 +313,18 @@ final class RealtimeTalkRelaySession {
306313 let data = Data ( base64Encoded: base64)
307314 else { return }
308315 self . recordOutputAudioChunk ( byteCount: data. count)
309- self . markOutputAudioStarted ( nowMs: ProcessInfo . processInfo. systemUptime * 1000 )
316+ self . markOutputAudioStarted ( byteCount : data . count , nowMs: ProcessInfo . processInfo. systemUptime * 1000 )
310317 self . onSpeakingChanged ( true )
318+ if self . outputContinuation == nil , self . outputTask != nil {
319+ self . pendingOutputChunks. append ( data)
320+ return
321+ }
322+ self . ensureOutputPlaybackStarted ( )
311323 self . outputContinuation? . yield ( data)
324+ case " audioDone " :
325+ self . finishOutputPlaybackStream ( )
312326 case " clear " :
313327 self . stopOutputPlayback ( )
314- self . startOutputPlayback ( )
315328 case " transcript " :
316329 self . handleTranscriptEvent ( payload)
317330 case " toolCall " :
@@ -337,11 +350,16 @@ final class RealtimeTalkRelaySession {
337350 " talk realtime audio: chunks= \( self . outputAudioChunkCount) bytes= \( self . outputAudioByteCount) " )
338351 }
339352
340- private func markOutputAudioStarted( nowMs: Double ) {
353+ private func markOutputAudioStarted( byteCount : Int , nowMs: Double ) {
341354 if !self . isOutputPlaying {
342355 self . outputStartedAtMs = nowMs
356+ self . outputPlaybackExpectedEndMs = nowMs
343357 }
344358 self . isOutputPlaying = true
359+ let bytesPerSecond = max ( 1 , self . outputSampleRateHz * Double( MemoryLayout< Int16> . size) )
360+ let chunkDurationMs = ( Double ( byteCount) / bytesPerSecond) * 1000
361+ self . outputPlaybackExpectedEndMs = max ( nowMs, self . outputPlaybackExpectedEndMs) + chunkDurationMs
362+ self . scheduleOutputPlaybackIdle ( expectedEndMs: self . outputPlaybackExpectedEndMs)
345363 }
346364
347365 private func handleInputLevelDuringOutput( _ rms: Float , timestampMs: Double ) {
@@ -537,14 +555,25 @@ final class RealtimeTalkRelaySession {
537555 { [ weak self, audioSender = self . audioSender] encoded, timestampMs, rms in
538556 guard let audioSender else { return }
539557 Task {
540- await MainActor . run { [ weak self] in
541- self ? . recordMicrophoneFrame ( byteCount: encoded. count, rms: rms, timestampMs: timestampMs)
542- }
543- if rms >= Self . bargeInRmsThreshold {
544- await MainActor . run { [ weak self] in
545- self ? . handleInputLevelDuringOutput ( rms, timestampMs: timestampMs)
558+ let shouldSend = await MainActor . run { [ weak self] in
559+ guard let self, !self . isClosed else { return false }
560+ self . recordMicrophoneFrame ( byteCount: encoded. count, rms: rms, timestampMs: timestampMs)
561+ self . refreshOutputPlaybackState ( timestampMs: timestampMs)
562+ if self . isOutputPlaying {
563+ if self . shouldSuppressMicrophoneDuringOutput ( ) {
564+ self . recordSuppressedOutputEchoFrame (
565+ byteCount: encoded. count,
566+ rms: rms,
567+ timestampMs: timestampMs)
568+ return false
569+ }
570+ if rms >= Self . bargeInRmsThreshold {
571+ self . handleInputLevelDuringOutput ( rms, timestampMs: timestampMs)
572+ }
546573 }
574+ return true
547575 }
576+ guard shouldSend else { return }
548577 guard let message = await audioSender. send ( encoded, timestampMs: timestampMs) else { return }
549578 await MainActor . run { [ weak self] in
550579 guard let self, !self . isClosed else { return }
@@ -561,6 +590,13 @@ final class RealtimeTalkRelaySession {
561590 try self . audioEngine. start ( )
562591 }
563592
593+ private func shouldSuppressMicrophoneDuringOutput( ) -> Bool {
594+ let outputs = AVAudioSession . sharedInstance ( ) . currentRoute. outputs
595+ // Built-in speaker output bleeds into the microphone even in voiceChat mode; keep the
596+ // realtime provider from treating its own speech as user input. Headsets keep barge-in.
597+ return outputs. contains { $0. portType == . builtInSpeaker }
598+ }
599+
564600 private func recordMicrophoneFrame( byteCount: Int , rms: Float , timestampMs: Double ) {
565601 guard !self . isClosed else { return }
566602 self . micLogFrameCount += 1
@@ -576,42 +612,127 @@ final class RealtimeTalkRelaySession {
576612 self . micLogMaxRms = 0
577613 }
578614
615+ private func recordSuppressedOutputEchoFrame( byteCount: Int , rms: Float , timestampMs: Double ) {
616+ self . suppressedEchoFrameCount += 1
617+ self . suppressedEchoByteCount += byteCount
618+ self . suppressedEchoMaxRms = max ( self . suppressedEchoMaxRms, rms)
619+ guard timestampMs - self . lastSuppressedEchoLogAtMs >= 1000 else { return }
620+ self . lastSuppressedEchoLogAtMs = timestampMs
621+ let maxRms = String ( format: " %.4f " , Double ( self . suppressedEchoMaxRms) )
622+ GatewayDiagnostics . log (
623+ " talk realtime mic suppressed during output: "
624+ + " buffers= \( self . suppressedEchoFrameCount) "
625+ + " bytes= \( self . suppressedEchoByteCount) maxRms= \( maxRms) " )
626+ self . suppressedEchoFrameCount = 0
627+ self . suppressedEchoByteCount = 0
628+ self . suppressedEchoMaxRms = 0
629+ }
630+
579631 private func stopMicrophonePump( ) {
580632 self . audioEngine. inputNode. removeTap ( onBus: 0 )
581633 self . audioEngine. stop ( )
582634 }
583635
584- private func startOutputPlayback( ) {
585- self . stopOutputPlayback ( )
636+ private func ensureOutputPlaybackStarted( ) {
637+ guard self . outputContinuation == nil , self . outputTask == nil else { return }
638+ self . outputSessionId += 1
639+ let sessionId = self . outputSessionId
586640 let stream = AsyncThrowingStream < Data , Error > { continuation in
587641 self . outputContinuation = continuation
588642 }
589643 self . outputTask = Task { [ weak self] in
590644 guard let self else { return }
591645 let result = await self . pcmPlayer. play ( stream: stream, sampleRate: self . outputSampleRateHz)
592646 await MainActor . run {
647+ guard self . outputSessionId == sessionId else { return }
648+ self . outputTask = nil
649+ self . outputContinuation = nil
593650 if !result. finished, let interruptedAt = result. interruptedAt {
594651 self . logger. info ( " realtime output interrupted at \( interruptedAt, privacy: . public) s " )
595652 }
596653 self . markOutputPlaybackFinished ( )
654+ self . startPendingOutputPlaybackIfNeeded ( )
655+ }
656+ }
657+ }
658+
659+ private func finishOutputPlaybackStream( ) {
660+ guard let continuation = self . outputContinuation else {
661+ if self . outputTask != nil , !self . pendingOutputChunks. isEmpty {
662+ self . pendingOutputDone = true
597663 }
664+ return
598665 }
666+ continuation. finish ( )
667+ self . outputContinuation = nil
599668 }
600669
601- private func markOutputPlaybackFinished( ) {
670+ private func startPendingOutputPlaybackIfNeeded( ) {
671+ guard !self . pendingOutputChunks. isEmpty else {
672+ self . pendingOutputDone = false
673+ return
674+ }
675+ let chunks = self . pendingOutputChunks
676+ let shouldFinish = self . pendingOutputDone
677+ self . pendingOutputChunks = [ ]
678+ self . pendingOutputDone = false
679+ self . ensureOutputPlaybackStarted ( )
680+ for chunk in chunks {
681+ self . markOutputAudioStarted ( byteCount: chunk. count, nowMs: ProcessInfo . processInfo. systemUptime * 1000 )
682+ self . onSpeakingChanged ( true )
683+ self . outputContinuation? . yield ( chunk)
684+ }
685+ if shouldFinish {
686+ self . finishOutputPlaybackStream ( )
687+ }
688+ }
689+
690+ private func scheduleOutputPlaybackIdle( expectedEndMs: Double ) {
691+ self . outputIdleTask? . cancel ( )
692+ let nowMs = ProcessInfo . processInfo. systemUptime * 1000
693+ let idleDelayMs = max ( 350 , expectedEndMs - nowMs + 500 )
694+ self . outputIdleTask = Task { [ weak self] in
695+ try ? await Task . sleep ( nanoseconds: UInt64 ( idleDelayMs * 1_000_000 ) )
696+ guard !Task. isCancelled else { return }
697+ await MainActor . run { [ weak self] in
698+ guard let self, !self . isClosed else { return }
699+ let nowMs = ProcessInfo . processInfo. systemUptime * 1000
700+ self . refreshOutputPlaybackState ( timestampMs: nowMs, cancelIdleTask: false )
701+ }
702+ }
703+ }
704+
705+ private func refreshOutputPlaybackState( timestampMs: Double , cancelIdleTask: Bool = true ) {
706+ guard self . isOutputPlaying else { return }
707+ guard timestampMs >= self . outputPlaybackExpectedEndMs + 500 else { return }
708+ self . markOutputPlaybackFinished ( cancelIdleTask: cancelIdleTask)
709+ }
710+
711+ private func markOutputPlaybackFinished( cancelIdleTask: Bool = true ) {
712+ if cancelIdleTask {
713+ self . outputIdleTask? . cancel ( )
714+ self . outputIdleTask = nil
715+ }
602716 self . isOutputPlaying = false
603717 self . outputStartedAtMs = nil
718+ self . outputPlaybackExpectedEndMs = 0
604719 self . onSpeakingChanged ( false )
605720 }
606721
607722 private func stopOutputPlayback( ) {
723+ self . outputSessionId += 1
608724 self . outputContinuation? . finish ( )
609725 self . outputContinuation = nil
610726 self . outputTask? . cancel ( )
611727 self . outputTask = nil
728+ self . outputIdleTask? . cancel ( )
729+ self . outputIdleTask = nil
730+ self . pendingOutputChunks = [ ]
731+ self . pendingOutputDone = false
612732 _ = self . pcmPlayer. stop ( )
613733 self . isOutputPlaying = false
614734 self . outputStartedAtMs = nil
735+ self . outputPlaybackExpectedEndMs = 0
615736 self . onSpeakingChanged ( false )
616737 }
617738
@@ -684,7 +805,7 @@ final class RealtimeTalkRelaySession {
684805
685806extension RealtimeTalkRelaySession {
686807 func _test_markOutputAudioStarted( nowMs: Double ) {
687- self . markOutputAudioStarted ( nowMs: nowMs)
808+ self . markOutputAudioStarted ( byteCount : 4800 , nowMs: nowMs)
688809 }
689810
690811 func _test_markOutputPlaybackFinished( ) {
0 commit comments