Skip to content

Using BulkProcessor2 in Deprecation Logging#94211

Merged
masseyke merged 11 commits intoelastic:mainfrom
masseyke:using-bulkprocessor2-in-deprecation-indexing
Mar 6, 2023
Merged

Using BulkProcessor2 in Deprecation Logging#94211
masseyke merged 11 commits intoelastic:mainfrom
masseyke:using-bulkprocessor2-in-deprecation-indexing

Conversation

@masseyke
Copy link
Copy Markdown
Member

In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore. This PR ports deprecation logging over to the new BulkProcessor2 implementation. This PR is somewhat complicated because we queue up deprecation log requests until the deprecation index template and ILM policy are in place. This was previously handled by BulkProcessor in the logic added in #80406. But this has not been ported to BulkProcessor2 since it was unique to this use case. Instead, there's now a bounded queue in DeprecationIndexingComponent to handle this.

* performance (adding a periodic task to check the queue).
*/
if (flushEnabled.get()) {
processor.add(request);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had an idea about this. What if we drain the queue in this location as well? It shouldn't replay any previously removed items, and it will clear any items that might have snuck into the queue since the drain operation in the cluster management thread.

Might be overkill, but this is a clunky thing already without fully implementing a producer consumer model.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Thanks.

@masseyke masseyke marked this pull request as ready for review March 1, 2023 21:16
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@elasticsearchmachine elasticsearchmachine added the Team:Data Management (obsolete) DO NOT USE. This team no longer exists. label Mar 1, 2023
@masseyke masseyke requested a review from jbaiera March 1, 2023 21:27
@masseyke masseyke marked this pull request as draft March 1, 2023 23:50
Copy link
Copy Markdown
Member

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. One comment.

* drained and its contents are sent to the processor. The queue is unbounded because we are first going through processor::add, which
* starts rejecting documents if the total number of bytes in flight gets too large.
*/
private final BlockingQueue<Tuple<BulkRequest, ActionListener<BulkResponse>>> requestAndListenerBuffer = new LinkedBlockingQueue<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be inclined to make this a queue of Runnable, there's no need to capture the request and listener separately. Also maybe use a ConcurrentLinkedQueue which avoids the need to handle any InterruptedException since there won't be any blocking here anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the light of fa87b0b maybe a queue of AbstractRunnable, that way you can fail any requests that you never got around to even sending. Better that than just leaking them IMO.

*/
public void close() {
try {
awaitClose(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah also I think this should not wait so long especially since it's only used in tests. A minute should be more than enough. It would be good to return something from awaitClose() to indicate whether the wait was successful or not, so that you can e.g. assertTrue(processor.awaitClose(1, TimeUnit.MINUTES)). Also not sure this needs doing here, it seems independent of the other changes?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've actually got several related PRs going on at once. One of them (#94133, just merged to main) changes awaitClose to return a boolean. Another one (#94197) depends on having this close method. The DeprecationIndexingComponent actually calls close(), which is why I made the change here (and modified a BulkProcessor test just to make sure it had a little coverage). I've been going back and forth over what the correct behavior here ought to be -- wait forever? wait some amount of time? remove this method and make the client think about what is right for them (although that complicates the dboule try-with-resources in #94197)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's usual to wait a bit (~30s say) but then to proceed regardless. We don't want to block the node shutting down forever, but bailing out straight away will sometimes drop some messages even if everything is working properly.

@masseyke masseyke marked this pull request as ready for review March 2, 2023 18:00
@masseyke masseyke requested a review from DaveCTurner March 2, 2023 20:37
@masseyke
Copy link
Copy Markdown
Member Author

masseyke commented Mar 2, 2023

@elasticmachine update branch

Copy link
Copy Markdown
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending green CI

@masseyke
Copy link
Copy Markdown
Member Author

masseyke commented Mar 2, 2023

@elasticmachine update branch

@masseyke
Copy link
Copy Markdown
Member Author

masseyke commented Mar 3, 2023

@elasticmachine update branch

@masseyke masseyke merged commit 87bf9f2 into elastic:main Mar 6, 2023
@masseyke masseyke deleted the using-bulkprocessor2-in-deprecation-indexing branch March 6, 2023 14:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>non-issue Team:Data Management (obsolete) DO NOT USE. This team no longer exists. v8.8.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants