Skip to content

Adding BulkProcessor2.addWithBackpressure()#94599

Merged
masseyke merged 2 commits intoelastic:mainfrom
masseyke:feature/BulkProcessor2-addWithBackpressure
Mar 21, 2023
Merged

Adding BulkProcessor2.addWithBackpressure()#94599
masseyke merged 2 commits intoelastic:mainfrom
masseyke:feature/BulkProcessor2-addWithBackpressure

Conversation

@masseyke
Copy link
Copy Markdown
Member

Most clients of BulkProcessor are OK with using it asynchronously. For example failing to load a single ILM history record does not impact what we do with other ILM history records. But some clients do care. For example if we are downsampling a TSDB index we want to abort if a single entry does not make it.
This PR adds a method called addWithBackpressure() to BulkProcessor2. This method is similar to the add() method that already exists, except that it blocks the calling thread until there is room to add the request.
This code is ported from code that was originally in RollupShardIndexer in #94197.

@masseyke masseyke added :Distributed/Ingest Node Execution or management of Ingest Pipelines v8.8.0 labels Mar 21, 2023
@elasticsearchmachine elasticsearchmachine added the Team:Data Management (obsolete) DO NOT USE. This team no longer exists. label Mar 21, 2023
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@masseyke masseyke requested a review from jbaiera March 21, 2023 16:55
Copy link
Copy Markdown
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only minor things

add(request);
successfullyAdded = true;
} catch (EsRejectedExecutionException e) {
logger.trace("Rollup doc rejected, and will try again");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Lets make the trace message more generic, i.e. remove "Rollup"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, missed that in the port from it's original location!

boolean successfullyAdded = false;
while (successfullyAdded == false) {
if (shouldAbort.get()) {
throw new EsRejectedExecutionException("Rejecting request because shouldAbort is now true");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: mentioning shouldAbort is a little weird here, should we reformat the message to just say that the caller has cancelled the bulk operation?

@masseyke masseyke merged commit b068ab2 into elastic:main Mar 21, 2023
@masseyke masseyke deleted the feature/BulkProcessor2-addWithBackpressure branch March 21, 2023 18:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Ingest Node Execution or management of Ingest Pipelines >non-issue Team:Data Management (obsolete) DO NOT USE. This team no longer exists. v8.8.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants