Skip to content

Commit 036a22e

Browse files
GH-40089: [Go] Concurrent Recordset for receiving huge recordset (#40090)
### Rationale for this change Enabling Support for Large Recordsets in Go FlightSQL Driver Replacing **download-all-at-once->read-later** with **download-chunk-as-reading** approach. The primary motivation for these changes is to enhance the driver's capability to handle large recordsets without the need for unnecessary memory pre-allocations. By implementing a concurrent streaming approach, the driver avoids loading the entire dataset into memory at once. ### Description: Implementing Concurrent Record Streaming to Better Support the Handling of Large Recordsets. For retrieving a recordset, the current implementation works as follows: - An SQL query results in a set of [endpoints] and a query ticket. - Each [endpoint] is requested (with the generated ticket), and its response is a [reader]. - Each reader is iterated for records. These records are, in fact, arrays of rows. - All the retrieved rows are stored at once in an array. - This means that data, potentially comprising billions of rows, is synchronously read into an array. - After this array is filled, it is then returned, all at once, to the consumer. - This can result in out-of-memory failures, or at the very least, unnecessary waiting times and huge pre-allocations. ### Proposed Changes: Iterate over [endpoints], [readers], and [records] ad hoc, reading only the necessary data according to consumer demand. ### What changes are included in this PR? **1. Introduction of `sync.Mutex`:** - The `Rows` struct has been updated to include a `currentRecordMux` mutex. This addition ensures that operations involving the release of the current record are thread-safe, thus preventing potential race conditions in a concurrent environment. **2. Channels for Asynchronous Record Fetching:** - A new buffered channel, `recordChan`, has been added to the `Rows` struct. This channel permits the driver to asynchronously fetch and queue records. It provides a non-blocking mechanism to manage incoming records, which is particularly advantageous when dealing with large recordsets. **3. Asynchronous Record Streaming via Goroutines:** - The `streamRecordset` function has been introduced and is designed to run concurrently using goroutines. This modification permits the driver to begin processing records as soon as they are received, without having to wait for the entire recordset to be loaded into memory. **4. Improved Record Management:** - A new method, `releaseRecord`, has been created to manage the lifecycle of the current record. This method ensures that resources are released when a record is no longer needed, thus reducing the memory footprint when processing large datasets. **5. Refactoring of the `Next` Method:** - The `Next` method in the `Rows` struct has been refactored to suit the new streaming model. It now efficiently waits for and retrieves the next available record from the `recordChan` channel, enabling a smooth and memory-efficient iteration over large datasets. ### Are These Changes Tested? The proposed changes have been validated against existing tests. ### Are There Any User-Facing Changes? No, there are no user-facing changes. * Closes: #40089 Lead-authored-by: miguel pragier <miguel.pragier@ebnerstolz.de> Co-authored-by: Miguel Pragier <miguelpragier@gmail.com> Signed-off-by: Matt Topol <zotthewizard@gmail.com>
1 parent 65c2b46 commit 036a22e

File tree

2 files changed

+1076
-66
lines changed

2 files changed

+1076
-66
lines changed

go/arrow/flight/flightsql/driver/driver.go

Lines changed: 139 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"fmt"
2424
"io"
2525
"sort"
26+
"sync"
2627
"time"
2728

2829
"github.com/apache/arrow/go/v16/arrow"
@@ -36,36 +37,77 @@ import (
3637
"google.golang.org/grpc/credentials/insecure"
3738
)
3839

40+
const recordChanBufferSizeDefault = 1
41+
3942
type Rows struct {
40-
schema *arrow.Schema
41-
records []arrow.Record
42-
currentRecord int
43-
currentRow int
43+
// schema stores the row schema, like column names.
44+
schema *arrow.Schema
45+
// recordChan enables async reading from server, while client interates.
46+
recordChan chan arrow.Record
47+
// currentRecord stores a record with n>=0 rows.
48+
currentRecord arrow.Record
49+
// currentRow tracks the position (row) within currentRecord.
50+
currentRow uint64
51+
// initializedChan prevents the row being used before properly initialized.
52+
initializedChan chan bool
53+
// streamError stores the error that interrupted streaming.
54+
streamError error
55+
streamErrorMux sync.RWMutex
56+
// ctxCancelFunc when called, triggers the streaming cancelation.
57+
ctxCancelFunc context.CancelFunc
58+
}
59+
60+
func newRows() *Rows {
61+
return &Rows{
62+
recordChan: make(chan arrow.Record, recordChanBufferSizeDefault),
63+
initializedChan: make(chan bool),
64+
}
65+
}
66+
67+
func (r *Rows) setStreamError(err error) {
68+
r.streamErrorMux.Lock()
69+
defer r.streamErrorMux.Unlock()
70+
71+
r.streamError = err
72+
}
73+
74+
func (r *Rows) getStreamError() error {
75+
r.streamErrorMux.RLock()
76+
defer r.streamErrorMux.RUnlock()
77+
78+
return r.streamError
4479
}
4580

4681
// Columns returns the names of the columns.
4782
func (r *Rows) Columns() []string {
48-
if len(r.records) == 0 {
83+
if r.schema == nil {
4984
return nil
5085
}
5186

52-
// All records have the same columns
53-
var cols []string
54-
for _, c := range r.schema.Fields() {
55-
cols = append(cols, c.Name)
87+
// All records have the same columns.
88+
cols := make([]string, len(r.schema.Fields()))
89+
for i, c := range r.schema.Fields() {
90+
cols[i] = c.Name
5691
}
5792

5893
return cols
5994
}
6095

96+
func (r *Rows) releaseRecord() {
97+
if r.currentRecord != nil {
98+
r.currentRecord.Release()
99+
r.currentRecord = nil
100+
}
101+
}
102+
61103
// Close closes the rows iterator.
62104
func (r *Rows) Close() error {
63-
for _, rec := range r.records {
64-
rec.Release()
65-
}
66-
r.currentRecord = 0
105+
r.ctxCancelFunc() // interrupting data streaming.
106+
67107
r.currentRow = 0
68108

109+
r.releaseRecord()
110+
69111
return nil
70112
}
71113

@@ -79,28 +121,37 @@ func (r *Rows) Close() error {
79121
// should be taken when closing Rows not to modify
80122
// a buffer held in dest.
81123
func (r *Rows) Next(dest []driver.Value) error {
82-
if r.currentRecord >= len(r.records) {
83-
return io.EOF
84-
}
85-
record := r.records[r.currentRecord]
124+
if r.currentRecord == nil || int64(r.currentRow) >= r.currentRecord.NumRows() {
125+
if err := r.getStreamError(); err != nil {
126+
return err
127+
}
128+
129+
r.releaseRecord()
130+
131+
// Get the next record from the channel
132+
var ok bool
133+
if r.currentRecord, ok = <-r.recordChan; !ok {
134+
return io.EOF // Channel closed, no more records
135+
}
86136

87-
if int64(r.currentRow) >= record.NumRows() {
88-
return ErrOutOfRange
137+
r.currentRow = 0
138+
139+
// safety double-check
140+
if r.currentRecord == nil || int64(r.currentRow) >= r.currentRecord.NumRows() {
141+
return io.EOF // Channel closed, no more records
142+
}
89143
}
90144

91-
for i, arr := range record.Columns() {
92-
v, err := fromArrowType(arr, r.currentRow)
145+
for i, col := range r.currentRecord.Columns() {
146+
v, err := fromArrowType(col, int(r.currentRow))
93147
if err != nil {
94148
return err
95149
}
150+
96151
dest[i] = v
97152
}
98153

99154
r.currentRow++
100-
if int64(r.currentRow) >= record.NumRows() {
101-
r.currentRecord++
102-
r.currentRow = 0
103-
}
104155

105156
return nil
106157
}
@@ -226,19 +277,14 @@ func (s *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driv
226277
return nil, err
227278
}
228279

229-
rows := Rows{}
230-
for _, endpoint := range info.Endpoint {
231-
schema, records, err := readEndpoint(ctx, s.client, endpoint)
232-
if err != nil {
233-
return &rows, err
234-
}
235-
if rows.schema == nil {
236-
rows.schema = schema
237-
}
238-
rows.records = append(rows.records, records...)
239-
}
280+
rows := newRows()
281+
ctx, rows.ctxCancelFunc = context.WithCancel(ctx)
282+
283+
go rows.streamRecordset(ctx, s.client, info.Endpoint)
240284

241-
return &rows, nil
285+
<-rows.initializedChan // waits the rows proper initialization.
286+
287+
return rows, nil
242288
}
243289

244290
func (s *Stmt) setParameters(args []driver.NamedValue) error {
@@ -462,43 +508,70 @@ func (c *Connection) QueryContext(ctx context.Context, query string, args []driv
462508
return nil, err
463509
}
464510

465-
rows := Rows{}
466-
for _, endpoint := range info.Endpoint {
467-
schema, records, err := readEndpoint(ctx, c.client, endpoint)
468-
if err != nil {
469-
return &rows, err
470-
}
471-
if rows.schema == nil {
472-
rows.schema = schema
473-
}
474-
rows.records = append(rows.records, records...)
475-
}
511+
rows := newRows()
512+
ctx, rows.ctxCancelFunc = context.WithCancel(ctx)
476513

477-
return &rows, nil
514+
go rows.streamRecordset(ctx, c.client, info.Endpoint)
478515

516+
<-rows.initializedChan // waits the rows proper initialization.
517+
518+
return rows, nil
479519
}
480520

481-
func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint *flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
482-
reader, err := client.DoGet(ctx, endpoint.GetTicket())
483-
if err != nil {
484-
return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
485-
}
486-
defer reader.Release()
521+
func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client, endpoints []*flight.FlightEndpoint) {
522+
defer close(r.recordChan)
523+
524+
// initializeOnceOnly ensures the {r.initializedChan} is valued once only, preventing a deadlock.
525+
initializeOnceOnly := &sync.Once{}
526+
527+
defer func() { // in case of error, init anyway.
528+
initializeOnceOnly.Do(func() { r.initializedChan <- true })
529+
}()
487530

488-
schema := reader.Schema()
489-
var records []arrow.Record
490-
for reader.Next() {
491-
if record := reader.Record(); record.NumRows() > 0 {
492-
record.Retain()
493-
records = append(records, record)
531+
// reads each endpoint.
532+
for _, endpoint := range endpoints {
533+
if ctx.Err() != nil {
534+
r.setStreamError(fmt.Errorf("recordset streaming interrupted by context error: %w", ctx.Err()))
535+
return
494536
}
495-
}
496537

497-
if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
498-
return nil, nil, err
499-
}
538+
func() { // with a func() is possible to {defer reader.Release()}.
539+
reader, err := c.DoGet(ctx, endpoint.GetTicket())
540+
if err != nil {
541+
r.setStreamError(fmt.Errorf("getting ticket failed: %w", err))
542+
return
543+
}
544+
545+
defer reader.Release()
546+
547+
r.schema = reader.Schema()
548+
549+
// reads each record into a blocking channel
550+
for reader.Next() {
551+
if ctx.Err() != nil {
552+
r.setStreamError(fmt.Errorf("recordset streaming interrupted by context error: %w", ctx.Err()))
553+
return
554+
}
500555

501-
return schema, records, nil
556+
record := reader.Record()
557+
record.Retain()
558+
559+
if record.NumRows() < 1 {
560+
record.Release()
561+
continue
562+
}
563+
564+
r.recordChan <- record
565+
566+
go initializeOnceOnly.Do(func() { r.initializedChan <- true })
567+
}
568+
569+
if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
570+
r.setStreamError(err)
571+
return
572+
}
573+
}()
574+
}
502575
}
503576

504577
// Close invalidates and potentially stops any current

0 commit comments

Comments
 (0)