-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
My Code
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import java.nio.charset.StandardCharsets;
class ConsumerThread extends Thread{
private final long processTime;
private final String name;
// private final ConsumerListener consumerListener;
Consumer<byte[]> consumer;
public ConsumerThread(String name,long processTime) throws PulsarClientException {
this.processTime = processTime;
this.name = name;
// consumerListener = new ConsumerListener(name,processTime);
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://192.168.184.131:6650")
.build();
consumer = pulsarClient.newConsumer()
.receiverQueueSize(1)
// .batchReceivePolicy()
.topic("my_topic")
.subscriptionName("my_subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
}
@Override
public void run(){
while (true){
try {
Message<byte []>message = consumer.receive();
Thread.sleep(processTime);
System.out.println(name + ":" + new String(message.getData()));
consumer.acknowledge(message);
} catch (PulsarClientException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class StartManager {
public static void main(String[] args) throws PulsarClientException, InterruptedException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://192.168.184.131:6650")
.build();
new ConsumerThread("a",100).start();
new ConsumerThread(" b",1000).start();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("my_topic").create();
for (int i = 0;i < 50;i++){
producer.send(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
while (true){
Thread.sleep(1000);
}
}
}
debug it run into
# ZeroQueueConsumerImpl.class
void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) {
log.warn("Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", this.subscription, this.consumerName);
this.closeAsync().handle((ok, e) -> {
this.notifyPendingReceivedCallback((Message)null, new InvalidMessageException(String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", this.subscription, this.consumerName)));
return null;
});
}
i can receive messages when I use .receiverQueueSize(1)
i can not receive messages when I use .receiverQueueSize(0)
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug