Update sparse file tracker complete pointer on progress#109247
Update sparse file tracker complete pointer on progress#109247tlrx merged 14 commits intoelastic:mainfrom
complete pointer on progress#109247Conversation
henningandersen
left a comment
There was a problem hiding this comment.
Looks great. Left a few smaller comments, but I think moving forward makes sense.
| private volatile List<PositionAndListener> listeners; | ||
| protected volatile long progress; | ||
| /** | ||
| * A consumer that accepts progress made by this {@link ProgressListenableActionFuture} |
There was a problem hiding this comment.
Can we add this to the constructor javadoc too. And also mention that this is called prior to notifying listeners (since we'll rely on that), though with out of order possibilities for concurrent notifications.
| consumer.accept(progress); | ||
| } catch (Exception e) { | ||
| assert false : e; | ||
| throw e; |
There was a problem hiding this comment.
I'd probably prefer to log a warning instead rather than rethrow. There seems to be no reason to throw out the exception if the consumer were to fail (I am sure it will not for now).
There was a problem hiding this comment.
Sure, I pushed b2249a6. I don't expect this to throw though.
...ck/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java
Show resolved
Hide resolved
|
|
||
| private void updateCompletePointerHoldingLock(long value) { | ||
| assert Thread.holdsLock(ranges); | ||
| assert complete <= value : complete + ">" + value; |
There was a problem hiding this comment.
When called from maybeUpdateCompletePointer, this assertion depends on ProgressListenableActionFuture not notifying of progress on the last byte. It feels slightly subtle to me, but works ofc. The check I suggested in maybeUpdateCompletePointer should fix it though.
complete pointer on progresscomplete pointer on progress
|
Pinging @elastic/es-distributed (Team:Distributed) |
| synchronized (this) { | ||
| assert completed == false; | ||
| completed = true; | ||
| assert listeners == null || listeners.stream().allMatch(l -> progress < l.position() && l.position() <= end); |
There was a problem hiding this comment.
There was a problem hiding this comment.
Hmm the actual length of the data is actionResult() which should be <= end. When actionResult() < end, I think there should be no listener waiting at all when we reach here? Otherwise these listeners are waiting for a range that can never be fulfilled?
There was a problem hiding this comment.
the actual length of the data is
actionResult()which should be <=end.
I think that the cache does not allow to partially complete ranges, so it expects the range (gap) to be fully written or it considers it failed and any remaining listener will be failed too.
I pushed cdabee7 to ensure it is always completed with a value equals to end.
There was a problem hiding this comment.
OK I see. The SparseFileTracker always completes ProgressListenableActionFuture with the range end. So regardless of the progress this range has actually made, the actionResult() should be equal to end when it completes.
...b-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java
Show resolved
Hide resolved
| synchronized (this) { | ||
| assert completed == false; | ||
| completed = true; | ||
| assert listeners == null || listeners.stream().allMatch(l -> progress < l.position() && l.position() <= end); |
There was a problem hiding this comment.
Hmm the actual length of the data is actionResult() which should be <= end. When actionResult() < end, I think there should be no listener waiting at all when we reach here? Otherwise these listeners are waiting for a range that can never be fulfilled?
|
Thanks for your feedback @ywangd!
I see what you mean bu I still think it makes sense to not complete "early" the listeners due to progress being made if those listeners are waiting for the full gap to be completed. Feels more correct to me. |
ywangd
left a comment
There was a problem hiding this comment.
Thanks for answering my question, Tanguy. I have a few more minor ones. This is looking great. I just need another pass for the tests before approval. Thank you!
| @Nullable | ||
| private final LongConsumer progressConsumer; | ||
|
|
||
| private List<PositionAndListener> listeners; |
There was a problem hiding this comment.
I think we still need volatile here or alternatively make toString synchronized?
There was a problem hiding this comment.
Let's not bring back volatile only for toString(), I pushed af58294
| if (rangeStart == complete) { | ||
| return this::updateCompletePointer; | ||
| } else { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Mostly making sure I understand how this works. We don't update complete unless every byte before it is fetched. We also adjust targetRange.start so that we can expect one of the ranges will resume fetching from complete. And that range is the only one that should update the complete field?
| var future = new ProgressListenableActionFuture( | ||
| start, | ||
| end, | ||
| p -> assertThat("LongConsumer should not consumed the same value twice", consumed.add(p), equalTo(true)) |
There was a problem hiding this comment.
Nit: extra space :)
| p -> assertThat("LongConsumer should not consumed the same value twice", consumed.add(p), equalTo(true)) | |
| p -> assertThat("LongConsumer should not consumed the same value twice", consumed.add(p), equalTo(true)) |
| public void testLongConsumerCalledOnProgressUpdate() { | ||
| // min length of 2 to have at least one progress update before reaching the end | ||
| long length = randomLongBetween(2L, ByteSizeUnit.TB.toBytes(1L)); | ||
| long start = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - length); |
There was a problem hiding this comment.
The code does not prevent negative start values. But it seems rather odd to allow it. I wonder whether we should tighten it up in a separate PR to assert start >= 0?
| ); | ||
|
|
||
| long position = start; | ||
| for (int i = 0; i < 25 && position < end - 1L; i++) { |
There was a problem hiding this comment.
Nit: Maybe a comment about why 25 is used could be helpful.
There was a problem hiding this comment.
This is just a number of iterations, I used a local variable in 16052d4
| assertThat(consumed.contains(progress), equalTo(true)); | ||
| assertThat(listener.isDone(), equalTo(true)); | ||
| position = progress; | ||
| } |
There was a problem hiding this comment.
The consumer is not called for future.onProgress(end). Is there any value to assert that? Or do you think this is an implementation detail and not worth it?
There was a problem hiding this comment.
I added a check that the consumer is not called when progress is updated to end in 16052d4
| if (awaitingListener != null && j < gap.end() - 1L) { | ||
| assertThat( | ||
| "Complete pointer should have been updated when a listener is waiting for the gap to be completed", | ||
| tracker.getComplete(), | ||
| equalTo(j + 1L) | ||
| ); | ||
| latestUpdatedCompletePointer = tracker.getComplete(); |
There was a problem hiding this comment.
I think this branch meant to assert awaitingKListener.isDone?
| /** | ||
| * A consumer that accepts progress made by this {@link ProgressListenableActionFuture}. The consumer is called before listeners are | ||
| * notified of the updated progress value in {@link #onProgress(long)}. The consumer can be called with out-of-order progress values. | ||
| */ |
There was a problem hiding this comment.
Can we call out that the consumer is not called when progress == end? I find it important and a bit counter-intuitive. So it's helpful to explicitly note it down.
|
Thanks Yang & Henning! |
This change updates the
completepointer when aGapmakes progress.