In what version(s) of Spring for Apache Kafka are you seeing this issue?
Since 4.0.0 (when ShareKafkaMessageListenerContainer was introduced)
Describe the bug
ShareKafkaMessageListenerContainer.doStart() does not await the startLatch before returning. The field is declared and countDown() is called in publishConsumerStartingEvent(), but startLatch.await() is never called in doStart().
The regular container awaits the latch in KafkaMessageListenerContainer.doStart():
// KafkaMessageListenerContainer.doStart(), lines 401-414
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - ...");
publishConsumerFailedToStart();
}
The share container does not:
// ShareKafkaMessageListenerContainer.doStart(), lines 207-217
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
CompletableFuture.runAsync(consumer, consumerExecutor);
}
// returns immediately, no startLatch.await()
This means:
start() returns before consumer threads have subscribed, which can cause a race condition
consumerStartTimeout is silently ignored
ConsumerFailedToStartEvent is never published
To Reproduce
- Create a
ShareKafkaMessageListenerContainer with a slow-starting executor
- Call
container.start()
- Observe that
start() returns in ~0ms regardless of executor delay
containerProps.setListenerTaskExecutor(task -> new Thread(() -> {
Thread.sleep(2000); // simulate slow thread allocation
task.run();
}).start());
long start = System.currentTimeMillis();
container.start();
long elapsed = System.currentTimeMillis() - start;
// elapsed is ~0ms, should be ~2000ms
Expected behavior
doStart() should await the startLatch with consumerStartTimeout, consistent with KafkaMessageListenerContainer. Since the share container supports concurrency, the latch should be initialized with CountDownLatch(concurrency) to wait for all consumer threads.
When the timeout expires, ConsumerFailedToStartEvent should be published.
In what version(s) of Spring for Apache Kafka are you seeing this issue?
Since 4.0.0 (when
ShareKafkaMessageListenerContainerwas introduced)Describe the bug
ShareKafkaMessageListenerContainer.doStart()does not await thestartLatchbefore returning. The field is declared andcountDown()is called inpublishConsumerStartingEvent(), butstartLatch.await()is never called indoStart().The regular container awaits the latch in
KafkaMessageListenerContainer.doStart():The share container does not:
This means:
start()returns before consumer threads have subscribed, which can cause a race conditionconsumerStartTimeoutis silently ignoredConsumerFailedToStartEventis never publishedTo Reproduce
ShareKafkaMessageListenerContainerwith a slow-starting executorcontainer.start()start()returns in ~0ms regardless of executor delayExpected behavior
doStart()should await thestartLatchwithconsumerStartTimeout, consistent withKafkaMessageListenerContainer. Since the share container supports concurrency, the latch should be initialized withCountDownLatch(concurrency)to wait for all consumer threads.When the timeout expires,
ConsumerFailedToStartEventshould be published.