Skip to content

GH-40089: [Go] Concurrent Recordset for receiving huge recordset#40090

Merged
zeroshade merged 13 commits intoapache:mainfrom
RSM-Ebner-Stolz:feat/concurrent-resultset
Feb 23, 2024
Merged

GH-40089: [Go] Concurrent Recordset for receiving huge recordset#40090
zeroshade merged 13 commits intoapache:mainfrom
RSM-Ebner-Stolz:feat/concurrent-resultset

Conversation

@miguelpragier
Copy link
Copy Markdown
Contributor

@miguelpragier miguelpragier commented Feb 15, 2024

Rationale for this change

Enabling Support for Large Recordsets in Go FlightSQL Driver
Replacing download-all-at-once->read-later with download-chunk-as-reading approach.

The primary motivation for these changes is to enhance the driver's capability to handle large recordsets without the need for unnecessary memory pre-allocations. By implementing a concurrent streaming approach, the driver avoids loading the entire dataset into memory at once.

Description:

Implementing Concurrent Record Streaming to Better Support the Handling of Large Recordsets.

For retrieving a recordset, the current implementation works as follows:

  • An SQL query results in a set of [endpoints] and a query ticket.
  • Each [endpoint] is requested (with the generated ticket), and its response is a [reader].
  • Each reader is iterated for records. These records are, in fact, arrays of rows.
  • All the retrieved rows are stored at once in an array.
  • This means that data, potentially comprising billions of rows, is synchronously read into an array.
  • After this array is filled, it is then returned, all at once, to the consumer.
  • This can result in out-of-memory failures, or at the very least, unnecessary waiting times and huge pre-allocations.

Proposed Changes:

Iterate over [endpoints], [readers], and [records] ad hoc, reading only the necessary data according to consumer demand.

What changes are included in this PR?

1. Introduction of sync.Mutex:

  • The Rows struct has been updated to include a currentRecordMux mutex. This addition ensures that operations involving the release of the current record are thread-safe, thus preventing potential race conditions in a concurrent environment.

2. Channels for Asynchronous Record Fetching:

  • A new buffered channel, recordChan, has been added to the Rows struct. This channel permits the driver to asynchronously fetch and queue records. It provides a non-blocking mechanism to manage incoming records, which is particularly advantageous when dealing with large recordsets.

3. Asynchronous Record Streaming via Goroutines:

  • The streamRecordset function has been introduced and is designed to run concurrently using goroutines. This modification permits the driver to begin processing records as soon as they are received, without having to wait for the entire recordset to be loaded into memory.

4. Improved Record Management:

  • A new method, releaseRecord, has been created to manage the lifecycle of the current record. This method ensures that resources are released when a record is no longer needed, thus reducing the memory footprint when processing large datasets.

5. Refactoring of the Next Method:

  • The Next method in the Rows struct has been refactored to suit the new streaming model. It now efficiently waits for and retrieves the next available record from the recordChan channel, enabling a smooth and memory-efficient iteration over large datasets.

Are These Changes Tested?

The proposed changes have been validated against existing tests.

Are There Any User-Facing Changes?

No, there are no user-facing changes.

@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #40089 has been automatically assigned in GitHub to PR creator.

@miguelpragier miguelpragier marked this pull request as ready for review February 15, 2024 09:42
@kou kou changed the title GH-40089: Go Concurrent Recordset for receiving huge recordset GH-40089: [Go] Concurrent Recordset for receiving huge recordset Feb 15, 2024
Copy link
Copy Markdown
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, a quick question above and beyond this review: Have you looked into using ADBC's FlightSQL driver instead of this? For reference, this asynchronous pulling of records instead of pulling them all into memory is already implemented there using the Go Arrow FlightSQL implementation and you can use https://pkg.go.dev/github.com/apache/arrow-adbc/go/adbc@v0.9.0/sqldriver if you need the database/sql interface rather than just working with Arrow directly.

If you haven't tried it yet, could you try it out and see if it solves your issues?


func newRows() *Rows {
return &Rows{
recordChan: make(chan arrow.Record, 1),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer size should probably be configurable, depending on the server in use and how the consumer is operating, getting better performance and backpressure may require manipulating the size of the buffer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @zeroshade .
I removed the magic number, but kept the buffer size=one.
Do you have suggestions for implementing a dynamic/configurable buffer size?
That could, anyway, happen on a new PR. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with making it configurable in a follow-up PR. sure.

As for a suggestion for implementing the configuable buffer size, there are two common patterns:

  • In the Open and Connector handling the uri, you can use a delimiter and allow passing various options in similar to how an ODBC DSN works. During Open you can then parse out the options and store them in the driver instance, then have this function take in the buffer size as a parameter which can be passed in.
  • You could provide context functions like WithBufferSize and then pull the value out of the context at query time when you construct the Rows object, defaulting to 1 if the user didn't stick it into the context.

Comment on lines +483 to +484
rows := newRows()
go rows.streamRecordset(ctx, c.client, info.Endpoint)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably use context.WithCancel on this and store the cancel function so that Rows can call it on Close in and tell the streaming it can end early.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the idea that helped me to better resolve the synchronization.
that's why I dropped the mutex, fearing no deadlocks.
thanks, it improves a lot the PR.


case <-ctx.Done():
r.releaseRecord()
r.streamError = fmt.Errorf("stream recordset context timed out")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use ctx.Err() instead, it could be cancelled or any other situation, not necessarily timed out.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade

Yes, we conducted an evaluation of the ADBC driver but ultimately chose to forgo it in favor of adapting the Arrow driver. Initially, we encountered issues integrating the driver with our Dremio deployment. We have yet to ascertain whether the problem lies with our deployment, with Dremio, or with the driver itself. A more significant consideration is our understanding that the ADBC driver utilizes Arrow data types. Our application is extensive and includes numerous structs, which would necessitate considerable refactoring.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgross-ebner We maintain usage of Dremio for the CI testing with the ADBC driver to ensure compatibility. So it would definitely be good for us to determine whether the issue is in the Driver or not.

A more significant consideration is our understanding that the ADBC driver utilizes Arrow data types. Our application is extensive and includes numerous structs, which would necessitate considerable refactoring.

There is an adapter in the arrow-adbc library that wraps the ADBC driver for the go database/sql interface which should allow you to drop in replace the usage without any changes. See the example here: https://pkg.go.dev/github.com/apache/arrow-adbc/go/adbc@v0.9.0/sqldriver/flightsql#example-package

That should avoid the need for refactoring, right?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade During my tests, columns of the decimal type were arrow.Decimal instead of a float64. I used the drop in replacement to setup the connection to flightsql Dremio via database/sql.

I will open an issue regarding the problems between Dremio and adbc in the adbc project. If I recall correctly, the problem was that the endpoint uri, which is used in the cache lru, was trying to fetch data from 0.0.0.0:32010 and not the url behind the ingress.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columns of the decimal type were arrow.Decimal instead of a float64.

On the one-hand, decimal128.Num does have a ToFloat function, but this is a good point. We should probably add a configuration to have it auto convert decimals to float64 for you.

If I recall correctly, the problem was that the endpoint uri, which is used in the cache lru, was trying to fetch data from 0.0.0.0:32010 and not the url behind the ingress.

Interesting In theory it should be properly following the URIs provided in the endpoints or use the same connection of the client. I'll keep an eye out for the issue getting filed so I can take a look into it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use ctx.Err() instead, it could be cancelled or any other situation, not necessarily timed out.

✔️ Replaced.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Feb 15, 2024
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Feb 16, 2024
@miguelpragier
Copy link
Copy Markdown
Contributor Author

thanks for reviewing, @zeroshade .
We gave it a second thought before deciding to refactor these routines, and the code edits are simple but not trivial.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Feb 16, 2024
@zeroshade
Copy link
Copy Markdown
Member

@miguelpragier looks like there's some sort of bad nil dereference happening in the tests that needs to be addressed before we can merge this.

@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Feb 21, 2024
@zeroshade
Copy link
Copy Markdown
Member

@miguelpragier The only thing missing here is some tests, otherwise this looks good to me. We should make sure the cancel func is being tested

@zeroshade zeroshade merged commit 036a22e into apache:main Feb 23, 2024
@zeroshade zeroshade removed the awaiting change review Awaiting change review label Feb 23, 2024
@github-actions github-actions bot added the awaiting merge Awaiting merge label Feb 23, 2024
@conbench-apache-arrow
Copy link
Copy Markdown

After merging your PR, Conbench analyzed the 5 benchmarking runs that have been run so far on merge-commit 036a22e.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 7 possible false positives for unstable benchmarks that are known to sometimes produce them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Go] Can't handle huge recordset with synchronous approach

4 participants