Skip to content

Conversation

@linlinnn
Copy link
Contributor

@linlinnn linlinnn commented Apr 15, 2021

Fixes #10222
Fixes #10173

Motivation

fix order guarantee for MultiTopicsConsumerImpl

The cause of out of order:

the origin order of message: 1, 2, 3, 4

// ConsumerImpl receive message
if (peekPendingReceive() != null) { //code link 1
    notifyPendingReceivedCallback(message, null); // code link 2
} else if (enqueueMessageAndCheckBatchReceive(message) /** code link 3  **/&& hasPendingBatchReceive()) {
    notifyPendingBatchReceivedCallBack();
}

// ConsumerImpl#internalReceiveAsync
Message<T> message = incomingMessages.poll(); // code link 4 
if (message == null) { 
    pendingReceives.add(result); // code link 5
    cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
    messageProcessed(message);
    result.complete(beforeConsume(message));
}

receive message 3
Thread pulsar-client-io ConsumerImpl peek pending receive is null (code link 1)
Thread pulsar-client-internal ConsumerImpl poll message is null (code link 4)
Thread pulsar-client-internal ConsumerImpl pending receive for message 3 (code link 5)
Thread pulsar-client-io ConsumerImpl offer message 3 to incomingMessages (code link 3)
receive message 4
Thread pulsar-client-io ConsumerImpl send message 4 notify above pending receive (code link 2)
Thread pulsar-client-internal MultiTopicsConsumerImpl accept message 4
Thread pulsar-client-internal MultiTopicsConsumerImpl accept message 3

Modification

Revert "Remove consumer unnecessary locks (#9261)"

@linlinnn
Copy link
Contributor Author

@MarvinCai @codelipenghui @merlimat Please take a look.

@codelipenghui
Copy link
Contributor

codelipenghui commented Apr 15, 2021

@linlinnn I have some doubts here, why we need to guarantee message order across partitions(The MultiTopicsConsumerImpl is consuming messages from multiple partitions)?

@linlinnn
Copy link
Contributor Author

linlinnn commented Apr 15, 2021

@linlinnn I have some doubts here, why we need to guarantee message order across partitions

@codelipenghui
Yes, we don't need to guarantee message order across partitions, but we should guarantee message order from the same partition, The MultiTopicsConsumerImpl iterates ConsumerImpl(consuming messages from single partition) to consume messages from multiple topics and multiple partitions. In this issue, The MultiTopicsConsumerImpl broken the order of the message from the same partition

newConsumers.forEach(consumer -> {
                // each ConsumerImpl consume messages from single partition
                consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                receiveMessageFromConsumer(consumer);
            });

@MarvinCai
Copy link
Contributor

I think we should guarantee order by process incoming message from single thread, instead of adding lock which can impact performance due to context switch and wait.

@linlinnn
Copy link
Contributor Author

linlinnn commented Apr 15, 2021

I think we should guarantee order by process incoming message from single thread, instead of adding lock which can impact performance due to context switch and wait.

@MarvinCai
Emm. I agree your consideration. But the concurrent logic notify and offer seems conflict with single thread without lock.

@linlinnn
Copy link
Contributor Author

/pulsarbot run-failure-checks

@315157973
Copy link
Contributor

315157973 commented Apr 15, 2021

This is indeed a problem

        //1
       Message<T> message = incomingMessages.poll();
        if (message == null) {
         //2
            pendingReceives.add(result);
            cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
        } else {
         //3
            messageProcessed(message);
            result.complete(beforeConsume(message));
        }
        // 4
        if (peekPendingReceive() != null) {
            notifyPendingReceivedCallback(message, null);
        }
       ...
       // Message enters incomeQueue

When the execution order is 1、4、2、1、3、4, the order of 3 and 4 cannot be guaranteed

@315157973
Copy link
Contributor

Emm...We have orderExecutor CC @hangc0276

@linlinnn
Copy link
Contributor Author

/pulsarbot run-failure-checks

@MarvinCai
Copy link
Contributor

I actually don't see the point of the locking we try to add here. The poll from receiver queue or notify pending request is not the root cause here. The BlockingQueue is itself threadsafe. And no matter which thread does the notification, it'll alway be thread from "pulsar-external-listener" to process pending request.
The root cause is when message is retrieved in the line, the receiveAsync can be completed from 2 different thread in different situation, if there are messages in receiver queue, it'll be completed from "pulsar-client-internal" thread, if it become pending request, it'll later be completed from "pulsar-external-listener" thread. Even if order of message arrive the underlying single ConsumerImpl is correct, the MultiTopicsConsumerImpl can see out of order message cause 2 thread are processing message independently.
This only happen with multitopic consumer in case of small message volume without proper batching like in out test case.

