Skip to content

Fix peek/relaxedPeek implementation of MpmcArrayQueue.#295

Closed
hl845740757 wants to merge 88 commits into
JCTools:masterfrom
hl845740757:mpmcarrayqueuepeek
Closed

Fix peek/relaxedPeek implementation of MpmcArrayQueue.#295
hl845740757 wants to merge 88 commits into
JCTools:masterfrom
hl845740757:mpmcarrayqueuepeek

Conversation

@hl845740757

Copy link
Copy Markdown
Contributor

Summary
Poll/relaxedPoll/Peek/RelaxedPeek should not return an older value.

ConsumerC: prepare to load element (currentConsumerIndex)
ConsumerA: poll the element (currentConsumerIndex)
Producer: offer the element (currentConsumerIndex + capaity)
Producer: complete store element
ConsumerC: complete load element (currentConsumerIndex + capaity)

ConsumerC expects to load currentConsumerIndex, but loads (currentConsumerIndex + capaity).

The implementation of SpmcArrayQueue has the same bug.

Exception in thread "Thread-2" java.lang.IllegalStateException: peekedSequence 18, lastPeekedSequence 25
	at com.wjybxx.fastjgame.concurrenttest.SpmcArrayQueuePeekTest$Peeker.run(SpmcArrayQueuePeekTest.java:111)
public class SpmcArrayQueuePeekTest {

    private static volatile boolean stop = false;

    public static void main(String[] args) throws InterruptedException {
        // Smaller capacity helps test
        MessagePassingQueue<Long> messageQueue = new SpmcArrayQueue<>(8);
        new Producer(messageQueue).start();
        new Consumer(messageQueue).start();
        new Peeker(messageQueue).start();

        try {
            Thread.sleep(10 * 1000);
        } finally {
            stop = true;
        }
    }

    private static class Producer extends Thread {

        final MessagePassingQueue<Long> messageQueue;

        long sequence = 0;

        Producer(MessagePassingQueue<Long> messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            while (!stop) {
                if (messageQueue.offer(sequence)) {
                    sequence++;
                }
            }
        }
    }

    private static class Consumer extends Thread {

        final MessagePassingQueue<Long> messageQueue;

        private Consumer(MessagePassingQueue<Long> messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            while (!stop) {
                messageQueue.poll();
                // wait producer fill
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    private static class Peeker extends Thread {

        final MessagePassingQueue<Long> messageQueue;
        long lastPeekedSequence;

        private Peeker(MessagePassingQueue<Long> messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            while (!stop) {
                final Long peekedSequence = messageQueue.peek();
                if (peekedSequence == null) {
                    continue;
                }

                if (peekedSequence < lastPeekedSequence) {
                    String msg = String.format("peekedSequence %s, lastPeekedSequence %s", peekedSequence, lastPeekedSequence);
                    throw new IllegalStateException(msg);
                }

                lastPeekedSequence = peekedSequence;
            }
        }
    }
}

franz1981 and others added 30 commits November 5, 2019 22:28
This is fixing MpqBurstCost and QueueBurstCost multi-consumer scenario
and will make both to look similar.
Fixes JCTools#272 BurstCost aren't handling multi-consumers correctly
nitsanw and others added 20 commits December 30, 2019 16:55
+ javadoc
+ tweak visibility of methods
…CTools#281)

QueueSanityTest::testSize with chunkSize = 1 and 1 recycled chunks was
failing with a blocked poll because it wasn't handling ccChunkIndex ==
ciChunkIndex as an isFirstElementOfNextChunk case
Though it's already package private, but anyhow
Avoids the field re-ordering introduced by JDK 15
Also handle negative edge cases for index queues (fix issue for Spsc)
Fixes JCTools#292
@coveralls

Copy link
Copy Markdown

Pull Request Test Coverage Report for Build 661

  • 14 of 14 (100.0%) changed or added relevant lines in 1 file are covered.
  • 41 unchanged lines in 4 files lost coverage.
  • Overall coverage decreased (-0.2%) to 85.86%

Files with Coverage Reduction New Missed Lines %
jctools-core/src/main/java/org/jctools/maps/NonBlockingHashMapLong.java 4 78.89%
jctools-core/src/main/java/org/jctools/maps/NonBlockingIdentityHashMap.java 5 77.31%
jctools-core/src/main/java/org/jctools/maps/NonBlockingHashMap.java 11 79.26%
jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java 21 85.19%
Totals Coverage Status
Change from base Build 660: -0.2%
Covered Lines: 4712
Relevant Lines: 5488

💛 - Coveralls

@nitsanw

nitsanw commented May 22, 2020

Copy link
Copy Markdown
Contributor

Thanks for the interesting test case. Can you add it to the existing test suite so all the queues are covered?

// only return null if queue is empty
// Volatile Mode: Loading element must be completed before loading pIndex.
e = lvRefElement(buffer, calcCircularRefElementOffset(cIndex, mask));
pIndex = lvProducerIndex();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original solution relying on the matching sequence is more in line with how poll works and the spirit of this queue altogether. The idea here is to avoid as much as possible cache misses/traffic induced by loading the producer index from the consumer threads.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence could be read twice to check if the read element is stable, similarly to what a stamped lock does

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stable value is unnecessary

long cIndex = lvConsumerIndex();
// Volatile Mode: Loading element must be completed before loading pIndex.
E e = lvRefElement(buffer, calcCircularRefElementOffset(cIndex, mask));
long pIndex = lvProducerIndex();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relaxed peek should be implemented without loading the producer index at all.

@nitsanw

nitsanw commented May 24, 2020

Copy link
Copy Markdown
Contributor

I think this is now resolved

@nitsanw nitsanw closed this May 24, 2020
@hl845740757 hl845740757 deleted the mpmcarrayqueuepeek branch May 28, 2020 06:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants