Skip to content

Java Client not allow newConsumer().receiverQueueSize(0) #11850

@graceon

Description

@graceon

My Code

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import java.nio.charset.StandardCharsets;

class ConsumerThread extends Thread{
    private final long processTime;
    private final String name;
    //    private final ConsumerListener consumerListener;
    Consumer<byte[]> consumer;
    public ConsumerThread(String name,long processTime) throws PulsarClientException {
        this.processTime = processTime;
        this.name = name;
//        consumerListener = new ConsumerListener(name,processTime);
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://192.168.184.131:6650")
                .build();
        consumer = pulsarClient.newConsumer()
                .receiverQueueSize(1)
//                .batchReceivePolicy()
                .topic("my_topic")
                .subscriptionName("my_subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
    }
    @Override
    public void run(){
        while (true){
            try {
                Message<byte []>message = consumer.receive();
                Thread.sleep(processTime);
                System.out.println(name + ":" + new String(message.getData()));
                consumer.acknowledge(message);
            } catch (PulsarClientException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class StartManager {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://192.168.184.131:6650")
                .build();
        new ConsumerThread("a",100).start();
        new ConsumerThread("      b",1000).start();

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic("my_topic").create();
        for (int i = 0;i < 50;i++){
            producer.send(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
        }
        while (true){


            Thread.sleep(1000);
        }
    }
}

debug it run into

# ZeroQueueConsumerImpl.class
    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) {
        log.warn("Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", this.subscription, this.consumerName);
        this.closeAsync().handle((ok, e) -> {
            this.notifyPendingReceivedCallback((Message)null, new InvalidMessageException(String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", this.subscription, this.consumerName)));
            return null;
        });
    }

i can receive messages when I use .receiverQueueSize(1)
i can not receive messages when I use .receiverQueueSize(0)

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions