feat(arrow/cdata): Add Implementation of Async C Data interface#169
feat(arrow/cdata): Add Implementation of Async C Data interface#169zeroshade merged 14 commits intoapache:mainfrom
Conversation
| func CreateAsyncDeviceStreamHandler(ctx context.Context, queueSize uint64, out *CArrowAsyncDeviceStreamHandler) <-chan AsyncRecordBatchStream { | ||
| ch := make(chan AsyncRecordBatchStream) | ||
| exportAsyncHandler(cAsyncState{ctx: ctx, ch: ch, queueSize: queueSize}, out) | ||
| return ch | ||
| } | ||
|
|
||
| // ExportAsyncRecordBatchStream takes in a schema and a channel of RecordMessages along with a | ||
| // ArrowAsyncDeviceStreamHandler to export the records as they come across the channel and call | ||
| // the appropriate callbacks on the handler. This function will block until the stream is closed | ||
| // or a message containing an error comes across the channel. | ||
| // | ||
| // The returned error will be nil if everything is successful, otherwise it will be the error which | ||
| // is encountered on the stream or an AsyncError if one of the handler callbacks returns an error. | ||
| func ExportAsyncRecordBatchStream(schema *arrow.Schema, stream <-chan RecordMessage, handler *CArrowAsyncDeviceStreamHandler) error { |
There was a problem hiding this comment.
One blocks and the other uses a channel; is there a reason to not have them both block or both use a channel?
There was a problem hiding this comment.
Creating the handler needs to then be passed to a producer, so the function to create the handler needs to easily provide for when it is safe to hand off that populated struct. By not immediately returning the AsyncRecordBatchStream it enables simplifying the interactions with waiting until on_schema is called.
ExportAsyncRecordBatchStream receives the handler and a channel. Given that with Go it's a simple matter to just use go ExportAsyncRecordBatchStream() or otherwise if desired, I figured it was easier to let that block rather than doing the extra work ourselves here.
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
|
Any further comments @kou? |
This adds a basic implementation of helpers for managing an ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device interface. The corresponding C++ helper implementation can be found at apache/arrow#44495 with the discusson on the actual C structures located at apache/arrow#43632.