Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Fix pulsar-io-thread block forever#946

Merged
BewareMyPower merged 5 commits into
streamnative:masterfrom
lordcheng10:fix_pulsar-io-thread_block_forever
Dec 3, 2021
Merged

Fix pulsar-io-thread block forever#946
BewareMyPower merged 5 commits into
streamnative:masterfrom
lordcheng10:fix_pulsar-io-thread_block_forever

Conversation

@lordcheng10

@lordcheng10 lordcheng10 commented Dec 2, 2021

Copy link
Copy Markdown
Contributor

When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
ctx.channel().eventLoop().execute(() -> { writeAndFlushResponseToClient(channel); });
This will cause a lot of close_wait on the server side;

related to:
#942

@BewareMyPower

Copy link
Copy Markdown
Collaborator

Is it related to #942?

@lordcheng10

Copy link
Copy Markdown
Contributor Author

Is it related to #942?

yes

@BewareMyPower

Copy link
Copy Markdown
Collaborator

In addition, is there a way to reproduce it? I think we can configure maxQueuedRequests with 1 so that some channels might stuck at requestQueue.put. But we have many pulsar-io threads to use, how many clients should we start to reproduce it? Or is there another way to reproduce it easily?

@lordcheng10

Copy link
Copy Markdown
Contributor Author

In addition, is there a way to reproduce it? I think we can configure maxQueuedRequests with 1 so that some channels might stuck at requestQueue.put. But we have many pulsar-io threads to use, how many clients should we start to reproduce it? Or is there another way to reproduce it easily?

Both maxQueuedRequests and numIOThreads are set to 1, which should be easier to reproduce

@lordcheng10

lordcheng10 commented Dec 2, 2021

Copy link
Copy Markdown
Contributor Author

@BewareMyPower I use the common order excuter to send response. PLTAL , thanks!

@aloyszhang

Copy link
Copy Markdown
Contributor

Nice catch!

@lordcheng10 lordcheng10 changed the title fix pulsar-io-thread block forever Fix pulsar-io-thread block forever Dec 2, 2021
@BewareMyPower

Copy link
Copy Markdown
Collaborator

LGTM. I will approve it after I have a simple test for following configs.

maxQueuedRequests=1
numIOThreads=1

Please also take a look, @hangc0276 @Demogorgon314

@aloyszhang aloyszhang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BewareMyPower

Copy link
Copy Markdown
Collaborator

I think this PR still doesn't fix the problem. Here are my logs of KoP standalone:

18:35:22.795 [send-response-OrderedScheduler-0-0] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [[id: 0x952f3998, L:/127.0.0.1:9092 - R:/127.0.0.1:63152]] request RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=producer-1, correlationId=6) is not completed for 30003703129 ns (> 30000 ms)
18:35:23.002 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.proto.PerChannelBookieClient - Timed-out 1 operations to channel [id: 0xda8b8f09, L:/127.0.0.1:63108 - R:/127.0.0.1:3181] for 127.0.0.1:3181
18:35:23.004 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (4, 0): Bookie operation timeout
18:35:23.013 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to find 1 bookies : excludeBookies [<Bookie:127.0.0.1:3181>], allBookies [<Bookie:127.0.0.1:3181>].
18:35:23.013 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to choose a bookie: excluded [<Bookie:127.0.0.1:3181>], fallback to choose bookie randomly from the cluster.

I test it simply with following extra configs:

maxQueuedRequests=1
sendKafkaResponseThreads=1

And send messages via Kafka CMD tool:

$ ./bin/kafka-producer-perf-test.sh --topic my-topic --num-records 10000 --throughput 1000 --record-size 1024 --producer.config temp.properties
[2021-12-02 18:35:32,240] WARN [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition my-topic-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2021-12-02 18:35:32,241] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition my-topic-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
...

@BewareMyPower

Copy link
Copy Markdown
Collaborator
"send-response-OrderedScheduler-0-0" #96 prio=5 os_prio=31 tid=0x00007fddd74d7800 nid=0x19a03 waiting on condition [0x0000700013bee000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000796005f48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
	- None

@lordcheng10

Copy link
Copy Markdown
Contributor Author

I think this PR still doesn't fix the problem. Here are my logs of KoP standalone:

18:35:22.795 [send-response-OrderedScheduler-0-0] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [[id: 0x952f3998, L:/127.0.0.1:9092 - R:/127.0.0.1:63152]] request RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=producer-1, correlationId=6) is not completed for 30003703129 ns (> 30000 ms)
18:35:23.002 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.proto.PerChannelBookieClient - Timed-out 1 operations to channel [id: 0xda8b8f09, L:/127.0.0.1:63108 - R:/127.0.0.1:3181] for 127.0.0.1:3181
18:35:23.004 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (4, 0): Bookie operation timeout
18:35:23.013 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to find 1 bookies : excludeBookies [<Bookie:127.0.0.1:3181>], allBookies [<Bookie:127.0.0.1:3181>].
18:35:23.013 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to choose a bookie: excluded [<Bookie:127.0.0.1:3181>], fallback to choose bookie randomly from the cluster.

I test it simply with following extra configs:

maxQueuedRequests=1
sendKafkaResponseThreads=1

And send messages via Kafka CMD tool:

$ ./bin/kafka-producer-perf-test.sh --topic my-topic --num-records 10000 --throughput 1000 --record-size 1024 --producer.config temp.properties
[2021-12-02 18:35:32,240] WARN [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition my-topic-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2021-12-02 18:35:32,241] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition my-topic-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
...

Before this PR is repaired, once the pulsar-io thread is blocked in the putting queue, the thread will be blocked forever. This PR is to solve the problem.
But we can't handle the situation where the pulsar-io thread is blocked in a limited time when the speed of putting it in is much faster than taking it out of the queue.

@lordcheng10

Copy link
Copy Markdown
Contributor Author

18:35:22.795 [send-response-OrderedScheduler-0-0] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [[id: 0x952f3998, L:/127.0.0.1:9092 - R:/127.0.0.1:63152]] request RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=producer-1, correlationId=6) is not completed for 30003703129 ns (> 30000 ms) 18:35:23.002 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO org.apache.bookkeeper.proto.PerChannelBookieClient - Timed-out 1 operations to channel [id: 0xda8b8f09, L:/127.0.0.1:63108 - R:/127.0.0.1:3181] for 127.0.0.1:3181 18:35:23.004 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (4, 0): Bookie operation timeout 18:35:23.013 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to find 1 bookies : excludeBookies [<Bookie:127.0.0.1:3181>], allBookies [<Bookie:127.0.0.1:3181>]. 18:35:23.013 [BookKeeperClientWorker-OrderedExecutor-4-0] WARN org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to choose a bookie: excluded [<Bookie:127.0.0.1:3181>], fallback to choose bookie randomly from the cluster.

How big is the bookkeeper thread? You can add it to the bookkeeper thread to increase the processing capacity. In addition, is the managedledgerdefaultensemblesize of bookkeeper set to 1

@BewareMyPower

BewareMyPower commented Dec 2, 2021

Copy link
Copy Markdown
Collaborator

Sorry I've made a mistake. I configured numIOThreads=1 instead of sendKafkaResponseThreads=1.

I think the root cause is that the value of numIOThread is too small for a lot of connections because writeAndFlushResponseToClient also uses the pulsar-io thread. The independent sendResponseScheduler can reduce the pressure for the IO threads.

@Demogorgon314 Demogorgon314 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. The default value of numSendKafkaResponseThreads might need some benchmark test to find out.

@lordcheng10

lordcheng10 commented Dec 2, 2021

Copy link
Copy Markdown
Contributor Author

LGTM. The default value of numSendKafkaResponseThreads might need some benchmark test to find out.

  1. Set numsendkafkaresponsethreads = 4,Use Kafka perf tool for pressure measurement,200 perf programs started ,Pulsar broker has only one :
    nohup sh kafka-producer-perf-test.sh --topic test1 --num-records 10000000000 --throughput 200000 --record-size 512 --producer.config ../config/producer.properties &
    .....
    nohup sh kafka-producer-perf-test.sh --topic test200 --num-records 10000000000 --throughput 200000 --record-size 512 --producer.config ../config/producer.properties &

  2. The throughput of a single broker is 200K msg/ s :
    image

So we can set the default value to 4? @Demogorgon314 @hangc0276

@BewareMyPower

Copy link
Copy Markdown
Collaborator

Did you compare it with other values? I think you can give a simple comparison with Runtime.getRuntime().availableProcessors() value.

@lordcheng10

lordcheng10 commented Dec 3, 2021

Copy link
Copy Markdown
Contributor Author

Did you compare it with other values? I think you can give a simple comparison with Runtime.getRuntime().availableProcessors() value.

When numsendkafkaresponsethreads = 4 and the throughput of a single broker reaches 200K msg / s, the send response thread is still very idle. At this time, the bottleneck is not the send response thread,So it doesn't make sense to increase the number of send threads:
image

@lordcheng10

Copy link
Copy Markdown
Contributor Author

Runtime.getRuntime().availableProcessors()

When using runtime. Getruntime(). Availableprocessors(), the number of send response threads on my test broker is 24

@BewareMyPower

Copy link
Copy Markdown
Collaborator

It makes sense.

@BewareMyPower BewareMyPower merged commit c4ce316 into streamnative:master Dec 3, 2021
BewareMyPower pushed a commit that referenced this pull request Dec 3, 2021
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
`                ctx.channel().eventLoop().execute(() -> {
                     writeAndFlushResponseToClient(channel);
                 });`
This will cause a lot of close_wait on the server side;

related to:
#942
BewareMyPower pushed a commit that referenced this pull request Dec 3, 2021
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
`                ctx.channel().eventLoop().execute(() -> {
                     writeAndFlushResponseToClient(channel);
                 });`
This will cause a lot of close_wait on the server side;

related to:
#942
eolivelli pushed a commit to eolivelli/kop that referenced this pull request Dec 17, 2021
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
`                ctx.channel().eventLoop().execute(() -> {
                     writeAndFlushResponseToClient(channel);
                 });`
This will cause a lot of close_wait on the server side;

related to:
streamnative#942

(cherry picked from commit c4ce316)
michaeljmarshall added a commit to michaeljmarshall/kop that referenced this pull request Dec 12, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] producer request timeout

5 participants