In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.11
Describe the bug
Calling ConcurrentMessageListenerContainer.getAssignedPartitions() can throw a ConcurrentModificationException, see our actual stacktrace:
Caused by: java.util.ConcurrentModificationException
at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
at java.base/java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.toList(Unknown Source)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.getAssignedPartitions(ConcurrentMessageListenerContainer.java:161)
...
In #3726 the same issue was reported and resolved by wrapping definedPartitions in a Collections.synchronizedMap and assignedPartitions in Collections.synchronizedSet.
However, the JavaDoc of Collections.synchronizedMap states:
// It is imperative that the user manually synchronize on the returned map
// when traversing any of its collection views via Iterator, Spliterator or Stream:
Map m = Collections.synchronizedMap(new HashMap());
...
Set s = m.keySet(); // Needn't be in synchronized block
...
synchronized (m) { // Synchronizing on m, not s!
Iterator i = s.iterator(); // Must be in synchronized block
while (i.hasNext())
foo(i.next());
}
This synchronization is currently missing when Collections.unmodifiableCollection(partitionsListenerConsumer.definedPartitions.keySet()); is returned in KafkaMessageListenerContainer.getAssignedPartitions(). Other places may be affected as well.
To Reproduce
Timing issue: access ConcurrentMessageListenerContainer.getAssignedPartitions() while the definedPartitions is modified.
Expected behavior
No ConcurrentModificationException.
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.11
Describe the bug
Calling
ConcurrentMessageListenerContainer.getAssignedPartitions()can throw aConcurrentModificationException, see our actual stacktrace:In #3726 the same issue was reported and resolved by wrapping
definedPartitionsin aCollections.synchronizedMapandassignedPartitionsinCollections.synchronizedSet.However, the JavaDoc of
Collections.synchronizedMapstates:This synchronization is currently missing when
Collections.unmodifiableCollection(partitionsListenerConsumer.definedPartitions.keySet());is returned inKafkaMessageListenerContainer.getAssignedPartitions(). Other places may be affected as well.To Reproduce
Timing issue: access
ConcurrentMessageListenerContainer.getAssignedPartitions()while thedefinedPartitionsis modified.Expected behavior
No
ConcurrentModificationException.