Search before asking
Read release policy
Version
- master
- 3.2.x
- 3.1.x
- 3.0.x.
- 2.11.x
Minimal reproduce step
In geo-replication case. Let's say there are two clusters: r1, r2, will replicator topic:my-topic from r1 and r2.
The consumer1 subscribes to the topic my-topic of r1 and enables replicateSubscriptionState. After the subscription state sync to r2: my-topic, create a reader read the message from r2:my-topic will stuck on readNext.
Please copy this test to ReplicatorSubscriptionTest to run it.
@Test
public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
String topicName = "persistent://" + namespace + "/mytopic";
String subscriptionName = "cluster-subscription";
// this setting can be used to manually run the test with subscription replication disabled
// it shows that subscription replication has no impact in behavior for this test case
boolean replicateSubscriptionState = true;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r2
createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
Set<String> sentMessages = new LinkedHashSet<>();
// send messages in r1
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
int numMessages = 6;
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
producer.send(body.getBytes(StandardCharsets.UTF_8));
sentMessages.add(body);
}
producer.close();
// consume 3 messages in r1
Set<String> receivedMessages = new LinkedHashSet<>();
try (Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer1, receivedMessages, 3, false);
}
// wait for subscription to be replicated
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
// create a reader in r2
Reader<byte[]> reader = client2.newReader().topic(topicName)
.subscriptionName("new-sub")
.startMessageId(MessageId.earliest)
.create();
int readNum = 0;
while (reader.hasMessageAvailable()) {
Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
System.out.println("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId());
assertNotNull(message);
readNum++;
}
assertEquals(readNum, numMessages);
}
What did you expect to see?
Test can passed.
What did you see instead?
Test will stuck.
Anything else?
No response
Are you willing to submit a PR?
Search before asking
Read release policy
Version
Minimal reproduce step
In geo-replication case. Let's say there are two clusters:
r1,r2, will replicator topic:my-topicfromr1andr2.The consumer1 subscribes to the topic
my-topicofr1and enablesreplicateSubscriptionState. After the subscription state sync tor2:my-topic, create areaderread the message fromr2:my-topicwill stuck onreadNext.Please copy this test to
ReplicatorSubscriptionTestto run it.What did you expect to see?
Test can passed.
What did you see instead?
Test will stuck.
Anything else?
No response
Are you willing to submit a PR?