-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Client|Proxy] Create copy of response messages before passing to other thread via CompletableFuture #10215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lhotari for working on this problem
I am not sure this is the right way.
The problem looks more structural, here we are trying to fix only the specific case.
|
@merlimat PTAL |
sijie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add test cases?
|
@lhotari please check the answer from @merlimat Probably this is the minimal fix we can do in ClientCtx, that is to eagerly initialize all of the fields: |
yes I checked it.
This won't work since there is no eager initialization. Please take a look at the source code of LightProto generated classes. A way to do eager initialization would be to move the code for creating |
|
For instance on Schema struct when you call "getName" the String is created and the ByteBuf is decoded: with my approach you eagerly create the "String" for name and then the problem is fixed |
|
your fix works but it needs to refactor a bit the structures that we are passing from method to method. if LightProto had an utility function to eagerly decode the ByteBufs we could use that feature, without the need of calling every method that needs an explicit call for initialization (ByteBuf decode I mean) |
For |
|
The protobuf object is only valid within the context of the IO thread that is deserializing it. If we need some of the fields to be used from different threads, these values have to be copied out. eg: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L866 |
|
/pulsarbot run-failure-checks |
|
I have another idea (that I am testing locally) We can simply "clone" the CommandGetSchemaResponse inside |
…er thread via CompletableFuture
- a single LightProto BaseCommand instance is reused in the IO thread
- the instance shouldn't be exposed to other threads
- See org.apache.pulsar.common.protocol.PulsarDecoder.channelRead for more
details.
9bc948d to
0040c36
Compare
@eolivelli Good idea. I revisited the changes and also found 2 other locations where a copy is needed. |
@sijie Do you have a proposal of how to test this? |
|
@merlimat I have revisited the change. PTAL |
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am running this patch in a local branch.
LGTM
@sijie I am not sure it is worth to add a test case.
I mean, reproducing the issue is not simple, so the only test case we can create is about ensuring that we are cloning the value
WDYT ?
|
@sijie @merlimat I have this integration test that fails due to this problem, I believe there is no strong need to add a test here |

Fixes #10210
Motivation
See #10210
details.
pulsar/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
Lines 93 to 115 in c12765a
IllegalReferenceCountExceptionwhile getting schemaModifications