-
Notifications
You must be signed in to change notification settings - Fork 24
feat: Change restriction to OffsetByteRange to allow functioning with runnerv2. #674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6fce394 to
9fa20d0
Compare
pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java
Show resolved
Hide resolved
boyuanzz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to fix the coder issue with setPartition() in this PR or a separate PR?
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||
|
|
||
| @AutoValue | ||
| @DefaultSchema(AutoValueSchema.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI this change will introduce backward incompatibility for updating a Dataflow pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood.
It has already been removed since it does not provide an access pattern we want to expose. |
Then you should remove this from Beam as well. Or if you decide to keep your own version, you should consider removing your code from Beam. |
| new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()), | ||
| range.getByteCount()); | ||
| return SplitResult.of( | ||
| this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By returning OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0), your getSize() function will use 'MAX_LONG - nextOffset' as backlog. Bad backlog estimation will have side-effect on dataflow autoscaling strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe so? GetSize delegates to the underlying OffsetByteRangeTracker if the range is unbounded on the right (like this is) and so will call out to this.backlogReader.computeMessageStats(range.getRange().getFrom()) to compute forward looking progress stats. For the existing range, it ships the byte size along with it which will be used since the already-finished range is closed. Is this analysis incorrect given how the runner interacts with these methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.
Yes, I've already created apache/beam#14976 to copy this and other changes over. |
|
Changes which are related to beam look good to me. |
9fa20d0 to
8e74ecc
Compare
Fixes #676