@MarvinCai
Copy link
Contributor

MarvinCai commented Apr 15, 2021

@merlimat @sijie @codelipenghui any thought on this

@linlinnn
Copy link
Contributor Author

linlinnn commented Apr 15, 2021

@MarvinCai
poll method is not a block method, it will return the head of this queue, or null if this queue is empty
pulsar-external-listener execute receivedFuture.complete(message)
and then the message is accepted by pulsar-client-internal to do final process.

Also, I see the similar lock in the method batch receive.

@linlinnn linlinnn requested a review from eolivelli April 16, 2021 05:47
@linlinnn
Copy link
Contributor Author

@eolivelli @codelipenghui @lhotari Please help review this PR, thanks.

@linlinnn
Copy link
Contributor Author

Emm.. any feedback : ) @eolivelli @MarvinCai @merlimat @315157973

@linlinnn
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@linlinnn
Copy link
Contributor Author

/pulsarbot run-failure-checks

@linlinnn
Copy link
Contributor Author

CICD passed. Could you please help review if you are available? @lhotari

@lhotari
Copy link
Member

lhotari commented Apr 17, 2021

@linlinnn please rebase on latest apache/pulsar master since the problem in the master branch was fixed separately by #10250 .

@linlinnn
Copy link
Contributor Author

linlinnn commented Apr 17, 2021

@linlinnn please rebase on latest apache/pulsar master since the problem in the master branch was fixed separately by #10250 .

@lhotari Thanks. I mean could you please help review this PR.

@315157973
Copy link
Contributor

IMO,it’s better to revert first, so that we can easily distinguish whether the flaky unit test is caused by my PR

@linlinnn
Copy link
Contributor Author

IMO,it’s better to revert first, so that we can easily distinguish whether the flaky unit test is caused by my PR

+1

@codelipenghui
Copy link
Contributor

codelipenghui commented Apr 26, 2021

Looks good to me, @linlinnn I think you can use this PR to revert #9261 first. @hangc0276 Please also help take a look.

@linlinnn linlinnn changed the title [consumer] fix order guarantee for MultiTopicsConsumerImpl [consumer] Revert "Remove consumer unnecessary locks (#9261)" Apr 26, 2021
@linlinnn
Copy link
Contributor Author

@codelipenghui @315157973 I rebase master and revert #9261, PTAL

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 please take a look

@MarvinCai
Copy link
Contributor

LGTM, probably it is the hidden cause for many other flaky tests.

@MarvinCai
Copy link
Contributor

But we should still try to improve the performance w/ a lock free solution in a separate issue ASAP.

@codelipenghui codelipenghui requested a review from eolivelli April 27, 2021 06:46
@codelipenghui
Copy link
Contributor

ping @eolivelli Please help review this PR, this issue makes many tests unstable.

@codelipenghui
Copy link
Contributor

codelipenghui commented Apr 27, 2021

But we should still try to improve the performance w/ a lock free solution in a separate issue ASAP.

@MarvinCai Yes, we should continue to improve the performance. @315157973 also works on the lock-free solution. We'd better to revert first and then find a better solution to handle this issue.

@codelipenghui
Copy link
Contributor

@linlinnn Could you please help resolve the conflict?

@eolivelli
Copy link
Contributor

+1
let's merge as soon as CI passes, then we can figure out which is the best approach to improve the consumer.

@codelipenghui codelipenghui merged commit e95d6f0 into apache:master Apr 27, 2021
sijie pushed a commit that referenced this pull request May 5, 2021
### Motivation
Lock-free solution for #10240
sijie pushed a commit that referenced this pull request May 8, 2021
)

### Motivation

#10240 has reverted the changes of the #9261 introduced which make the key_shared tests flaky. So it's better to move out the tests from the quarantine group.

### Modifications

Move out the key_shared related tests from the quarantine group.
eolivelli pushed a commit to eolivelli/pulsar that referenced this pull request May 11, 2021
eolivelli pushed a commit to eolivelli/pulsar that referenced this pull request May 11, 2021
…che#10508)

### Motivation

apache#10240 has reverted the changes of the apache#9261 introduced which make the key_shared tests flaky. So it's better to move out the tests from the quarantine group.

### Modifications

Move out the key_shared related tests from the quarantine group.
codelipenghui pushed a commit that referenced this pull request Jul 1, 2021
Lock-free solution for #10240

(cherry picked from commit def1932)
dlg99 pushed a commit to dlg99/pulsar that referenced this pull request Dec 2, 2021
Lock-free solution for apache#10240

(cherry picked from commit def1932)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Dec 2, 2021
Lock-free solution for apache#10240

(cherry picked from commit def1932)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Message can arrive out of order for MultiTopicConsumer Tests of class NullValueTest are randomly failing

6 participants