-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] Avoid recursive update in ConcurrentHashMap during policy cache cleanup #24939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, however, would it be possible to add a test case?
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #24939 +/- ##
=============================================
+ Coverage 38.46% 74.25% +35.78%
- Complexity 13239 33503 +20264
=============================================
Files 1856 1913 +57
Lines 145283 149451 +4168
Branches 16876 17363 +487
=============================================
+ Hits 55890 110981 +55091
+ Misses 81827 29602 -52225
- Partials 7566 8868 +1302
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
@coderzc Cherry-picking this test to branch-4.0 and branch-4.1 causes the test to fail. I had to do some modifications to make it pass. @Test
public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception {
final String namespace = "system-topic/namespace-6";
admin.namespaces().createNamespace(namespace);
TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1");
SystemTopicBasedTopicPoliciesService service =
Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
// inject exception when create NamespaceEventsSystemTopicFactory
Mockito.doThrow(new RuntimeException("test exception"))
.doCallRealMethod() // <--- replacing Mockito.reset
.when(service)
.getNamespaceEventsSystemTopicFactory();
CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
Optional<TopicPolicies> topicPoliciesOptional;
try {
topicPoliciesFuture =
service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
topicPoliciesOptional = topicPoliciesFuture.join();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause().getMessage().contains("test exception"));
}
// HERE: using Awaitility to retry
Awaitility.await().untilAsserted(() -> {
assertThat(service.updateTopicPoliciesAsync(topicName, false, false, topicPolicies ->
topicPolicies.setMaxConsumerPerTopic(10)))
.succeedsWithin(Duration.ofSeconds(2));
});
topicPoliciesFuture =
service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
topicPoliciesOptional = topicPoliciesFuture.join();
Assert.assertNotNull(topicPoliciesOptional);
Assert.assertTrue(topicPoliciesOptional.isPresent());
TopicPolicies topicPolicies = topicPoliciesOptional.get();
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}Is there some other change that would be required for branch-4.0 and branch-4.1 that causes this difference in behavior? |
…cy cache cleanup (apache#24939) (cherry picked from commit 344905f) (cherry picked from commit c2ea6a9)
…cy cache cleanup (apache#24939) (cherry picked from commit 344905f) (cherry picked from commit c2ea6a9)
…cy cache cleanup (apache#24939) (cherry picked from commit 344905f)
@lhotari @coderzc The same issue exists in the branch-3.0. Since the update and get operations has race issue, we need to use @Test
public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception {
final String namespace = "system-topic/namespace-6";
admin.namespaces().createNamespace(namespace);
TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1");
SystemTopicBasedTopicPoliciesService service =
Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
// inject exception when create NamespaceEventsSystemTopicFactory
Mockito.doThrow(new RuntimeException("test exception"))
.doCallRealMethod()
.when(service)
.getNamespaceEventsSystemTopicFactory();
try {
service.getTopicPoliciesAsync(topicName, false).join();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause().getMessage().contains("test exception"));
}
TopicPolicies updatedTopicPolicies = new TopicPolicies();
updatedTopicPolicies.setMaxConsumerPerTopic(10);
Awaitility.await().untilAsserted(() -> {
assertThat(service.updateTopicPoliciesAsync(topicName, updatedTopicPolicies))
.succeedsWithin(Duration.ofSeconds(2));
});
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
service.getTopicPoliciesAsync(topicName, false);
assertThat(topicPoliciesFuture).succeedsWithin(Duration.ofSeconds(2))
.satisfies(n -> {
Assert.assertTrue(n.isPresent());
TopicPolicies topicPolicies = n.get();
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
});
});
} |
@nodece Yes, I applied the change to use Awaitility while backporting to maintenance branches since the test wouldn't pass otherwise. For example bda4fed in branch-3.0 |
|
@lhotari I think your patch is incorrect, could you verify this test local? This test always fail on my CI. |
@nodece This PR has been cherry-picked to branch-3.0 and it has been backported with the Awaitility fix. branch-3.0 CI passed: yesterday before 3.0.15 release candidate 1: https://github.com/apache/pulsar/actions/runs/19274039418 . |
|
Ran the test locally in branch-3.0 |
…cy cache cleanup (apache#24939) (cherry picked from commit 344905f) (cherry picked from commit bda4fed)
…cy cache cleanup (apache#24939) (cherry picked from commit 344905f) (cherry picked from commit bda4fed)
Motivation
We found a recursive exception lead to failed policy cache initFuture can't be cleaned up.
Modifications
Use an asynchronous thread to execute initFuture's exception handling logic.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: