Skip to content

Update sparse file tracker complete pointer on progress#109247

Merged
tlrx merged 14 commits intoelastic:mainfrom
tlrx:2024/05/31/update-complete-marker-on-progress
Jun 12, 2024
Merged

Update sparse file tracker complete pointer on progress#109247
tlrx merged 14 commits intoelastic:mainfrom
tlrx:2024/05/31/update-complete-marker-on-progress

Conversation

@tlrx
Copy link
Copy Markdown
Member

@tlrx tlrx commented May 31, 2024

This change updates the complete pointer when a Gap makes progress.

@tlrx tlrx added >non-issue :Distributed/Distributed A catch all label for anything in the Distributed Area. Please avoid if you can. v8.15.0 labels May 31, 2024
@henningandersen henningandersen self-requested a review June 3, 2024 08:33
Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Left a few smaller comments, but I think moving forward makes sense.

private volatile List<PositionAndListener> listeners;
protected volatile long progress;
/**
* A consumer that accepts progress made by this {@link ProgressListenableActionFuture}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to the constructor javadoc too. And also mention that this is called prior to notifying listeners (since we'll rely on that), though with out of order possibilities for concurrent notifications.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b15d351

consumer.accept(progress);
} catch (Exception e) {
assert false : e;
throw e;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably prefer to log a warning instead rather than rethrow. There seems to be no reason to throw out the exception if the consumer were to fail (I am sure it will not for now).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I pushed b2249a6. I don't expect this to throw though.


private void updateCompletePointerHoldingLock(long value) {
assert Thread.holdsLock(ranges);
assert complete <= value : complete + ">" + value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When called from maybeUpdateCompletePointer, this assertion depends on ProgressListenableActionFuture not notifying of progress on the last byte. It feels slightly subtle to me, but works ofc. The check I suggested in maybeUpdateCompletePointer should fix it though.

@tlrx tlrx changed the title [Draft] Update sparse file tracker complete pointer on progress Update sparse file tracker complete pointer on progress Jun 5, 2024
@tlrx tlrx marked this pull request as ready for review June 5, 2024 11:36
@tlrx tlrx requested review from henningandersen and ywangd June 5, 2024 11:36
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Meta label for distributed team. label Jun 5, 2024
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

synchronized (this) {
assert completed == false;
completed = true;
assert listeners == null || listeners.stream().allMatch(l -> progress < l.position() && l.position() <= end);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the actual length of the data is actionResult() which should be <= end. When actionResult() < end, I think there should be no listener waiting at all when we reach here? Otherwise these listeners are waiting for a range that can never be fulfilled?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the actual length of the data is actionResult() which should be <= end.

I think that the cache does not allow to partially complete ranges, so it expects the range (gap) to be fully written or it considers it failed and any remaining listener will be failed too.

I pushed cdabee7 to ensure it is always completed with a value equals to end.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see. The SparseFileTracker always completes ProgressListenableActionFuture with the range end. So regardless of the progress this range has actually made, the actionResult() should be equal to end when it completes.

Copy link
Copy Markdown
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these changes, I wonder whether we still need the shortcircuting for when progressValue == end introduced in #108095?

synchronized (this) {
assert completed == false;
completed = true;
assert listeners == null || listeners.stream().allMatch(l -> progress < l.position() && l.position() <= end);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the actual length of the data is actionResult() which should be <= end. When actionResult() < end, I think there should be no listener waiting at all when we reach here? Otherwise these listeners are waiting for a range that can never be fulfilled?

@tlrx
Copy link
Copy Markdown
Member Author

tlrx commented Jun 6, 2024

Thanks for your feedback @ywangd!

I wonder whether we still need the shortcircuting for when progressValue == end introduced in #108095?

I see what you mean bu I still think it makes sense to not complete "early" the listeners due to progress being made if those listeners are waiting for the full gap to be completed. Feels more correct to me.

Copy link
Copy Markdown
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for answering my question, Tanguy. I have a few more minor ones. This is looking great. I just need another pass for the tests before approval. Thank you!

@Nullable
private final LongConsumer progressConsumer;

private List<PositionAndListener> listeners;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need volatile here or alternatively make toString synchronized?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not bring back volatile only for toString(), I pushed af58294

Comment on lines +266 to +270
if (rangeStart == complete) {
return this::updateCompletePointer;
} else {
return null;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly making sure I understand how this works. We don't update complete unless every byte before it is fetched. We also adjust targetRange.start so that we can expect one of the ranges will resume fetching from complete. And that range is the only one that should update the complete field?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.

@tlrx tlrx requested a review from ywangd June 7, 2024 15:11
Copy link
Copy Markdown
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

var future = new ProgressListenableActionFuture(
start,
end,
p -> assertThat("LongConsumer should not consumed the same value twice", consumed.add(p), equalTo(true))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra space :)

Suggested change
p -> assertThat("LongConsumer should not consumed the same value twice", consumed.add(p), equalTo(true))
p -> assertThat("LongConsumer should not consumed the same value twice", consumed.add(p), equalTo(true))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eagle eyes again :)

public void testLongConsumerCalledOnProgressUpdate() {
// min length of 2 to have at least one progress update before reaching the end
long length = randomLongBetween(2L, ByteSizeUnit.TB.toBytes(1L));
long start = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - length);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code does not prevent negative start values. But it seems rather odd to allow it. I wonder whether we should tighten it up in a separate PR to assert start >= 0?

);

long position = start;
for (int i = 0; i < 25 && position < end - 1L; i++) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe a comment about why 25 is used could be helpful.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a number of iterations, I used a local variable in 16052d4

assertThat(consumed.contains(progress), equalTo(true));
assertThat(listener.isDone(), equalTo(true));
position = progress;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer is not called for future.onProgress(end). Is there any value to assert that? Or do you think this is an implementation detail and not worth it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check that the consumer is not called when progress is updated to end in 16052d4

Comment on lines +559 to +565
if (awaitingListener != null && j < gap.end() - 1L) {
assertThat(
"Complete pointer should have been updated when a listener is waiting for the gap to be completed",
tracker.getComplete(),
equalTo(j + 1L)
);
latestUpdatedCompletePointer = tracker.getComplete();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this branch meant to assert awaitingKListener.isDone?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, pushed in 16052d4

Comment on lines +38 to +41
/**
* A consumer that accepts progress made by this {@link ProgressListenableActionFuture}. The consumer is called before listeners are
* notified of the updated progress value in {@link #onProgress(long)}. The consumer can be called with out-of-order progress values.
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call out that the consumer is not called when progress == end? I find it important and a bit counter-intuitive. So it's helpful to explicitly note it down.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 16052d4

@tlrx tlrx merged commit a7cd581 into elastic:main Jun 12, 2024
@tlrx tlrx deleted the 2024/05/31/update-complete-marker-on-progress branch June 12, 2024 06:58
@tlrx
Copy link
Copy Markdown
Member Author

tlrx commented Jun 12, 2024

Thanks Yang & Henning!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Distributed A catch all label for anything in the Distributed Area. Please avoid if you can. >non-issue Team:Distributed Meta label for distributed team. v8.15.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants