Skip to content

feat(arrow/cdata): Add Implementation of Async C Data interface#169

Merged
zeroshade merged 14 commits intoapache:mainfrom
zeroshade:async-c-interface
Nov 12, 2024
Merged

feat(arrow/cdata): Add Implementation of Async C Data interface#169
zeroshade merged 14 commits intoapache:mainfrom
zeroshade:async-c-interface

Conversation

@zeroshade
Copy link
Copy Markdown
Member

This adds a basic implementation of helpers for managing an ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device interface. The corresponding C++ helper implementation can be found at apache/arrow#44495 with the discusson on the actual C structures located at apache/arrow#43632.

Comment on lines +329 to +342
func CreateAsyncDeviceStreamHandler(ctx context.Context, queueSize uint64, out *CArrowAsyncDeviceStreamHandler) <-chan AsyncRecordBatchStream {
ch := make(chan AsyncRecordBatchStream)
exportAsyncHandler(cAsyncState{ctx: ctx, ch: ch, queueSize: queueSize}, out)
return ch
}

// ExportAsyncRecordBatchStream takes in a schema and a channel of RecordMessages along with a
// ArrowAsyncDeviceStreamHandler to export the records as they come across the channel and call
// the appropriate callbacks on the handler. This function will block until the stream is closed
// or a message containing an error comes across the channel.
//
// The returned error will be nil if everything is successful, otherwise it will be the error which
// is encountered on the stream or an AsyncError if one of the handler callbacks returns an error.
func ExportAsyncRecordBatchStream(schema *arrow.Schema, stream <-chan RecordMessage, handler *CArrowAsyncDeviceStreamHandler) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One blocks and the other uses a channel; is there a reason to not have them both block or both use a channel?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the handler needs to then be passed to a producer, so the function to create the handler needs to easily provide for when it is safe to hand off that populated struct. By not immediately returning the AsyncRecordBatchStream it enables simplifying the interactions with waiting until on_schema is called.

ExportAsyncRecordBatchStream receives the handler and a channel. Given that with Go it's a simple matter to just use go ExportAsyncRecordBatchStream() or otherwise if desired, I figured it was easier to let that block rather than doing the extra work ourselves here.

zeroshade and others added 10 commits October 29, 2024 14:10
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
@zeroshade zeroshade requested a review from kou November 11, 2024 17:33
@zeroshade
Copy link
Copy Markdown
Member Author

Any further comments @kou?

Copy link
Copy Markdown
Member

@kou kou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@zeroshade zeroshade merged commit d10a859 into apache:main Nov 12, 2024
@zeroshade zeroshade deleted the async-c-interface branch November 12, 2024 04:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants