-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-14191: [C++][Dataset] Dataset writes should respect backpressure #11286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-14191: [C++][Dataset] Dataset writes should respect backpressure #11286
Conversation
|
|
|
|
…ng rows from an ExecPlan to disk This PR adds a write node. The write node takes in `FileSystemDatasetWriteOptions` and the projected schema and writes the incoming data to disk. It is a sink node but it is a bit different than the existing sink node. The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan. I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow. This makes it possible to block the exec plan from finishing until all data has been written to disk. In addition this PR refines the AsyncTaskGroup a little. `WaitForTasksToFinish` was not a very clearly named method. Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name. The new name (`End`) is hopefully more clear. This PR does not solve the backpressure problem. Instead a serial async task group was created. This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method). This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written). That problem is solved in a follow-up #11286 The `AsyncSerializedTaskGroup` and `AsyncTaskGroup` classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable. The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question. As a consequence of using the ExecPlan dataset writes no longer have reliable ordering. If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file. This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order. Closes #11017 from westonpace/feature/ARROW-13542-write-node Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
72aea77 to
edeed63
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good in principle, just one nit on the test
aocsa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, Just a couple of comments related to code style and tests.
cpp/src/arrow/dataset/file_base.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this is a good time to uniformize code styling as this is a class all the data members should follow name_ style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I fixed all data members to use name_ style.
python/pyarrow/tests/test_dataset.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment, If python test is slow why don't write this test in C++. I think there is more control in the C++, and even we a test with large workload is achivable, or run test cases when something so it doesn't always run,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are already tests in C++ for the scanner backpressure and the dataset writer backpressure. You are correct that we have more control. I was able to use the thread pool's "wait for idle" method to know when backpressure had been hit.
I wanted a python test to pull everything together and make sure it is actually being utilized correctly (I think it is easy sometimes for python to get missed due to a configuration parameter or something else). I'd be ok with removing this test but I don't think we need to add anything to C++. @bkietz thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say this is sufficient for this PR
871b282 to
b55c34a
Compare
|
@bkietz Any other concerns? Otherwise I will rebase tomorrow morning and merge if all looks good. |
b55c34a to
35d512f
Compare
|
Benchmark runs are scheduled for baseline = 02f11b9 and contender = cf50b31. cf50b31 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
…ng rows from an ExecPlan to disk This PR adds a write node. The write node takes in `FileSystemDatasetWriteOptions` and the projected schema and writes the incoming data to disk. It is a sink node but it is a bit different than the existing sink node. The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan. I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow. This makes it possible to block the exec plan from finishing until all data has been written to disk. In addition this PR refines the AsyncTaskGroup a little. `WaitForTasksToFinish` was not a very clearly named method. Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name. The new name (`End`) is hopefully more clear. This PR does not solve the backpressure problem. Instead a serial async task group was created. This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method). This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written). That problem is solved in a follow-up apache#11286 The `AsyncSerializedTaskGroup` and `AsyncTaskGroup` classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable. The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question. As a consequence of using the ExecPlan dataset writes no longer have reliable ordering. If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file. This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order. Closes apache#11017 from westonpace/feature/ARROW-13542-write-node Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
This PR adds the backpressure feature (introduced in ARROW-13611) to the dataset write node (introduced in ARROW-13542).
To note: The python test here is unfortunately a bit slow for two reasons. First, we don't make the "how many rows can the dataset writer hold before backpressure applies" option a configurable option (it's not clear in what circumstances a user would ever change it) and it defaults to 64Mi rows.
Second, there is no signal sent from the C++ side when backpressure has been applied. So we are forced to poll and guess when it seems we have stopped reading from the source.
I'm open to suggestions (e.g. don't include the test, make the test a large memory test or something so it doesn't always run, any ideas we can use to test this better, etc.)