-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][txn]Remove subscription after transaction pending ack init failed. #20927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
During the process of creating a subscription, when checking if the transaction pending has been initialized using `return store.exists(PREFIX + path);`, a connection failure error occurred while connecting to ZooKeeper. However, the created subscription continues to exist in the PersistentTopic, which causes subsequent retries to keep using the same subscription. ### Modification Remove the subscription, if its PendingAckHandle init failed.
poorbarcode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test?
|
Additional context |
|
|
||
| public void exceptionHandleFuture(Throwable t) { | ||
| final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t); | ||
| persistentSubscription.getTopic().getSubscriptions().remove(subName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not agree with removing the subscription because it will make data lost if users work as the following:
- create topic
- publish messages and consume messages.
- enable transaction and restart the broker.
- the subscription was removed if a pending ack store initialization failed.
Instead of unsubscribing the subscription, we should make the topic reload or retry to initiate the Pending Ack Store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about unloading the subscription?
((PersistentTopic)persistentSubscription.getTopic()).unloadSubscription(subName);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind unloading the subscription?
This method is only in 3.0.0 now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine. We can make it protect instead of public in the other branch.
...src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testUnloadSubscriptionWhenFailedInitPendingAck() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test won't pass when I run it locally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@liangyepianzhou In such a scenario, the "subscribe" command should be deemed unsuccessful. The PendingAck module, being a part of the system, should focus solely on cleaning up its own state and subsequently notifying the caller of the failure it encountered. Then the caller("subscribe") should cleanup it's own state. |
This process can happen in loading or creating persistent topics, and the method private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
&& topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic();
return new PulsarCompactorSubscription(this, compactedTopic, subscriptionName, cursor);
} else {
return new PersistentSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties);
}
}We can just reload the subscription to load PendingAck again. |
Motivation
During the process of creating a subscription, when checking if the transaction pending has been initialized using
return store.exists(PREFIX + path);, a connection failure error occurred while connecting to ZooKeeper. However, the created subscription continues to exist in the PersistentTopic, which causes subsequent retries to keep using the same subscription.Modification
Remove the subscription, if its PendingAckHandle init failed.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: