Optimize reshuffle for globally windowed data.#4933
Conversation
|
R: @udim |
|
Thanks! Will review tomorrow. |
| _UnwindowedValues(windowed_values), | ||
| MIN_TIMESTAMP, | ||
| self.GLOBAL_WINDOW_TUPLE) | ||
| return [ |
There was a problem hiding this comment.
I agree that return makes more sense here.
Is there a quantifiable difference in performance between yielding a single value vs return a list with the same value in it?
There was a problem hiding this comment.
As it turns out, no. Reverted.
| else: | ||
| # The linter is confused. | ||
| # pylint: disable=abstract-class-instantiated | ||
| cls = hash(1) and _IdentityWindowFn |
There was a problem hiding this comment.
Could you explain this line? What's the purpose of hash(1)?
There was a problem hiding this comment.
Otherwise the linter was smart enough to identify _IdentityWindowFn was being instantiated, and still give the warning (which the pylint comment wasn't suppressing). Spent a good amount of time fighting the linter here--on the one hand too smart, on the other too dumb.
|
|
||
| else: | ||
| # The linter is confused. | ||
| # pylint: disable=abstract-class-instantiated |
There was a problem hiding this comment.
PyCharm says this class is missing to_runner_api_parameter.
There was a problem hiding this comment.
PyCharm is confused too. It inherits this via the urn magic.
| if windowing_saved.is_default(): | ||
| # In this (common) case we can use a trivial trigger driver | ||
| # and avoid the (expensive) window param. | ||
| _globally_windowed = window.GlobalWindows.windowed_value(None) |
There was a problem hiding this comment.
Style: why does this variable get a _ prefix?
| # sorted order. | ||
| if windowing.is_default() and is_batch: | ||
| driver = DefaultGlobalBatchTriggerDriver() | ||
| elif (windowing.windowfn == GlobalWindows() |
There was a problem hiding this comment.
Would this branch also apply to streaming?
If so, it might be a good idea to remove the word Batch from the driver's name.
| class DefaultGlobalBatchTriggerDriver(TriggerDriver): | ||
| """Breaks a bundles into window (pane)s according to the default triggering. | ||
| class DiscardingGlobalTriggerDriver(TriggerDriver): | ||
| """Groups all received values togeather. |
da55268 to
92ff3a6
Compare
DESCRIPTION HERE
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.