Skip to content

KAFKA-19384: The passing of BrokerRegistrationRequestTest is a false positive#5

Draft
apalan60 wants to merge 12 commits into
trunkfrom
Kafka-19384
Draft

KAFKA-19384: The passing of BrokerRegistrationRequestTest is a false positive#5
apalan60 wants to merge 12 commits into
trunkfrom
Kafka-19384

Conversation

@apalan60

Copy link
Copy Markdown
Owner

No description provided.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @apalan60, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a false positive in the BrokerRegistrationRequestTest by migrating the test from Scala to Java and correcting a critical assertion. The original Scala test incorrectly expected BROKER_ID_NOT_REGISTERED when INVALID_REGISTRATION was the correct error for certain broker registration scenarios, leading to a misleading test pass. The new Java test accurately validates broker registration behavior under different metadata versions, ensuring the test suite correctly identifies issues.

Highlights

  • Test Migration: The BrokerRegistrationRequestTest has been migrated from Scala to Java, ensuring consistency and maintainability within the test suite.
  • Bug Fix: False Positive: The expected error code in testRegisterZkWith33Controller was corrected from BROKER_ID_NOT_REGISTERED to INVALID_REGISTRATION for scenarios where no metadata version features are sent during broker registration. This resolves a false positive in the test, ensuring accurate validation of broker registration behavior.
  • Metadata Version Updates: New metadata version constants, specifically IBP_3_3_IV3_FEATURE_LEVEL and IBP_3_4_IV0_FEATURE_LEVEL, were added to MetadataVersionTestUtils.java to support updated test scenarios and future compatibility.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request successfully migrates the BrokerRegistrationRequestTest from Scala to Java and, more importantly, corrects several false-positive assertions. The expected error codes in various broker registration scenarios are now accurate, which improves the reliability of the test. The overall implementation is clean. I have a couple of suggestions for the new Java test file to further improve maintainability by removing an unused method and adopting a more standard approach for Java-Scala interoperability.

Comment on lines +150 to +142
public Errors createTopics(NodeToControllerChannelManager channelManager, String topicName) throws Exception {
var createTopics = new CreateTopicsRequestData();
createTopics.setTopics(new CreateTopicsRequestData.CreatableTopicCollection());
createTopics.topics().add(
new CreatableTopic()
.setName(topicName)
.setNumPartitions(10)
.setReplicationFactor((short) 1)
);
createTopics.setTimeoutMs(500);

var req = new CreateTopicsRequest.Builder(createTopics);
var resp = sendAndReceive(channelManager, req, 3000);
var responseData = ((CreateTopicsResponse) resp).data();
return Errors.forCode(responseData.topics().find(topicName).errorCode());
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This createTopics method is not called anywhere within the test class. It appears to be a remnant from the original Scala test, where it was also unused. To improve code clarity and maintainability, it's best to remove this dead code.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemini-code-assist
review again

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for asking me to review again, @apalan60. I've re-evaluated the createTopics method in core/src/test/java/kafka/server/BrokerRegistrationRequestTest.java. My previous assessment still stands: this method is not called anywhere within the test class and appears to be dead code. Removing it would improve code clarity and maintainability.

To address this, I suggest removing the entire method.

Comment on lines +232 to +214
return ControllerInformation.apply(
Option.apply(node().orElse(null)),
listenerName(),
securityProtocol(),
saslMechanism()
);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The conversion from java.util.Optional to scala.Option using Option.apply(node().orElse(null)) is functional but not very idiomatic. A cleaner, more maintainable approach is to use the dedicated converter scala.jdk.javaapi.OptionConverters.toScala(). This makes the intent clearer and is the standard way for such conversions in modern Scala/Java interop code. You'll likely need to add an import for scala.jdk.javaapi.OptionConverters.

Suggested change
return ControllerInformation.apply(
Option.apply(node().orElse(null)),
listenerName(),
securityProtocol(),
saslMechanism()
);
return ControllerInformation.apply(
scala.jdk.javaapi.OptionConverters.toScala(node()),
listenerName(),
securityProtocol(),
saslMechanism()
);

@apalan60

Copy link
Copy Markdown
Owner Author

/review
auto_approve
--pr_reviewer.extra_instructions="
In the possible issues section, emphasize the following:

  • Is the code logic efficient?

"
--pr_reviewer.inline_code_comments=true
-i
--pr_reviewer.require_score_review=true
--pr_reviewer.require_can_be_split_review=true
--pr_reviewer.require_soc2_ticket=true
--pr_reviewer.num_code_suggestions="5"

@qodo-code-review

Copy link
Copy Markdown

Manual-approval option for PR-Agent is disabled. You can enable it via a configuration file

@apalan60

Copy link
Copy Markdown
Owner Author

/improve
--pr_code_suggestions.extra_instructions="
Emphasize the following:

  • Does the code logic cover relevant edge cases?
  • Is the code logic clear and easy to understand?
  • Is the code logic efficient?

"
--pr_code_suggestions.num_code_suggestions_per_chunk="4"
--pr_code_suggestions.commitable_code_suggestions=true
--pr_code_suggestions.suggestions_score_threshold="0"

Comment on lines +81 to +95
public <T extends AbstractRequest, R extends AbstractResponse> R sendAndReceive(
NodeToControllerChannelManager channelManager,
AbstractRequest.Builder<T> reqBuilder,
int timeoutMs
) throws Exception {
var responseFuture = new CompletableFuture<R>();
channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() {
@Override
public void onTimeout() {
responseFuture.completeExceptionally(new TimeoutException());
}

@Override
public void onComplete(ClientResponse response) {
responseFuture.complete((R) response.responseBody());
}
});
return responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Validate that a response was actually received and is non-null before completing the future to avoid NPEs or ClassCast issues. Also propagate server-side errors by checking response.wasDisconnected() and response.responseBody() presence to fail fast on network errors. [possible issue, importance: 6]

Suggested change
public <T extends AbstractRequest, R extends AbstractResponse> R sendAndReceive(
NodeToControllerChannelManager channelManager,
AbstractRequest.Builder<T> reqBuilder,
int timeoutMs
) throws Exception {
var responseFuture = new CompletableFuture<R>();
channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() {
@Override
public void onTimeout() {
responseFuture.completeExceptionally(new TimeoutException());
}
@Override
public void onComplete(ClientResponse response) {
responseFuture.complete((R) response.responseBody());
}
});
return responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
}
public <T extends AbstractRequest, R extends AbstractResponse> R sendAndReceive(
NodeToControllerChannelManager channelManager,
AbstractRequest.Builder<T> reqBuilder,
int timeoutMs
) throws Exception {
var responseFuture = new CompletableFuture<R>();
channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() {
@Override
public void onTimeout() {
responseFuture.completeExceptionally(new TimeoutException());
}
@Override
public void onComplete(ClientResponse response) {
if (response == null || response.wasDisconnected() || response.responseBody() == null) {
responseFuture.completeExceptionally(new IllegalStateException("No valid response from controller"));
return;
}
@SuppressWarnings("unchecked")
R body = (R) response.responseBody();
responseFuture.complete(body);
}
});
return responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
}

Comment on lines +167 to +180
@ClusterTest(types = {Type.KRAFT}, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3)
public void testRegisterZkWith33Controller(ClusterInstance clusterInstance) throws Exception {
// Verify that a controller running an old metadata.version cannot register a ZK broker
var clusterId = clusterInstance.clusterId();
var channelManager = brokerToControllerChannelManager(clusterInstance);
try {
channelManager.start();
// Invalid registration (isMigratingZkBroker, but MV does not support migrations)
Assertions.assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, 1L,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL))
);

// No features (MV) sent with registration, controller can't verify
Assertions.assertEquals(
Errors.INVALID_REGISTRATION,
registerBroker(channelManager, clusterId, 100, null, null)
);

// Given MV is too high for controller to support
Assertions.assertEquals(
Errors.UNSUPPORTED_VERSION,
registerBroker(channelManager, clusterId, 100, null,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL))
);

// Controller supports this MV and isMigratingZkBroker is false, so this one works
Assertions.assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, null,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL))
);
} finally {
channelManager.shutdown();
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The test name and comment say a ZK broker should not register with a 3.3 controller, but assertions send zkEpoch=null for some cases, not modeling a migrating ZK broker. For correctness, keep zkEpoch non-null for the negative cases to truly simulate ZK migration attempts. [general, importance: 7]

Suggested change
@ClusterTest(types = {Type.KRAFT}, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3)
public void testRegisterZkWith33Controller(ClusterInstance clusterInstance) throws Exception {
// Verify that a controller running an old metadata.version cannot register a ZK broker
var clusterId = clusterInstance.clusterId();
var channelManager = brokerToControllerChannelManager(clusterInstance);
try {
channelManager.start();
// Invalid registration (isMigratingZkBroker, but MV does not support migrations)
Assertions.assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, 1L,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL))
);
// No features (MV) sent with registration, controller can't verify
Assertions.assertEquals(
Errors.INVALID_REGISTRATION,
registerBroker(channelManager, clusterId, 100, null, null)
);
// Given MV is too high for controller to support
Assertions.assertEquals(
Errors.UNSUPPORTED_VERSION,
registerBroker(channelManager, clusterId, 100, null,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL))
);
// Controller supports this MV and isMigratingZkBroker is false, so this one works
Assertions.assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, null,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL))
);
} finally {
channelManager.shutdown();
}
}
@ClusterTest(types = {Type.KRAFT}, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3)
public void testRegisterZkWith33Controller(ClusterInstance clusterInstance) throws Exception {
var clusterId = clusterInstance.clusterId();
var channelManager = brokerToControllerChannelManager(clusterInstance);
try {
channelManager.start();
// Invalid registration (isMigratingZkBroker, but MV does not support migrations)
Assertions.assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, 1L,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL))
);
// No features (MV) sent with registration, controller can't verify (still migrating ZK)
Assertions.assertEquals(
Errors.INVALID_REGISTRATION,
registerBroker(channelManager, clusterId, 100, 1L, null)
);
// Given MV is too high for controller to support (still migrating ZK)
Assertions.assertEquals(
Errors.UNSUPPORTED_VERSION,
registerBroker(channelManager, clusterId, 100, 1L,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL))
);
// Controller supports this MV and isMigratingZkBroker is false, so this one works
Assertions.assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, null,
new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL))
);
} finally {
channelManager.shutdown();
}
}

@apalan60

Copy link
Copy Markdown
Owner Author

@gemini-code-assist review again

@apalan60

Copy link
Copy Markdown
Owner Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a false positive in BrokerRegistrationRequestTest by migrating the test from Scala to Java and refining the test logic. The previous implementation had flawed assertions that didn't correctly distinguish between different broker registration failure scenarios. The new Java test provides clearer, more accurate test cases for both migrating and non-migrating brokers with specific error code assertions, which is a solid improvement. The overall code quality is high, and I have one suggestion to enhance type safety.

Comment on lines +138 to +143
var resp = sendAndReceive(
channelManager,
new BrokerRegistrationRequest.Builder(req),
30000
);
return Errors.forCode(((BrokerRegistrationResponse) resp).data().errorCode());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to sendAndReceive can be made more type-safe by providing explicit type arguments. This avoids the need for a cast on the response object, improving readability and compile-time safety.

Suggested change
var resp = sendAndReceive(
channelManager,
new BrokerRegistrationRequest.Builder(req),
30000
);
return Errors.forCode(((BrokerRegistrationResponse) resp).data().errorCode());
BrokerRegistrationResponse resp = this.<BrokerRegistrationRequest, BrokerRegistrationResponse>sendAndReceive(
channelManager,
new BrokerRegistrationRequest.Builder(req),
30000
);
return Errors.forCode(resp.data().errorCode());

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request successfully addresses a false positive in BrokerRegistrationRequestTest by migrating it from Scala to Java and refining the assertions to check for more specific error codes. The new test is more accurate and robust. The changes also include necessary updates to metadata version constants and cleanup of the old Scala file. The overall implementation is solid, but I have one suggestion to enhance code readability in the new test file.

Comment on lines +121 to +136
var req = new BrokerRegistrationRequestData()
.setBrokerId(brokerId)
.setLogDirs(List.of(Uuid.randomUuid()))
.setClusterId(clusterId)
.setIncarnationId(Uuid.randomUuid())
.setIsMigratingZkBroker(zkEpoch != null)
.setFeatures(features)
.setListeners(new BrokerRegistrationRequestData.ListenerCollection(
List.of(
new BrokerRegistrationRequestData.Listener()
.setName("EXTERNAL")
.setHost("example.com")
.setPort(8082)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
).iterator()
));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The construction of BrokerRegistrationRequestData with the inline ListenerCollection is a bit dense and harms readability due to deep nesting. Extracting the Listener creation into a local variable would make the code cleaner and easier to follow.

        var listener = new BrokerRegistrationRequestData.Listener()
                .setName("EXTERNAL")
                .setHost("example.com")
                .setPort(8082)
                .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id);
        var req = new BrokerRegistrationRequestData()
            .setBrokerId(brokerId)
            .setLogDirs(List.of(Uuid.randomUuid()))
            .setClusterId(clusterId)
            .setIncarnationId(Uuid.randomUuid())
            .setIsMigratingZkBroker(zkEpoch != null)
            .setFeatures(features)
            .setListeners(new BrokerRegistrationRequestData.ListenerCollection(List.of(listener).iterator()));

@apalan60

Copy link
Copy Markdown
Owner Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a false positive in BrokerRegistrationRequestTest by converting the test from Scala to Java and correcting the assertions. The previous test had incorrect logic for certain scenarios, which is now fixed. The changes also include adding IDE files to .gitignore and updating metadata version constants. The new Java test is well-structured. I've provided a couple of minor suggestions to improve code style and readability.

public record FeatureLevel(short min, short max) { }

private record TestControllerNodeProvider(SocketServer controllerSocketServer,
ClusterInstance clusterInstance) implements ControllerNodeProvider {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better code formatting and readability, the parameters of the record should be aligned. The ClusterInstance parameter should be aligned with SocketServer on the previous line.

Suggested change
ClusterInstance clusterInstance) implements ControllerNodeProvider {
ClusterInstance clusterInstance) implements ControllerNodeProvider {

@apalan60 apalan60 force-pushed the Kafka-19384 branch 2 times, most recently from 3380690 to a7b82db Compare August 11, 2025 17:44
Comment on lines +155 to +163
// No features (MV) sent with registration, controller can't verify
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, Some(1), None))
Errors.INVALID_REGISTRATION,
registerBroker(channelManager, clusterId, 100, None, None))

// Given MV is too high for controller to support
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0.featureLevel, MetadataVersion.IBP_3_4_IV0.featureLevel))))
Errors.UNSUPPORTED_VERSION,
registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0.featureLevel, MetadataVersion.IBP_3_4_IV0.featureLevel))))

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, passing Some(1) as the zkEpoch parameter caused zkEpoch.isDefined to always be true, which led to a BrokerIdNotRegisteredException being thrown.
As a result, the intended test scenario was never actually tested.

@apalan60

Copy link
Copy Markdown
Owner Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new feature for KRaft controllers to automatically join the metadata quorum, which is a valuable addition for dynamic cluster management. It also addresses a false-positive test case by replacing an existing Scala test with a more reliable Java integration test for broker registration. Additionally, the PR includes several nice refactorings, such as converting a Scala test to Java and improving logging for auto topic creation, which enhances maintainability and debuggability. Overall, these are solid improvements to the codebase. I have one suggestion regarding the test setup for the new auto-join feature.

props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");

// do not include auto join config in broker nodes
if (brokerNode != null) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The controller.quorum.auto.join.enable configuration is valid for combined broker/controller nodes, but this logic removes it for any node with a broker role. This could prevent tests for combined nodes from correctly testing this feature.

This condition should be more specific to only remove the configuration for broker-only nodes.

Suggested change
if (brokerNode != null) {
if (brokerNode != null && controllerNode == null) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant