Prefetch a chunk of result for stream operation#2137
Merged
jatcwang merged 1 commit intotypelevel:mainfrom Nov 21, 2024
Merged
Conversation
jatcwang
reviewed
Nov 17, 2024
jatcwang
reviewed
Nov 17, 2024
modules/hikari/src/test/scala/doobie/postgres/PGConcurrentSuite.scala
Outdated
Show resolved
Hide resolved
jatcwang
reviewed
Nov 17, 2024
|
|
||
| // if buffer is less than result set, it will be still block new connection since the result set is not drained | ||
| // use sleep to test the result set can be drained | ||
| val streamSmallerBuffer = fr"select * from stream_cancel_test".query[Int].stream.transactBuffer(xa, 50) |
Collaborator
There was a problem hiding this comment.
Hm sorry I can't quite understand how this is testing that the connection is closed before the stream is completed 🤔
An alternative way I can think of to test this is to use some sort of atomic boolean and set it to true when the transactor finishes the transaction, and then you can assert that this was set to true before you process the last chunk of the stream.
import doobie.util.transactor.Strategy
import doobie.FC
import java.util.concurrent.atomic.AtomicBoolean
val hasClosed = new AtomicBoolean(false)
xa.copy(
strategy0 = Strategy.default.copy(
always = Strategy.default.always.flatMap(_ => FC.delay(hasClosed.set(true)))
)
)
It's probably possible to use Deferred instead of AtomicBoolean but there will be more ceremony to instantiates a Deferred inside ConnectionIO
Contributor
Author
There was a problem hiding this comment.
Fixed the test to assert if it's closed
dc077b6 to
267dfbe
Compare
66da6e4 to
2bd4a8d
Compare
Resolves typelevel#2132. Use fs2 Stream's `prefetchN` to buffer query results so that slow downstream operations don't slow down transaction commit. The buffered result size is `chunkSize` (equals to `fetchSize`) by default. If want a different buffer size, can use `transactNoPrefetc` to get a stream without `prefetchN` and append any prefetch operations wanted.
Contributor
Author
|
Hi @jatcwang just follow up on this PR. Please let me know if the updated code addressed your comments or not. |
jatcwang
reviewed
Nov 21, 2024
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Resolves #2132.
Use fs2 Stream's
prefetchNto buffer query results so that slow downstream operations don't slow down transaction commit.The buffered result size is
chunkSize(equals tofetchSize) by default. If want a different buffer size, can usetransactNoPrefetcto get a stream withoutprefetchNand append any prefetch operations wanted.