-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13542: [C++][Compute][Dataset] Add dataset::WriteNode for writing rows from an ExecPlan to disk #11017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-13542: [C++][Compute][Dataset] Add dataset::WriteNode for writing rows from an ExecPlan to disk #11017
Conversation
019d847 to
fd2b237
Compare
d6cdf3b to
605913e
Compare
… some new async utilities
…hat all tests are now passing.
… placed arrow::dataset::Initialize method
c651c4a to
4240953
Compare
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. There are a lot of files touched, but things look straightforward.
cpp/src/arrow/util/future.h
Outdated
| /// | ||
| /// The future will be marked complete if all `futures` complete | ||
| /// successfully. Otherwise, it will be marked failed with the status of | ||
| /// the first failing future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a nit about the docstring on the MakeTask PR: #11210 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I clarified the comment according to that review and also changed the name to AllFinished.
| // is finished. Use SinkNode if you are transferring the ownership of the data to another | ||
| // system. Use ConsumingSinkNode if the data is being consumed within the exec plan (i.e. | ||
| // the exec plan should not complete until the consumption has completed). | ||
| class ConsumingSinkNode : public ExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an explicit test of this node in plan_test.cc, as with the other nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few unit tests to plan_test.cc
|
LGTM too me too. I reviewed the code and there are some common patterns to handle signals, input_counter and future |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. LGTM
…ng rows from an ExecPlan to disk This PR adds a write node. The write node takes in `FileSystemDatasetWriteOptions` and the projected schema and writes the incoming data to disk. It is a sink node but it is a bit different than the existing sink node. The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan. I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow. This makes it possible to block the exec plan from finishing until all data has been written to disk. In addition this PR refines the AsyncTaskGroup a little. `WaitForTasksToFinish` was not a very clearly named method. Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name. The new name (`End`) is hopefully more clear. This PR does not solve the backpressure problem. Instead a serial async task group was created. This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method). This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written). That problem is solved in a follow-up apache#11286 The `AsyncSerializedTaskGroup` and `AsyncTaskGroup` classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable. The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question. As a consequence of using the ExecPlan dataset writes no longer have reliable ordering. If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file. This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order. Closes apache#11017 from westonpace/feature/ARROW-13542-write-node Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
This PR adds a write node. The write node takes in
FileSystemDatasetWriteOptionsand the projected schema and writes the incoming data to disk. It is a sink node but it is a bit different than the existing sink node. The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan. I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow. This makes it possible to block the exec plan from finishing until all data has been written to disk.In addition this PR refines the AsyncTaskGroup a little.
WaitForTasksToFinishwas not a very clearly named method. Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name. The new name (End) is hopefully more clear.This PR does not solve the backpressure problem. Instead a serial async task group was created. This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method). This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written). That problem is solved in a follow-up #11286
The
AsyncSerializedTaskGroupandAsyncTaskGroupclasses have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable. The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question.As a consequence of using the ExecPlan dataset writes no longer have reliable ordering. If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file. This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order.