[BEAM-4294] [BEAM-4360] Join translation and ReduceByKey test suite where moved to org.apache.beam.* package. #1
Conversation
…pplication of windowing in JoinTranslator fixed.
… to yet unsupported features.
…here moved to org.apache.beam.* package. Small imports fixes after rebase to seznam/beam eff3ffd.
dmvk
left a comment
There was a problem hiding this comment.
Looks good overall! Good job 👍 Please introduce a check whether RBK can be translated using current translator and we're good to go ;)
|
|
||
| ListDataSink<Pair<Integer, Pair<String, Integer>>> output = ListDataSink.get(); | ||
|
|
||
| BinaryFunctor<Optional<Pair<Integer, String>>, Pair<Integer, Integer>, Pair<String, Integer>> |
There was a problem hiding this comment.
it's usually more readable to apply lambda directly or to create named static function and use it by reference
There was a problem hiding this comment.
I agree. Code was changed so the lambda is is used directly.
This case is more specific since the API requires type parameters of BinaryFunctor to be stated explicitly, otherwise parameter to the output() method has inferred arguments type which cannot be assigned from ListDataSink<Pair<Integer, Pair<String, Integer>>> (output variable).
| private static <InputT, K, V, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>> | ||
| doTranslate(ReduceByKey<InputT, K, V, OutputT, W> operator, BeamExecutorContext context) { | ||
|
|
||
| if (operator.getValueComparator() != null) { //TODO Could we even do values sorting ? |
| @@ -54,6 +55,7 @@ class FlowTranslator { | |||
| // extended operators | |||
| translators.put(ReduceByKey.class, new ReduceByKeyTranslator()); | |||
There was a problem hiding this comment.
we need to add predicate for reduce by key translator as it can not handle some cases (eg. if it contains value comparator). Predicates also allow us to have more implementations for a single operator (to handle different cases)
There was a problem hiding this comment.
I reckon it would be worth adding a boolean canTranslate to the OperatorTranslator interface.
There was a problem hiding this comment.
New method boolean canTranslate(operator) added to OperatorTranslator. Now we have ability to explicitly state when given translator cannot translate given Operator. We can also add more specialized translators when needed.
| PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection; | ||
| typedInput.setCoder(valueCoder); | ||
|
|
||
| PCollection<KV<K, ValueT>> leftKvInput = |
| private final UnaryFunction<InputT, K> keyExtractor; | ||
|
|
||
| public InputToKvDoFn(UnaryFunction<InputT, K> leftKeyExtractor) { | ||
| this.keyExtractor = leftKeyExtractor; |
… extended to allow more than one translator per operator.
|
LGTM 👍 Good job. Can we send a pull request directly to |
Fix tests expectations and minor code fix up.
Update forked repository.
This pull requests summaries together all the changes described in:
Plus small imports fixes after rebase to eff3ffd. So please review them first to get understanding of this pull request.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue../gradlew buildto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.