Fix pulsar-io-thread block forever#946
Conversation
|
Is it related to #942? |
yes |
|
In addition, is there a way to reproduce it? I think we can configure |
Both maxQueuedRequests and numIOThreads are set to 1, which should be easier to reproduce |
|
@BewareMyPower I use the common order excuter to send response. PLTAL , thanks! |
|
Nice catch! |
|
LGTM. I will approve it after I have a simple test for following configs. Please also take a look, @hangc0276 @Demogorgon314 |
|
I think this PR still doesn't fix the problem. Here are my logs of KoP standalone: I test it simply with following extra configs: maxQueuedRequests=1
sendKafkaResponseThreads=1And send messages via Kafka CMD tool: $ ./bin/kafka-producer-perf-test.sh --topic my-topic --num-records 10000 --throughput 1000 --record-size 1024 --producer.config temp.properties
[2021-12-02 18:35:32,240] WARN [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition my-topic-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2021-12-02 18:35:32,241] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition my-topic-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
... |
|
Before this PR is repaired, once the pulsar-io thread is blocked in the putting queue, the thread will be blocked forever. This PR is to solve the problem. |
|
How big is the bookkeeper thread? You can add it to the bookkeeper thread to increase the processing capacity. In addition, is the managedledgerdefaultensemblesize of bookkeeper set to 1 |
|
Sorry I've made a mistake. I configured I think the root cause is that the value of |
Demogorgon314
left a comment
There was a problem hiding this comment.
LGTM. The default value of numSendKafkaResponseThreads might need some benchmark test to find out.
So we can set the default value to 4? @Demogorgon314 @hangc0276 |
|
Did you compare it with other values? I think you can give a simple comparison with |
When using runtime. Getruntime(). Availableprocessors(), the number of send response threads on my test broker is 24 |
|
It makes sense. |
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
` ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});`
This will cause a lot of close_wait on the server side;
related to:
#942
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
` ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});`
This will cause a lot of close_wait on the server side;
related to:
#942
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
` ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});`
This will cause a lot of close_wait on the server side;
related to:
streamnative#942
(cherry picked from commit c4ce316)
This reverts commit c4ce316.


When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
ctx.channel().eventLoop().execute(() -> { writeAndFlushResponseToClient(channel); });This will cause a lot of close_wait on the server side;
related to:
#942