Skip to content

Commit 5fd5772

Browse files
committed
Fix inconsistent isEmpty/poll/peek/offer dynamics for SpscLinkedQueue
Also handle negative edge cases for index queues (fix issue for Spsc) Fixes #292
1 parent 9157a5c commit 5fd5772

File tree

9 files changed

+586
-272
lines changed

9 files changed

+586
-272
lines changed

jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import java.util.AbstractQueue;
1717
import java.util.Iterator;
18+
import java.util.Queue;
1819

1920
import static org.jctools.util.UnsafeAccess.UNSAFE;
2021
import static org.jctools.util.UnsafeAccess.fieldOffset;
@@ -45,20 +46,23 @@ abstract class BaseLinkedQueueProducerNodeRef<E> extends BaseLinkedQueuePad0<E>
4546
{
4647
final static long P_NODE_OFFSET = fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode");
4748

48-
private LinkedQueueNode<E> producerNode;
49+
private volatile LinkedQueueNode<E> producerNode;
4950

5051
final void spProducerNode(LinkedQueueNode<E> newValue)
5152
{
52-
producerNode = newValue;
53+
UNSAFE.putObject(this, P_NODE_OFFSET, newValue);
54+
}
55+
56+
final void soProducerNode(LinkedQueueNode<E> newValue)
57+
{
58+
UNSAFE.putOrderedObject(this, P_NODE_OFFSET, newValue);
5359
}
5460

55-
@SuppressWarnings("unchecked")
5661
final LinkedQueueNode<E> lvProducerNode()
5762
{
58-
return (LinkedQueueNode<E>) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET);
63+
return producerNode;
5964
}
6065

61-
@SuppressWarnings("unchecked")
6266
final boolean casProducerNode(LinkedQueueNode<E> expect, LinkedQueueNode<E> newValue)
6367
{
6468
return UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, expect, newValue);
@@ -212,7 +216,9 @@ public final int size()
212216
@Override
213217
public boolean isEmpty()
214218
{
215-
return lvConsumerNode() == lvProducerNode();
219+
LinkedQueueNode<E> consumerNode = lvConsumerNode();
220+
LinkedQueueNode<E> producerNode = lvProducerNode();
221+
return consumerNode == producerNode;
216222
}
217223

218224
protected E getSingleConsumerNodeValue(LinkedQueueNode<E> currConsumerNode, LinkedQueueNode<E> nextNode)
@@ -229,6 +235,91 @@ protected E getSingleConsumerNodeValue(LinkedQueueNode<E> currConsumerNode, Link
229235
return nextValue;
230236
}
231237

