Skip to content

Commit 73b7e09

Browse files
authored
feat (provider/gateway): add SSE support for video generation with heartbeat keep-alive (#12729)
## Background Video generation requests can take a long time to complete, and HTTP connections may timeout during processing. Server-Sent Events (SSE) with heartbeat keep-alive messages provide a way to maintain the connection while the video is being generated. ## Changes Added SSE support for video generation in the gateway provider with heartbeat keep-alive functionality. The implementation: - Sets `accept: 'text/event-stream'` header in video generation requests - Parses SSE events using `parseJsonEventStream` with a discriminated union schema for result and error events - Properly handles SSE error events by throwing `APICallError` with structured error data ## Manual Verification Ran example video scripts against a gateway server supporting SSE format response. ## Checklist - [x] Tests have been added / updated (for bug fixes / features) - [ ] Documentation has been added / updated (for bug fixes / features) - [x] A _patch_ changeset for relevant packages has been added (for bug fixes / features - run `pnpm changeset` in the project root) - [x] I have reviewed this pull request (self-review) ## Future Work ## Related Issues
1 parent 6f687ac commit 73b7e09

File tree

3 files changed

+217
-22
lines changed

3 files changed

+217
-22
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@ai-sdk/gateway': patch
3+
---
4+
5+
feat (provider/gateway): add SSE support for video generation with heartbeat keep-alive

packages/gateway/src/gateway-video-model.test.ts

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ describe('GatewayVideoModel', () => {
8787
>;
8888
providerMetadata?: Record<string, unknown>;
8989
} = {}) {
90+
const ssePayload = {
91+
type: 'result',
92+
videos,
93+
...(warnings && { warnings }),
94+
...(providerMetadata && { providerMetadata }),
95+
};
9096
server.urls['https://api.test.com/video-model'].response = {
91-
type: 'json-value',
92-
body: {
93-
videos,
94-
...(warnings && { warnings }),
95-
...(providerMetadata && { providerMetadata }),
96-
},
97+
type: 'stream-chunks',
98+
chunks: [`data: ${JSON.stringify(ssePayload)}\n\n`],
9799
};
98100
}
99101

@@ -586,6 +588,113 @@ describe('GatewayVideoModel', () => {
586588
).rejects.toThrow();
587589
});
588590

