Add topicPattern property to KafkaIO.Read to match topics using a regex#26948
Add topicPattern property to KafkaIO.Read to match topics using a regex#26948chamikaramj merged 3 commits intoapache:masterfrom
Conversation
|
|
||
| @VisibleForTesting final @Nullable List<String> topics; | ||
|
|
||
| private final @Nullable Pattern topicPattern; |
There was a problem hiding this comment.
Not annotated with @VisibleForTesting since the property is not accessed directly in tests, but neither are the properties which do specify @VisibleForTesting.
|
Assigning reviewers. If you would like to opt out of this review, comment R: @bvolpato for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| } | ||
| } else { | ||
| for (String topic : topics) { | ||
| for (PartitionInfo p : consumer.partitionsFor(topic)) { |
There was a problem hiding this comment.
Any chance this is null at this point? In the split below you have null checks, but not here.
There was a problem hiding this comment.
Is this referring to topics? Both topicPartitions and topics are initialized as empty lists in the builder by default and replaced using .withTopics() and .withTopicPartitions(). The previous Preconditions.checkStateNotNull(topics) expression in the for loop should still not be null under any circumstance. Special care should be taken to carry that property forward when we add support for this property in KafkaIO's ExternalTransformRegistrar though, since it doesn't guarantee the same object state the builder guarantees.
In regards to topicPattern, if both topicPartitions and topics are empty, then topicPattern must be non-null, since the PTransform's expansion checks that at least one of those properties is set and the .withX() builder methods check that none are previously set.
As far as Kafka's topic metadata goes, .partitionsFor() will throw an exception if an unauthorized topic is requested and .listTopics() will only list all authorized topics. Both methods return initialized objects and ensure that potential null responses from the server are translated to empty collections (as far back as org.apache.kafka:kafka-clients:0.11.0.3) or throw an exception in the case of an authorization failure. I'd say that the existing check on partitionInfoList seems superfluous and could potentially be considered for deletion:
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
|
@pabloem @johnjcasey can you please TAL / merge? Thanks! |
|
Reminder, please take a look at this pr: @robertwb @chamikaramj |
| p.run(); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
can you add a test, or update this test, such that you don't match all the topics?
There was a problem hiding this comment.
Added one for partial matches and one for no matches.
|
@johnjcasey Unit tests are failing early on a missing dependency. It has been superseded by com.github.davidmc24.gradle.plugin.avro it seems, see changelog. |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
run RAT PreCommit |
|
Tests appear to be building fine to me |
| * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the | ||
| * partitions are distributed among the splits. | ||
| */ | ||
| public Read<K, V> withTopicPattern(Pattern topicPattern) { |
There was a problem hiding this comment.
Can change this API to use a portable type (for example, a string regex) so that this can be supported via cross-language wrappers ?
There was a problem hiding this comment.
Read.External.Configuration has fields for keyDeserializer and valueDeserializer which are resolved from String to Class during external transform construction, does it make sense to provide a mapping there instead?
There was a problem hiding this comment.
Ideally we don't want to be using external read configuration as a pattern. It makes it so the configurations for an IO diverge for python v. java users
There was a problem hiding this comment.
Got it. Would an additional method overload with String topicPattern help at all?
Otherwise I'll change the method signature.
There was a problem hiding this comment.
I think we can have both
There was a problem hiding this comment.
Is there a significant advantage to using "Pattern" over a string regex ?
If it's a perf issue, we could just build the Patter object once within "withTopicPattern" and use that ?
There was a problem hiding this comment.
Mostly to make sure that pattern flags can be specified by users, but nearly all (except LITERAL and CANON_EQ) can be specified in the expression as well.
There was a problem hiding this comment.
Thanks. If there is no significant advantage for specifying a Pattern object I would just support specifying a String regex to keep the API simple.
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
This addresses #19217 and #21338, matching Apache Flink's KafkaSource property
topicPattern.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.