Using BulkProcessor2 in RollupShardIndexer#94197
Conversation
|
Pinging @elastic/es-analytics-geo (Team:Analytics) |
martijnvg
left a comment
There was a problem hiding this comment.
I think this looks good. I asked a few questions just for my own understanding.
| .setBulkActions(ROLLUP_BULK_ACTIONS) | ||
| .setBulkSize(ROLLUP_BULK_SIZE) | ||
| // execute the bulk request on the same thread | ||
| .setConcurrentRequests(0) |
There was a problem hiding this comment.
I think this is a change in runtime behaviour? BulkProcessor2 would execute the bulk request on different threads and if multiple bulk requests exceed max bytes in flight, then a rejected exception is thrown. Just double checking, I think BulkProcessor2 would work well in this context.
There was a problem hiding this comment.
Yes you are correct that if searches are happening faster than bulk indexing then eventually we'd start getting EsRejectedExecutionExceptions and losing data. I've added bulkProcessorTooFullMonitor and logic around it to exert backpressure and avoid this. I also added DownsampleActionSingleNodeTests.testTooManyBytesInFlight() to show this problem (and fix).
| XContentBuilder doc = rollupBucketBuilder.buildRollupDocument(); | ||
| indexBucket(doc); | ||
| } | ||
| bulkProcessor.flush(); |
There was a problem hiding this comment.
Not needed because this would happen during closing of BulkProcessor2?
There was a problem hiding this comment.
That's right. There's no need to explicitly flush BulkProcessor2 (it doesn't even have a public flush method).
| TimeValue.timeValueMillis(System.currentTimeMillis() - startTime) | ||
| ); | ||
|
|
||
| if (task.getNumFailed() > 0) { |
There was a problem hiding this comment.
When this if statement is reached then all bulk requests have been executed?
There was a problem hiding this comment.
Hmm, close only waits up to 30 seconds. So I guess it's possible that we don't have any failures here, and then by the time the next line runs we do, but task.getNumIndexed() == task.getNumSent() so we report success. I think if I swap these two blocks it will solve that, right? Also, do we want to wait more than 30 seconds here? That's effectively 30 seconds for the last 50 MB (the max amount of in-flight bytes we allow) to flush, which seems like plenty.
|
@elasticmachine update branch |
| private final Rounding.Prepared rounding; | ||
| private final List<FieldValueFetcher> fieldValueFetchers; | ||
| private final RollupShardTask task; | ||
| /* |
There was a problem hiding this comment.
Maybe there should be a different version of BulkProcessor2 that can execute on the same thread? (Or BulkProcessor2 should be modified to handle this?)
This change now adds quite some additional concurrency logic.
There was a problem hiding this comment.
We could move the old BulkProcessor into this package? I think BulkProcessor2 is faster (since it allows for multiple index requests to be in flight at once), but I don't actually have any performance tests (I don't know if there are any for this).
There was a problem hiding this comment.
What if I add a new addWithBackpressure method in BulkProcessor2 (a separate method will allow us to pass in a Supplier to check whether RollupShardIndexer.abort is true), and move this code in there? If I understand correctly, your concern is not that we run everything on the current thread (which would be fairly difficult with BulkProcessor2), but that you don't want to get exceptions if the search code is running much faster than the index code, and you don't want the complexity (or the need to maintain it) in TSDB code. Right?
There was a problem hiding this comment.
What if I add a new addWithBackpressure method in BulkProcessor2 (a separate method will allow us to pass in a Supplier to check whether RollupShardIndexer.abort is true), and move this code in there?
👍 this sounds good to me. The code added in this change to RollupShardSearcher isn't about rolling up data, but being able to bulk index on the current thread.
There was a problem hiding this comment.
OK I've added this change with #94599 (which drastically reduced the changes needed in RollupShardIndexer.
martijnvg
left a comment
There was a problem hiding this comment.
LGTM, thanks for iterating here.
|
It looks like this change also had an positive impact on the downsampling to 1 minute fixed interval tsdb benchmark:
(visualization is from 16th of march until 27th of march) |

In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore. This PR ports rollup over to the new BulkProcessor2 implementation. BulkProcessor2 always runs asynchronously, meaning that RollupShardIndexer has to explicitly check for failures and throw an exception, rather than relying on the exception being thrown in-thread during the bulk indexing.