[BEAM-7495] Add progress reporting to the BigQuery source#9079
[BEAM-7495] Add progress reporting to the BigQuery source#9079chamikaramj merged 1 commit intoapache:masterfrom
Conversation
|
R: @chamikaramj FYI: @kmjung @mmladenovski |
1842db3 to
03c216d
Compare
|
Run Java PreCommit |
03c216d to
78c27d9
Compare
| // N.B.: For simplicity, we update fractionConsumed once a new response is fetched, not | ||
| // when we reach the end of the current response. In practice, this choice is not | ||
| // consequential. | ||
| fractionConsumed = fractionConsumedFromLastResponse; |
There was a problem hiding this comment.
We should only update the fraction consumed after this record is returned through an getCurrent() call. Otherwise this record has not been consumed yet. For example, fraction consumed should not become 1.0 till the runner gets the final record.
There was a problem hiding this comment.
I was under the impression that getCurrent() should not modify any state.
The JavaDoc doesn't explicitly call out where the progress should be updated, but it mentions, "Should return 1 after [...] Source.Reader.advance() call that returns false."[1] Reading this and the JavaDoc for advance() and getCurrent() leads me to believe that advance() is the appropriate place to update progress.
Please note that in the code I have written, progress is only updated after you have called advance() enough times to go through all rows in the response that returned the new progress. You can see an example of this in the unit tests.
I also found two examples where the progress is updated in advance():
- (uses range tracker that updates the progress; note that getCurrent() does not modify any state)
There was a problem hiding this comment.
Thanks for clarifying. Current implementation (that updates the progress for a record in the advance() call following the getCurrent() call that returns the record) seems to be correct.
|
|
||
| private static float getFractionConsumed(ReadRowsResponse response) { | ||
| // TODO(aryann): Once we rebuild the generated client code, we should change this to | ||
| // use getFractionConsumed(). |
There was a problem hiding this comment.
We should be sure to update the client code before 2.15 is released. The use of UnknownField types is probably not appropriate for production code.
There was a problem hiding this comment.
My thinking is that since our primary use case is not to be a reference implementation that this is okay, though I do think that sometime soon we should update the client.
I'm not sure if it makes sense to block 2.15 on the client being updated, though. The comments I've added make it clear what the fields we are setting are for someone who is trying to understand the internals of our source.
That said, I'm willing to change my mind if there is a stronger opinion on this. :)
|
LGTM. |
5a30e73 to
b5a4419
Compare
Progress reporting allows Google Cloud Dataflow to make more intelligent decisions during dynamic worker rebalancing.
b5a4419 to
090ad02
Compare
|
LGTM |
|
LGTM. |
Progress reporting allows Google Cloud Dataflow to make more intelligent
decisions during dynamic worker rebalancing.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.