Kafka 19620 bk#10
Conversation
There was a problem hiding this comment.
Code Review
This pull request migrates the AutoTopicCreationManager and ExpiringErrorCache from Scala to Java, necessitating updates to KafkaApis, BrokerServer, and associated tests to support the new Java interface and types. The migration also includes moving the defaultReplicationFactor configuration to AbstractKafkaConfig and introducing a new ForwardingManagerUtils utility. The review feedback suggests improving code cleanliness in DefaultAutoTopicCreationManager by using explicit imports for exceptions in the logError method instead of fully qualified names.
There was a problem hiding this comment.
Pull request overview
This pull request migrates auto topic creation and its supporting error backoff cache from the legacy Scala kafka.server area into the Java org.apache.kafka.server module, and updates broker/runtime call sites and tests to use the new Java APIs (notably java.util.* collections and Optional).
Changes:
- Added Java implementations for
AutoTopicCreationManager,DefaultAutoTopicCreationManager, andExpiringErrorCache(with TTL-based error caching/backoff for Streams internal topic creation). - Updated broker/runtime integration and call sites (notably
KafkaApisandBrokerServer) to use the new Java interface and Java collection types. - Replaced Scala unit tests for auto topic creation/error cache with Java JUnit tests; updated imports in builder/benchmark code.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| server/src/main/java/org/apache/kafka/server/AutoTopicCreationManager.java | New Java interface for auto topic creation (incl. Streams internal topics + cached error retrieval). |
| server/src/main/java/org/apache/kafka/server/DefaultAutoTopicCreationManager.java | New Java implementation with in-flight tracking and error caching/backoff for Streams internal topics. |
| server/src/main/java/org/apache/kafka/server/ExpiringErrorCache.java | New bounded TTL cache used for topic-creation error backoff. |
| server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java | Adds defaultReplicationFactor() accessor used by Java auto-topic creation. |
| server/src/test/java/org/apache/kafka/server/DefaultAutoTopicCreationManagerTest.java | New Java unit tests covering auto topic creation and Streams error caching/backoff behavior. |
| server/src/test/java/org/apache/kafka/server/ExpiringErrorCacheTest.java | New Java unit tests for cache semantics (expiration, capacity, concurrency). |
| core/src/main/scala/kafka/server/BrokerServer.scala | Wires broker to the new Java DefaultAutoTopicCreationManager using config suppliers. |
| core/src/main/scala/kafka/server/KafkaApis.scala | Updates metadata and Streams heartbeat paths to use Java AutoTopicCreationManager types (util.Set, Optional, etc.). |
| core/src/main/scala/kafka/server/KafkaConfig.scala | Removes Scala defaultReplicationFactor val (relies on AbstractKafkaConfig method). |
| core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | Updates tests/mocking to match Java Optional + java.util collection types. |
| core/src/main/java/kafka/server/builders/KafkaApisBuilder.java | Updates import to Java org.apache.kafka.server.AutoTopicCreationManager. |
| jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java | Updates import to Java org.apache.kafka.server.AutoTopicCreationManager. |
| core/src/main/scala/kafka/server/AutoTopicCreationManager.scala | Deletes the old Scala trait/implementation (migrated to Java). |
| core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala | Deletes Scala unit tests (replaced by Java tests). |
| core/src/test/scala/unit/kafka/server/ExpiringErrorCacheTest.scala | Deletes Scala unit tests (replaced by Java tests). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public DefaultAutoTopicCreationManager( | ||
| AbstractKafkaConfig config, | ||
| Supplier<Properties> groupCoordinatorConfigsSupplier, | ||
| Supplier<Properties> transactionTopicConfigsSupplier, | ||
| Supplier<Properties> shareCoordinatorConfigsSupplier, | ||
| TopicCreator topicCreator, | ||
| Time time | ||
| ) { | ||
| this( | ||
| config, | ||
| groupCoordinatorConfigsSupplier, | ||
| transactionTopicConfigsSupplier, | ||
| shareCoordinatorConfigsSupplier, | ||
| time, | ||
| topicCreator, | ||
| // Hardcoded default capacity; can be overridden in tests via constructor param | ||
| DEFAULT_TOPIC_ERROR_CACHE_CAPACITY | ||
| ); | ||
| } | ||
|
|
||
| // VisibleForTesting | ||
| DefaultAutoTopicCreationManager( | ||
| AbstractKafkaConfig config, | ||
| Supplier<Properties> groupCoordinatorConfigsSupplier, | ||
| Supplier<Properties> transactionTopicConfigsSupplier, | ||
| Supplier<Properties> shareCoordinatorConfigsSupplier, | ||
| Time time, | ||
| TopicCreator topicCreator, | ||
| int topicErrorCacheCapacity | ||
| ) { |
| public class DefaultAutoTopicCreationManager implements AutoTopicCreationManager { | ||
|
|
||
| private static final int DEFAULT_TOPIC_ERROR_CACHE_CAPACITY = 1000; | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAutoTopicCreationManager.class); |
Delete this text and replace it with a detailed description of your
change. The PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload
images, or include other supplemental information that should not be
part of the eventual commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy
(including rationale) for the proposed change. Unit and/or integration
tests are expected for any behavior change and system tests should be
considered for larger changes.
Reviewers: gemini-code-assist[bot] (github:gemini-code-assist[bot])