GH-40089: [Go] Concurrent Recordset for receiving huge recordset#40090
GH-40089: [Go] Concurrent Recordset for receiving huge recordset#40090zeroshade merged 13 commits intoapache:mainfrom
Conversation
|
|
zeroshade
left a comment
There was a problem hiding this comment.
So, a quick question above and beyond this review: Have you looked into using ADBC's FlightSQL driver instead of this? For reference, this asynchronous pulling of records instead of pulling them all into memory is already implemented there using the Go Arrow FlightSQL implementation and you can use https://pkg.go.dev/github.com/apache/arrow-adbc/go/adbc@v0.9.0/sqldriver if you need the database/sql interface rather than just working with Arrow directly.
If you haven't tried it yet, could you try it out and see if it solves your issues?
|
|
||
| func newRows() *Rows { | ||
| return &Rows{ | ||
| recordChan: make(chan arrow.Record, 1), |
There was a problem hiding this comment.
This buffer size should probably be configurable, depending on the server in use and how the consumer is operating, getting better performance and backpressure may require manipulating the size of the buffer.
There was a problem hiding this comment.
thanks, @zeroshade .
I removed the magic number, but kept the buffer size=one.
Do you have suggestions for implementing a dynamic/configurable buffer size?
That could, anyway, happen on a new PR. WDYT?
There was a problem hiding this comment.
I'd be fine with making it configurable in a follow-up PR. sure.
As for a suggestion for implementing the configuable buffer size, there are two common patterns:
- In the
OpenandConnectorhandling the uri, you can use a delimiter and allow passing various options in similar to how an ODBC DSN works. DuringOpenyou can then parse out the options and store them in the driver instance, then have this function take in the buffer size as a parameter which can be passed in. - You could provide context functions like
WithBufferSizeand then pull the value out of the context at query time when you construct the Rows object, defaulting to 1 if the user didn't stick it into the context.
| rows := newRows() | ||
| go rows.streamRecordset(ctx, c.client, info.Endpoint) |
There was a problem hiding this comment.
should probably use context.WithCancel on this and store the cancel function so that Rows can call it on Close in and tell the streaming it can end early.
There was a problem hiding this comment.
this is the idea that helped me to better resolve the synchronization.
that's why I dropped the mutex, fearing no deadlocks.
thanks, it improves a lot the PR.
|
|
||
| case <-ctx.Done(): | ||
| r.releaseRecord() | ||
| r.streamError = fmt.Errorf("stream recordset context timed out") |
There was a problem hiding this comment.
Should use ctx.Err() instead, it could be cancelled or any other situation, not necessarily timed out.
There was a problem hiding this comment.
Yes, we conducted an evaluation of the ADBC driver but ultimately chose to forgo it in favor of adapting the Arrow driver. Initially, we encountered issues integrating the driver with our Dremio deployment. We have yet to ascertain whether the problem lies with our deployment, with Dremio, or with the driver itself. A more significant consideration is our understanding that the ADBC driver utilizes Arrow data types. Our application is extensive and includes numerous structs, which would necessitate considerable refactoring.
There was a problem hiding this comment.
@mgross-ebner We maintain usage of Dremio for the CI testing with the ADBC driver to ensure compatibility. So it would definitely be good for us to determine whether the issue is in the Driver or not.
A more significant consideration is our understanding that the ADBC driver utilizes Arrow data types. Our application is extensive and includes numerous structs, which would necessitate considerable refactoring.
There is an adapter in the arrow-adbc library that wraps the ADBC driver for the go database/sql interface which should allow you to drop in replace the usage without any changes. See the example here: https://pkg.go.dev/github.com/apache/arrow-adbc/go/adbc@v0.9.0/sqldriver/flightsql#example-package
That should avoid the need for refactoring, right?
There was a problem hiding this comment.
@zeroshade During my tests, columns of the decimal type were arrow.Decimal instead of a float64. I used the drop in replacement to setup the connection to flightsql Dremio via database/sql.
I will open an issue regarding the problems between Dremio and adbc in the adbc project. If I recall correctly, the problem was that the endpoint uri, which is used in the cache lru, was trying to fetch data from 0.0.0.0:32010 and not the url behind the ingress.
There was a problem hiding this comment.
columns of the decimal type were arrow.Decimal instead of a float64.
On the one-hand, decimal128.Num does have a ToFloat function, but this is a good point. We should probably add a configuration to have it auto convert decimals to float64 for you.
If I recall correctly, the problem was that the endpoint uri, which is used in the cache lru, was trying to fetch data from 0.0.0.0:32010 and not the url behind the ingress.
Interesting In theory it should be properly following the URIs provided in the endpoints or use the same connection of the client. I'll keep an eye out for the issue getting filed so I can take a look into it.
There was a problem hiding this comment.
Should use
ctx.Err()instead, it could be cancelled or any other situation, not necessarily timed out.
✔️ Replaced.
|
thanks for reviewing, @zeroshade . |
|
@miguelpragier looks like there's some sort of bad nil dereference happening in the tests that needs to be addressed before we can merge this. |
|
@miguelpragier The only thing missing here is some tests, otherwise this looks good to me. We should make sure the cancel func is being tested |
|
After merging your PR, Conbench analyzed the 5 benchmarking runs that have been run so far on merge-commit 036a22e. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 7 possible false positives for unstable benchmarks that are known to sometimes produce them. |
Rationale for this change
Enabling Support for Large Recordsets in Go FlightSQL Driver
Replacing download-all-at-once->read-later with download-chunk-as-reading approach.
The primary motivation for these changes is to enhance the driver's capability to handle large recordsets without the need for unnecessary memory pre-allocations. By implementing a concurrent streaming approach, the driver avoids loading the entire dataset into memory at once.
Description:
Implementing Concurrent Record Streaming to Better Support the Handling of Large Recordsets.
For retrieving a recordset, the current implementation works as follows:
Proposed Changes:
Iterate over [endpoints], [readers], and [records] ad hoc, reading only the necessary data according to consumer demand.
What changes are included in this PR?
1. Introduction of
sync.Mutex:Rowsstruct has been updated to include acurrentRecordMuxmutex. This addition ensures that operations involving the release of the current record are thread-safe, thus preventing potential race conditions in a concurrent environment.2. Channels for Asynchronous Record Fetching:
recordChan, has been added to theRowsstruct. This channel permits the driver to asynchronously fetch and queue records. It provides a non-blocking mechanism to manage incoming records, which is particularly advantageous when dealing with large recordsets.3. Asynchronous Record Streaming via Goroutines:
streamRecordsetfunction has been introduced and is designed to run concurrently using goroutines. This modification permits the driver to begin processing records as soon as they are received, without having to wait for the entire recordset to be loaded into memory.4. Improved Record Management:
releaseRecord, has been created to manage the lifecycle of the current record. This method ensures that resources are released when a record is no longer needed, thus reducing the memory footprint when processing large datasets.5. Refactoring of the
NextMethod:Nextmethod in theRowsstruct has been refactored to suit the new streaming model. It now efficiently waits for and retrieves the next available record from therecordChanchannel, enabling a smooth and memory-efficient iteration over large datasets.Are These Changes Tested?
The proposed changes have been validated against existing tests.
Are There Any User-Facing Changes?
No, there are no user-facing changes.