Skip to content

Commit 203f305

Browse files
authored
[fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (apache#22496)
1 parent bbff29d commit 203f305

File tree

6 files changed

+48
-11
lines changed

6 files changed

+48
-11
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
import org.apache.pulsar.client.api.CompressionType;
8989
import org.apache.pulsar.client.api.MessageId;
9090
import org.apache.pulsar.client.api.Producer;
91-
import org.apache.pulsar.client.api.PulsarClientException;
9291
import org.apache.pulsar.client.api.Schema;
9392
import org.apache.pulsar.client.api.TableView;
9493
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1381,8 +1380,8 @@ private synchronized void doCleanup(String broker) {
13811380
}
13821381

13831382
try {
1384-
producer.flush();
1385-
} catch (PulsarClientException e) {
1383+
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
1384+
} catch (Exception e) {
13861385
log.error("Failed to flush the in-flight non-system bundle override messages.", e);
13871386
}
13881387

@@ -1405,8 +1404,8 @@ private synchronized void doCleanup(String broker) {
14051404
}
14061405

14071406
try {
1408-
producer.flush();
1409-
} catch (PulsarClientException e) {
1407+
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
1408+
} catch (Exception e) {
14101409
log.error("Failed to flush the in-flight system bundle override messages.", e);
14111410
}
14121411

@@ -1584,8 +1583,8 @@ protected void monitorOwnerships(List<String> brokers) {
15841583
}
15851584

15861585
try {
1587-
producer.flush();
1588-
} catch (PulsarClientException e) {
1586+
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
1587+
} catch (Exception e) {
15891588
log.error("Failed to flush the in-flight messages.", e);
15901589
}
15911590

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,11 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
824824
}
825825

826826
public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
827+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
828+
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
829+
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
830+
.thenApply(Optional::isPresent);
831+
}
827832
return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
828833
}
829834

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,8 @@ public CompletableFuture<Void> stopReplProducers() {
589589
@Override
590590
public CompletableFuture<Void> checkReplication() {
591591
TopicName name = TopicName.get(topic);
592-
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
592+
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
593+
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
593594
return CompletableFuture.completedFuture(null);
594595
}
595596

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1704,7 +1704,8 @@ CompletableFuture<Void> checkPersistencePolicies() {
17041704
@Override
17051705
public CompletableFuture<Void> checkReplication() {
17061706
TopicName name = TopicName.get(topic);
1707-
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
1707+
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
1708+
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
17081709
return CompletableFuture.completedFuture(null);
17091710
}
17101711

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.google.common.collect.Sets;
2222
import lombok.Cleanup;
2323
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
24+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
25+
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
2426
import org.apache.pulsar.client.api.MessageRoutingMode;
2527
import org.apache.pulsar.client.api.PulsarClient;
2628
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -32,6 +34,8 @@
3234
import org.testng.annotations.AfterClass;
3335
import org.testng.annotations.BeforeClass;
3436
import org.testng.annotations.BeforeMethod;
37+
import org.testng.annotations.DataProvider;
38+
import org.testng.annotations.Factory;
3539
import org.testng.annotations.Test;
3640

3741
import java.lang.reflect.Method;
@@ -41,6 +45,18 @@
4145
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
4246

4347
protected String methodName;
48+
@DataProvider(name = "loadManagerClassName")
49+
public static Object[][] loadManagerClassName() {
50+
return new Object[][]{
51+
{ModularLoadManagerImpl.class.getName()},
52+
{ExtensibleLoadManagerImpl.class.getName()}
53+
};
54+
}
55+
56+
@Factory(dataProvider = "loadManagerClassName")
57+
public ReplicatorGlobalNSTest(String loadManagerClassName) {
58+
this.loadManagerClassName = loadManagerClassName;
59+
}
4460

4561
@BeforeMethod
4662
public void beforeMethod(Method m) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
119119
protected final String cluster2 = "r2";
120120
protected final String cluster3 = "r3";
121121
protected final String cluster4 = "r4";
122+
protected String loadManagerClassName;
123+
124+
protected String getLoadManagerClassName() {
125+
return loadManagerClassName;
126+
}
122127

123128
// Default frequency
124129
public int getBrokerServicePurgeInactiveFrequency() {
@@ -271,8 +276,9 @@ protected void setup() throws Exception {
271276
.brokerClientTlsTrustStoreType(keyStoreType)
272277
.build());
273278

274-
admin1.tenants().createTenant("pulsar",
275-
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
279+
updateTenantInfo("pulsar",
280+
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
281+
Sets.newHashSet("r1", "r2", "r3")));
276282
admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
277283
admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2"));
278284

@@ -344,6 +350,7 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
344350
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
345351
config.setEnableReplicatedSubscriptions(true);
346352
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
353+
config.setLoadManagerClassName(getLoadManagerClassName());
347354
}
348355

349356
public void resetConfig1() {
@@ -409,6 +416,14 @@ protected void cleanup() throws Exception {
409416
resetConfig4();
410417
}
411418

419+
protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) throws Exception {
420+
if (!admin1.tenants().getTenants().contains(tenant)) {
421+
admin1.tenants().createTenant(tenant, tenantInfo);
422+
} else {
423+
admin1.tenants().updateTenant(tenant, tenantInfo);
424+
}
425+
}
426+
412427
static class MessageProducer implements AutoCloseable {
413428
URL url;
414429
String namespace;

0 commit comments

Comments
 (0)