Skip to content

[Bug]: RedPandaContainer does not allow for container to container Kafka communication #6395

@JapuDCret

Description

@JapuDCret

Module

Core

Testcontainers version

1.17.6

Using the latest Testcontainers version?

Yes

Host OS

Linux (Ubuntu 22.04.1 LTS)

Host Arch

x86

Docker version

20.10.22

What happened?

Cannot connect a Kafka Connect TestContainer instance to the RedpandaContainer.

It seems that this is due to the unfortunate nature of advertised addresses/listeners in the Kafka architecture.

Quarkus RedPandaKafkaContainer solved this by not having a static advertised addresses/listeners

See https://github.com/quarkusio/quarkus/blob/e47a267e13819cf5ebd4322ad2b2c88b06209a34/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/RedPandaKafkaContainer.java#L86
vs.
https://github.com/testcontainers/testcontainers-java/blob/0bb6925fb7d420/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java#L55

My setup:

class KafkaConnectTest {
    companion object {
        private val log = LoggerFactory.getLogger(KafkaConnectTest::class.java)

        val kafka = RedpandaContainer(DockerImageName.parse("docker.redpanda.com/vectorized/redpanda:v22.3.11")).apply {
            withNetwork(Network.SHARED)
            withNetworkAliases("kafka")
        }

        val kafkaConnect = GenericContainer(DockerImageName.parse("confluentinc/cp-kafka-connect:7.3.0")).apply {
            withNetwork(Network.SHARED)
            dependsOn(kafka)

            withNetworkAliases("connect")

            withEnv("CONNECT_GROUP_ID", "test")
            withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect-configs-test")
            withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect-offsets-test")
            withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect-status-test")
            withEnv("CONNECT_LISTENERS", "http://0.0.0.0:8083")
            withEnv("CONNECT_REST_HOST_NAME", "0.0.0.0")
            withEnv("CONNECT_REST_PORT", "8083")
            withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect")
            withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
            withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")
            withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka:29092")
            withExposedPorts(8083)
        }

        @JvmStatic
        @BeforeAll
        fun setup() {
            Stream.of( kafka, kafkaConnect ).parallel().forEach { it.start() }

            val logConsumer = Slf4jLogConsumer(log)
            kafkaConnect.followOutput(logConsumer)
        }
    }

    @Test
    fun `verify Kafka message publish`() {
        println("kafka = $kafka")
        println("localStack = $localStack")
        println("kafkaConnect: $kafkaConnect")
        println("bootstrapServers: $bootstrapServers")

        Thread.sleep(20 * 1000)
    }
}

Relevant log output

2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> User
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> Configuring ...
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> Running preflight checks ... 
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: ===> Check if Kafka is healthy ...
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Error while getting broker list.
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: 	at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDERR: Expected 1 brokers but found only 0. Brokers found [].
2023-01-06 19:10:57 INFO  KafkaConnectTest - STDOUT: Using log4j config /etc/cp-base-new/log4j.properties

Additional Information

Indeed, after overriding the command from

command += "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092)

to

command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092)

it works in my desired setup (should be dynamic of course)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions