Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Oct 1, 2021

This PR adds the backpressure feature (introduced in ARROW-13611) to the dataset write node (introduced in ARROW-13542).

To note: The python test here is unfortunately a bit slow for two reasons. First, we don't make the "how many rows can the dataset writer hold before backpressure applies" option a configurable option (it's not clear in what circumstances a user would ever change it) and it defaults to 64Mi rows.

Second, there is no signal sent from the C++ side when backpressure has been applied. So we are forced to poll and guess when it seems we have stopped reading from the source.

I'm open to suggestions (e.g. don't include the test, make the test a large memory test or something so it doesn't always run, any ideas we can use to test this better, etc.)

@github-actions
Copy link

github-actions bot commented Oct 1, 2021

@github-actions
Copy link

github-actions bot commented Oct 1, 2021

⚠️ Ticket has no components in JIRA, make sure you assign one.

@github-actions
Copy link

github-actions bot commented Oct 1, 2021

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

westonpace added a commit that referenced this pull request Oct 8, 2021
…ng rows from an ExecPlan to disk

This PR adds a write node.  The write node takes in `FileSystemDatasetWriteOptions` and the projected schema and writes the incoming data to disk.  It is a sink node but it is a bit different than the existing sink node.  The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan.  I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow.  This makes it possible to block the exec plan from finishing until all data has been written to disk.

In addition this PR refines the AsyncTaskGroup a little.  `WaitForTasksToFinish` was not a very clearly named method.  Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name.  The new name (`End`) is hopefully more clear.

This PR does not solve the backpressure problem.  Instead a serial async task group was created.  This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method).  This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written).  That problem is solved in a follow-up #11286

The `AsyncSerializedTaskGroup` and `AsyncTaskGroup` classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable.  The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question.

As a consequence of using the ExecPlan dataset writes no longer have reliable ordering.  If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file.  This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order.

Closes #11017 from westonpace/feature/ARROW-13542-write-node

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace force-pushed the feature/ARROW-13611--scanning-datasets-backpressure-with-write branch 2 times, most recently from 72aea77 to edeed63 Compare October 12, 2021 21:37
@westonpace westonpace marked this pull request as ready for review October 12, 2021 22:16
@westonpace westonpace requested a review from bkietz October 12, 2021 22:16
Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good in principle, just one nit on the test

Copy link
Contributor

@aocsa aocsa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, Just a couple of comments related to code style and tests.

Comment on lines 378 to 383
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this is a good time to uniformize code styling as this is a class all the data members should follow name_ style.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I fixed all data members to use name_ style.

Comment on lines +3434 to +3447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment, If python test is slow why don't write this test in C++. I think there is more control in the C++, and even we a test with large workload is achivable, or run test cases when something so it doesn't always run,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already tests in C++ for the scanner backpressure and the dataset writer backpressure. You are correct that we have more control. I was able to use the thread pool's "wait for idle" method to know when backpressure had been hit.

I wanted a python test to pull everything together and make sure it is actually being utilized correctly (I think it is easy sometimes for python to get missed due to a configuration parameter or something else). I'd be ok with removing this test but I don't think we need to add anything to C++. @bkietz thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say this is sufficient for this PR

@westonpace westonpace force-pushed the feature/ARROW-13611--scanning-datasets-backpressure-with-write branch from 871b282 to b55c34a Compare October 14, 2021 11:54
@westonpace westonpace requested a review from bkietz October 14, 2021 11:56
@westonpace
Copy link
Member Author

@bkietz Any other concerns? Otherwise I will rebase tomorrow morning and merge if all looks good.

@westonpace westonpace force-pushed the feature/ARROW-13611--scanning-datasets-backpressure-with-write branch from b55c34a to 35d512f Compare October 15, 2021 19:38
@ursabot
Copy link

ursabot commented Oct 15, 2021

Benchmark runs are scheduled for baseline = 02f11b9 and contender = cf50b31. cf50b31 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.51% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.13% ⬆️0.0%] ursa-thinkcentre-m75q
Supported benchmarks:
ursa-i9-9960x: langs = Python, R, JavaScript
ursa-thinkcentre-m75q: langs = C++, Java
ec2-t3-xlarge-us-east-2: cloud = True

ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
…ng rows from an ExecPlan to disk

This PR adds a write node.  The write node takes in `FileSystemDatasetWriteOptions` and the projected schema and writes the incoming data to disk.  It is a sink node but it is a bit different than the existing sink node.  The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan.  I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow.  This makes it possible to block the exec plan from finishing until all data has been written to disk.

In addition this PR refines the AsyncTaskGroup a little.  `WaitForTasksToFinish` was not a very clearly named method.  Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name.  The new name (`End`) is hopefully more clear.

This PR does not solve the backpressure problem.  Instead a serial async task group was created.  This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method).  This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written).  That problem is solved in a follow-up apache#11286

The `AsyncSerializedTaskGroup` and `AsyncTaskGroup` classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable.  The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question.

As a consequence of using the ExecPlan dataset writes no longer have reliable ordering.  If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file.  This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order.

Closes apache#11017 from westonpace/feature/ARROW-13542-write-node

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace deleted the feature/ARROW-13611--scanning-datasets-backpressure-with-write branch January 6, 2022 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants