Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import org.apache.kafka.common.security.kerberos.KerberosLogin
import org.apache.kafka.common.utils.{LogContext, MockTime}
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -69,15 +69,15 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
serverConfig.put(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG, failedAuthenticationDelayMs.toString)
super.setUp(testInfo)
serverAddr = new InetSocketAddress("localhost",
servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))
brokers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))

clientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name)
clientConfig.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism))
clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000")

// create the test topic with all the brokers as replicas
createTopic(topic, 2, brokerCount)
createTopic(topic, 2, brokerCount, listenerName = listenerName, adminClientConfig = adminClientConfig)
}

@AfterEach
Expand All @@ -92,15 +92,16 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* Tests that Kerberos replay error `Request is a replay (34)` is not handled as an authentication exception
* since replay detection used to detect DoS attacks may occasionally reject valid concurrent requests.
*/
@Test
def testRequestIsAReplay(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testRequestIsAReplay(quorum: String): Unit = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is quorum an input parameter to this test? Seems the value would always be "kraft"? -- I also also don't see where quorum is actually used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the parameter isn't used directly, it affects the test name which some code in TestUtils.scala uses to determine if the test is kraft or not. This is standard for our kraft tests

val successfulAuthsPerThread = 10
val futures = (0 until numThreads).map(_ => executor.submit(new Runnable {
override def run(): Unit = verifyRetriableFailuresDuringAuthentication(successfulAuthsPerThread)
}))
futures.foreach(_.get(60, TimeUnit.SECONDS))
assertEquals(0, TestUtils.totalMetricValue(servers.head, "failed-authentication-total"))
val successfulAuths = TestUtils.totalMetricValue(servers.head, "successful-authentication-total")
assertEquals(0, TestUtils.totalMetricValue(brokers.head, "failed-authentication-total"))
val successfulAuths = TestUtils.totalMetricValue(brokers.head, "successful-authentication-total")
assertTrue(successfulAuths > successfulAuthsPerThread * numThreads, "Too few authentications: " + successfulAuths)
}

Expand All @@ -109,8 +110,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* are able to connect after the second re-login. Verifies that logout is performed only once
* since duplicate logouts without successful login results in NPE from Java 9 onwards.
*/
@Test
def testLoginFailure(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no input parameter to testLoginFailure() -- why change this to being a parametrized test? (Same below.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was supposed to take a parameter. I'll fix it (and the one below). See the explanation above.

def testLoginFailure(quorum: String): Unit = {
val selector = createSelectorWithRelogin()
try {
val login = TestableKerberosLogin.instance
Expand All @@ -132,8 +134,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* is performed when credentials are unavailable between logout and login, we handle it as a
* transient error and not an authentication failure so that clients may retry.
*/
@Test
def testReLogin(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also have the quorum param?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed :)

def testReLogin(quorum: String): Unit = {
val selector = createSelectorWithRelogin()
try {
val login = TestableKerberosLogin.instance
Expand Down Expand Up @@ -163,8 +166,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
* as a fatal authentication failure.
*/
@Test
def testServerNotFoundInKerberosDatabase(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testServerNotFoundInKerberosDatabase(quorum: String): Unit = {
val jaasConfig = clientConfig.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
val invalidServiceConfig = jaasConfig.replace("serviceName=\"kafka\"", "serviceName=\"invalid-service\"")
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, invalidServiceConfig)
Expand Down