-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Module
Core
Testcontainers version
1.17.6
Using the latest Testcontainers version?
Yes
Host OS
Linux (Ubuntu 22.04.1 LTS)
Host Arch
x86
Docker version
20.10.22What happened?
Cannot connect a Kafka Connect TestContainer instance to the RedpandaContainer.
It seems that this is due to the unfortunate nature of advertised addresses/listeners in the Kafka architecture.
Quarkus RedPandaKafkaContainer solved this by not having a static advertised addresses/listeners
See https://github.com/quarkusio/quarkus/blob/e47a267e13819cf5ebd4322ad2b2c88b06209a34/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/RedPandaKafkaContainer.java#L86
vs.
https://github.com/testcontainers/testcontainers-java/blob/0bb6925fb7d420/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java#L55
My setup:
class KafkaConnectTest {
companion object {
private val log = LoggerFactory.getLogger(KafkaConnectTest::class.java)
val kafka = RedpandaContainer(DockerImageName.parse("docker.redpanda.com/vectorized/redpanda:v22.3.11")).apply {
withNetwork(Network.SHARED)
withNetworkAliases("kafka")
}
val kafkaConnect = GenericContainer(DockerImageName.parse("confluentinc/cp-kafka-connect:7.3.0")).apply {
withNetwork(Network.SHARED)
dependsOn(kafka)
withNetworkAliases("connect")
withEnv("CONNECT_GROUP_ID", "test")
withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs-test")
withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets-test")
withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status-test")
withEnv("CONNECT_LISTENERS", "http://0.0.0.0:8083")
withEnv("CONNECT_REST_HOST_NAME", "0.0.0.0")
withEnv("CONNECT_REST_PORT", "8083")
withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect")
withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka:29092")
withExposedPorts(8083)
}
@JvmStatic
@BeforeAll
fun setup() {
Stream.of( kafka, kafkaConnect ).parallel().forEach { it.start() }
val logConsumer = Slf4jLogConsumer(log)
kafkaConnect.followOutput(logConsumer)
}
}
@Test
fun `verify Kafka message publish`() {
println("kafka = $kafka")
println("localStack = $localStack")
println("kafkaConnect: $kafkaConnect")
println("bootstrapServers: $bootstrapServers")
Thread.sleep(20 * 1000)
}
}Relevant log output
2023-01-06 19:10:57 INFO KafkaConnectTest - STDOUT: ===> User
2023-01-06 19:10:57 INFO KafkaConnectTest - STDOUT: uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
2023-01-06 19:10:57 INFO KafkaConnectTest - STDOUT: ===> Configuring ...
2023-01-06 19:10:57 INFO KafkaConnectTest - STDOUT: ===> Running preflight checks ...
2023-01-06 19:10:57 INFO KafkaConnectTest - STDOUT: ===> Check if Kafka is healthy ...
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: Error while getting broker list.
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...
2023-01-06 19:10:57 INFO KafkaConnectTest - STDERR: Expected 1 brokers but found only 0. Brokers found [].
2023-01-06 19:10:57 INFO KafkaConnectTest - STDOUT: Using log4j config /etc/cp-base-new/log4j.propertiesAdditional Information
Indeed, after overriding the command from
command += "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092)to
command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092)it works in my desired setup (should be dynamic of course)