-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker]Consumer can't consume messages because there has two sames topics in one broker #17526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker]Consumer can't consume messages because there has two sames topics in one broker #17526
Conversation
0367fb7 to
6f0616c
Compare
|
This PR should merge into these branches:
|
|
/pulsarbot rerun-failure-checks |
6f0616c to
d7475f6
Compare
liangyepianzhou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work!
| if (!createTopicFuture.isDone()){ | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
| return createTopicFuture.thenCompose(topicOptional -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The createTopicFuture might be completed with the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point.
I have covered this logic branch: If the future in cache has exception complete, the topic instance in the cache is not the same as the @param topic, so the delete will return success
d7475f6 to
33e4f11
Compare
|
rebase master |
|
/pulsarbot rerun-failure-checks |
|
/pulsarbot rerun-failure-checks |
…mes topics in one broker (#17526)
…mes topics in one broker (apache#17526) (cherry picked from commit 260f5c6) (cherry picked from commit ddd642e)
… race conditions - solution was introduced in apache#17526 - however, it was accidentially replaced with a call to the incorrect method signature in apache#17736
heesung-sohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use the same thread to update states of the same pulsar resource?
I think we can make the same thread update the same resource to minimize update conflicts.
One idea is to use resource update task queues and have one thread consume and run the tasks for the same topic.
For example,
// topic op task pub
TopicRemoveTask task = new TopicRemoveTask()
Int Qid = hash(topic)
Queues(qid).add(task) // we can have a set data structure to dedup the same topic operations
//topic op task consume
TopicTask task= Queues(this.thread.id).poll()
run(task);
Motivation
With the transaction feature, we send and receive messages, and at the same time, execute
admin API: unload namespace1000 times. Then the problem occur: Consumer could not receive any message, and there has no error log. After that we triedadmin API: get topic stats, and the response showed only producers are registered on topic, and no consumers are registered on topic, but consumer stat isReadyin the client. This means that the state of the consumer is inconsistent between the broker and the client.Location problem
Then we found the problem: Two PersistentTopic which have the same name registered at a broker node, consumer registered on one (aka
topic-c), and producer registered on another one (akatopic-p). At this time, when we send messages, the data flow like this :But the consumer exactly registered on another topic:
topic-c, so consumer could not receive any message.Repreduce
Make
transaction buffer recover,admin unload namespace,client create consumer,client create producerexecuted at the same time, the process flow like this (at the step-11, the problem occurs ):transaction buffer recoverradmin unload namespaceclient create consumerclient create producertopic-ctopic-ctopic-cfinishtopic-ptopic-pclient create consumer,client create producer.brokerService.topics.Timeis used only to indicate the order of each step, not the actual time.Even if persistent topic property
isClosingOrDeletinghave already changed totrue, it still can be executed another once, see line-1247:pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1240 to 1249 in f230d15
Whether close can be executed depends on two predicates:
is closingor@param closeWithoutWaitingClientDisconnect is true. This means that methodtopic.closecan be reentrant executed when@param closeWithoutWaitingClientDisconnectis true, and in the implementation ofadmin API: unload namespacethe parametercloseWithoutWaitingClientDisconnectis exactlytrue.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
Lines 723 to 725 in f230d15
So when
transaction buffer recover failandadmin unload namespaceis executed at the same time, andtransaction buffer recover failbeforeadmin unload namespace, the topic will be removed frombrokerService.topicstwice.Because of the current implementation of
BrokerService. removeTopicFromCacheuse cmdmap.remove(key), not usemap.remove(key, value), So this cmd can remove any value in the map, even if it's not the desired one.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
Line 1956 in f230d15
To sum up: We should make these two changes:
topic.closenon-reentrant. Also prevent reentrant betweentopic.closeandtopic.delete.map.remove(key, value)instead ofmap.remove(key)in implementation ofBrokerService. removeTopicFromCache. This change will apply to both scenes:topic.closeandtopic.delete.Modifications
topic.closenon-reentrant. Also prevent reentrant betweentopic.closeandtopic.delete.map.remove(key, value)instead ofmap.remove(key)in implementation of `BrokerService.Documentation
doc-requireddoc-not-neededdocdoc-completeMatching PR in forked repository
PR in forked repository(All check passed):