Skip to content

Conversation

@Denovo1998
Copy link
Contributor

@Denovo1998 Denovo1998 commented Nov 28, 2023

Fixes #21537

Motivation

Because of #21536 . The consumer with the subscription name __compaction can not reconnect. Therefore, the compaction task triggered before the topic unload fails. In this case, a compaction task can be triggered again after unload.

CleanShot 2023-11-30 at 9 41 32@2x

Modifications

After unload the topic, make sure that the topic has been initialized correctly. Then trigger compaction again after previous task failed.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Denovo1998#5

@coderzc coderzc changed the title [fix][broker] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done Nov 29, 2023
@Technoboy- Technoboy- added this to the 3.2.0 milestone Nov 29, 2023
@Denovo1998
Copy link
Contributor Author

Denovo1998 commented Nov 29, 2023

@coderzc @Technoboy- CI checks reported an error just now and has been fixed now. Please check again.

CleanShot 2023-11-29 at 10 51 02@2x CleanShot 2023-11-30 at 12 59 11@2x

@coderzc
Copy link
Member

coderzc commented Nov 30, 2023

@Denovo1998 After looking at the code, I found that getCompactedTopicContext method will wait for compactedTopicContext to be created, so we don’t seem to need to wait for compactedLedger to be created in the test. I think we just need to wait for topic initialization to complete.

/**
* Getter for CompactedTopicContext.
* @return CompactedTopicContext
*/
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get());
}

@Denovo1998
Copy link
Contributor Author

Denovo1998 commented Nov 30, 2023

@Denovo1998 After looking at the code, I found that getCompactedTopicContext method will wait for compactedTopicContext to be created, so we don’t seem to need to wait for compactedLedger to be created in the test. I think we just need to wait for topic initialization to complete.

After the topic initialization is complete, we also wait for the CompactedTopicContext to be created. Because the creation of CompactedTopicContext requires the creation of compactedLedger, otherwise ledgerId will be -1, and an error will be reported if a ledger is not created after 10s. here

private static CompletableFuture<CompactedTopicContext> openCompactedLedger(BookKeeper bk, long id) {
CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
bk.asyncOpenLedger(id,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
(rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
promise.complete(ledger);
}
}, null);
return promise.thenApply((ledger) -> new CompactedTopicContext(
ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
}

@coderzc
Copy link
Member

coderzc commented Nov 30, 2023

2023-11-30T05:30:45,771 - WARN  - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
	at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]

@Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload. Maybe we can get the future of compaction from persistentTopic and judge whether compaction task has failed, then retrigger it.

@Denovo1998
Copy link
Contributor Author

2023-11-30T05:30:45,771 - WARN  - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
	at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
	at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]

@Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload. Maybe we can get the future of compaction from persistentTopic and judge whether compaction task has failed, then retrigger it.

@coderzc I get it! So the cause is that triggerCompactor didn't succeed, not that compactionLedger creation timed out.
We need to unload the topic, make sure the topic properly initialized, and invoke persistentTopic.compactionStatus(), if success is to continue. If that fails, call triggerCompactor again.
If my description is correct, I will modify the solution description and code of PR after work.

@coderzc
Copy link
Member

coderzc commented Nov 30, 2023

@coderzc I get it! So the cause is that triggerCompactor didn't succeed, not that compactionLedger creation timed out.
We need to unload the topic, make sure the topic properly initialized, and invoke persistentTopic.compactionStatus(), if success is to continue. If that fails, call triggerCompactor again.
If my description is correct, I will modify the solution description and code of PR after work.

@Denovo1998 Yes, I think so.

@Denovo1998
Copy link
Contributor Author

@coderzc Something strange.

1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN
CleanShot 2023-11-30 at 9 10 48@2x
CleanShot 2023-11-30 at 9 12 09@2x

2. Even if the topic already exists, the compactedTopicContext is not created correctly.

CleanShot 2023-11-30 at 9 07 18@2x CleanShot 2023-11-30 at 9 07 35@2x

The problem still seems to be the compactedTopicContext, The compactedLedger hasn't been created yet
(org.apache.pulsar.compaction.CompactedTopicImpl#openCompactedLedger).

PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);

// Add ledger info for compacted topic ledger if exist.
LedgerInfo info = new LedgerInfo();
info.ledgerId = -1;
info.entries = -1;
info.size = -1;
Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
stats.compactedLedger = info;

3. The error you sent before should be this issue. Does the consumer with the subscription name __compaction not reconnect?

#21536

CleanShot 2023-11-30 at 9 41 32@2x

@coderzc
Copy link
Member

coderzc commented Nov 30, 2023

@Denovo1998

  1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN

Because after unloading, persistentTopic.compactionStatus() will be reset, so we cannot get the running status of the previous task through it. We can get and retain the CompletableFuture object of previous task (persistentTopic .currentCompaction).

@Denovo1998
Copy link
Contributor Author

@coderzc Yes, I just read the code and got it. Let's look at the second and third questions.

@coderzc
Copy link
Member

coderzc commented Nov 30, 2023

Even if the topic already exists, the compactedTopicContext is not created correctly.

@Denovo1998 Maybe the compaction task isn't finished yet? CompactedTopicContext is created only after the compaction task is completed.

  1. The error you sent before should be this issue. Does the consumer with the subscription name __compaction not reconnect?

@Denovo1998 Yes, the reader of compaction may be reconnecting when acknowledging message.

@Denovo1998
Copy link
Contributor Author

@coderzc Therefore, the current code in this pr waits for the CompactedTopicContext to be created successfully, which has solved this issue.
As for the error report in number 3, it seems to be the problem of issue21536?

@coderzc
Copy link
Member

coderzc commented Nov 30, 2023

As for the error report in number 3, it seems to be the problem of #21536?

why? we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.

@Denovo1998
Copy link
Contributor Author

we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.

@coderzc Do you mean to create a new test case in org.apache.pulsar.compaction.StrategicCompactionTest?

All has been changed. Please check again.

@coderzc
Copy link
Member

coderzc commented Dec 1, 2023

we also need to fix it in StrategicCompactionTest.testCompactionDuplicate. just retrigger compaction if the previous task failed after topic was unloaded.

@coderzc Do you mean to create a new test case in org.apache.pulsar.compaction.StrategicCompactionTest?

All has been changed. Please check again.

Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest.

@Denovo1998
Copy link
Contributor Author

Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest.

@coderzc I'm sorry I misunderstood what you said. I am still learning the code structure of pulsar, and I am not particularly familiar with it. The modification to the org.apache.pulsar.com paction.StrategicCompactionTest has been removed. Please check again.

@Denovo1998 Denovo1998 changed the title [fix][test] Fix the bug that the CompactedLedger is used before the newCompactedLedger done [fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered Dec 1, 2023
Copy link
Member

@coderzc coderzc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@coderzc
Copy link
Member

coderzc commented Dec 1, 2023

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

Codecov Report

Merging #21634 (5ae340f) into master (e1d06b5) will increase coverage by 36.49%.
Report is 12 commits behind head on master.
The diff coverage is n/a.

Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #21634       +/-   ##
=============================================
+ Coverage     36.83%   73.33%   +36.49%     
- Complexity      376    32740    +32364     
=============================================
  Files          1715     1893      +178     
  Lines        131097   140730     +9633     
  Branches      14318    15500     +1182     
=============================================
+ Hits          48288   103201    +54913     
+ Misses        76424    29421    -47003     
- Partials       6385     8108     +1723     
Flag Coverage Δ
inttests 24.15% <ø> (-0.09%) ⬇️
systests 24.66% <ø> (-0.26%) ⬇️
unittests 72.66% <ø> (+40.79%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

see 1460 files with indirect coverage changes

@coderzc coderzc merged commit 2bf1354 into apache:master Dec 1, 2023
@Denovo1998 Denovo1998 deleted the wait_for_newCompactedLedger branch December 1, 2023 14:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: StrategicCompactionTest.testCompactionDuplicate

4 participants