238+
/**
239+
* {@inheritDoc} <br>
240+
* <p>
241+
* IMPLEMENTATION NOTES:<br>
242+
* Poll is allowed from a SINGLE thread.<br>
243+
* Poll is potentially blocking here as the {@link Queue#poll()} does not allow returning {@code null} if the queue is not
244+
* empty. This is very different from the original Vyukov guarantees. See {@link #relaxedPoll()} for the original
245+
* semantics.<br>
246+
* Poll reads {@code consumerNode.next} and:
247+
* <ol>
248+
* <li>If it is {@code null} AND the queue is empty return {@code null}, <b>if queue is not empty spin wait for
249+
* value to become visible</b>.
250+
* <li>If it is not {@code null} set it as the consumer node and return it's now evacuated value.
251+
* </ol>
252+
* This means the consumerNode.value is always {@code null}, which is also the starting point for the queue.
253+
* Because {@code null} values are not allowed to be offered this is the only node with it's value set to
254+
* {@code null} at any one time.
255+
*
256+
* @see MessagePassingQueue#poll()
257+
* @see java.util.Queue#poll()
258+
*/
259+
@Override
260+
public E poll()
261+
{
262+
final LinkedQueueNode<E> currConsumerNode = lpConsumerNode();
263+
LinkedQueueNode<E> nextNode = currConsumerNode.lvNext();
264+
if (nextNode != null)
265+
{
266+
return getSingleConsumerNodeValue(currConsumerNode, nextNode);
267+
}
268+
else if (currConsumerNode != lvProducerNode())
269+
{
270+
nextNode = spinWaitForNextNode(currConsumerNode);
271+
// got the next node...
272+
return getSingleConsumerNodeValue(currConsumerNode, nextNode);
273+
}
274+
return null;
275+
}
276+
277+
/**
278+
* {@inheritDoc} <br>
279+
* <p>
280+
* IMPLEMENTATION NOTES:<br>
281+
* Peek is allowed from a SINGLE thread.<br>
282+
* Peek is potentially blocking here as the {@link Queue#peek()} does not allow returning {@code null} if the queue is not
283+
* empty. This is very different from the original Vyukov guarantees. See {@link #relaxedPeek()} for the original
284+
* semantics.<br>
285+
* Poll reads the next node from the consumerNode and:
286+
* <ol>
287+
* <li>If it is {@code null} AND the queue is empty return {@code null}, <b>if queue is not empty spin wait for
288+
* value to become visible</b>.
289+
* <li>If it is not {@code null} return it's value.
290+
* </ol>
291+
*
292+
* @see MessagePassingQueue#peek()
293+
* @see java.util.Queue#peek()
294+
*/
295+
@Override
296+
public E peek()
297+
{
298+
final LinkedQueueNode<E> currConsumerNode = lpConsumerNode();
299+
LinkedQueueNode<E> nextNode = currConsumerNode.lvNext();
300+
if (nextNode != null)
301+
{
302+
return nextNode.lpValue();
303+
}
304+
else if (currConsumerNode != lvProducerNode())
305+
{
306+
nextNode = spinWaitForNextNode(currConsumerNode);
307+
// got the next node...
308+
return nextNode.lpValue();
309+
}
310+
return null;
311+
}
312+
313+
LinkedQueueNode<E> spinWaitForNextNode(LinkedQueueNode<E> currNode)
314+
{
315+
LinkedQueueNode<E> nextNode;
316+
while ((nextNode = currNode.lvNext()) == null)
317+
{
318+
// spin, we are no longer wait free
319+
}
320+
return nextNode;
321+
}
322+
232323
@Override
233324
public E relaxedPoll()
234325
{

jctools-core/src/main/java/org/jctools/queues/IndexedQueueSizeUtil.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@
1515

1616
import org.jctools.util.InternalAPI;
1717

18+
/**
19+
* A note to maintainers on index assumptions: in a single threaded world it would seem intuitive to assume:
20+
* <pre>
21+
* <code>producerIndex >= consumerIndex</code>
22+
* </pre>
23+
* As an invariant, but in a concurrent, long running settings all of the following need to be considered:
24+
* <ul>
25+
* <li> <code>consumerIndex > producerIndex</code> : due to counter overflow (unlikey with longs, but easy to reason)
26+
* <li> <code>consumerIndex > producerIndex</code> : due to consumer FastFlow like implementation discovering the
27+
* element before the counter is updated.
28+
* <li> <code>producerIndex - consumerIndex < 0</code> : due to above.
29+
* <li> <code>producerIndex - consumerIndex > Integer.MAX_VALUE</code> : as linked buffers allow constructing queues
30+
* with more than <code>Integer.MAX_VALUE</code> elements.
31+
*
32+
* </ul>
33+
*/
1834
@InternalAPI
1935
public final class IndexedQueueSizeUtil
2036
{
@@ -45,6 +61,10 @@ public static int size(IndexedQueue iq)
4561
{
4662
return Integer.MAX_VALUE;
4763
}
64+
else if (size < 0)
65+
{
66+
return 0;
67+
}
4868
else
4969
{
5070
return (int) size;
@@ -57,7 +77,7 @@ public static boolean isEmpty(IndexedQueue iq)
5777
// Loading consumer before producer allows for producer increments after consumer index is read.
5878
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
5979
// nothing we can do to make this an exact method.
60-
return (iq.lvConsumerIndex() == iq.lvProducerIndex());
80+
return (iq.lvConsumerIndex() >= iq.lvProducerIndex());
6181
}
6282

6383
@InternalAPI

jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java

Lines changed: 2 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -72,81 +72,6 @@ public boolean offer(final E e)
7272
return true;
7373
}
7474

75-
/**
76-
* {@inheritDoc} <br>
77-
* <p>
78-
* IMPLEMENTATION NOTES:<br>
79-
* Poll is allowed from a SINGLE thread.<br>
80-
* Poll is potentially blocking here as the {@link Queue#poll()} does not allow returning {@code null} if the queue is not
81-
* empty. This is very different from the original Vyukov guarantees. See {@link #relaxedPoll()} for the original
82-
* semantics.<br>
83-
* Poll reads {@code consumerNode.next} and:
84-
* <ol>
85-
* <li>If it is {@code null} AND the queue is empty return {@code null}, <b>if queue is not empty spin wait for
86-
* value to become visible</b>.
87-
* <li>If it is not {@code null} set it as the consumer node and return it's now evacuated value.
88-
* </ol>
89-
* This means the consumerNode.value is always {@code null}, which is also the starting point for the queue.
90-
* Because {@code null} values are not allowed to be offered this is the only node with it's value set to
91-
* {@code null} at any one time.
92-
*
93-
* @see MessagePassingQueue#poll()
94-
* @see java.util.Queue#poll()
95-
*/
96-
@Override
97-
public E poll()
98-
{
99-
final LinkedQueueNode<E> currConsumerNode = lpConsumerNode();
100-
LinkedQueueNode<E> nextNode = currConsumerNode.lvNext();
101-
if (nextNode != null)
102-
{
103-
return getSingleConsumerNodeValue(currConsumerNode, nextNode);
104-
}
105-
else if (currConsumerNode != lvProducerNode())
106-
{
107-
nextNode = spinWaitForNextNode(currConsumerNode);
108-
// got the next node...
109-
return getSingleConsumerNodeValue(currConsumerNode, nextNode);
110-
}
111-
return null;
112-
}
113-
114-
/**
115-
* {@inheritDoc} <br>
116-
* <p>
117-
* IMPLEMENTATION NOTES:<br>
118-
* Peek is allowed from a SINGLE thread.<br>
119-
* Peek is potentially blocking here as the {@link Queue#peek()} does not allow returning {@code null} if the queue is not
120-
* empty. This is very different from the original Vyukov guarantees. See {@link #relaxedPeek()} for the original
121-
* semantics.<br>
122-
* Poll reads the next node from the consumerNode and:
123-
* <ol>
124-
* <li>If it is {@code null} AND the queue is empty return {@code null}, <b>if queue is not empty spin wait for
125-
* value to become visible</b>.
126-
* <li>If it is not {@code null} return it's value.
127-
* </ol>
128-
*
129-
* @see MessagePassingQueue#peek()
130-
* @see java.util.Queue#peek()
131-
*/
132-
@Override
133-
public E peek()
134-
{
135-
final LinkedQueueNode<E> currConsumerNode = lpConsumerNode();
136-
LinkedQueueNode<E> nextNode = currConsumerNode.lvNext();
137-
if (nextNode != null)
138-
{
139-
return nextNode.lpValue();
140-
}
141-
else if (currConsumerNode != lvProducerNode())
142-
{
143-
nextNode = spinWaitForNextNode(currConsumerNode);
144-
// got the next node...
145-
return nextNode.lpValue();
146-
}
147-
return null;
148-
}
149-
15075
/**
15176
* {@inheritDoc}
15277
* <p>
@@ -248,13 +173,13 @@ private LinkedQueueNode<E> xchgProducerNode(LinkedQueueNode<E> newVal)
248173
}
249174
else
250175
{
251-
Object oldVal;
176+
LinkedQueueNode<E> oldVal;
252177
do
253178
{
254179
oldVal = lvProducerNode();
255180
}
256181
while (!UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, oldVal, newVal));
257-
return (LinkedQueueNode<E>) oldVal;
182+
return oldVal;
258183
}
259184
}
260185

@@ -268,13 +193,4 @@ private LinkedQueueNode<E> getNextConsumerNode(LinkedQueueNode<E> currConsumerNo
268193
return nextNode;
269194
}
270195

271-
private LinkedQueueNode<E> spinWaitForNextNode(LinkedQueueNode<E> currNode)
272-
{
273-
LinkedQueueNode<E> nextNode;
274-
while ((nextNode = currNode.lvNext()) == null)
275-
{
276-
// spin, we are no longer wait free
277-
}
278-
return nextNode;
279-
}
280196
}

jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public SpscLinkedQueue()
4646
* Offer is allowed from a SINGLE thread.<br>
4747
* Offer allocates a new node (holding the offered value) and:
4848
* <ol>
49-
* <li>Sets that node as the producerNode.next
5049
* <li>Sets the new node as the producerNode
50+
* <li>Sets that node as the lastProducerNode.next
5151
* </ol>
5252
* From this follows that producerNode.next is always null and for all other nodes node.next is not null.
5353
*
@@ -62,37 +62,15 @@ public boolean offer(final E e)
6262
throw new NullPointerException();
6363
}
6464
final LinkedQueueNode<E> nextNode = newNode(e);
65-
lpProducerNode().soNext(nextNode);
66-
spProducerNode(nextNode);
65+
LinkedQueueNode<E> oldNode = lpProducerNode();
66+
soProducerNode(nextNode);
67+
// Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
68+
// and completes the store in prev.next. This is a "bubble".
69+
// Inverting the order here will break the `isEmpty` invariant, and will require matching adjustments elsewhere.
70+
oldNode.soNext(nextNode);
6771
return true;
6872
}
6973

70-
/**
71-
* {@inheritDoc} <br>
72-
* <p>
73-
* IMPLEMENTATION NOTES:<br>
74-
* Poll is allowed from a SINGLE thread.<br>
75-
* Poll reads the next node from the consumerNode and:
76-
* <ol>
77-
* <li>If it is null, the queue is empty.
78-
* <li>If it is not null set it as the consumer node and return it's now evacuated value.
79-
* </ol>
80-
* This means the consumerNode.value is always null, which is also the starting point for the queue.
81-
* Because null values are not allowed to be offered this is the only node with it's value set to null at
82-
* any one time.
83-
*/
84-
@Override
85-
public E poll()
86-
{
87-
return relaxedPoll();
88-
}
89-
90-
@Override
91-
public E peek()
92-
{
93-
return relaxedPeek();
94-
}
95-
9674
@Override
9775
public int fill(Supplier<E> s)
9876
{
@@ -118,31 +96,15 @@ public int fill(Supplier<E> s, int limit)
11896
tail = temp;
11997
}
12098
final LinkedQueueNode<E> oldPNode = lpProducerNode();
121-
oldPNode.soNext(head);
12299
spProducerNode(tail);
100+
// same bubble as offer, and for the same reasons.
101+
oldPNode.soNext(head);
123102
return limit;
124103
}
125104

126105
@Override
127106
public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit)
128107
{
129-
if (null == wait)
130-
throw new IllegalArgumentException("waiter is null");
131-
if (null == exit)
132-
throw new IllegalArgumentException("exit condition is null");
133-
if (null == s)
134-
throw new IllegalArgumentException("supplier is null");
135-
136-
LinkedQueueNode<E> chaserNode = lpProducerNode();
137-
while (exit.keepRunning())
138-
{
139-
for (int i = 0; i < 4096; i++)
140-
{
141-
final LinkedQueueNode<E> nextNode = newNode(s.get());
142-
chaserNode.soNext(nextNode);
143-
chaserNode = nextNode;
144-
this.spProducerNode(chaserNode);
145-
}
146-
}
108+
MessagePassingQueueUtil.fill(this, s, wait, exit);
147109
}
148110
}

0 commit comments

Comments
 (0)