Skip to content

[BEAM-4705] Better Kryo integration#14

Merged
VaclavPlajt merged 4 commits intodsl-euphoriafrom
vasek/kryo-integration
Aug 20, 2018
Merged

[BEAM-4705] Better Kryo integration#14
VaclavPlajt merged 4 commits intodsl-euphoriafrom
vasek/kryo-integration

Conversation

@VaclavPlajt
Copy link
Copy Markdown

@VaclavPlajt VaclavPlajt commented Jul 9, 2018

User is now allowed to register classes of its own into Kryo by supplying implementation of KryoRegistrar through RegisterCoders API. User do not need to create KryoCoders instances, they are created automatically. Kryo instances are shared by all KryoCoders with the same Kryo initialization.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run ./gradlew build to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@VaclavPlajt VaclavPlajt changed the base branch from dsl-euphoria to vasek/coders-expose July 9, 2018 09:56
@VaclavPlajt VaclavPlajt changed the title Vasek/kryo integration [BEAM-4705] Better Kryo integration Jul 9, 2018
@VaclavPlajt VaclavPlajt requested review from dmvk and mareksimunek July 9, 2018 13:01
@VaclavPlajt VaclavPlajt force-pushed the vasek/coders-expose branch from 7058dae to f8013da Compare July 31, 2018 16:11
@VaclavPlajt VaclavPlajt force-pushed the vasek/kryo-integration branch 2 times, most recently from b03a6d2 to a618f23 Compare August 1, 2018 07:49
Copy link
Copy Markdown

@mareksimunek mareksimunek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍 I only have few questions, that may not be relevant.

static Kryo getOrCreateKryo(IdentifiedRegistrar registrarWithId) {
Objects.requireNonNull(registrarWithId);

synchronized (kryoByRegistrarId) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be synchronized? I reckon simple map.computeIfAbsent(k, (x) -> ThreadLocal.withInitial((...) should be enough.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that it should be synchronized. Otherwise two threads can simultaneously introduce new ThreadLocal objects to the map.

Input input = KryoFactory.getKryoInput();

// limit the input stream to not allow kryo to read beyond the current object
BoundedInputStream limitedStream = new BoundedInputStream(inStream, lengthOfDecodedObject);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'm just litle bit concerned about apache commons dependency? Where did it come from?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It will be removed in BEAM-5179. When Kryo's chunked in/out will replace our own chunking.

// mere NO_OP_REGISTRAR == registrarWithId is not enough since
// NO_OP_REGISTRAR can be deserialized into several instances
if (NO_OP_REGISTRAR.getId() == registrarWithId.getId()) {
instance.setRegistrationRequired(false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does user need to explicitly allow NO_OP_REGISTRAR or is it default behavior? If so, do we warn him that it may have performance implications?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a default one. Explicit warning is stated in documentation. There is also a INFO level log informing about use of default KryoCoder. The log was added in latter versions. Therefore not visible in this PR.

}

private static ThreadLocal<Output> threadLocalOutput =
ThreadLocal.withInitial(() -> new Output(DEFAULT_BUFFER_SIZE, -1));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a case when user needs to override the buffer size? What happens if the object size exceeds this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kryo's Output buffers can grow automatically. We do not allow user to choose initial size of buffers yet. Such an API extension could be done in future.

dmvk pushed a commit that referenced this pull request Aug 17, 2018
…e rather internal Flink record's timestamp.
dmvk pushed a commit that referenced this pull request Aug 17, 2018
@VaclavPlajt VaclavPlajt force-pushed the vasek/kryo-integration branch from a618f23 to 12b4585 Compare August 20, 2018 10:49
@VaclavPlajt VaclavPlajt changed the base branch from vasek/coders-expose to dsl-euphoria August 20, 2018 11:03
@VaclavPlajt VaclavPlajt force-pushed the vasek/kryo-integration branch from 12b4585 to 1f9abd9 Compare August 20, 2018 12:15
@VaclavPlajt VaclavPlajt merged commit a9d1ab0 into dsl-euphoria Aug 20, 2018
@VaclavPlajt VaclavPlajt deleted the vasek/kryo-integration branch August 20, 2018 13:15
dmvk pushed a commit that referenced this pull request Oct 5, 2018
…e rather internal Flink record's timestamp.
dmvk pushed a commit that referenced this pull request Oct 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants