Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Conversation

@Demogorgon314
Copy link
Member

@Demogorgon314 Demogorgon314 commented Nov 24, 2022

#1129

Motivation

Currently, KoP doesn't support publish rate limit, but it is a helpful feature for users,
In this PR, it introduced the message publish rate limit, and the rate limit is reused from
Pulsar, it can work with Pulsar together.

Modifications

  • Support message publish rate limit.
  • Added units test to verify it.

TODO

Need check if the KoP needs to support this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-219+-+Improve+quota+communication

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@Demogorgon314 Demogorgon314 self-assigned this Nov 24, 2022
@github-actions github-actions bot added the no-need-doc This pr does not need any document label Nov 24, 2022
@Demogorgon314 Demogorgon314 force-pushed the Demogorgon314/Support-messege-publish-throttling branch from 392abd3 to 2524538 Compare November 24, 2022 06:55
@codecov
Copy link

codecov bot commented Nov 24, 2022

Codecov Report

Merging #1589 (fc7bd67) into master (6970ed5) will decrease coverage by 0.48%.
The diff coverage is 3.57%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1589      +/-   ##
============================================
- Coverage     15.34%   14.85%   -0.49%     
+ Complexity      592      589       -3     
============================================
  Files           164      164              
  Lines         11913    12225     +312     
  Branches       1102     1120      +18     
============================================
- Hits           1828     1816      -12     
- Misses         9931    10254     +323     
- Partials        154      155       +1     
Impacted Files Coverage Δ
...mnative/pulsar/handlers/kop/InternalServerCnx.java 0.00% <ø> (ø)
...ative/pulsar/handlers/kop/KafkaRequestHandler.java 1.09% <0.00%> (-0.07%) ⬇️
...sar/handlers/kop/storage/AppendRecordsContext.java 0.00% <0.00%> (ø)
...tive/pulsar/handlers/kop/storage/PartitionLog.java 10.26% <4.34%> (-0.29%) ⬇️
...ative/pulsar/handlers/kop/PendingTopicFutures.java 50.98% <0.00%> (-25.50%) ⬇️
...nagedLedgerPropertiesMigrationMetadataManager.java 44.11% <0.00%> (-0.89%) ⬇️
.../handlers/kop/coordinator/group/GroupMetadata.java 51.78% <0.00%> (-0.19%) ⬇️
...streamnative/pulsar/handlers/kop/AdminManager.java 0.40% <0.00%> (-0.01%) ⬇️
...e/pulsar/handlers/kop/utils/KafkaRequestUtils.java 0.00% <0.00%> (ø)
... and 10 more

@Demogorgon314 Demogorgon314 marked this pull request as ready for review November 24, 2022 12:58
@Demogorgon314 Demogorgon314 force-pushed the Demogorgon314/Support-messege-publish-throttling branch from 426e704 to 9382ee3 Compare November 28, 2022 08:43
@Demogorgon314
Copy link
Member Author

@BewareMyPower @eolivelli Please help review this PR :-)

@BewareMyPower
Copy link
Collaborator

I will review it next week.

@BewareMyPower
Copy link
Collaborator

Not related to this PR, I think it would be good to implement KIP-219

@BewareMyPower
Copy link
Collaborator

We can reduce duplicated code by adding a method like:

    private void sendMessagesFromMultiThreads(KafkaProducer<Integer, byte[]> producer, String topicName,
                                              int numThread, int numMessage, int msgBytes) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(numThread);
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (int n = 0; n < numThread; n++) {
            final CompletableFuture<Void> future = new CompletableFuture<>();
            futures.add(future);
            executorService.submit(() -> {
                for (int i = 0; i < numMessage; i++) {
                    try {
                        producer.send(new ProducerRecord<>(topicName, new byte[msgBytes])).get();
                        future.complete(null);
                    } catch (InterruptedException | ExecutionException e) {
                        future.completeExceptionally(e);
                    }
                }
            });
        }
        FutureUtil.waitForAll(futures).get();
    }

It's also a fail-fast style.

In addition, I noticed the two tests use different msgBytes:

        int numMessage = 10;
        int msgBytes = 110;
        int numThread = 4;
        int numMessage = 20;
        int numThread = 4;
        int msgBytes = 50;

Is it possible to share the same config so that we can make these fields private and simplify the method to the following signature?

    private void sendMessagesFromMultiThreads(KafkaProducer<Integer, byte[]> producer, String topicName)
            throws Exception {

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BewareMyPower BewareMyPower merged commit 60bb8f0 into streamnative:master Dec 8, 2022
Demogorgon314 added a commit that referenced this pull request Dec 8, 2022
### Motivation

Currently, KoP doesn't support publish rate limit, but it is a helpful
feature for users,
In this PR, it introduced the message publish rate limit, and the rate
limit is reused from
Pulsar, it can work with Pulsar together.

### Modifications
* Support message publish rate limit.
* Added units test to verify it.

(cherry picked from commit 60bb8f0)
@Demogorgon314 Demogorgon314 deleted the Demogorgon314/Support-messege-publish-throttling branch December 8, 2022 02:55
Demogorgon314 added a commit that referenced this pull request Dec 19, 2022
### Motivation

Currently, KoP doesn't support publish rate limit, but it is a helpful
feature for users,
In this PR, it introduced the message publish rate limit, and the rate
limit is reused from
Pulsar, it can work with Pulsar together.

### Modifications
* Support message publish rate limit.
* Added units test to verify it.

(cherry picked from commit 60bb8f0)
michaeljmarshall pushed a commit to michaeljmarshall/kop that referenced this pull request Jan 31, 2023
Currently, KoP doesn't support publish rate limit, but it is a helpful
feature for users,
In this PR, it introduced the message publish rate limit, and the rate
limit is reused from
Pulsar, it can work with Pulsar together.

* Support message publish rate limit.
* Added units test to verify it.
Demogorgon314 added a commit that referenced this pull request Feb 6, 2023
### Motivation

Currently, KoP doesn't support publish rate limit, but it is a helpful
feature for users,
In this PR, it introduced the message publish rate limit, and the rate
limit is reused from
Pulsar, it can work with Pulsar together.

### Modifications
* Support message publish rate limit.
* Added units test to verify it.

(cherry picked from commit 60bb8f0)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants