Skip to content

Kafka 19620 bk#10

Open
brandboat wants to merge 21 commits into
trunkfrom
KAFKA-19620-bk
Open

Kafka 19620 bk#10
brandboat wants to merge 21 commits into
trunkfrom
KAFKA-19620-bk

Conversation

@brandboat

@brandboat brandboat commented May 5, 2026

Copy link
Copy Markdown
Owner

Delete this text and replace it with a detailed description of your
change. The PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload
images, or include other supplemental information that should not be
part of the eventual commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy
(including rationale) for the proposed change. Unit and/or integration
tests are expected for any behavior change and system tests should be
considered for larger changes.

Reviewers: gemini-code-assist[bot] (github:gemini-code-assist[bot])

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request migrates the AutoTopicCreationManager and ExpiringErrorCache from Scala to Java, necessitating updates to KafkaApis, BrokerServer, and associated tests to support the new Java interface and types. The migration also includes moving the defaultReplicationFactor configuration to AbstractKafkaConfig and introducing a new ForwardingManagerUtils utility. The review feedback suggests improving code cleanliness in DefaultAutoTopicCreationManager by using explicit imports for exceptions in the logError method instead of fully qualified names.

Comment thread server/src/main/java/org/apache/kafka/server/DefaultAutoTopicCreationManager.java Outdated

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request migrates auto topic creation and its supporting error backoff cache from the legacy Scala kafka.server area into the Java org.apache.kafka.server module, and updates broker/runtime call sites and tests to use the new Java APIs (notably java.util.* collections and Optional).

Changes:

  • Added Java implementations for AutoTopicCreationManager, DefaultAutoTopicCreationManager, and ExpiringErrorCache (with TTL-based error caching/backoff for Streams internal topic creation).
  • Updated broker/runtime integration and call sites (notably KafkaApis and BrokerServer) to use the new Java interface and Java collection types.
  • Replaced Scala unit tests for auto topic creation/error cache with Java JUnit tests; updated imports in builder/benchmark code.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
server/src/main/java/org/apache/kafka/server/AutoTopicCreationManager.java New Java interface for auto topic creation (incl. Streams internal topics + cached error retrieval).
server/src/main/java/org/apache/kafka/server/DefaultAutoTopicCreationManager.java New Java implementation with in-flight tracking and error caching/backoff for Streams internal topics.
server/src/main/java/org/apache/kafka/server/ExpiringErrorCache.java New bounded TTL cache used for topic-creation error backoff.
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java Adds defaultReplicationFactor() accessor used by Java auto-topic creation.
server/src/test/java/org/apache/kafka/server/DefaultAutoTopicCreationManagerTest.java New Java unit tests covering auto topic creation and Streams error caching/backoff behavior.
server/src/test/java/org/apache/kafka/server/ExpiringErrorCacheTest.java New Java unit tests for cache semantics (expiration, capacity, concurrency).
core/src/main/scala/kafka/server/BrokerServer.scala Wires broker to the new Java DefaultAutoTopicCreationManager using config suppliers.
core/src/main/scala/kafka/server/KafkaApis.scala Updates metadata and Streams heartbeat paths to use Java AutoTopicCreationManager types (util.Set, Optional, etc.).
core/src/main/scala/kafka/server/KafkaConfig.scala Removes Scala defaultReplicationFactor val (relies on AbstractKafkaConfig method).
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Updates tests/mocking to match Java Optional + java.util collection types.
core/src/main/java/kafka/server/builders/KafkaApisBuilder.java Updates import to Java org.apache.kafka.server.AutoTopicCreationManager.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java Updates import to Java org.apache.kafka.server.AutoTopicCreationManager.
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala Deletes the old Scala trait/implementation (migrated to Java).
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala Deletes Scala unit tests (replaced by Java tests).
core/src/test/scala/unit/kafka/server/ExpiringErrorCacheTest.scala Deletes Scala unit tests (replaced by Java tests).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +71 to +100
public DefaultAutoTopicCreationManager(
AbstractKafkaConfig config,
Supplier<Properties> groupCoordinatorConfigsSupplier,
Supplier<Properties> transactionTopicConfigsSupplier,
Supplier<Properties> shareCoordinatorConfigsSupplier,
TopicCreator topicCreator,
Time time
) {
this(
config,
groupCoordinatorConfigsSupplier,
transactionTopicConfigsSupplier,
shareCoordinatorConfigsSupplier,
time,
topicCreator,
// Hardcoded default capacity; can be overridden in tests via constructor param
DEFAULT_TOPIC_ERROR_CACHE_CAPACITY
);
}

// VisibleForTesting
DefaultAutoTopicCreationManager(
AbstractKafkaConfig config,
Supplier<Properties> groupCoordinatorConfigsSupplier,
Supplier<Properties> transactionTopicConfigsSupplier,
Supplier<Properties> shareCoordinatorConfigsSupplier,
Time time,
TopicCreator topicCreator,
int topicErrorCacheCapacity
) {
Comment on lines +57 to +60
public class DefaultAutoTopicCreationManager implements AutoTopicCreationManager {

private static final int DEFAULT_TOPIC_ERROR_CACHE_CAPACITY = 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAutoTopicCreationManager.class);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants