Skip to content

Conversation

@liangyepianzhou
Copy link
Contributor

Motivation

During the process of creating a subscription, when checking if the transaction pending has been initialized using return store.exists(PREFIX + path);, a connection failure error occurred while connecting to ZooKeeper. However, the created subscription continues to exist in the PersistentTopic, which causes subsequent retries to keep using the same subscription.

Modification

Remove the subscription, if its PendingAckHandle init failed.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

During the process of creating a subscription, when checking if the transaction pending has been initialized using `return store.exists(PREFIX + path);`, a connection failure error occurred while connecting to ZooKeeper. However, the created subscription continues to exist in the PersistentTopic, which causes subsequent retries to keep using the same subscription.
### Modification
Remove the subscription, if its PendingAckHandle init failed.
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 3, 2023
Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test?

@poorbarcode
Copy link
Contributor

Additional context

2023-08-02T02:21:46,026+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:54641][persistent://sn/system/__xxx-partition-0][multiTopicsReader-3ec782c191] Failed to create consumer: consumerId=0, org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1177) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:215) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:536) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$16(PersistentTopic.java:785) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$18(PersistentTopic.java:780) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:701) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:677) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$11(ServerCnx.java:1135) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$Functions.lambda$getLedgerEntry$0(BookkeeperSchemaStorage.java:655) ~[io.streamnative-pulsar-broker-2.11.2.2.jar:2.11.2.2]
	at org.apache.bookkeeper.client.LedgerHandle$6.onSuccess(LedgerHandle.java:809) ~[org.apache.bookkeeper-bookkeeper-server-4.15.4.jar:4.15.4]
	at org.apache.bookkeeper.client.LedgerHandle$6.onSuccess(LedgerHandle.java:806) ~[org.apache.bookkeeper-bookkeeper-server-4.15.4.jar:4.15.4]
	at org.apache.bookkeeper.common.concurrent.FutureEventListener.accept(FutureEventListener.java:42) ~[org.apache.bookkeeper-bookkeeper-common-4.15.4.jar:4.15.4]
	at org.apache.bookkeeper.common.concurrent.FutureEventListener.accept(FutureEventListener.java:26) ~[org.apache.bookkeeper-bookkeeper-common-4.15.4.jar:4.15.4]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.93.Final.jar:4.1.93.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:452) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$existsFromStore$11(ZKMetadataStore.java:331) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	... 4 more
Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/sn/system/persistent/__xxx-partition-0-multiTopicsReader-3ec782c191__transaction_pending_ack
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) ~[org.apache.zookeeper-zookeeper-3.8.1.jar:3.8.1]
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[org.apache.zookeeper-zookeeper-3.8.1.jar:3.8.1]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:442) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$existsFromStore$11(ZKMetadataStore.java:331) ~[io.streamnative-pulsar-metadata-2.11.2.2.jar:2.11.2.2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	... 4 more


public void exceptionHandleFuture(Throwable t) {
final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t);
persistentSubscription.getTopic().getSubscriptions().remove(subName);
Copy link
Contributor

@poorbarcode poorbarcode Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree with removing the subscription because it will make data lost if users work as the following:

  • create topic
  • publish messages and consume messages.
  • enable transaction and restart the broker.
  • the subscription was removed if a pending ack store initialization failed.

Instead of unsubscribing the subscription, we should make the topic reload or retry to initiate the Pending Ack Store.

Copy link
Contributor Author

@liangyepianzhou liangyepianzhou Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about unloading the subscription?

((PersistentTopic)persistentSubscription.getTopic()).unloadSubscription(subName);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind unloading the subscription?

This method is only in 3.0.0 now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine. We can make it protect instead of public in the other branch.

@poorbarcode poorbarcode added this to the 3.2.0 milestone Aug 3, 2023
}

@Test
public void testUnloadSubscriptionWhenFailedInitPendingAck() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test won't pass when I run it locally

Copy link
Contributor Author

@liangyepianzhou liangyepianzhou Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@codelipenghui
Copy link
Contributor

During the process of creating a subscription, when checking if the transaction pending has been initialized using return store.exists(PREFIX + path);, a connection failure error occurred while connecting to ZooKeeper. However, the created subscription continues to exist in the PersistentTopic, which causes subsequent retries to keep using the same subscription.

@liangyepianzhou In such a scenario, the "subscribe" command should be deemed unsuccessful. The PendingAck module, being a part of the system, should focus solely on cleaning up its own state and subsequently notifying the caller of the failure it encountered. Then the caller("subscribe") should cleanup it's own state.

@liangyepianzhou
Copy link
Contributor Author

During the process of creating a subscription, when checking if the transaction pending has been initialized using return store.exists(PREFIX + path);, a connection failure error occurred while connecting to ZooKeeper. However, the created subscription continues to exist in the PersistentTopic, which causes subsequent retries to keep using the same subscription.

@liangyepianzhou In such a scenario, the "subscribe" command should be deemed unsuccessful. The PendingAck module, being a part of the system, should focus solely on cleaning up its own state and subsequently notifying the caller of the failure it encountered. Then the caller("subscribe") should cleanup it's own state.

This process can happen in loading or creating persistent topics, and the method createPersistentSubscription does not concern the errors that happened in constructing PersistentSubscription. So it's not easy for us to fail the caller.

    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
            boolean replicated, Map<String, String> subscriptionProperties) {
        requireNonNull(topicCompactionService);
        if (isCompactionSubscription(subscriptionName)
                && topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
            CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic();
            return new PulsarCompactorSubscription(this, compactedTopic, subscriptionName, cursor);
        } else {
            return new PersistentSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties);
        }
    }

We can just reload the subscription to load PendingAck again.
Whether it is the broker initiating a retry or returning an exception to the client side, which then triggers a retry, the result is identical. Users are oblivious to this, as we do not throw retryable exceptions to the users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/transaction doc-not-needed Your PR changes do not impact docs type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants