Skip to content

KafkaMessageListenerContainer.getAssignedPartitions can throw ConcurrentModificationException #4273

@wucop

Description

@wucop

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3.11

Describe the bug

Calling ConcurrentMessageListenerContainer.getAssignedPartitions() can throw a ConcurrentModificationException, see our actual stacktrace:

Caused by: java.util.ConcurrentModificationException
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
	at java.base/java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
	at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
	at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.toList(Unknown Source)
	at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.getAssignedPartitions(ConcurrentMessageListenerContainer.java:161)
	...

In #3726 the same issue was reported and resolved by wrapping definedPartitions in a Collections.synchronizedMap and assignedPartitions in Collections.synchronizedSet.

However, the JavaDoc of Collections.synchronizedMap states:

// It is imperative that the user manually synchronize on the returned map 
// when traversing any of its collection views via Iterator, Spliterator or Stream:

   Map m = Collections.synchronizedMap(new HashMap());
       ...
   Set s = m.keySet();  // Needn't be in synchronized block
       ...
   synchronized (m) {  // Synchronizing on m, not s!
       Iterator i = s.iterator(); // Must be in synchronized block
       while (i.hasNext())
           foo(i.next());
   }

This synchronization is currently missing when Collections.unmodifiableCollection(partitionsListenerConsumer.definedPartitions.keySet()); is returned in KafkaMessageListenerContainer.getAssignedPartitions(). Other places may be affected as well.

To Reproduce

Timing issue: access ConcurrentMessageListenerContainer.getAssignedPartitions() while the definedPartitions is modified.

Expected behavior

No ConcurrentModificationException.

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions