Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.server.config.ServerLogConfigs
Expand All @@ -45,7 +44,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
private val topicNameWithCustomConfigs = "foo2"
private var adminClient: Admin = _
private val mockTime: Time = new MockTime(1)
private var version = RecordBatch.MAGIC_VALUE_V2
private val dataFolder = Seq(tempDir().getAbsolutePath, tempDir().getAbsolutePath)

@BeforeEach
Expand Down Expand Up @@ -73,20 +71,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp())
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testListVersion0(quorum: String): Unit = {
// create records for version 0
createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0)
produceMessagesInSeparateBatch()

// update version to version 1 to list offset for max timestamp
createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)
// the offset of max timestamp is always -1 if the batch version is 0
verifyListOffsets(expectedMaxTimestampOffset = -1)
}


@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
Expand Down Expand Up @@ -129,38 +113,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
}

// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = {
createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)
produceMessagesInOneBatch()
verifyListOffsets()

// test LogAppendTime case
setUpForLogAppendTimeCase()
produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
// So in this one batch test, it'll be the first offset 0
verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
}

// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = {
createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1)
produceMessagesInSeparateBatch()
verifyListOffsets()

// test LogAppendTime case
setUpForLogAppendTimeCase()
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
// In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time
// for each batch, So it'll be the last offset 2
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
Expand Down Expand Up @@ -201,15 +153,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
createTopicWithConfig(topicNameWithCustomConfigs, props)
}

private def createMessageFormatBrokers(recordVersion: Byte): Unit = {
version = recordVersion
recreateBrokers(reconfigure = true, startup = true)
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
adminClient = Admin.create(Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
).asJava)
}

private def createTopicWithConfig(topic: String, props: Properties): Unit = {
createTopic(topic, 1, 1.toShort, topicConfig = props)
}
Expand All @@ -224,12 +167,9 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {

val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic)
assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
if (version >= RecordBatch.MAGIC_VALUE_V2)
// the epoch is related to the returned offset.
// Hence, it should be zero (the earliest leader epoch), regardless of new leader election
assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch())
else
assertEquals(Optional.empty(), maxTimestampOffset.leaderEpoch())
// the epoch is related to the returned offset.
// Hence, it should be zero (the earliest leader epoch), regardless of new leader election
assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch())
}

// case 0: test the offsets from leader's append path
Expand Down Expand Up @@ -336,15 +276,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(2, zkConnectOrNull).zipWithIndex.map{ case (props, index) =>
if (version == RecordBatch.MAGIC_VALUE_V0) {
props.setProperty("log.message.format.version", "0.9.0")
props.setProperty("inter.broker.protocol.version", "0.9.0")
}
if (version == RecordBatch.MAGIC_VALUE_V1) {
props.setProperty("log.message.format.version", "0.10.0")
props.setProperty("inter.broker.protocol.version", "0.10.0")
}
TestUtils.createBrokerConfigs(2, null).zipWithIndex.map{ case (props, index) =>
// We use mock timer so the records can get removed if the test env is too busy to complete
// tests before kafka-log-retention. Hence, we disable the retention to avoid failed tests
props.setProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "-1")
Expand Down
25 changes: 1 addition & 24 deletions core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
overridingProps.put(JmxReporter.EXCLUDE_CONFIG, s"$requiredKafkaServerPrefix=ClusterId")

def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, enableControlledShutdown = false).
TestUtils.createBrokerConfigs(numNodes, null, enableControlledShutdown = false).
map(KafkaConfig.fromProps(_, overridingProps))

val nMessages = 2
Expand Down Expand Up @@ -214,29 +214,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertTrue(TestUtils.meterCount(bytesOut) > initialBytesOut)
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testZkControllerMetrics(quorum: String): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics

assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=TopicsToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ReplicasToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveBrokerCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=FencedBrokerCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ZkMigrationState"), 1)

val zkStateMetricName = metrics.keySet.asScala.filter(_.getMBeanName == "kafka.controller:type=KafkaController,name=ZkMigrationState").head
val zkStateGauge = metrics.get(zkStateMetricName).asInstanceOf[Gauge[Int]]
assertEquals(ZkMigrationState.ZK.value().intValue(), zkStateGauge.value())
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testKRaftControllerMetrics(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.server

import kafka.utils._
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData
Expand Down Expand Up @@ -103,36 +102,6 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateTopicExists("partial-none")
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {
// When using ZooKeeper, we don't expect a request to ever complete within 1ms.
// A timeout of 1 ms allows us to test the purgatory timeout logic.
//
// Note: we do not test KRaft here because its behavior is different. Server-side
// timeouts are much less likely to happen with KRaft since the operation is much
// faster. Additionally, if a server side timeout does happen, the operation is
// usually not performed.
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout", numPartitions = 10, replicationFactor = 3)), timeout = 1),
Map("error-timeout" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout-zero", numPartitions = 10, replicationFactor = 3)), timeout = 0),
Map("error-timeout-zero" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// Negative timeouts are treated the same as 0
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout-negative", numPartitions = 10, replicationFactor = 3)), timeout = -1),
Map("error-timeout-negative" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// The topics should still get created eventually
TestUtils.waitForPartitionMetadata(servers, "error-timeout", 0)
TestUtils.waitForPartitionMetadata(servers, "error-timeout-zero", 0)
TestUtils.waitForPartitionMetadata(servers, "error-timeout-negative", 0)
validateTopicExists("error-timeout")
validateTopicExists("error-timeout-zero")
validateTopicExists("error-timeout-negative")
}


@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidCreateTopicsRequests(quorum: String): Unit = {
Expand All @@ -149,21 +118,8 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "zkMigration"))
def testNotController(quorum: String): Unit = {
// Note: we don't run this test when in KRaft mode, because KRaft doesn't have this
// behavior of returning NOT_CONTROLLER. Instead, the request is forwarded.
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
val error = if (isZkMigrationTest()) Errors.NONE else Errors.NOT_CONTROLLER
assertEquals(1, response.errorCounts().get(error))
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("kraft"))
def testCreateTopicsRequestVersions(quorum: String): Unit = {
// Note: we don't run this test when in KRaft mode, because kraft does not yet support returning topic
// configs from CreateTopics.
for (version <- ApiKeys.CREATE_TOPICS.oldestVersion to ApiKeys.CREATE_TOPICS.latestVersion) {
val topic = s"topic_$version"
val data = new CreateTopicsRequestData()
Expand All @@ -175,7 +131,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
).asJava.iterator()))

val request = new CreateTopicsRequest.Builder(data).build(version.asInstanceOf[Short])
val response = sendCreateTopicRequest(request)
val response = sendCreateTopicRequest(request, adminSocketServer)

val topicResponse = response.data.topics.find(topic)
assertNotNull(topicResponse)
Expand Down
Loading