-
Notifications
You must be signed in to change notification settings - Fork 15.3k
MINOR: convert GssapiAuthenticationTest to KRaft #17786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7dca456
c4a0444
fd12a04
c3a4170
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,9 +38,9 @@ import org.apache.kafka.common.security.kerberos.KerberosLogin | |
| import org.apache.kafka.common.utils.{LogContext, MockTime} | ||
| import org.apache.kafka.network.SocketServerConfigs | ||
| import org.junit.jupiter.api.Assertions._ | ||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} | ||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} | ||
| import org.junit.jupiter.params.ParameterizedTest | ||
| import org.junit.jupiter.params.provider.MethodSource | ||
| import org.junit.jupiter.params.provider.{MethodSource, ValueSource} | ||
|
|
||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
|
|
@@ -69,15 +69,15 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { | |
| serverConfig.put(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG, failedAuthenticationDelayMs.toString) | ||
| super.setUp(testInfo) | ||
| serverAddr = new InetSocketAddress("localhost", | ||
| servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT))) | ||
| brokers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT))) | ||
|
|
||
| clientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name) | ||
| clientConfig.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism) | ||
| clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism)) | ||
| clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000") | ||
|
|
||
| // create the test topic with all the brokers as replicas | ||
| createTopic(topic, 2, brokerCount) | ||
| createTopic(topic, 2, brokerCount, listenerName = listenerName, adminClientConfig = adminClientConfig) | ||
| } | ||
|
|
||
| @AfterEach | ||
|
|
@@ -92,15 +92,16 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { | |
| * Tests that Kerberos replay error `Request is a replay (34)` is not handled as an authentication exception | ||
| * since replay detection used to detect DoS attacks may occasionally reject valid concurrent requests. | ||
| */ | ||
| @Test | ||
| def testRequestIsAReplay(): Unit = { | ||
| @ParameterizedTest | ||
| @ValueSource(strings = Array("kraft")) | ||
| def testRequestIsAReplay(quorum: String): Unit = { | ||
| val successfulAuthsPerThread = 10 | ||
| val futures = (0 until numThreads).map(_ => executor.submit(new Runnable { | ||
| override def run(): Unit = verifyRetriableFailuresDuringAuthentication(successfulAuthsPerThread) | ||
| })) | ||
| futures.foreach(_.get(60, TimeUnit.SECONDS)) | ||
| assertEquals(0, TestUtils.totalMetricValue(servers.head, "failed-authentication-total")) | ||
| val successfulAuths = TestUtils.totalMetricValue(servers.head, "successful-authentication-total") | ||
| assertEquals(0, TestUtils.totalMetricValue(brokers.head, "failed-authentication-total")) | ||
| val successfulAuths = TestUtils.totalMetricValue(brokers.head, "successful-authentication-total") | ||
| assertTrue(successfulAuths > successfulAuthsPerThread * numThreads, "Too few authentications: " + successfulAuths) | ||
| } | ||
|
|
||
|
|
@@ -109,8 +110,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { | |
| * are able to connect after the second re-login. Verifies that logout is performed only once | ||
| * since duplicate logouts without successful login results in NPE from Java 9 onwards. | ||
| */ | ||
| @Test | ||
| def testLoginFailure(): Unit = { | ||
| @ParameterizedTest | ||
| @ValueSource(strings = Array("kraft")) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no input parameter to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was supposed to take a parameter. I'll fix it (and the one below). See the explanation above. |
||
| def testLoginFailure(quorum: String): Unit = { | ||
| val selector = createSelectorWithRelogin() | ||
| try { | ||
| val login = TestableKerberosLogin.instance | ||
|
|
@@ -132,8 +134,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { | |
| * is performed when credentials are unavailable between logout and login, we handle it as a | ||
| * transient error and not an authentication failure so that clients may retry. | ||
| */ | ||
| @Test | ||
| def testReLogin(): Unit = { | ||
| @ParameterizedTest | ||
| @ValueSource(strings = Array("kraft")) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also have the quorum param?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed :) |
||
| def testReLogin(quorum: String): Unit = { | ||
| val selector = createSelectorWithRelogin() | ||
| try { | ||
| val login = TestableKerberosLogin.instance | ||
|
|
@@ -163,8 +166,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { | |
| * Tests that Kerberos error `Server not found in Kerberos database (7)` is handled | ||
| * as a fatal authentication failure. | ||
| */ | ||
| @Test | ||
| def testServerNotFoundInKerberosDatabase(): Unit = { | ||
| @ParameterizedTest | ||
| @ValueSource(strings = Array("kraft")) | ||
| def testServerNotFoundInKerberosDatabase(quorum: String): Unit = { | ||
| val jaasConfig = clientConfig.getProperty(SaslConfigs.SASL_JAAS_CONFIG) | ||
| val invalidServiceConfig = jaasConfig.replace("serviceName=\"kafka\"", "serviceName=\"invalid-service\"") | ||
| clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, invalidServiceConfig) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is
quoruman input parameter to this test? Seems the value would always be"kraft"? -- I also also don't see wherequorumis actually used?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the parameter isn't used directly, it affects the test name which some code in
TestUtils.scalauses to determine if the test is kraft or not. This is standard for our kraft tests