Skip to content

[BEAM-7495] Add progress reporting to the BigQuery source#9079

Merged
chamikaramj merged 1 commit intoapache:masterfrom
aryann:bq-storage-progress-reporting
Jul 25, 2019
Merged

[BEAM-7495] Add progress reporting to the BigQuery source#9079
chamikaramj merged 1 commit intoapache:masterfrom
aryann:bq-storage-progress-reporting

Conversation

@aryann
Copy link
Copy Markdown
Contributor

@aryann aryann commented Jul 16, 2019

Progress reporting allows Google Cloud Dataflow to make more intelligent
decisions during dynamic worker rebalancing.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@aryann
Copy link
Copy Markdown
Contributor Author

aryann commented Jul 16, 2019

R: @chamikaramj

FYI: @kmjung @mmladenovski

@aryann aryann force-pushed the bq-storage-progress-reporting branch 3 times, most recently from 1842db3 to 03c216d Compare July 16, 2019 22:47
@aryann
Copy link
Copy Markdown
Contributor Author

aryann commented Jul 18, 2019

Run Java PreCommit

@aryann aryann force-pushed the bq-storage-progress-reporting branch from 03c216d to 78c27d9 Compare July 18, 2019 20:55
// N.B.: For simplicity, we update fractionConsumed once a new response is fetched, not
// when we reach the end of the current response. In practice, this choice is not
// consequential.
fractionConsumed = fractionConsumedFromLastResponse;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only update the fraction consumed after this record is returned through an getCurrent() call. Otherwise this record has not been consumed yet. For example, fraction consumed should not become 1.0 till the runner gets the final record.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that getCurrent() should not modify any state.

The JavaDoc doesn't explicitly call out where the progress should be updated, but it mentions, "Should return 1 after [...] Source.Reader.advance() call that returns false."[1] Reading this and the JavaDoc for advance() and getCurrent() leads me to believe that advance() is the appropriate place to update progress.

Please note that in the code I have written, progress is only updated after you have called advance() enough times to go through all rows in the response that returned the new progress. You can see an example of this in the unit tests.

I also found two examples where the progress is updated in advance():

[1] https://beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/io/BoundedSource.BoundedReader.html#getFractionConsumed--

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. Current implementation (that updates the progress for a record in the advance() call following the getCurrent() call that returns the record) seems to be correct.


private static float getFractionConsumed(ReadRowsResponse response) {
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use getFractionConsumed().
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be sure to update the client code before 2.15 is released. The use of UnknownField types is probably not appropriate for production code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking is that since our primary use case is not to be a reference implementation that this is okay, though I do think that sometime soon we should update the client.

I'm not sure if it makes sense to block 2.15 on the client being updated, though. The comments I've added make it clear what the fields we are setting are for someone who is trying to understand the internals of our source.

That said, I'm willing to change my mind if there is a stronger opinion on this. :)

@chamikaramj
Copy link
Copy Markdown
Contributor

LGTM.

@aryann aryann force-pushed the bq-storage-progress-reporting branch 3 times, most recently from 5a30e73 to b5a4419 Compare July 23, 2019 17:49
Progress reporting allows Google Cloud Dataflow to make more intelligent
decisions during dynamic worker rebalancing.
@aryann aryann force-pushed the bq-storage-progress-reporting branch from b5a4419 to 090ad02 Compare July 23, 2019 20:23
@chamikaramj
Copy link
Copy Markdown
Contributor

LGTM

@kmjung
Copy link
Copy Markdown
Contributor

kmjung commented Jul 25, 2019

LGTM.

@chamikaramj chamikaramj merged commit 4eb7610 into apache:master Jul 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants