-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered #21634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][test] Fix the bug caused by unload topic and compaction task failure after task is triggered #21634
Conversation
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
Outdated
Show resolved
Hide resolved
|
@coderzc @Technoboy- CI checks reported an error just now and has been fixed now. Please check again.
|
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
Outdated
Show resolved
Hide resolved
|
@Denovo1998 After looking at the code, I found that pulsar/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java Lines 298 to 304 in 4e7bff7
|
After the topic initialization is complete, we also wait for the CompactedTopicContext to be created. Because the creation of CompactedTopicContext requires the creation of compactedLedger, otherwise ledgerId will be -1, and an error will be reported if a ledger is not created after 10s. here pulsar/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java Lines 239 to 253 in e1d06b5
|
2023-11-30T05:30:45,771 - WARN - [broker-client-shared-internal-executor-4522-1:PersistentTopic@3397] - [persistent://my-property/use/my-ns/testCompactionDuplicate] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?]
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$closeLedger$20(TwoPhaseCompactor.java:388) ~[classes/:?]
at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncClose$1(PulsarMockLedgerHandle.java:96) ~[testmocks-3.2.0-SNAPSHOT.jar:4.16.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.16.3.jar:4.16.3]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.16.3.jar:4.16.3]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Consumer connect fail! consumer state:Connecting
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doImmediateAck(PersistentAcknowledgmentsGroupingTracker.java:363) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doCumulativeAck(PersistentAcknowledgmentsGroupingTracker.java:310) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:243) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:198) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:566) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688) ~[pulsar-client-original-3.2.0-SNAPSHOT.jar:3.2.0-SNAPSHOT]
at org.apache.pulsar.client.impl.RawReaderImpl.acknowledgeCumulativeAsync(RawReaderImpl.java:90) ~[classes/:3.2.0-SNAPSHOT]
at org.apache.pulsar.compaction.TwoPhaseCompactor.lambda$phaseTwoSeekThenLoop$10(TwoPhaseCompactor.java:225) ~[classes/:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ~[?:?]@Denovo1998 I found the above error logs from the CI report. I think this is the root cause. I think we can trigger compaction again after topic unload. Maybe we can get the future of compaction from persistentTopic and judge whether compaction task has failed, then retrigger it. |
@coderzc I get it! So the cause is that |
@Denovo1998 Yes, I think so. |
|
@coderzc Something strange. 1. Call persistentTopic.compactionStatus() after unload topic, longRunningProcessStatus always NOT_RUN 2. Even if the topic already exists, the
The problem still seems to be the PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java Lines 2455 to 2469 in 3377003
3. The error you sent before should be this issue. Does the consumer with the subscription name |
Because after unloading, |
|
@coderzc Yes, I just read the code and got it. Let's look at the second and third questions. |
@Denovo1998 Maybe the compaction task isn't finished yet?
@Denovo1998 Yes, the reader of compaction may be reconnecting when acknowledging message. |
|
@coderzc Therefore, the current code in this pr waits for the CompactedTopicContext to be created successfully, which has solved this issue. |
why? we also need to fix it in |
@coderzc Do you mean to create a new test case in All has been changed. Please check again. |
Oh, Don't need to do that., StrategicCompactionTest already extends from CompactionTest. |
@coderzc I'm sorry I misunderstood what you said. I am still learning the code structure of pulsar, and I am not particularly familiar with it. The modification to the |
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
Show resolved
Hide resolved
coderzc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
/pulsarbot rerun-failure-checks |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #21634 +/- ##
=============================================
+ Coverage 36.83% 73.33% +36.49%
- Complexity 376 32740 +32364
=============================================
Files 1715 1893 +178
Lines 131097 140730 +9633
Branches 14318 15500 +1182
=============================================
+ Hits 48288 103201 +54913
+ Misses 76424 29421 -47003
- Partials 6385 8108 +1723
Flags with carried forward coverage won't be shown. Click here to find out more. |







Fixes #21537
Motivation
Because of #21536 . The consumer with the subscription name
__compactioncan not reconnect. Therefore, the compaction task triggered before the topic unload fails. In this case, a compaction task can be triggered again after unload.Modifications
After unload the topic, make sure that the topic has been initialized correctly. Then trigger compaction again after previous task failed.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: Denovo1998#5