@@ -26,6 +26,47 @@ func NewHandler(provider core.RoutableProvider, logger auditlog.LoggerInterface)
2626 }
2727}
2828
29+ // handleStreamingResponse handles SSE streaming responses for both ChatCompletion and Responses endpoints.
30+ // It wraps the stream with audit logging and sets appropriate SSE headers.
31+ func (h * Handler ) handleStreamingResponse (c echo.Context , streamFn func () (io.ReadCloser , error )) error {
32+ // Mark as streaming so middleware doesn't log (StreamLogWrapper handles it)
33+ auditlog .MarkEntryAsStreaming (c , true )
34+ auditlog .EnrichEntryWithStream (c , true )
35+
36+ stream , err := streamFn ()
37+ if err != nil {
38+ return handleError (c , err )
39+ }
40+
41+ // Get entry from context and wrap stream for logging
42+ entry := auditlog .GetStreamEntryFromContext (c )
43+ streamEntry := auditlog .CreateStreamEntry (entry )
44+ if streamEntry != nil {
45+ streamEntry .StatusCode = http .StatusOK // Streaming always starts with 200 OK
46+ }
47+ wrappedStream := auditlog .WrapStreamForLogging (stream , h .logger , streamEntry , c .Request ().URL .Path )
48+ defer func () {
49+ _ = wrappedStream .Close () //nolint:errcheck
50+ }()
51+
52+ c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
53+ c .Response ().Header ().Set ("Cache-Control" , "no-cache" )
54+ c .Response ().Header ().Set ("Connection" , "keep-alive" )
55+
56+ // Capture response headers on stream entry AFTER setting them
57+ if streamEntry != nil && streamEntry .Data != nil {
58+ streamEntry .Data .ResponseHeaders = map [string ]string {
59+ "Content-Type" : "text/event-stream" ,
60+ "Cache-Control" : "no-cache" ,
61+ "Connection" : "keep-alive" ,
62+ }
63+ }
64+
65+ c .Response ().WriteHeader (http .StatusOK )
66+ _ , _ = io .Copy (c .Response ().Writer , wrappedStream )
67+ return nil
68+ }
69+
2970// ChatCompletion handles POST /v1/chat/completions
3071func (h * Handler ) ChatCompletion (c echo.Context ) error {
3172 var req core.ChatRequest
@@ -42,43 +83,9 @@ func (h *Handler) ChatCompletion(c echo.Context) error {
4283
4384 // Handle streaming: proxy the raw SSE stream
4485 if req .Stream {
45- // Mark as streaming so middleware doesn't log (StreamLogWrapper handles it)
46- auditlog .MarkEntryAsStreaming (c , true )
47- auditlog .EnrichEntryWithStream (c , true )
48-
49- stream , err := h .provider .StreamChatCompletion (c .Request ().Context (), & req )
50- if err != nil {
51- return handleError (c , err )
52- }
53-
54- // Get entry from context and wrap stream for logging
55- entry := auditlog .GetStreamEntryFromContext (c )
56- streamEntry := auditlog .CreateStreamEntry (entry )
57- if streamEntry != nil {
58- streamEntry .StatusCode = http .StatusOK // Streaming always starts with 200 OK
59- }
60- wrappedStream := auditlog .WrapStreamForLogging (stream , h .logger , streamEntry , c .Request ().URL .Path )
61- defer func () {
62- _ = wrappedStream .Close () //nolint:errcheck
63- }()
64-
65- c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
66- c .Response ().Header ().Set ("Cache-Control" , "no-cache" )
67- c .Response ().Header ().Set ("Connection" , "keep-alive" )
68-
69- // Capture response headers on stream entry AFTER setting them
70- if streamEntry != nil && streamEntry .Data != nil {
71- streamEntry .Data .ResponseHeaders = map [string ]string {
72- "Content-Type" : "text/event-stream" ,
73- "Cache-Control" : "no-cache" ,
74- "Connection" : "keep-alive" ,
75- }
76- }
77-
78- c .Response ().WriteHeader (http .StatusOK )
79-
80- _ , _ = io .Copy (c .Response ().Writer , wrappedStream )
81- return nil
86+ return h .handleStreamingResponse (c , func () (io.ReadCloser , error ) {
87+ return h .provider .StreamChatCompletion (c .Request ().Context (), & req )
88+ })
8289 }
8390
8491 // Non-streaming
@@ -125,43 +132,9 @@ func (h *Handler) Responses(c echo.Context) error {
125132
126133 // Handle streaming: proxy the raw SSE stream
127134 if req .Stream {
128- // Mark as streaming so middleware doesn't log (StreamLogWrapper handles it)
129- auditlog .MarkEntryAsStreaming (c , true )
130- auditlog .EnrichEntryWithStream (c , true )
131-
132- stream , err := h .provider .StreamResponses (c .Request ().Context (), & req )
133- if err != nil {
134- return handleError (c , err )
135- }
136-
137- // Get entry from context and wrap stream for logging
138- entry := auditlog .GetStreamEntryFromContext (c )
139- streamEntry := auditlog .CreateStreamEntry (entry )
140- if streamEntry != nil {
141- streamEntry .StatusCode = http .StatusOK // Streaming always starts with 200 OK
142- }
143- wrappedStream := auditlog .WrapStreamForLogging (stream , h .logger , streamEntry , c .Request ().URL .Path )
144- defer func () {
145- _ = wrappedStream .Close () //nolint:errcheck
146- }()
147-
148- c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
149- c .Response ().Header ().Set ("Cache-Control" , "no-cache" )
150- c .Response ().Header ().Set ("Connection" , "keep-alive" )
151-
152- // Capture response headers on stream entry AFTER setting them
153- if streamEntry != nil && streamEntry .Data != nil {
154- streamEntry .Data .ResponseHeaders = map [string ]string {
155- "Content-Type" : "text/event-stream" ,
156- "Cache-Control" : "no-cache" ,
157- "Connection" : "keep-alive" ,
158- }
159- }
160-
161- c .Response ().WriteHeader (http .StatusOK )
162-
163- _ , _ = io .Copy (c .Response ().Writer , wrappedStream )
164- return nil
135+ return h .handleStreamingResponse (c , func () (io.ReadCloser , error ) {
136+ return h .provider .StreamResponses (c .Request ().Context (), & req )
137+ })
165138 }
166139
167140 // Non-streaming
0 commit comments