Skip to content

ShareKafkaMessageListenerContainer.doStart() does not await consumer thread startup #4357

@Wordbe

Description

@Wordbe

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Since 4.0.0 (when ShareKafkaMessageListenerContainer was introduced)

Describe the bug

ShareKafkaMessageListenerContainer.doStart() does not await the startLatch before returning. The field is declared and countDown() is called in publishConsumerStartingEvent(), but startLatch.await() is never called in doStart().

The regular container awaits the latch in KafkaMessageListenerContainer.doStart():

// KafkaMessageListenerContainer.doStart(), lines 401-414
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
    this.logger.error("Consumer thread failed to start - ...");
    publishConsumerFailedToStart();
}

The share container does not:

// ShareKafkaMessageListenerContainer.doStart(), lines 207-217
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
    CompletableFuture.runAsync(consumer, consumerExecutor);
}
// returns immediately, no startLatch.await()

This means:

  • start() returns before consumer threads have subscribed, which can cause a race condition
  • consumerStartTimeout is silently ignored
  • ConsumerFailedToStartEvent is never published

To Reproduce

  1. Create a ShareKafkaMessageListenerContainer with a slow-starting executor
  2. Call container.start()
  3. Observe that start() returns in ~0ms regardless of executor delay
containerProps.setListenerTaskExecutor(task -> new Thread(() -> {
    Thread.sleep(2000); // simulate slow thread allocation
    task.run();
}).start());

long start = System.currentTimeMillis();
container.start();
long elapsed = System.currentTimeMillis() - start;
// elapsed is ~0ms, should be ~2000ms

Expected behavior

doStart() should await the startLatch with consumerStartTimeout, consistent with KafkaMessageListenerContainer. Since the share container supports concurrency, the latch should be initialized with CountDownLatch(concurrency) to wait for all consumer threads.

When the timeout expires, ConsumerFailedToStartEvent should be published.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions