Skip to content

[Bug] key-SHARED subscription is not performing redelivery and going in infinite while loop of reading messages from disk and not dispatching any messages #21656

@rdhabalia

Description

@rdhabalia

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

SHARED or key-SHARED subscription must dispatch redelivered messages in any scenario. every shared subscription should dispatch already delivered unack messages. You can follow strict ordering for new messages which broker is reading first time by advancing readPosition of the cursor but broker can dispatch already delivered unack messages when its required without restricting any scenario.

However, key-shared subscription is incorrectly handling redelivered messages by keep reading redelivered messages , discarding them and not dispatching any single messages to the consumer by incorrectly changing the semantics of consumer delivery ordering. broker doesn't dispatch redelivery message if that message id is smaller than consumer's assigned offset-message-id when it joined. broker assigns cursor's current read position as consumer's min-message-id offset to manage ordering but delivered messageId can be smaller than that position and redelivery should not be restricted by ordering as we already discussed semantics of shared subscription earlier. But as broker handles it incorrectly in key-shared because of that key-shared subscription topics which have connected consumers with positive permits are not able to receive any messages and dispatching is stuck also broker is keep performing same cold reads across those stuck topics and wasting storage and CPU resources by discarding read messages. which impacts application, broker and bookies and such buggy handling is semantically and practically invalid.

Right now, such multiple topics with key-shared subscription and redelivery messages can significantly impact broker and bookies by keep reading large number of messages without dispatching them and client application are not able to consume any messages which also impacts application significantly.

Test case


    @Test
    public void test()
            throws Exception {
        String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
        boolean enableBatch = false;
        Set<Integer> values = new HashSet<>();

        @Cleanup
        Consumer<Integer> consumer1 = createConsumer(topic);

        @Cleanup
        Producer<Integer> producer = createProducer(topic, enableBatch);
        int count = 0;
        for (int i = 0; i < 10; i++) {
            // Send the same key twice so that we'll have a batch message
            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
            producer.newMessage().key(key).value(count++).send();
        }

        @Cleanup
        Consumer<Integer> consumer2 = createConsumer(topic);

        for (int i = 0; i < 10; i++) {
            // Send the same key twice so that we'll have a batch message
            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
            producer.newMessage().key(key).value(count++).send();
        }

        @Cleanup
        Consumer<Integer> consumer3 = createConsumer(topic);

        consumer2.redeliverUnacknowledgedMessages();

        for (int i = 0; i < 10; i++) {
            // Send the same key twice so that we'll have a batch message
            String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
            producer.newMessage().key(key).value(count++).send();
        }
        consumer1.close();

        for(int i = 0; i < count; i++) {
            Message<Integer> msg = consumer2.receive(100, TimeUnit.MILLISECONDS);
            if (msg!=null) {
                values.add(msg.getValue());
            } else {
                break;
            }
        }
        for(int i = 0; i < count; i++) {
            Message<Integer> msg = consumer3.receive(1, TimeUnit.MILLISECONDS);
            if (msg!=null) {
                values.add(msg.getValue());
            } else {
                break;
            }
        }
        assertEquals(values.size(), count);
    }

What did you expect to see?

  • Consumers with available permits, must be able to consume delivered messages
  • It should not be end of world scenario for brokers to serve such subscription consumers

What did you see instead?

Broker should not have broken logic to stuck delivery with such incorrect assumptions

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions