If the speed of the producer is too fast, the producer always tends to create new chunks, so we must reduce the speed of the producer.
If the speed of the consumer is too fast, the peeker always tends to read an newer element, so we must reduce the speed of the consumer.
If and only if all chunks form a ring buffer, peeker may read an future element.
@franz1981
Exception in thread "Thread-2" java.lang.IllegalStateException: peekedSequence 9201, lastPeekedSequence 9202
at com.wjybxx.fastjgame.queue.MpmcUnboundedXaddArrayQueuePeekTest$Peeker.run(MpmcUnboundedXaddArrayQueuePeekTest.java:121)
Test case
public class MpmcUnboundedXaddArrayQueuePeekTest {
private static final int chunkSize = 16;
private static final int poolSize = 4;
private static final int capacity = chunkSize * poolSize;
private static volatile boolean stop = false;
public static void main(String[] args) throws InterruptedException {
MessagePassingQueue<Long> messageQueue = new MpmcUnboundedXaddArrayQueue<>(chunkSize, poolSize);
new Producer(messageQueue, 800).start();
new Consumer(messageQueue, 600).start();
new Peeker(messageQueue).start();
try {
Thread.sleep(10 * 1000);
} finally {
stop = true;
}
}
private static class Producer extends Thread {
final MessagePassingQueue<Long> messageQueue;
final long sleepNanos;
long sequence = 0;
Producer(MessagePassingQueue<Long> messageQueue, long sleepNanos) {
this.messageQueue = messageQueue;
this.sleepNanos = sleepNanos;
}
@Override
public void run() {
while (!stop) {
if (messageQueue.offer(sequence)) {
sequence++;
}
if (messageQueue.size() >= capacity - chunkSize) {
LockSupport.parkNanos(sleepNanos);
}
}
}
}
private static class Consumer extends Thread {
final MessagePassingQueue<Long> messageQueue;
final long sleepNanos;
private Consumer(MessagePassingQueue<Long> messageQueue, long sleepNanos) {
this.messageQueue = messageQueue;
this.sleepNanos = sleepNanos;
}
@Override
public void run() {
while (!stop) {
messageQueue.poll();
if (messageQueue.size() < chunkSize) {
LockSupport.parkNanos(sleepNanos);
}
}
}
}
private static class Peeker extends Thread {
final MessagePassingQueue<Long> messageQueue;
long lastPeekedSequence;
private Peeker(MessagePassingQueue<Long> messageQueue) {
this.messageQueue = messageQueue;
setPriority(MIN_PRIORITY);
}
@Override
public void run() {
while (!stop) {
final Long peekedSequence = messageQueue.peek();
if (peekedSequence == null) {
continue;
}
if (peekedSequence < lastPeekedSequence) {
String msg = String.format("peekedSequence %s, lastPeekedSequence %s", peekedSequence, lastPeekedSequence);
throw new IllegalStateException(msg);
}
lastPeekedSequence = peekedSequence;
}
}
}
}
If the speed of the producer is too fast, the producer always tends to create new chunks, so we must reduce the speed of the producer.
If the speed of the consumer is too fast, the peeker always tends to read an newer element, so we must reduce the speed of the consumer.
If and only if all chunks form a ring buffer, peeker may read an future element.
@franz1981
Test case