Skip to content

[fix] [broker] fix write all compacted out entry into compacted topic#21917

Merged
Technoboy- merged 10 commits into
apache:masterfrom
thetumbled:Fix_AllCompactdOutEntry
Jan 21, 2024
Merged

[fix] [broker] fix write all compacted out entry into compacted topic#21917
Technoboy- merged 10 commits into
apache:masterfrom
thetumbled:Fix_AllCompactdOutEntry

Conversation

@thetumbled

@thetumbled thetumbled commented Jan 18, 2024

Copy link
Copy Markdown
Member

Fixes #21916

Motivation

The method reader.hasMessageAvailable() return true, but reader.readNext can't return any messages because the messages in compacted topic are all compacted out.

Modifications

Check if the message without partition key is a valid message.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#33

@thetumbled

Copy link
Copy Markdown
Member Author

PTAL, thanks. @coderzc

@codelipenghui codelipenghui left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test to reproduce this issue?

@coderzc

coderzc commented Jan 19, 2024

Copy link
Copy Markdown
Member

@thetumbled @codelipenghui The following test can reproduce the problem, and this PR can fix it.

    @Test
    public void testAllCompactedOut() throws Exception {
        String topicName = "persistent://my-property/use/my-ns/testAllCompactedOut";

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .enableBatching(true).topic(topicName).batchingMaxMessages(3).create();

        producer.newMessage().key("K1").value("V1").sendAsync();
        producer.newMessage().key("K2").value("V2").sendAsync();
        producer.newMessage().key("K2").value(null).sendAsync();
        producer.flush();

        admin.topics().triggerCompaction(topicName);

        Awaitility.await().untilAsserted(() -> {
            assertEquals(admin.topics().compactionStatus(topicName).status,
                    LongRunningProcessStatus.Status.SUCCESS);
        });

        producer.newMessage().key("K1").value(null).sendAsync();
        producer.flush();

        admin.topics().triggerCompaction(topicName);

        Awaitility.await().untilAsserted(() -> {
            assertEquals(admin.topics().compactionStatus(topicName).status,
                    LongRunningProcessStatus.Status.SUCCESS);
        });

        @Cleanup
        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
                .subscriptionName("reader-test")
                .topic(topicName)
                .readCompacted(true)
                .startMessageId(MessageId.earliest)
                .create();
        Assert.assertEquals(reader.getLastMessageIds().get(0), MessageIdImpl.earliest);
        while (reader.hasMessageAvailable()) {
            Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
            Assert.assertNotNull(message);
        }
    }

Comment thread pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java Outdated
@Technoboy-

Copy link
Copy Markdown
Contributor
testAllCompactedOut

Need to add this test to this patch

@thetumbled

Copy link
Copy Markdown
Member Author

@thetumbled @codelipenghui The following test can reproduce the problem, and this PR can fix it.

    @Test
    public void testAllCompactedOut() throws Exception {
        String topicName = "persistent://my-property/use/my-ns/testAllCompactedOut";

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .enableBatching(true).topic(topicName).batchingMaxMessages(3).create();

        producer.newMessage().key("K1").value("V1").sendAsync();
        producer.newMessage().key("K2").value("V2").sendAsync();
        producer.newMessage().key("K2").value(null).sendAsync();
        producer.flush();

        admin.topics().triggerCompaction(topicName);

        Awaitility.await().untilAsserted(() -> {
            assertEquals(admin.topics().compactionStatus(topicName).status,
                    LongRunningProcessStatus.Status.SUCCESS);
        });

        producer.newMessage().key("K1").value(null).sendAsync();
        producer.flush();

        admin.topics().triggerCompaction(topicName);

        Awaitility.await().untilAsserted(() -> {
            assertEquals(admin.topics().compactionStatus(topicName).status,
                    LongRunningProcessStatus.Status.SUCCESS);
        });

        @Cleanup
        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
                .subscriptionName("reader-test")
                .topic(topicName)
                .readCompacted(true)
                .startMessageId(MessageId.earliest)
                .create();
        Assert.assertEquals(reader.getLastMessageIds().get(0), MessageIdImpl.earliest);
        while (reader.hasMessageAvailable()) {
            Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
            Assert.assertNotNull(message);
        }
    }

Great job, but there is something wrong with the test code. We need to set the config topicCompactionRetainNullKey to be true, so that the empty can be written into the compacted topic and the exception is throw out.
I have pushed up the test code, PTAL, thanks! @coderzc

Comment thread pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java Outdated
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java Outdated
@codecov-commenter

codecov-commenter commented Jan 19, 2024

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.58%. Comparing base (b25d2c3) to head (6def666).
⚠️ Report is 1319 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff            @@
##             master   #21917   +/-   ##
=========================================
  Coverage     73.57%   73.58%           
- Complexity    32375    32395   +20     
=========================================
  Files          1861     1861           
  Lines        138568   138591   +23     
  Branches      15185    15185           
=========================================
+ Hits         101958   101978   +20     
- Misses        28704    28730   +26     
+ Partials       7906     7883   -23     
Flag Coverage Δ
inttests 24.16% <0.00%> (+0.02%) ⬆️
systests 23.61% <0.00%> (-0.01%) ⬇️
unittests 72.88% <100.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...g/apache/pulsar/client/impl/RawBatchConverter.java 93.90% <100.00%> (+0.15%) ⬆️

... and 85 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@coderzc coderzc added area/compaction area/broker type/bug The PR fixed a bug or issue reported a bug labels Jan 20, 2024
@Technoboy- Technoboy- merged commit 40eebc0 into apache:master Jan 21, 2024
@Technoboy- Technoboy- modified the milestones: 3.3.0, 3.2.0 Jan 21, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 1, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [broker] empty entry writed into the compacted topic

6 participants