KAFKA-8305: support default partitions & replication factor in AdminClient#createTopic#6728
Conversation
ryannedolan
left a comment
There was a problem hiding this comment.
Looking forward to this!
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR, I had a quick look and left some comments.
There was a problem hiding this comment.
The documentation for CreateTopicPolicy says:
@param numPartitions the number of partitions to create or null if replicasAssignments is set.
@param replicationFactor the replication factor for the topic or null if replicaAssignments is set.
It seems like we may end up with a non null numPartitions and replicationFactor even if replicaAssignment is set?
There was a problem hiding this comment.
What's the scenario that causes this? If replicaAssignments is set, then we expect the partitions/replication factor to equal NO_PARTITIONS and NO_REPLICATION_FACTOR respectively, otherwise it fails here:
if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
&& !topic.assignments().isEmpty) {
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
}There was a problem hiding this comment.
You're right. But then why do we need this change at all? It seems like we never end up with a different result than the previous code.
There was a problem hiding this comment.
The difference is that if the value sent was -1 (i.e. NO_REPLICATION_FACTOR) and there are no assignments, it will instead be set the resolvedReplicationFactor, which is the cluster default (i.e. default.replication.factor)
(you can see tests for the changed behavior)
There was a problem hiding this comment.
Hmm.. I think Ismael's point is that if topic.replicationFactor != NO_REPLICATION_FACTOR, then resolvedReplicationFactor == topic.replicationFactor, and this is equivalent to what we already had. It seems like this is what we're trying to do:
val replicationFactor: java.lang.Short =
if (topic.assignments().nonEmpty) null else resolvedReplicationFactorThere was a problem hiding this comment.
🤦♂ I see what you're saying now. I think @hachikuji's code snippet is the easiest to read so I'll change it to that. @ijuma my original line of thought was that it's easier to reason about the code if we only ever use resolvedX everywhere, hence the motivation for this change. That way I would never accidentally use the value of -1 in business logic code and it's easy to confirm by just looking at usages of topic.numPartitions and topic.replicationFactor.
|
Pinging +1'ers @colinhicks @omkreddy @gwenshap @rhauch 😄 thanks in advance! (cc @ijuma) |
|
@agavra I may be missing something, but if we are not bumping the |
|
@hachikuji - thanks for the review!
You are not missing something, this is something that I overlooked (not all errors are the same)! |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the updates. Left a few comments.
There was a problem hiding this comment.
Hmm.. I think Ismael's point is that if topic.replicationFactor != NO_REPLICATION_FACTOR, then resolvedReplicationFactor == topic.replicationFactor, and this is equivalent to what we already had. It seems like this is what we're trying to do:
val replicationFactor: java.lang.Short =
if (topic.assignments().nonEmpty) null else resolvedReplicationFactorThere was a problem hiding this comment.
The error may be a bit obscure if no --partitions or --replication-factor option was provided. Could we detect this case in the argument checks we have below?
There was a problem hiding this comment.
I wasn't sure if it was worth adding bloat for the deprecated code, but I'm happy adding it back (I had it then removed it)
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, looks good. Just a few more small comments.
hachikuji
left a comment
There was a problem hiding this comment.
Just one more comment, but LGTM overall.
There was a problem hiding this comment.
Can we add default assertions here. Maybe we can just use describeTopic like the other tests?
There was a problem hiding this comment.
Oops i missed this comment! Addressing that now.
|
"retest this please" |
…lient#createTopic. This commit makes three changes: - Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>) which allows users to specify Optional.empty() for numPartitions or replicationFactor in order to use the broker default. - Changes AdminManager to accept -1 as valid options for replication factor and numPartitions (resolving to broker defaults). - Adds a dependency on scalaJava8Compat library to make it simpler to convert Scala Option to Java Optional
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, LGTM. Merging to trunk!
…lient#createTopic (KIP-464) (apache#6728) This commit makes three changes: - Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>) which allows users to specify Optional.empty() for numPartitions or replicationFactor in order to use the broker default. - Changes AdminManager to accept -1 as valid options for replication factor and numPartitions (resolving to broker defaults). - Makes --partitions and --replication-factor optional arguments when creating topics using kafka-topics.sh. - Adds a dependency on scalaJava8Compat library to make it simpler to convert Scala Option to Java Optional Reviewers: Ismael Juma <ismael@juma.me.uk>, Ryanne Dolan <ryannedolan@gmail.com>, Jason Gustafson <jason@confluent.io>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: #6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via #10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <dongjin@apache.org>, Ismael Juma <ismael@juma.me.uk>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: #6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via #10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <dongjin@apache.org>, Ismael Juma <ismael@juma.me.uk>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: #6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via #10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <dongjin@apache.org>, Ismael Juma <ismael@juma.me.uk>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: apache/kafka#6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via apache/kafka#10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <dongjin@apache.org>, Ismael Juma <ismael@juma.me.uk>
…he#11429) [KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: apache#6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via apache#10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <dongjin@apache.org>, Ismael Juma <ismael@juma.me.uk>
…he#11429) [KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: apache#6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via apache#10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <dongjin@apache.org>, Ismael Juma <ismael@juma.me.uk>
See: KIP-464 for more information.
Description
This change makes the two required changes to support creating topics using the cluster defaults for replication and partitions:
NewTopic(String)constructor to theNewTopicAPIAdminManagerto accept-1as valid options for replication factor and partitions. If this is the case, it will resolve it using the default configuration.OptiontoOptional.Testing
Committer Checklist (excluded from commit message)