Skip to content

MpmcUnboundedXaddArrayQueue#peek/relaxedPeek can load "future" elements  #310

@hl845740757

Description

@hl845740757

If the speed of the producer is too fast, the producer always tends to create new chunks, so we must reduce the speed of the producer.
If the speed of the consumer is too fast, the peeker always tends to read an newer element, so we must reduce the speed of the consumer.

If and only if all chunks form a ring buffer, peeker may read an future element.
@franz1981

Exception in thread "Thread-2" java.lang.IllegalStateException: peekedSequence 9201, lastPeekedSequence 9202
	at com.wjybxx.fastjgame.queue.MpmcUnboundedXaddArrayQueuePeekTest$Peeker.run(MpmcUnboundedXaddArrayQueuePeekTest.java:121)

Test case

public class MpmcUnboundedXaddArrayQueuePeekTest {

    private static final int chunkSize = 16;
    private static final int poolSize = 4;
    private static final int capacity = chunkSize * poolSize;

    private static volatile boolean stop = false;

    public static void main(String[] args) throws InterruptedException {
        MessagePassingQueue<Long> messageQueue = new MpmcUnboundedXaddArrayQueue<>(chunkSize, poolSize);
        new Producer(messageQueue, 800).start();
        new Consumer(messageQueue, 600).start();
        new Peeker(messageQueue).start();

        try {
            Thread.sleep(10 * 1000);
        } finally {
            stop = true;
        }
    }

    private static class Producer extends Thread {

        final MessagePassingQueue<Long> messageQueue;
        final long sleepNanos;

        long sequence = 0;

        Producer(MessagePassingQueue<Long> messageQueue, long sleepNanos) {
            this.messageQueue = messageQueue;
            this.sleepNanos = sleepNanos;
        }

        @Override
        public void run() {
            while (!stop) {
                if (messageQueue.offer(sequence)) {
                    sequence++;
                }

                if (messageQueue.size() >= capacity - chunkSize) {
                    LockSupport.parkNanos(sleepNanos);
                }
            }
        }
    }

    private static class Consumer extends Thread {

        final MessagePassingQueue<Long> messageQueue;
        final long sleepNanos;

        private Consumer(MessagePassingQueue<Long> messageQueue, long sleepNanos) {
            this.messageQueue = messageQueue;
            this.sleepNanos = sleepNanos;
        }

        @Override
        public void run() {
            while (!stop) {
                messageQueue.poll();

                if (messageQueue.size() < chunkSize) {
                    LockSupport.parkNanos(sleepNanos);
                }
            }
        }

    }

    private static class Peeker extends Thread {

        final MessagePassingQueue<Long> messageQueue;
        long lastPeekedSequence;

        private Peeker(MessagePassingQueue<Long> messageQueue) {
            this.messageQueue = messageQueue;
            setPriority(MIN_PRIORITY);
        }

        @Override
        public void run() {
            while (!stop) {
                final Long peekedSequence = messageQueue.peek();
                if (peekedSequence == null) {
                    continue;
                }

                if (peekedSequence < lastPeekedSequence) {
                    String msg = String.format("peekedSequence %s, lastPeekedSequence %s", peekedSequence, lastPeekedSequence);
                    throw new IllegalStateException(msg);
                }

                lastPeekedSequence = peekedSequence;
            }
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions