Skip to content

Using BulkProcessor2 in RollupShardIndexer#94197

Merged
masseyke merged 11 commits intoelastic:mainfrom
masseyke:using-bulkprocessor2-in-rollup
Mar 22, 2023
Merged

Using BulkProcessor2 in RollupShardIndexer#94197
masseyke merged 11 commits intoelastic:mainfrom
masseyke:using-bulkprocessor2-in-rollup

Conversation

@masseyke
Copy link
Copy Markdown
Member

In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore. This PR ports rollup over to the new BulkProcessor2 implementation. BulkProcessor2 always runs asynchronously, meaning that RollupShardIndexer has to explicitly check for failures and throw an exception, rather than relying on the exception being thrown in-thread during the bulk indexing.

@masseyke masseyke added :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) labels Feb 28, 2023
@masseyke masseyke marked this pull request as ready for review March 6, 2023 16:23
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

Copy link
Copy Markdown
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good. I asked a few questions just for my own understanding.

.setBulkActions(ROLLUP_BULK_ACTIONS)
.setBulkSize(ROLLUP_BULK_SIZE)
// execute the bulk request on the same thread
.setConcurrentRequests(0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a change in runtime behaviour? BulkProcessor2 would execute the bulk request on different threads and if multiple bulk requests exceed max bytes in flight, then a rejected exception is thrown. Just double checking, I think BulkProcessor2 would work well in this context.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are correct that if searches are happening faster than bulk indexing then eventually we'd start getting EsRejectedExecutionExceptions and losing data. I've added bulkProcessorTooFullMonitor and logic around it to exert backpressure and avoid this. I also added DownsampleActionSingleNodeTests.testTooManyBytesInFlight() to show this problem (and fix).

XContentBuilder doc = rollupBucketBuilder.buildRollupDocument();
indexBucket(doc);
}
bulkProcessor.flush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed because this would happen during closing of BulkProcessor2?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. There's no need to explicitly flush BulkProcessor2 (it doesn't even have a public flush method).

TimeValue.timeValueMillis(System.currentTimeMillis() - startTime)
);

if (task.getNumFailed() > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this if statement is reached then all bulk requests have been executed?

Copy link
Copy Markdown
Member Author

@masseyke masseyke Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, close only waits up to 30 seconds. So I guess it's possible that we don't have any failures here, and then by the time the next line runs we do, but task.getNumIndexed() == task.getNumSent() so we report success. I think if I swap these two blocks it will solve that, right? Also, do we want to wait more than 30 seconds here? That's effectively 30 seconds for the last 50 MB (the max amount of in-flight bytes we allow) to flush, which seems like plenty.

@masseyke masseyke marked this pull request as draft March 7, 2023 13:49
@masseyke masseyke requested a review from martijnvg March 8, 2023 23:06
@masseyke masseyke marked this pull request as ready for review March 8, 2023 23:07
@masseyke
Copy link
Copy Markdown
Member Author

masseyke commented Mar 8, 2023

@elasticmachine update branch

private final Rounding.Prepared rounding;
private final List<FieldValueFetcher> fieldValueFetchers;
private final RollupShardTask task;
/*
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there should be a different version of BulkProcessor2 that can execute on the same thread? (Or BulkProcessor2 should be modified to handle this?)

This change now adds quite some additional concurrency logic.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move the old BulkProcessor into this package? I think BulkProcessor2 is faster (since it allows for multiple index requests to be in flight at once), but I don't actually have any performance tests (I don't know if there are any for this).

Copy link
Copy Markdown
Member Author

@masseyke masseyke Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I add a new addWithBackpressure method in BulkProcessor2 (a separate method will allow us to pass in a Supplier to check whether RollupShardIndexer.abort is true), and move this code in there? If I understand correctly, your concern is not that we run everything on the current thread (which would be fairly difficult with BulkProcessor2), but that you don't want to get exceptions if the search code is running much faster than the index code, and you don't want the complexity (or the need to maintain it) in TSDB code. Right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I add a new addWithBackpressure method in BulkProcessor2 (a separate method will allow us to pass in a Supplier to check whether RollupShardIndexer.abort is true), and move this code in there?

👍 this sounds good to me. The code added in this change to RollupShardSearcher isn't about rolling up data, but being able to bulk index on the current thread.

Copy link
Copy Markdown
Member Author

@masseyke masseyke Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I've added this change with #94599 (which drastically reduced the changes needed in RollupShardIndexer.

@masseyke masseyke requested a review from martijnvg March 21, 2023 18:53
Copy link
Copy Markdown
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for iterating here.

@masseyke masseyke merged commit c97cccb into elastic:main Mar 22, 2023
@masseyke masseyke deleted the using-bulkprocessor2-in-rollup branch March 22, 2023 15:31
@martijnvg
Copy link
Copy Markdown
Member

martijnvg commented Jun 12, 2023

It looks like this change also had an positive impact on the downsampling to 1 minute fixed interval tsdb benchmark:

image

(visualization is from 16th of march until 27th of march)
On the day this got merged, downsampling the tsdb index to 1 minute interval buckets went from ~1400000 ms to ~800000 ms.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>non-issue :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.8.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants