Conversation
|
Hey @sor4chi thanks for taking this up! I'd added a few more convenience methods to my own // Lets you easily emit data in JSON lines format
log(obj: any) {
return this.writeln(JSON.stringify(obj)).flush();
}
// Lets you send a ReadableStream directly, e.g. from a subrequest
async pipe(body: ReadableStream) {
this.writer.releaseLock();
await body.pipeTo(this.writable, { preventClose: true });
this.writer = this.writable.getWriter();
}My only thought is whether this should be called |
src/utils/stream.ts
Outdated
| } | ||
|
|
||
| write(str: string): StreamingApi { | ||
| this.buffer.push(str) |
There was a problem hiding this comment.
Does it make sense to buffer here into memory or just write directly to the stream? If the latter, I don't think you'd even need .flush() at all?
There was a problem hiding this comment.
Hi @geelen, thanks for your review.
I found a flush function in @geelen 's reference implementation, so I implemented this to create a buffer until then so that the stream is written at the timing when the flush actually occurs.
Surely, I think it is unnecessary because the user can buffer the stream before writing to it.
Would it be OK to send it when write or writeln is called?
There was a problem hiding this comment.
Yeah, I was testing something at the time with Cloudflare Workers and found that it was only streaming in 4096 byte chunks, so I made flush send a bunch of bytes of whitespace to force it to send 🤫
I think I was doing something wrong, though, since when I tried again, chunks were streamed much more rapidly so I think it's fine to send them as soon as write is called. A user can always buffer on their own side before sending it...
There was a problem hiding this comment.
I see, I'll fix this in the next commit!
|
Hi @sor4chi! Awesome! This will be set to be a highlight feature for our next minor update, I haven't looked closely the implementation yet, but my primary concerns are:
I'll be reviewing your implementation more closely later. Either way, this is an important feature, so take our time making it. |
|
Great job! There are two points I'd like us to consider:
|
|
@yusukebe Is it like leaving it up to the user whether to add |
Yes, we have to keep it because I think it should be
I've suggested that because of the compatibility for Line 313 in 9cb6b37 I'm planning making |
|
Ah, we can refer to the Vercel's AI sdk: https://sdk.vercel.ai/docs |
|
Back-pressure and cancellation management (detailed in Vercel AI SDK here). A minimalist code I'm using now with Hono and Server Sent Event message format const completionStream = await openai.chat.completions.create(....)
const completionStreamIterator = completionStream[Symbol.asyncIterator]();
const eventStream = new ReadableStream({
// Client asks for more data
async pull(controller) {
const { done, value } = await completionStreamIterator.next();
// Nothing more from OpenAI
if (done) {
// Close client connection, we could send a "finish" SSE custom event before closing
controller.close()
}
controller.enqueue(`data: ${value.choices[0]?.delta?.content || ""}\n\n`);
},
// Client aborts the connection
async cancel(reason) {
// Cancel OpenAI upstream
await completionStreamIterator.return?.(reason);
}
}).pipeThrough(new TextEncoderStream());
// Respond with the stream and SSE required headers
return new Response(eventStream, {
headers: {
...corsHeaders,
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-store",
Connection: "keep-alive",
},
});
|
|
@yusukebe When testing my openai proxy (not using hono c.stream) with CloudFlare functions I get more than 100ms CPU time according to the completion length form OpenAI (so way above the 50ms CPU time limit). What CPU time you get with the |
|
About SSE and cancellation with bun there is this issue: |
|
Similarly, it appears that AWS Lambda's response streaming only supports HTTP1.1 as well. You can find more information at
It's a challenging question to decide how generic to make the system, but I think it would be good to have options in the roadmap. |
|
There is no issues with HTTP1.1, only the limit of 6 parallel SSE connections per domain. The code stays the same. async pull(controller) {
// with bun we have to check if the request is cancelled here
if (req.signal.aborted) {
controller.close();
// Cancel OpenAI upstream
await completionStreamIterator.return?.(reason);
return;
}
// ...
} |
|
I see...if HTTP 1.1 support is only for AWS lambda. There is no need to support it in this commit. There are other complicated differences, so I will absorb them with adaptor. |
I think |
|
Thank you for your comments.
About SSE. SSE is not supported in So what I'm saying is that So let's build on the current implementation! |
|
@yusukebe |
|
Hey! Now, I'll merge it into the Awesome work, @sor4chi ! Thanks! |
* feat: implement stream api utility-class * test: write the test of StreamApi * feat: implement `c.stream` to context * test: write the test of `c.stream()` * chore: denoify * fix: extend for bytes, remove buffer system, add pipe and log interface * test: update test about log, pipe, etc... for streaming API * feat: extend textStream interface, remove utf-8 content-type * test: add test about `c.textStream` * refactor: update some args name * chore: denoify * fix: for deno, removed the optional parameter of `write` and `writeln` * chore: denoify * feat: add charset for textStream content-type header * fix: rename textStream to streamText * fix: reuse stream in streamText for bundle size * feat: add `stream.wait()` api * chore: denoify * fix: rename `stream.wait` to `stream.sleep` * test: use `stream.sleep` for waiting * refactor: remove `stream.log` * fix: remove preHeader from `c.stream()` and use `transfer-encoding` only `c.streamText()` * chore: denoify * refactoring: remove preHeader initialize * test: reduce sleep duration * chore: denoify Co-authored-by: Glen Maddern <glenmaddern@gmail.com>
* feat: implement stream api utility-class * test: write the test of StreamApi * feat: implement `c.stream` to context * test: write the test of `c.stream()` * chore: denoify * fix: extend for bytes, remove buffer system, add pipe and log interface * test: update test about log, pipe, etc... for streaming API * feat: extend textStream interface, remove utf-8 content-type * test: add test about `c.textStream` * refactor: update some args name * chore: denoify * fix: for deno, removed the optional parameter of `write` and `writeln` * chore: denoify * feat: add charset for textStream content-type header * fix: rename textStream to streamText * fix: reuse stream in streamText for bundle size * feat: add `stream.wait()` api * chore: denoify * fix: rename `stream.wait` to `stream.sleep` * test: use `stream.sleep` for waiting * refactor: remove `stream.log` * fix: remove preHeader from `c.stream()` and use `transfer-encoding` only `c.streamText()` * chore: denoify * refactoring: remove preHeader initialize * test: reduce sleep duration * chore: denoify Co-authored-by: Glen Maddern <glenmaddern@gmail.com>
Hi, @yusukebe, @ geelen
I implemented
c.stream()according to #914. I'd like to see @ geelen added as a co-author as well, since I'm pretty much quoting the code from the issue.This is the first time I've use a StreamAPI, so feel free to point out any mistakes.
@yusukebe asked me to give some examples of using
c.stream().Below are some examples.
Usecase
1: ChatGPT Proxy
Enable rate limiting to protect or hide external APIs
Code
2: Read some large files using stream
In this example, I'm reading a large file from the disk and sending it to the client using stream.
eg: Read Geojson using stream and gradually draw
Code
Author should do the followings, if applicable
yarn denoifyto generate files for Deno