Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Apr 13, 2021

Fixes #10210

Motivation

See #10210

  • a single LightProto BaseCommand instance is reused in the IO thread
    • the instance shouldn't be exposed to other threads
    • See org.apache.pulsar.common.protocol.PulsarDecoder.channelRead for more
      details.

private final BaseCommand cmd = new BaseCommand();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
this.proxyMessage = proxyMessage;
proxyMessage.release();
return;
}
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
try {
// De-serialize the command
int cmdSize = (int) buffer.readUnsignedInt();
cmd.parseFrom(buffer, cmdSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Received cmd {}", ctx.channel().remoteAddress(), cmd.getType());
}
messageReceived();
switch (cmd.getType()) {

  • fixes IllegalReferenceCountException while getting schema

Modifications

  • make copies of response message instances before sharing with other threads

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lhotari for working on this problem

I am not sure this is the right way.
The problem looks more structural, here we are trying to fix only the specific case.

@eolivelli
Copy link
Contributor

@merlimat PTAL

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add test cases?

@eolivelli
Copy link
Contributor

@lhotari please check the answer from @merlimat
#10210 (comment)

Probably this is the minimal fix we can do in ClientCtx, that is to eagerly initialize all of the fields:

@Override
    protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchemaResponse) {
        checkArgument(state == State.Ready);

        long requestId = commandGetSchemaResponse.getRequestId();
        commandGetSchemaResponse.getSchema().getSchemaData();
        commandGetSchemaResponse.getSchema().getName();
        commandGetSchemaResponse.getSchema().getPropertiesList()
        commandGetSchemaResponse.getSchema().getType();
        CompletableFuture<CommandGetSchemaResponse> future = (CompletableFuture<CommandGetSchemaResponse>) pendingRequests
                .remove(requestId);
        if (future == null) {
            log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId);
            return;
        }
        future.complete(commandGetSchemaResponse);
    }

@lhotari
Copy link
Member Author

lhotari commented Apr 14, 2021

@lhotari please check the answer from @merlimat
#10210 (comment)

yes I checked it.

Probably this is the minimal fix we can do in ClientCtx, that is to eagerly initialize all of the fields:

@Override
    protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchemaResponse) {
        checkArgument(state == State.Ready);

        long requestId = commandGetSchemaResponse.getRequestId();
        commandGetSchemaResponse.getSchema().getSchemaData();
        commandGetSchemaResponse.getSchema().getName();
        commandGetSchemaResponse.getSchema().getPropertiesList()
        commandGetSchemaResponse.getSchema().getType();
        CompletableFuture<CommandGetSchemaResponse> future = (CompletableFuture<CommandGetSchemaResponse>) pendingRequests
                .remove(requestId);
        if (future == null) {
            log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId);
            return;
        }
        future.complete(commandGetSchemaResponse);
    }

This won't work since there is no eager initialization. Please take a look at the source code of LightProto generated classes.

A way to do eager initialization would be to move the code for creating SchemaInfo from sendGetSchema method to handleGetSchemaResponse. I experimented with that here: lhotari@431b41c .
It works, but the problem is that Pulsar Proxy depends on the current way and that would break. That's the reason why I came back to the solution that I have in this PR.

@eolivelli
Copy link
Contributor

For instance on Schema struct when you call "getName" the String is created and the ByteBuf is decoded:

public String getName() {
		if (!hasName()) {
			throw new IllegalStateException("Field 'name' is not set");
		}
		if (name == null) {
			name = LightProtoCodec.readString(_parsedBuffer, _nameBufferIdx, _nameBufferLen);
		}
		return name;
	}

with my approach you eagerly create the "String" for name and then the problem is fixed

@eolivelli
Copy link
Contributor

your fix works but it needs to refactor a bit the structures that we are passing from method to method.

if LightProto had an utility function to eagerly decode the ByteBufs we could use that feature, without the need of calling every method that needs an explicit call for initialization (ByteBuf decode I mean)

@merlimat ?

@lhotari
Copy link
Member Author

lhotari commented Apr 14, 2021

with my approach you eagerly create the "String" for name and then the problem is fixed

For Strings it's fine, but the approach doesn't seem to work for getSchemaData(). Did you check the source code of that method?

@merlimat
Copy link
Contributor

The protobuf object is only valid within the context of the IO thread that is deserializing it. If we need some of the fields to be used from different threads, these values have to be copied out. eg: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L866

@lhotari
Copy link
Member Author

lhotari commented Apr 15, 2021

/pulsarbot run-failure-checks

@eolivelli
Copy link
Contributor

I have another idea (that I am testing locally)

We can simply "clone" the CommandGetSchemaResponse inside handleGetSchemaResponse, this way we decode the ByteBuf and we return a safe object to the caller

 @Override
    protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchemaResponse) {
        log.info("handleGetSchemaResponse {}", commandGetSchemaResponse);
        checkArgument(state == State.Ready);
        CommandGetSchemaResponse clone = new CommandGetSchemaResponse();
        clone.copyFrom(commandGetSchemaResponse);
        commandGetSchemaResponse = clone;

…er thread via CompletableFuture

- a single LightProto BaseCommand instance is reused in the IO thread
  - the instance shouldn't be exposed to other threads
  - See org.apache.pulsar.common.protocol.PulsarDecoder.channelRead for more
    details.
@lhotari lhotari force-pushed the lh-fix-get-schema branch from 9bc948d to 0040c36 Compare April 16, 2021 14:00
@lhotari
Copy link
Member Author

lhotari commented Apr 16, 2021

We can simply "clone" the CommandGetSchemaResponse inside handleGetSchemaResponse, this way we decode the ByteBuf and we return a safe object to the caller

@eolivelli Good idea. I revisited the changes and also found 2 other locations where a copy is needed.

@lhotari
Copy link
Member Author

lhotari commented Apr 16, 2021

Can we add test cases?

@sijie Do you have a proposal of how to test this?

@lhotari lhotari changed the title Fix IllegalReferenceCountException while getting schema [Client|Proxy] Create copy of response messages before passing to other thread via CompletableFuture Apr 16, 2021
@lhotari
Copy link
Member Author

lhotari commented Apr 16, 2021

@merlimat I have revisited the change. PTAL

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am running this patch in a local branch.

LGTM

@sijie I am not sure it is worth to add a test case.
I mean, reproducing the issue is not simple, so the only test case we can create is about ensuring that we are cloning the value

WDYT ?

@eolivelli
Copy link
Contributor

eolivelli commented Apr 16, 2021

@sijie @merlimat I have this integration test that fails due to this problem, I believe there is no strong need to add a test here
https://github.com/apache/pulsar/pull/10211/checks?check_run_id=2362726281

image

@merlimat merlimat added the type/bug The PR fixed a bug or issue reported a bug label Apr 16, 2021
@merlimat merlimat added this to the 2.8.0 milestone Apr 16, 2021
@eolivelli eolivelli requested a review from sijie April 16, 2021 21:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Schema: "IllegalReferenceCountException: refCnt: 0" while getting schema with AutoConsumeSchema

4 participants