KAFKA-19384: The passing of BrokerRegistrationRequestTest is a false positive#5
KAFKA-19384: The passing of BrokerRegistrationRequestTest is a false positive#5apalan60 wants to merge 12 commits into
Conversation
There was a problem hiding this comment.
Summary of Changes
Hello @apalan60, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request addresses a false positive in the BrokerRegistrationRequestTest by migrating the test from Scala to Java and correcting a critical assertion. The original Scala test incorrectly expected BROKER_ID_NOT_REGISTERED when INVALID_REGISTRATION was the correct error for certain broker registration scenarios, leading to a misleading test pass. The new Java test accurately validates broker registration behavior under different metadata versions, ensuring the test suite correctly identifies issues.
Highlights
- Test Migration: The
BrokerRegistrationRequestTesthas been migrated from Scala to Java, ensuring consistency and maintainability within the test suite. - Bug Fix: False Positive: The expected error code in
testRegisterZkWith33Controllerwas corrected fromBROKER_ID_NOT_REGISTEREDtoINVALID_REGISTRATIONfor scenarios where no metadata version features are sent during broker registration. This resolves a false positive in the test, ensuring accurate validation of broker registration behavior. - Metadata Version Updates: New metadata version constants, specifically
IBP_3_3_IV3_FEATURE_LEVELandIBP_3_4_IV0_FEATURE_LEVEL, were added toMetadataVersionTestUtils.javato support updated test scenarios and future compatibility.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Code Review
This pull request successfully migrates the BrokerRegistrationRequestTest from Scala to Java and, more importantly, corrects several false-positive assertions. The expected error codes in various broker registration scenarios are now accurate, which improves the reliability of the test. The overall implementation is clean. I have a couple of suggestions for the new Java test file to further improve maintainability by removing an unused method and adopting a more standard approach for Java-Scala interoperability.
| public Errors createTopics(NodeToControllerChannelManager channelManager, String topicName) throws Exception { | ||
| var createTopics = new CreateTopicsRequestData(); | ||
| createTopics.setTopics(new CreateTopicsRequestData.CreatableTopicCollection()); | ||
| createTopics.topics().add( | ||
| new CreatableTopic() | ||
| .setName(topicName) | ||
| .setNumPartitions(10) | ||
| .setReplicationFactor((short) 1) | ||
| ); | ||
| createTopics.setTimeoutMs(500); | ||
|
|
||
| var req = new CreateTopicsRequest.Builder(createTopics); | ||
| var resp = sendAndReceive(channelManager, req, 3000); | ||
| var responseData = ((CreateTopicsResponse) resp).data(); | ||
| return Errors.forCode(responseData.topics().find(topicName).errorCode()); | ||
| } |
There was a problem hiding this comment.
Thank you for asking me to review again, @apalan60. I've re-evaluated the createTopics method in core/src/test/java/kafka/server/BrokerRegistrationRequestTest.java. My previous assessment still stands: this method is not called anywhere within the test class and appears to be dead code. Removing it would improve code clarity and maintainability.
To address this, I suggest removing the entire method.
| return ControllerInformation.apply( | ||
| Option.apply(node().orElse(null)), | ||
| listenerName(), | ||
| securityProtocol(), | ||
| saslMechanism() | ||
| ); |
There was a problem hiding this comment.
The conversion from java.util.Optional to scala.Option using Option.apply(node().orElse(null)) is functional but not very idiomatic. A cleaner, more maintainable approach is to use the dedicated converter scala.jdk.javaapi.OptionConverters.toScala(). This makes the intent clearer and is the standard way for such conversions in modern Scala/Java interop code. You'll likely need to add an import for scala.jdk.javaapi.OptionConverters.
| return ControllerInformation.apply( | |
| Option.apply(node().orElse(null)), | |
| listenerName(), | |
| securityProtocol(), | |
| saslMechanism() | |
| ); | |
| return ControllerInformation.apply( | |
| scala.jdk.javaapi.OptionConverters.toScala(node()), | |
| listenerName(), | |
| securityProtocol(), | |
| saslMechanism() | |
| ); |
|
/review
" |
|
Manual-approval option for PR-Agent is disabled. You can enable it via a configuration file |
|
/improve
" |
| public <T extends AbstractRequest, R extends AbstractResponse> R sendAndReceive( | ||
| NodeToControllerChannelManager channelManager, | ||
| AbstractRequest.Builder<T> reqBuilder, | ||
| int timeoutMs | ||
| ) throws Exception { | ||
| var responseFuture = new CompletableFuture<R>(); | ||
| channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() { | ||
| @Override | ||
| public void onTimeout() { | ||
| responseFuture.completeExceptionally(new TimeoutException()); | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete(ClientResponse response) { | ||
| responseFuture.complete((R) response.responseBody()); | ||
| } | ||
| }); | ||
| return responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS); | ||
| } |
There was a problem hiding this comment.
Suggestion: Validate that a response was actually received and is non-null before completing the future to avoid NPEs or ClassCast issues. Also propagate server-side errors by checking response.wasDisconnected() and response.responseBody() presence to fail fast on network errors. [possible issue, importance: 6]
| public <T extends AbstractRequest, R extends AbstractResponse> R sendAndReceive( | |
| NodeToControllerChannelManager channelManager, | |
| AbstractRequest.Builder<T> reqBuilder, | |
| int timeoutMs | |
| ) throws Exception { | |
| var responseFuture = new CompletableFuture<R>(); | |
| channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() { | |
| @Override | |
| public void onTimeout() { | |
| responseFuture.completeExceptionally(new TimeoutException()); | |
| } | |
| @Override | |
| public void onComplete(ClientResponse response) { | |
| responseFuture.complete((R) response.responseBody()); | |
| } | |
| }); | |
| return responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS); | |
| } | |
| public <T extends AbstractRequest, R extends AbstractResponse> R sendAndReceive( | |
| NodeToControllerChannelManager channelManager, | |
| AbstractRequest.Builder<T> reqBuilder, | |
| int timeoutMs | |
| ) throws Exception { | |
| var responseFuture = new CompletableFuture<R>(); | |
| channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() { | |
| @Override | |
| public void onTimeout() { | |
| responseFuture.completeExceptionally(new TimeoutException()); | |
| } | |
| @Override | |
| public void onComplete(ClientResponse response) { | |
| if (response == null || response.wasDisconnected() || response.responseBody() == null) { | |
| responseFuture.completeExceptionally(new IllegalStateException("No valid response from controller")); | |
| return; | |
| } | |
| @SuppressWarnings("unchecked") | |
| R body = (R) response.responseBody(); | |
| responseFuture.complete(body); | |
| } | |
| }); | |
| return responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS); | |
| } |
| @ClusterTest(types = {Type.KRAFT}, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3) | ||
| public void testRegisterZkWith33Controller(ClusterInstance clusterInstance) throws Exception { | ||
| // Verify that a controller running an old metadata.version cannot register a ZK broker | ||
| var clusterId = clusterInstance.clusterId(); | ||
| var channelManager = brokerToControllerChannelManager(clusterInstance); | ||
| try { | ||
| channelManager.start(); | ||
| // Invalid registration (isMigratingZkBroker, but MV does not support migrations) | ||
| Assertions.assertEquals( | ||
| Errors.BROKER_ID_NOT_REGISTERED, | ||
| registerBroker(channelManager, clusterId, 100, 1L, | ||
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL)) | ||
| ); | ||
|
|
||
| // No features (MV) sent with registration, controller can't verify | ||
| Assertions.assertEquals( | ||
| Errors.INVALID_REGISTRATION, | ||
| registerBroker(channelManager, clusterId, 100, null, null) | ||
| ); | ||
|
|
||
| // Given MV is too high for controller to support | ||
| Assertions.assertEquals( | ||
| Errors.UNSUPPORTED_VERSION, | ||
| registerBroker(channelManager, clusterId, 100, null, | ||
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL)) | ||
| ); | ||
|
|
||
| // Controller supports this MV and isMigratingZkBroker is false, so this one works | ||
| Assertions.assertEquals( | ||
| Errors.NONE, | ||
| registerBroker(channelManager, clusterId, 100, null, | ||
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL)) | ||
| ); | ||
| } finally { | ||
| channelManager.shutdown(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Suggestion: The test name and comment say a ZK broker should not register with a 3.3 controller, but assertions send zkEpoch=null for some cases, not modeling a migrating ZK broker. For correctness, keep zkEpoch non-null for the negative cases to truly simulate ZK migration attempts. [general, importance: 7]
| @ClusterTest(types = {Type.KRAFT}, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3) | |
| public void testRegisterZkWith33Controller(ClusterInstance clusterInstance) throws Exception { | |
| // Verify that a controller running an old metadata.version cannot register a ZK broker | |
| var clusterId = clusterInstance.clusterId(); | |
| var channelManager = brokerToControllerChannelManager(clusterInstance); | |
| try { | |
| channelManager.start(); | |
| // Invalid registration (isMigratingZkBroker, but MV does not support migrations) | |
| Assertions.assertEquals( | |
| Errors.BROKER_ID_NOT_REGISTERED, | |
| registerBroker(channelManager, clusterId, 100, 1L, | |
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL)) | |
| ); | |
| // No features (MV) sent with registration, controller can't verify | |
| Assertions.assertEquals( | |
| Errors.INVALID_REGISTRATION, | |
| registerBroker(channelManager, clusterId, 100, null, null) | |
| ); | |
| // Given MV is too high for controller to support | |
| Assertions.assertEquals( | |
| Errors.UNSUPPORTED_VERSION, | |
| registerBroker(channelManager, clusterId, 100, null, | |
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL)) | |
| ); | |
| // Controller supports this MV and isMigratingZkBroker is false, so this one works | |
| Assertions.assertEquals( | |
| Errors.NONE, | |
| registerBroker(channelManager, clusterId, 100, null, | |
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL)) | |
| ); | |
| } finally { | |
| channelManager.shutdown(); | |
| } | |
| } | |
| @ClusterTest(types = {Type.KRAFT}, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3) | |
| public void testRegisterZkWith33Controller(ClusterInstance clusterInstance) throws Exception { | |
| var clusterId = clusterInstance.clusterId(); | |
| var channelManager = brokerToControllerChannelManager(clusterInstance); | |
| try { | |
| channelManager.start(); | |
| // Invalid registration (isMigratingZkBroker, but MV does not support migrations) | |
| Assertions.assertEquals( | |
| Errors.BROKER_ID_NOT_REGISTERED, | |
| registerBroker(channelManager, clusterId, 100, 1L, | |
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL)) | |
| ); | |
| // No features (MV) sent with registration, controller can't verify (still migrating ZK) | |
| Assertions.assertEquals( | |
| Errors.INVALID_REGISTRATION, | |
| registerBroker(channelManager, clusterId, 100, 1L, null) | |
| ); | |
| // Given MV is too high for controller to support (still migrating ZK) | |
| Assertions.assertEquals( | |
| Errors.UNSUPPORTED_VERSION, | |
| registerBroker(channelManager, clusterId, 100, 1L, | |
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL)) | |
| ); | |
| // Controller supports this MV and isMigratingZkBroker is false, so this one works | |
| Assertions.assertEquals( | |
| Errors.NONE, | |
| registerBroker(channelManager, clusterId, 100, null, | |
| new FeatureLevel(MetadataVersionTestUtils.IBP_3_3_IV3_FEATURE_LEVEL, MetadataVersionTestUtils.IBP_3_4_IV0_FEATURE_LEVEL)) | |
| ); | |
| } finally { | |
| channelManager.shutdown(); | |
| } | |
| } |
|
@gemini-code-assist review again |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request addresses a false positive in BrokerRegistrationRequestTest by migrating the test from Scala to Java and refining the test logic. The previous implementation had flawed assertions that didn't correctly distinguish between different broker registration failure scenarios. The new Java test provides clearer, more accurate test cases for both migrating and non-migrating brokers with specific error code assertions, which is a solid improvement. The overall code quality is high, and I have one suggestion to enhance type safety.
| var resp = sendAndReceive( | ||
| channelManager, | ||
| new BrokerRegistrationRequest.Builder(req), | ||
| 30000 | ||
| ); | ||
| return Errors.forCode(((BrokerRegistrationResponse) resp).data().errorCode()); |
There was a problem hiding this comment.
The call to sendAndReceive can be made more type-safe by providing explicit type arguments. This avoids the need for a cast on the response object, improving readability and compile-time safety.
| var resp = sendAndReceive( | |
| channelManager, | |
| new BrokerRegistrationRequest.Builder(req), | |
| 30000 | |
| ); | |
| return Errors.forCode(((BrokerRegistrationResponse) resp).data().errorCode()); | |
| BrokerRegistrationResponse resp = this.<BrokerRegistrationRequest, BrokerRegistrationResponse>sendAndReceive( | |
| channelManager, | |
| new BrokerRegistrationRequest.Builder(req), | |
| 30000 | |
| ); | |
| return Errors.forCode(resp.data().errorCode()); |
There was a problem hiding this comment.
Code Review
This pull request successfully addresses a false positive in BrokerRegistrationRequestTest by migrating it from Scala to Java and refining the assertions to check for more specific error codes. The new test is more accurate and robust. The changes also include necessary updates to metadata version constants and cleanup of the old Scala file. The overall implementation is solid, but I have one suggestion to enhance code readability in the new test file.
| var req = new BrokerRegistrationRequestData() | ||
| .setBrokerId(brokerId) | ||
| .setLogDirs(List.of(Uuid.randomUuid())) | ||
| .setClusterId(clusterId) | ||
| .setIncarnationId(Uuid.randomUuid()) | ||
| .setIsMigratingZkBroker(zkEpoch != null) | ||
| .setFeatures(features) | ||
| .setListeners(new BrokerRegistrationRequestData.ListenerCollection( | ||
| List.of( | ||
| new BrokerRegistrationRequestData.Listener() | ||
| .setName("EXTERNAL") | ||
| .setHost("example.com") | ||
| .setPort(8082) | ||
| .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) | ||
| ).iterator() | ||
| )); |
There was a problem hiding this comment.
The construction of BrokerRegistrationRequestData with the inline ListenerCollection is a bit dense and harms readability due to deep nesting. Extracting the Listener creation into a local variable would make the code cleaner and easier to follow.
var listener = new BrokerRegistrationRequestData.Listener()
.setName("EXTERNAL")
.setHost("example.com")
.setPort(8082)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id);
var req = new BrokerRegistrationRequestData()
.setBrokerId(brokerId)
.setLogDirs(List.of(Uuid.randomUuid()))
.setClusterId(clusterId)
.setIncarnationId(Uuid.randomUuid())
.setIsMigratingZkBroker(zkEpoch != null)
.setFeatures(features)
.setListeners(new BrokerRegistrationRequestData.ListenerCollection(List.of(listener).iterator()));|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request addresses a false positive in BrokerRegistrationRequestTest by converting the test from Scala to Java and correcting the assertions. The previous test had incorrect logic for certain scenarios, which is now fixed. The changes also include adding IDE files to .gitignore and updating metadata version constants. The new Java test is well-structured. I've provided a couple of minor suggestions to improve code style and readability.
| public record FeatureLevel(short min, short max) { } | ||
|
|
||
| private record TestControllerNodeProvider(SocketServer controllerSocketServer, | ||
| ClusterInstance clusterInstance) implements ControllerNodeProvider { |
There was a problem hiding this comment.
For better code formatting and readability, the parameters of the record should be aligned. The ClusterInstance parameter should be aligned with SocketServer on the previous line.
| ClusterInstance clusterInstance) implements ControllerNodeProvider { | |
| ClusterInstance clusterInstance) implements ControllerNodeProvider { |
3380690 to
a7b82db
Compare
| // No features (MV) sent with registration, controller can't verify | ||
| assertEquals( | ||
| Errors.BROKER_ID_NOT_REGISTERED, | ||
| registerBroker(channelManager, clusterId, 100, Some(1), None)) | ||
| Errors.INVALID_REGISTRATION, | ||
| registerBroker(channelManager, clusterId, 100, None, None)) | ||
|
|
||
| // Given MV is too high for controller to support | ||
| assertEquals( | ||
| Errors.BROKER_ID_NOT_REGISTERED, | ||
| registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0.featureLevel, MetadataVersion.IBP_3_4_IV0.featureLevel)))) | ||
| Errors.UNSUPPORTED_VERSION, | ||
| registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0.featureLevel, MetadataVersion.IBP_3_4_IV0.featureLevel)))) |
There was a problem hiding this comment.
Previously, passing Some(1) as the zkEpoch parameter caused zkEpoch.isDefined to always be true, which led to a BrokerIdNotRegisteredException being thrown.
As a result, the intended test scenario was never actually tested.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new feature for KRaft controllers to automatically join the metadata quorum, which is a valuable addition for dynamic cluster management. It also addresses a false-positive test case by replacing an existing Scala test with a more reliable Java integration test for broker registration. Additionally, the PR includes several nice refactorings, such as converting a Scala test to Java and improving logging for auto topic creation, which enhances maintainability and debuggability. Overall, these are solid improvements to the codebase. I have one suggestion regarding the test setup for the new auto-join feature.
| props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); | ||
|
|
||
| // do not include auto join config in broker nodes | ||
| if (brokerNode != null) { |
There was a problem hiding this comment.
The controller.quorum.auto.join.enable configuration is valid for combined broker/controller nodes, but this logic removes it for any node with a broker role. This could prevent tests for combined nodes from correctly testing this feature.
This condition should be more specific to only remove the configuration for broker-only nodes.
| if (brokerNode != null) { | |
| if (brokerNode != null && controllerNode == null) { |
…gistrationRequestTest to avoid thread leaks
No description provided.