Skip to content

Conversation

@westonpace
Copy link
Member

This PR adds backpressure back into the asynchronous scanner. It creates an AsyncToggle which can be shared between the push-based sink and the pull-based scanner. The sink will close the toggle when it's buffer fills up and the scanner will pause delivering items when the toggle is closed.

This PR adds the feature in a way that bypasses the exec plan's backpressure mechanisms as those have not been fully fleshed out and I still am not sure what direction we are planning to go with that. Instead the back pressure is almost completely handled outside of the compute space.

I've got the same mechanism working for dataset writes but I don't want to hold up this PR while I wait for the write node to merge so I have created ARROW-14191 to follow that work.

Currently backpressure is broken for ordered scans. It turns out this has always been the case for the asynchronous scanner, even before it moved to the exec plan. The root cause is that the merge generator will keep reading from files 2-N if the read on file 1 is slow. I have created a test case which demonstrates this but will defer fixing this for ARROW-14192

@github-actions
Copy link

github-actions bot commented Oct 1, 2021

@github-actions
Copy link

github-actions bot commented Oct 1, 2021

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@westonpace westonpace force-pushed the feature/ARROW-13611--scanning-datasets-backpressure branch 2 times, most recently from 0a2e8b8 to dd0b36e Compare October 4, 2021 23:39
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this looks good to me, but it seems the tests are unreliable, especially on Windows - the backpressure doesn't seem to get applied quickly enough. Maybe we should deliver the fragments manually as in the sink node test.

This will also conflict with the MakeTask PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the scanner option, should we document that this is primarily for testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will document that. ThreadPool is internal (and I'm not proposing we add WaitForIdle to Executor) so I hope that is reasonable. CC @pitrou for thoughts. For context: I added WaitForIdle to the ThreadPool purely for the sake of sequencing some testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I would expect dropping the generator/scanner to clean up everything. Wonder what's keeping it alive. (Not a problem to solve here.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minimal example of the problem is:

TEST(Weston, MemTest) {
  PushGenerator<util::optional<int>> int_prod;
  AsyncGenerator<util::optional<int>> int_gen = int_prod;
  Future<> visit_fut = VisitAsyncGenerator(std::move(int_gen), [] (util::optional<int>) {
    return Status::OK();
  });
}

The above will leak. I have a fix in mind but will address it in a separate PR.

@westonpace westonpace force-pushed the feature/ARROW-13611--scanning-datasets-backpressure branch from 668edf1 to b36ea72 Compare October 8, 2021 01:05
…tly it only works for unordered scans. This commit adds backpressure in a way that bypasses the exec plan entirely and these utilities will eventually change as the exec plan evolves.
…here opening the backpressure while holding a lock. Added a unit test for the sink node
…or simplicity and making it more obvious what is happening. Added backpressure to the OrderBy sink node
…ckpressure tests more predictable. In addition, my previous attempt at fixing the race condition between the producer and the async toggle wasn't correct. I improved it and now it should be more accurate.
…at. Removed disabled ordered backpressure test because I was just updating the same thing twice. It can be added back in when ordering is supported
…tor as the source of an AsyncGenerator pipeline can be somewhat problematic. if the producer is not finished.
@westonpace westonpace force-pushed the feature/ARROW-13611--scanning-datasets-backpressure branch from 1f05f6c to 401e906 Compare October 12, 2021 18:23
@ursabot
Copy link

ursabot commented Oct 12, 2021

Benchmark runs are scheduled for baseline = 5f80ddc and contender = 8837f64. 8837f64 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️1.36% ⬆️0.0%] ursa-i9-9960x
[Failed] ursa-thinkcentre-m75q
Supported benchmarks:
ursa-i9-9960x: langs = Python, R, JavaScript
ursa-thinkcentre-m75q: langs = C++, Java
ec2-t3-xlarge-us-east-2: cloud = True

@westonpace westonpace deleted the feature/ARROW-13611--scanning-datasets-backpressure branch January 6, 2022 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants