[BEAM-4705] Better Kryo integration#14
Conversation
7058dae to
f8013da
Compare
b03a6d2 to
a618f23
Compare
dmvk
left a comment
There was a problem hiding this comment.
LGTM 👍 I only have few questions, that may not be relevant.
| static Kryo getOrCreateKryo(IdentifiedRegistrar registrarWithId) { | ||
| Objects.requireNonNull(registrarWithId); | ||
|
|
||
| synchronized (kryoByRegistrarId) { |
There was a problem hiding this comment.
does this need to be synchronized? I reckon simple map.computeIfAbsent(k, (x) -> ThreadLocal.withInitial((...) should be enough.
There was a problem hiding this comment.
I believe that it should be synchronized. Otherwise two threads can simultaneously introduce new ThreadLocal objects to the map.
| Input input = KryoFactory.getKryoInput(); | ||
|
|
||
| // limit the input stream to not allow kryo to read beyond the current object | ||
| BoundedInputStream limitedStream = new BoundedInputStream(inStream, lengthOfDecodedObject); |
There was a problem hiding this comment.
👍 I'm just litle bit concerned about apache commons dependency? Where did it come from?
There was a problem hiding this comment.
Good point. It will be removed in BEAM-5179. When Kryo's chunked in/out will replace our own chunking.
| // mere NO_OP_REGISTRAR == registrarWithId is not enough since | ||
| // NO_OP_REGISTRAR can be deserialized into several instances | ||
| if (NO_OP_REGISTRAR.getId() == registrarWithId.getId()) { | ||
| instance.setRegistrationRequired(false); |
There was a problem hiding this comment.
does user need to explicitly allow NO_OP_REGISTRAR or is it default behavior? If so, do we warn him that it may have performance implications?
There was a problem hiding this comment.
It is a default one. Explicit warning is stated in documentation. There is also a INFO level log informing about use of default KryoCoder. The log was added in latter versions. Therefore not visible in this PR.
| } | ||
|
|
||
| private static ThreadLocal<Output> threadLocalOutput = | ||
| ThreadLocal.withInitial(() -> new Output(DEFAULT_BUFFER_SIZE, -1)); |
There was a problem hiding this comment.
do we have a case when user needs to override the buffer size? What happens if the object size exceeds this?
There was a problem hiding this comment.
Kryo's Output buffers can grow automatically. We do not allow user to choose initial size of buffers yet. Such an API extension could be done in future.
…e rather internal Flink record's timestamp.
1066ada to
93026f2
Compare
a618f23 to
12b4585
Compare
…er to retain identity information after (de)serialization.
12b4585 to
1f9abd9
Compare
…e rather internal Flink record's timestamp.
User is now allowed to register classes of its own into
Kryoby supplying implementation ofKryoRegistrarthroughRegisterCodersAPI. User do not need to createKryoCodersinstances, they are created automatically.Kryoinstances are shared by allKryoCoderswith the same Kryo initialization.Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue../gradlew buildto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.