Skip to content

[Bug][client] Delay messages can be ack before reaching the delivery time #23149

@Denovo1998

Description

@Denovo1998

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

master

Minimal reproduce step

I found this when trying to implement the abort function of delayed messages. Here is the test code.

    @Test(timeOut = 20000)
    public void testAbortDelayedMessage() throws Exception {
        final String topic = "persistent://my-property/my-ns/testAbortDelayedMessage";

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();

        admin.topicPolicies().setDelayedDeliveryPolicy(topic,
                DelayedDeliveryPolicies.builder()
                        .active(true)
                        .tickTime(100L)
                        .maxDeliveryDelayInMillis(5000)
                        .build());

        MessageId msgId = producer.newMessage()
                .value("Test delayed message".getBytes())
                .deliverAfter(5, TimeUnit.SECONDS)
                .send();
        log.warn("Message sent with ID: " + msgId);
        producer.flush();

        consumer.acknowledge(msgId);

        while (true) {
            Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
            if (msg != null) {
                log.warn("Received message: " + new String(msg.getData()));
                consumer.acknowledge(msg.getMessageId());
            }
            log.warn("Waiting for messages...");
        }
    }

What did you expect to see?

Delayed messages are consumed normally. Because consumer have not received this message.

What did you see instead?

After the delay message corresponding to the active ack is consumer, the delay message is no longer consumed.

Anything else?

Will there be any problem with ack like this?

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions