Skip to content

Conversation

@coderzc
Copy link
Member

@coderzc coderzc commented Nov 4, 2025

Motivation

[pulsar-ordered-OrderedExecutor-1-0] ERROR org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService - [marketing/prod-retain] Failed to cleanup reader on __change_events topic
java.lang.IllegalStateException: Recursive update
	at java.util.concurrent.ConcurrentHashMap.compute(Unknown Source)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.cleanCacheAndCloseReader(SystemTopicBasedTopicPoliciesService.java:749)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.cleanCacheAndCloseReader(SystemTopicBasedTopicPoliciesService.java:722)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$prepareInitPoliciesCacheAsync$22(SystemTopicBasedTopicPoliciesService.java:599)
	at java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniExceptionallyStage(Unknown Source)
	at java.util.concurrent.CompletableFuture.exceptionally(Unknown Source)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$prepareInitPoliciesCacheAsync$23(SystemTopicBasedTopicPoliciesService.java:592)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.lambda$prepareInitPoliciesCacheAsync$25(SystemTopicBasedTopicPoliciesService.java:580)
	at java.util.concurrent.CompletableFuture.uniComposeStage(Unknown Source)
	at java.util.concurrent.CompletableFuture.thenCompose(Unknown Source)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.prepareInitPoliciesCacheAsync(SystemTopicBasedTopicPoliciesService.java:573)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.addOwnedNamespaceBundleAsync(SystemTopicBasedTopicPoliciesService.java:558)
	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$2.lambda$onLoad$0(SystemTopicBasedTopicPoliciesService.java:649)
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:105)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Unknown Source)

We found a recursive exception lead to failed policy cache initFuture can't be cleaned up.

Modifications

Use an asynchronous thread to execute initFuture's exception handling logic.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 4, 2025
@coderzc coderzc added type/bug The PR fixed a bug or issue reported a bug area/broker ready-to-test labels Nov 4, 2025
@coderzc coderzc closed this Nov 4, 2025
@coderzc coderzc reopened this Nov 4, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, however, would it be possible to add a test case?

@codecov-commenter
Copy link

codecov-commenter commented Nov 4, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.25%. Comparing base (fefe771) to head (1da4db1).
⚠️ Report is 48 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24939       +/-   ##
=============================================
+ Coverage     38.46%   74.25%   +35.78%     
- Complexity    13239    33503    +20264     
=============================================
  Files          1856     1913       +57     
  Lines        145283   149451     +4168     
  Branches      16876    17363      +487     
=============================================
+ Hits          55890   110981    +55091     
+ Misses        81827    29602    -52225     
- Partials       7566     8868     +1302     
Flag Coverage Δ
inttests 26.30% <100.00%> (+0.03%) ⬆️
systests 22.70% <100.00%> (-0.02%) ⬇️
unittests 73.78% <100.00%> (+39.08%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../service/SystemTopicBasedTopicPoliciesService.java 75.00% <100.00%> (+9.41%) ⬆️

... and 1414 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@coderzc coderzc self-assigned this Nov 4, 2025
@lhotari lhotari merged commit 344905f into apache:master Nov 4, 2025
143 of 148 checks passed
@lhotari
Copy link
Member

lhotari commented Nov 4, 2025

@coderzc Cherry-picking this test to branch-4.0 and branch-4.1 causes the test to fail. I had to do some modifications to make it pass.

    @Test
    public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception {
        final String namespace = "system-topic/namespace-6";

        admin.namespaces().createNamespace(namespace);

        TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1");

        SystemTopicBasedTopicPoliciesService service =
            Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());

        // inject exception when create NamespaceEventsSystemTopicFactory
        Mockito.doThrow(new RuntimeException("test exception"))
                .doCallRealMethod() // <--- replacing Mockito.reset
                .when(service)
            .getNamespaceEventsSystemTopicFactory();

        CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
        Optional<TopicPolicies> topicPoliciesOptional;
        try {
            topicPoliciesFuture =
            service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
            topicPoliciesOptional = topicPoliciesFuture.join();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("test exception"));
        }

        // HERE: using Awaitility to retry
        Awaitility.await().untilAsserted(() -> {
            assertThat(service.updateTopicPoliciesAsync(topicName, false, false, topicPolicies ->
                    topicPolicies.setMaxConsumerPerTopic(10)))
                    .succeedsWithin(Duration.ofSeconds(2));
        });

        topicPoliciesFuture =
            service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
        topicPoliciesOptional = topicPoliciesFuture.join();

        Assert.assertNotNull(topicPoliciesOptional);
        Assert.assertTrue(topicPoliciesOptional.isPresent());

        TopicPolicies topicPolicies = topicPoliciesOptional.get();
        Assert.assertNotNull(topicPolicies);
        Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
    }

Is there some other change that would be required for branch-4.0 and branch-4.1 that causes this difference in behavior?

lhotari pushed a commit that referenced this pull request Nov 4, 2025
lhotari pushed a commit that referenced this pull request Nov 4, 2025
lhotari pushed a commit that referenced this pull request Nov 4, 2025
lhotari pushed a commit that referenced this pull request Nov 4, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 6, 2025
…cy cache cleanup (apache#24939)

(cherry picked from commit 344905f)
(cherry picked from commit c2ea6a9)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 6, 2025
…cy cache cleanup (apache#24939)

(cherry picked from commit 344905f)
(cherry picked from commit c2ea6a9)
nodece pushed a commit to nodece/pulsar that referenced this pull request Nov 12, 2025
@nodece
Copy link
Member

nodece commented Nov 12, 2025

@coderzc Cherry-picking this test to branch-4.0 and branch-4.1 causes the test to fail. I had to do some modifications to make it pass.

@lhotari @coderzc The same issue exists in the branch-3.0. Since the update and get operations has race issue, we need to use Awaitility to wait until the topic policies become available.

    @Test
    public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception {
        final String namespace = "system-topic/namespace-6";

        admin.namespaces().createNamespace(namespace);

        TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1");

        SystemTopicBasedTopicPoliciesService service =
                Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());

        // inject exception when create NamespaceEventsSystemTopicFactory
        Mockito.doThrow(new RuntimeException("test exception"))
                .doCallRealMethod()
                .when(service)
                .getNamespaceEventsSystemTopicFactory();

        try {
            service.getTopicPoliciesAsync(topicName, false).join();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("test exception"));
        }

        TopicPolicies updatedTopicPolicies = new TopicPolicies();
        updatedTopicPolicies.setMaxConsumerPerTopic(10);
        Awaitility.await().untilAsserted(() -> {
            assertThat(service.updateTopicPoliciesAsync(topicName, updatedTopicPolicies))
                    .succeedsWithin(Duration.ofSeconds(2));
        });

        Awaitility.await().untilAsserted(() -> {
            CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
                    service.getTopicPoliciesAsync(topicName, false);
            assertThat(topicPoliciesFuture).succeedsWithin(Duration.ofSeconds(2))
                    .satisfies(n -> {
                        Assert.assertTrue(n.isPresent());
                        TopicPolicies topicPolicies = n.get();
                        Assert.assertNotNull(topicPolicies);
                        Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
                    });
        });
    }

@lhotari
Copy link
Member

lhotari commented Nov 12, 2025

@lhotari @coderzc The same issue exists in the branch-3.0. Since the update and get operations has race issue, we need to use Awaitility to wait until the topic policies become available.

@nodece Yes, I applied the change to use Awaitility while backporting to maintenance branches since the test wouldn't pass otherwise. For example bda4fed in branch-3.0

@nodece
Copy link
Member

nodece commented Nov 12, 2025

@lhotari I think your patch is incorrect, could you verify this test local? This test always fail on my CI.

@lhotari
Copy link
Member

lhotari commented Nov 12, 2025

@lhotari I think your patch is incorrect, could you verify this test local? This test always fail on my CI.

@nodece This PR has been cherry-picked to branch-3.0 and it has been backported with the Awaitility fix. branch-3.0 CI passed: yesterday before 3.0.15 release candidate 1: https://github.com/apache/pulsar/actions/runs/19274039418 .
Do you have some other fork of branch-3.0? In that case you can cherry-pick commit bda4fed from branch-3.0 to get a commit where the test has been fixed.

@lhotari
Copy link
Member

lhotari commented Nov 12, 2025

Ran the test locally in branch-3.0

[INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 59.01 s - in org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:04 min
[INFO] Finished at: 2025-11-12T16:24:06+02:00
[INFO] ------------------------------------------------------------------------
[INFO] 0 goals, 0 executed

manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…cy cache cleanup (apache#24939)

(cherry picked from commit 344905f)
(cherry picked from commit bda4fed)
@nodece
Copy link
Member

nodece commented Nov 13, 2025

@lhotari The forked version already includes #24658. I need to wait until the reader received the updated topic policies after calling service.updateTopicPoliciesAsync.

Thanks, the branch-3.0 works fine now.

srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…cy cache cleanup (apache#24939)

(cherry picked from commit 344905f)
(cherry picked from commit bda4fed)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants