[BEAM-5437] Allow kryo provider to use multiple registrars.#40
[BEAM-5437] Allow kryo provider to use multiple registrars.#40VaclavPlajt merged 5 commits intodsl-euphoriafrom
Conversation
46cd74b to
a45c0ac
Compare
d436a7c to
4f04ed3
Compare
VaclavPlajt
left a comment
There was a problem hiding this comment.
I like most of the changes. Good work 👍.
I do not see why we need multiple registrars, it is nice features. But complicates it. And it is not exercised in tests. Please consider adding such a test.
Checkstyle have some complains. lease run whole module build.
sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java
Show resolved
Hide resolved
sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java
Show resolved
Hide resolved
| final KryoOptions kryoOptions = pipelineOptions.as(KryoOptions.class); | ||
| return new KryoCoder<>( | ||
| new SerializableOptions( | ||
| kryoOptions.getKryoBufferSize(), |
There was a problem hiding this comment.
I do consider these to be user input. And as such, they should be checked for validity. Please consider to add at least basic range checks.
| return KryoCoder.of(kryoRegistrar); | ||
| throw new CannotProvideCoderException( | ||
| String.format( | ||
| "Cannot provide [%s], given type descriptor's [%s] raw type is not registered in Kryo.", |
There was a problem hiding this comment.
We should settle whenever we want o use "'%s'" or "[%s]" when formatting nested string. :-)
| public void encode(T value, OutputStream outStream) throws IOException { | ||
| final KryoState kryoState = KryoState.get(this); | ||
| if (value == null) { | ||
| throw new CoderException("Cannot encode a null value."); |
There was a problem hiding this comment.
Why not? Kryo can do that. Should it be a configuration option?
| + "Forgotten kryo registration is possible explanation. Kryo registrations where done by '%s'.", | ||
| (value == null) ? null : value.getClass().getSimpleName(), registrarWithId), | ||
| e); | ||
| if (e.getMessage().startsWith("Class is not registered")) { |
There was a problem hiding this comment.
It works (tests are exercising that). But it may easily breaks without notice when Kryo version is updated. Could we check this explicitly, maybe?
| outputChunked.flush(); | ||
| } catch (KryoException e) { | ||
| outputChunked.clear(); | ||
| if (e.getCause() instanceof EOFException) { |
There was a problem hiding this comment.
Why only EOFException and to all IOExceptions ?
...ava/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java
Show resolved
Hide resolved
| * @param <T> type of element coder can handle | ||
| */ | ||
| public class KryoCoder<T> extends CustomCoder<T> { | ||
| public class KryoCoder<T> extends AtomicCoder<T> { |
There was a problem hiding this comment.
Not sure that 'KryoCoder' is an AtomicCoder since docs states:
All atomic coders of the same class are considered to be equal to each other. As a result, an {@link AtomicCoder} should have no associated configuration (instance variables, etc).
|
All the unresolved review comments still holds and should be resolved. |
Please add a meaningful description for your change here
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.Post-Commit Tests Status (on master branch)