[Feature] WebSocket streaming audio input for ASR#22848
[Feature] WebSocket streaming audio input for ASR#22848SammLSH wants to merge 10 commits intosgl-project:mainfrom
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
236cea0 to
2f6c8b5
Compare
- Wrap session lifecycle in RealtimeConnection class; remove free-standing handler functions and the RealtimeTranscriptionSession dataclass. State and methods now live on a single per-connection instance, mirroring vLLM's RealtimeConnection structure. - Replace ad-hoc dict messages with Pydantic event models (SessionStartEvent, SessionStartedEvent, TranscriptDeltaEvent, TranscriptFinalEvent, ErrorEvent) for schema-validated send/receive. Pydantic now catches invalid_payload cases (wrong types, missing fields) automatically. - Drop RealtimeMessageType and RealtimeErrorCode enums in favor of bare strings, matching sglang's serving_base.py and vLLM's realtime/ connection.py conventions. - Module docstring now shows concrete wire examples and explicitly notes the protocol is sglang-specific (does not align with OpenAI Realtime / vLLM /v1/realtime). - Add note in _pcm_to_wav about cumulative re-encoding cost; deferred to M2 (RadixCache prefix caching). - Update test docstring to drop reference to renamed internal helper.
Reject non-positive values at startup so misconfiguration fails fast instead of producing immediately-closing sessions (value == 0) or disabling the backpressure cap entirely (value < 0, comparison never trips). Validation runs before the dummy-model short-circuit so it applies regardless of which model is being served.
| TranscriptionRequest(language=event.language) | ||
| if event.language | ||
| else TranscriptionRequest() |
There was a problem hiding this comment.
| TranscriptionRequest(language=event.language) | |
| if event.language | |
| else TranscriptionRequest() | |
| TranscriptionRequest(language=event.language) |
| adapter = self.serving._adapter | ||
| if not adapter.supports_chunked_streaming: |
There was a problem hiding this comment.
_adapter is a private member of OpenAIServingTranscriptionand we are now accessing it across classes.
I'm considering two alternative manners:
- expose a public
adapterproperty or create delegation to re-export some properties and methods of_adapter handle_realtime_transcriptionacceptstokenizer_managerand_adapterinstead ofOpenAIServingTranscription(this also gets rid of circular imports and TYPE_CHECKING.)
There was a problem hiding this comment.
option 2 sounds good to me. RealtimeConnection only uses tokenizer_manager and adapter from serving , and passing them in also lets us drop the TYPE_CHECKING import.
| tokenizer_manager, | ||
| adapter, |
There was a problem hiding this comment.
please add type hints
| def _pcm_to_wav(pcm_buffer: bytes) -> bytes: | ||
| """Wrap raw PCM16 mono 16 kHz bytes in a WAV container so | ||
| ``soundfile.read`` (called by the multimodal processor) can decode it. | ||
|
|
||
| Note: callers re-encode the entire cumulative buffer per chunk (M1 | ||
| constraint). Cost is bounded by ``--asr-max-buffer-seconds``; M2 plans | ||
| RadixCache prefix caching to remove this overhead. | ||
| """ | ||
| if not pcm_buffer: | ||
| raise ValueError("pcm_buffer is empty") | ||
| samples = np.frombuffer(pcm_buffer, dtype=np.int16) | ||
| buf = io.BytesIO() | ||
| sf.write(buf, samples, _SAMPLE_RATE, format="WAV") | ||
| return buf.getvalue() |
There was a problem hiding this comment.
here PCM is converted to WAV, but later the multimodal processor will need to decode WAV to a float32 ndarray. Potentially we can make load_audio accept a pre-decoded ndarray to skip the "round trip".
(maybe a follow-up PR)
There was a problem hiding this comment.
will do the ndarray path as a follow-up.
| "sampling parameters if available. Default is 'model'.", | ||
| ) | ||
| parser.add_argument( | ||
| "--asr-max-buffer-seconds", |
There was a problem hiding this comment.
this is a cap per session, and globally we need a --asr-max-concurrent-sessions
There was a problem hiding this comment.
will add it in the next commit
| This protocol is sglang-specific; it does not align with OpenAI's Realtime | ||
| API spec (vLLM's ``/v1/realtime``). Clients written against OpenAI Realtime | ||
| will not work as-is. See ``test/manual/models/test_qwen3_asr.py`` for a | ||
| reference client and the Pydantic event models below for full schema. |
There was a problem hiding this comment.
I think it will be nice if we can have an OpenAI-compatible protocol from now on, because it will influence some future designs.
cc. @mickqian
There was a problem hiding this comment.
agreed, i can update it in next commit
Move /v1/audio/transcriptions/stream to /v1/realtime and switch from the M1 session.start/binary-PCM protocol to OpenAI's Realtime transcription wire format. The shared inference driver is untouched, so HTTP SSE and WS still produce byte-identical transcripts; this is purely a transport rewrite. sglang deviations from the spec live in the module docstring: sample_rate is a sglang extension accepting 16/24/48 kHz with internal resample (OpenAI fixes audio/pcm at 24 kHz), turn_detection and noise_reduction must be null (no server-side VAD), include[] is dropped, model is echo-only. Addresses sgl-project#22848 review sgl-project#1 (decouple from OpenAIServingTranscription), sgl-project#4 (type hints), sgl-project#5 (--asr-max-concurrent-sessions, default 32). sgl-project#2 (skip PCM round trip) is deferred since it changes process_asr_chunk's input contract.
0725add to
3167135
Compare
Move /v1/audio/transcriptions/stream to /v1/realtime and switch from the M1 session.start/binary-PCM protocol to OpenAI's Realtime transcription wire format. Add --asr-max-concurrent-sessions (default 32) for the global session cap. sglang deviations from the spec are documented in the module docstring: sample_rate accepts 16/24/48 kHz with internal resample to 16 kHz, turn_detection and noise_reduction must be null, include[] is dropped, model is echo-only.
3167135 to
296fff3
Compare
5 PR-added HTTP fixture tests now use _assert_close_to_ref(max_wer=0.05) instead of len > 0. New test_websocket_rejects_unsupported_sample_rate covers the 22050 reject path. Spanish WS keeps native 48 kHz so the server-side resample runs (was bypassed by client pre-resample). Wraps the WS receive task in asyncio.wait_for(timeout=60) so a stuck inference fails fast with the deltas seen so far. Trims docstrings, section banners, and assertIn scaffolding. 20/20 pass, ~53s.
isinstance guards now reject non-dict session.audio, session.audio.input, and session.audio.input.transcription with invalid_value instead of crashing into the unrecoverable handler. Tests added for each. buffer_overflow now reports error_type=server_error (was rate_limit_exceeded, which would have misled clients into backoff when the right action is reconnect). Extracted _format_error_event so _send_error and the pre-connection too_many_sessions path share one error-event schema. Also makes _send_error's param/error_type keyword-only, drops a stale "per spec" claim from the commit-failure comment, and notes the long-frame single-inference behavior in _on_audio_append.
Replaces ad-hoc isinstance guards in _on_session_update / _on_audio_append
with Pydantic models for SessionUpdate / InputAudioBufferAppend / Commit /
Clear. ValidationError is caught in _run_loop and emitted as invalid_value
with the joined loc as param, preserving the per-field error shape the
existing reject tests expect (session.audio, session.audio.input,
session.audio.input.transcription, audio).
Also closes the WebSocket with code 1009 ("message too big") after
buffer_overflow so clients can distinguish session-resource exhaustion
from a normal close.
Motivation
Implements M1 of the RFC in #22474.
PR #22089 shipped chunked streaming output for Qwen3-ASR via
POST /v1/audio/transcriptions?stream=true(SSE over an HTTP upload), which assumes the entire audio file is known up-front. Real-time use cases (live captioning, voice assistants, meeting transcription) need the opposite direction: the server accepts audio as it arrives and pushes partial transcripts back as the speaker talks.Modifications
New WebSocket endpoint
WS /v1/audio/transcriptions/stream(registered inhttp_server.py)session.start(JSON) → binary PCM16 frames →session.end(JSON)session.started/transcript.delta(per word) /transcript.final/errorsession.startaccepts optionalmodelandlanguagefields; any other format will be rejected at PCM-frame validation (invalid_audio_formatif frame byte length is not a multiple of 2, i.e. truncated int16). A future milestone can add anaudio_formatnegotiation field without breaking the existing wire format.--asr-max-buffer-secondsCLI flag (default 60s, validated> 0at startup). If accumulated server-side audio exceeds the cap, the server sends abuffer_overflowerror and closes the socket. Below the cap, the single-task coroutine alternates receive and inference, so while a chunk is inferring the client experiences standard TCP-level backpressure (ws.sendblocks on a full socket buffer). No silent drop.session.endis therefore serialized after any in-flight chunk.Note on HTTP SSE path: switching
get_prefix_text()fromconfirmed_texttoemitted_text(seestreaming_asr.pyrow below) also incidentally fixes a latent prompt-prefix continuation issue in the HTTP SSE path from #22089, whereconfirmed_textcould roll back mid-sentence and cause the model to re-emit from scratch on long English audio. Regression covered by the existing 8 HTTP tests.Architecture
The WS transport uses an OO
RealtimeConnectionclass (mirroring vLLM'sRealtimeConnectioninvllm/entrypoints/openai/realtime/connection.py) — one instance per WebSocket connection, holding session state and exposing lifecycle methods. Wire messages are typed Pydantic models so schema validation, type-checking, and JSON serialization flow from a single source of truth.The HTTP SSE and WebSocket paths both route through a shared inference driver, keeping streaming state in
StreamingASRStateat the adapter layer rather than lifting it into the transport layer.flowchart TB HTTP["HTTP<br/>(file upload)"] WS["WebSocket<br/>(live PCM frames)"] subgraph serving["OpenAIServingTranscription"] SSE["_generate_chunked_asr_stream"] WSH["handle_websocket<br/>(delegator)"] end PAC["process_asr_chunk<br/>(shared inference driver)"] subgraph state["StreamingASRState"] CT["confirmed_text<br/>chunk-local rollback, delta diff basis"] ET["emitted_text<br/>monotonic accumulator, prompt prefix source"] UF["update() / finalize()"] end HTTP --> SSE WS --> WSH SSE --> PAC WSH --> PAC PAC --> stateWebSocket session lifecycle
Inside
serving_transcription_websocket.py, one coroutine alternates receive and inference. Each PCM batch ofchunk_size_bytestriggers exactly one inference pass through the shared driver.sequenceDiagram autonumber participant C as Client participant H as RealtimeConnection participant I as process_asr_chunk participant S as StreamingASRState C->>H: session.start (JSON) H->>H: _init<br/>accept + adapter capability check H-->>C: session.started loop per chunk_size_bytes of new audio C->>H: binary PCM16 frames H->>H: _on_audio_frame<br/>accumulate into pcm_buffer H->>I: _run_inference(_pcm_to_wav(buffer)) I->>S: update() → delta I-->>H: delta str H-->>C: transcript.delta (per word) end C->>H: session.end (JSON) H->>I: _run_inference(is_last=True) I->>S: finalize() → tail I-->>H: tail delta H-->>C: transcript.final H->>C: close socketFiles touched
python/sglang/srt/entrypoints/http_server.pyWS /v1/audio/transcriptions/streamroutepython/sglang/srt/entrypoints/openai/serving_transcription.pyprocess_asr_chunkintostreaming_asr.pyso HTTP and WS share the inference driver; addhandle_websocketdelegatorpython/sglang/srt/entrypoints/openai/serving_transcription_websocket.pyRealtimeConnectionclass encapsulating per-session state and lifecycle methods (_init/_run_loop/_on_session_start/_on_session_end/_on_audio_frame/_run_inference); Pydantic event models (SessionStartEvent,SessionStartedEvent,TranscriptDeltaEvent,TranscriptFinalEvent,ErrorEvent) as protocol schema source-of-truth; private_pcm_to_wavadapter for protocol-fixed PCM16/16kHz/mono; bare-string error codes matching sglang/vLLM convention (no enum)python/sglang/srt/entrypoints/openai/streaming_asr.pyStreamingASRStatewith anemitted_textaccumulator used as the prompt prefix inget_prefix_text()(previouslyconfirmed_text); also extractprocess_asr_chunkas the shared HTTP/WS inference driver and add anormalize_whitespacehelper for batched-inference punctuation jitterpython/sglang/srt/server_args.pyasr_max_buffer_seconds: int = 60+ CLI flag +_handle_asr_validation()startup check (rejects≤ 0to fail fast on misconfiguration)test/manual/models/test_qwen3_asr.pyEXPECTED_TRANSCRIPTSreference dict,_werLevenshtein helper, WS assertions via_assert_close_to_ref(WER ≤ 0.15)Accuracy Tests
Manual end-to-end verification against HTTP non-streaming (one-shot) ground truth across 7 audio fixtures × 3 paths (HTTP JSON / HTTP SSE / WebSocket), all captured against a single live
sglang.launch_serverrunningQwen/Qwen3-ASR-0.6Bon a GH200:Oh yeah, yeah. He wasn't even that big when I started listening to him. But and his solo music didn't do overly well, but he did very well when he started writing for other people.Uh huh. Oh yeah, yeah. He wasn't even that big when I started listening to him, but and his solo music didn't do overly well, but he did very well when he started writing for other people.Uh huh. Oh yeah, yeah. He wasn't even that big when I started listening to him, but and his solo music didn't do overly well, but he did very well when he started writing for other people.Uh huh.prefix hallucination (vocal-fry intro, chunked-inference artifact from #22089) +But→butcasing +.→,mid-sentence drift. 37 WS deltas. SSE ≡ WS byte-for-byte.甚至出现交易几乎停滞的情况。甚至出现交易几乎停滞的情况。甚至出现交易几乎停滞的情况。I have a dream that one day this nation will rise up and live out the true meaning of its creed.I have a dream that one day this nation will rise up and live out the true meaning of its creed.I have a dream that one day this nation will rise up and live out the true meaning of its creed.He hoped there would be stew for dinner—turnips and carrots and bruised potatoes and fat mutton pieces—to be ladled out in thick peppered flour-fatted sauce.He hoped there would be stew for dinner: turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick peppered flour-fatted sauce.He hoped there would be stew for dinner: turnips and carrots and bruised potatoes and fat mutton pieces to be ladled out in thick peppered flour-fatted sauce.—→:+ second—dropped. Normalized WER 0.000. 27 WS deltas. SSE ≡ WS byte-for-byte.y en las ramas medio sumergidas revoloteaban algunos pájaros de químico y legendario plumajeY en las ramas medio sumergidas revoloteaban algunos pájaros de químico y legendario plumaje.Y en las ramas medio sumergidas revoloteaban algunos pájaros de químico y legendario plumaje.मिर्ची में कितने विभिन्न प्रजातियाँ हैंमिर्ची में कितने विभिन्न प्रजातियाँ हैंमिर्ची में कितने विभिन्न प्रजातियाँ हैंI know kung fu.I know kung fu.I know kung fu.Cross-path consistency: on every fixture the HTTP-SSE output and the WebSocket output are byte-for-byte identical — you can read the SSE and WS columns and see this for yourself. This is the observable consequence of the two transports routing through the same
process_asr_chunkdriver over the sameStreamingASRState; the transport layer introduces no additional drift beyond what chunked inference itself produces vs. one-shot.WER threshold rationale: WS assertions use
WER ≤ 0.15. This tolerates the chunked-inference boundary artifacts inherited from #22089 (e.g. theUh huh.prefix on the EN clip) while still catching real regressions — the maximum observed WER across these 7 fixtures is 0.054, so the 0.15 threshold leaves ~3× headroom for flakiness without hiding regressions.5 consecutive stability runs of the 19-test suite passed (all 19 tests green each run), with byte-identical WS transcripts across runs on a GH200. Median wall-clock 52.3s per run once the model weights are cached.
Streaming verification: on the 15s EN audio in realtime-pacing mode (client sends PCM at 0.5s/frame = wall-clock rate), ~30 out of 37
transcript.deltaevents arrive at the client before the client sendssession.end, confirming true incremental server push (not batch-at-end).M1 completion checklist (from RFC)
/v1/audio/transcriptions/streamsession.start/ binary PCM /session.end⇄session.started/transcript.delta/transcript.final/errorinvalid_json,invalid_payload,invalid_state,invalid_audio_format,unknown_message,buffer_overflow,unsupported_model,internal_error--asr-max-buffer-secondsbackpressureStreamingASRState,process_asr_chunk,TranscriptionAdapterGenerateReqInput.stream=True, forced-alignment timestamps): out of scopeKnown limitations
Listed here so reviewers don't have to re-discover them:
"Uh huh."prefix hallucination because the first 2s chunk sees only vocal fry / silence, and the append-only delta protocol can't retract it later. WER ~0.054.transcript.deltaevent becausestr.split()can't word-tokenize them. Final transcript is still correct. TODO already inStreamingASRStatedocstring.mm_utils.py:_adjust_embedding_length. Upstream bug, unrelated to this PR.嗯哼。). Short-clip test uses 3s MP3 to avoid this.Test plan
Tests live under
test/manual/because they require downloading theQwen/Qwen3-ASR-0.6Bcheckpoint (~1.2GB) and a GPU. They are runnable locally with a single command and complete in ~52s on one H100.Automated (single command)
cd /path/to/sglang python test/manual/models/test_qwen3_asr.pyThe file uses
popen_launch_serverto spin up its ownsglang.launch_server, runs 19 tests, and tears the server down.Test breakdown:
has_new_audio → is_last=Truepath), 4s chunk-boundary clip (exact multiple ofchunk_size_bytes→ hits_on_session_end'selif state.full_transcriptflush-tail branch without running another inference)WS assertions use
_assert_close_to_ref(WER ≤ 0.15)againstEXPECTED_TRANSCRIPTS(a dict of canonical transcripts captured from one-shot non-streaming inference)._wernormalizes case/punctuation and falls back to character-level comparison for CJK.Manual (single audio, step-by-step)
Launch the server:
HTTP non-streaming (baseline / ground truth):
curl -s http://127.0.0.1:30000/v1/audio/transcriptions \ -F file=@audio.wav -F model=qwen3-asr | jq -r .textHTTP SSE streaming:
WebSocket streaming — save as
wsasr.pyand runpython wsasr.py audio.wav [language]. Sends PCM in 0.5s frames at wall-clock realtime pacing while concurrently readingtranscript.deltaevents; each line is prefixed witht=<sec>from session start so you can see deltas arriving while audio is still uploading.Error-path verification
Manually tested all 8 error codes against a running server:
invalid_json,invalid_payload,invalid_state× 3 variants,invalid_audio_format,unknown_message,buffer_overflow,unsupported_model,internal_error. All return the documented error event and close the socket.Speed Tests and Profiling
No impact on inference speed — this PR is a thin WebSocket transport layer on top of unchanged chunked inference. Per-chunk latency is bound by the existing
chunk_size_sec(2s) + model inference time (~0.5–1.5s on H100 for Qwen3-ASR-0.6B). No new CUDA kernels, no new memory patterns, no scheduler changes.Checklist
test/manual/models/test_qwen3_asr.py)Review and Merge Process
Related
TranscriptionAdapterrefactor, merged)