591+
it('should throw on SSE error event with correct message and status', async () => {
592+
server.urls['https://api.test.com/video-model'].response = {
593+
type: 'stream-chunks',
594+
chunks: [
595+
`data: ${JSON.stringify({
596+
type: 'error',
597+
message: 'Rate limit exceeded',
598+
errorType: 'rate_limit_exceeded',
599+
statusCode: 429,
600+
param: null,
601+
})}\n\n`,
602+
],
603+
};
604+
605+
await expect(
606+
createTestModel().doGenerate({
607+
prompt: 'Test prompt',
608+
image: undefined,
609+
n: 1,
610+
aspectRatio: undefined,
611+
resolution: undefined,
612+
duration: undefined,
613+
fps: undefined,
614+
seed: undefined,
615+
providerOptions: {},
616+
}),
617+
).rejects.toThrow('Rate limit exceeded');
618+
});
619+
620+
it('should throw on SSE error event with provider routing failure', async () => {
621+
server.urls['https://api.test.com/video-model'].response = {
622+
type: 'stream-chunks',
623+
chunks: [
624+
`data: ${JSON.stringify({
625+
type: 'error',
626+
message: 'All providers failed',
627+
errorType: 'internal_server_error',
628+
statusCode: 500,
629+
param: null,
630+
})}\n\n`,
631+
],
632+
};
633+
634+
await expect(
635+
createTestModel().doGenerate({
636+
prompt: 'Test prompt',
637+
image: undefined,
638+
n: 1,
639+
aspectRatio: undefined,
640+
resolution: undefined,
641+
duration: undefined,
642+
fps: undefined,
643+
seed: undefined,
644+
providerOptions: {},
645+
}),
646+
).rejects.toThrow('All providers failed');
647+
});
648+
649+
it('should throw on empty SSE stream', async () => {
650+
server.urls['https://api.test.com/video-model'].response = {
651+
type: 'stream-chunks',
652+
chunks: [],
653+
};
654+
655+
await expect(
656+
createTestModel().doGenerate({
657+
prompt: 'Test prompt',
658+
image: undefined,
659+
n: 1,
660+
aspectRatio: undefined,
661+
resolution: undefined,
662+
duration: undefined,
663+
fps: undefined,
664+
seed: undefined,
665+
providerOptions: {},
666+
}),
667+
).rejects.toThrow();
668+
});
669+
670+
it('should ignore SSE heartbeat comments and parse data event', async () => {
671+
const videos = [
672+
{ type: 'base64' as const, data: 'base64-1', mediaType: 'video/mp4' },
673+
];
674+
server.urls['https://api.test.com/video-model'].response = {
675+
type: 'stream-chunks',
676+
chunks: [
677+
':\n\n',
678+
':\n\n',
679+
`data: ${JSON.stringify({ type: 'result', videos })}\n\n`,
680+
],
681+
};
682+
683+
const result = await createTestModel().doGenerate({
684+
prompt: 'Test prompt',
685+
image: undefined,
686+
n: 1,
687+
aspectRatio: undefined,
688+
resolution: undefined,
689+
duration: undefined,
690+
fps: undefined,
691+
seed: undefined,
692+
providerOptions: {},
693+
});
694+
695+
expect(result.videos).toEqual(videos);
696+
});
697+
589698
it('should include providerOptions object in request body', async () => {
590699
prepareJsonResponse();
591700

packages/gateway/src/gateway-video-model.ts

Lines changed: 97 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import type {
66
SharedV3ProviderMetadata,
77
SharedV3Warning,
88
} from '@ai-sdk/provider';
9+
import { APICallError } from '@ai-sdk/provider';
910
import {
1011
combineHeaders,
1112
convertUint8ArrayToBase64,
12-
createJsonResponseHandler,
1313
createJsonErrorResponseHandler,
14+
parseJsonEventStream,
1415
postJsonToApi,
1516
resolve,
1617
type Resolvable,
@@ -61,17 +62,14 @@ export class GatewayVideoModel implements Experimental_VideoModelV3 {
6162
}> {
6263
const resolvedHeaders = await resolve(this.config.headers());
6364
try {
64-
const {
65-
responseHeaders,
66-
value: responseBody,
67-
rawValue,
68-
} = await postJsonToApi({
65+
const { responseHeaders, value: responseBody } = await postJsonToApi({
6966
url: this.getUrl(),
7067
headers: combineHeaders(
7168
resolvedHeaders,
7269
headers ?? {},
7370
this.getModelConfigHeaders(),
7471
await resolve(this.config.o11yHeaders),
72+
{ accept: 'text/event-stream' },
7573
),
7674
body: {
7775
prompt,
@@ -84,9 +82,82 @@ export class GatewayVideoModel implements Experimental_VideoModelV3 {
8482
...(providerOptions && { providerOptions }),
8583
...(image && { image: maybeEncodeVideoFile(image) }),
8684
},
87-
successfulResponseHandler: createJsonResponseHandler(
88-
gatewayVideoResponseSchema,
89-
),
85+
successfulResponseHandler: async ({
86+
response,
87+
url,
88+
requestBodyValues,
89+
}: {
90+
url: string;
91+
requestBodyValues: unknown;
92+
response: Response;
93+
}) => {
94+
if (response.body == null) {
95+
throw new APICallError({
96+
message: 'SSE response body is empty',
97+
url,
98+
requestBodyValues,
99+
statusCode: response.status,
100+
});
101+
}
102+
103+
const eventStream = parseJsonEventStream({
104+
stream: response.body,
105+
schema: gatewayVideoEventSchema,
106+
});
107+
108+
const reader = eventStream.getReader();
109+
const { done, value: parseResult } = await reader.read();
110+
reader.releaseLock();
111+
112+
if (done || !parseResult) {
113+
throw new APICallError({
114+
message: 'SSE stream ended without a data event',
115+
url,
116+
requestBodyValues,
117+
statusCode: response.status,
118+
});
119+
}
120+
121+
if (!parseResult.success) {
122+
throw new APICallError({
123+
message: 'Failed to parse video SSE event',
124+
cause: parseResult.error,
125+
url,
126+
requestBodyValues,
127+
statusCode: response.status,
128+
});
129+
}
130+
131+
const event = parseResult.value;
132+
133+
if (event.type === 'error') {
134+
throw new APICallError({
135+
message: event.message,
136+
statusCode: event.statusCode,
137+
url,
138+
requestBodyValues,
139+
responseHeaders: Object.fromEntries([...response.headers]),
140+
responseBody: JSON.stringify(event),
141+
data: {
142+
error: {
143+
message: event.message,
144+
type: event.errorType,
145+
param: event.param,
146+
},
147+
},
148+
});
149+
}
150+
151+
// event.type === 'result'
152+
return {
153+
value: {
154+
videos: event.videos,
155+
warnings: event.warnings,
156+
providerMetadata: event.providerMetadata,
157+
},
158+
responseHeaders: Object.fromEntries([...response.headers]),
159+
};
160+
},
90161
failedResponseHandler: createJsonErrorResponseHandler({
91162
errorSchema: z.any(),
92163
errorToMessage: data => data,
@@ -169,10 +240,20 @@ const gatewayVideoWarningSchema = z.discriminatedUnion('type', [
169240
}),
170241
]);
171242

172-
const gatewayVideoResponseSchema = z.object({
173-
videos: z.array(gatewayVideoDataSchema),
174-
warnings: z.array(gatewayVideoWarningSchema).optional(),
175-
providerMetadata: z
176-
.record(z.string(), providerMetadataEntrySchema)
177-
.optional(),
178-
});
243+
const gatewayVideoEventSchema = z.discriminatedUnion('type', [
244+
z.object({
245+
type: z.literal('result'),
246+
videos: z.array(gatewayVideoDataSchema),
247+
warnings: z.array(gatewayVideoWarningSchema).optional(),
248+
providerMetadata: z
249+
.record(z.string(), providerMetadataEntrySchema)
250+
.optional(),
251+
}),
252+
z.object({
253+
type: z.literal('error'),
254+
message: z.string(),
255+
errorType: z.string(),
256+
statusCode: z.number(),
257+
param: z.unknown().nullable(),
258+
}),
259+
]);

0 commit comments

Comments
 (0)