Using BulkProcessor2 in Deprecation Logging#94211
Conversation
| * performance (adding a periodic task to check the queue). | ||
| */ | ||
| if (flushEnabled.get()) { | ||
| processor.add(request); |
There was a problem hiding this comment.
Had an idea about this. What if we drain the queue in this location as well? It shouldn't replay any previously removed items, and it will clear any items that might have snuck into the queue since the drain operation in the cluster management thread.
Might be overkill, but this is a clunky thing already without fully implementing a producer consumer model.
There was a problem hiding this comment.
That's a good idea. Thanks.
|
Pinging @elastic/es-data-management (Team:Data Management) |
| * drained and its contents are sent to the processor. The queue is unbounded because we are first going through processor::add, which | ||
| * starts rejecting documents if the total number of bytes in flight gets too large. | ||
| */ | ||
| private final BlockingQueue<Tuple<BulkRequest, ActionListener<BulkResponse>>> requestAndListenerBuffer = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
I'd be inclined to make this a queue of Runnable, there's no need to capture the request and listener separately. Also maybe use a ConcurrentLinkedQueue which avoids the need to handle any InterruptedException since there won't be any blocking here anyway.
There was a problem hiding this comment.
in the light of fa87b0b maybe a queue of AbstractRunnable, that way you can fail any requests that you never got around to even sending. Better that than just leaking them IMO.
| */ | ||
| public void close() { | ||
| try { | ||
| awaitClose(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Oh yeah also I think this should not wait so long especially since it's only used in tests. A minute should be more than enough. It would be good to return something from awaitClose() to indicate whether the wait was successful or not, so that you can e.g. assertTrue(processor.awaitClose(1, TimeUnit.MINUTES)). Also not sure this needs doing here, it seems independent of the other changes?
There was a problem hiding this comment.
I've actually got several related PRs going on at once. One of them (#94133, just merged to main) changes awaitClose to return a boolean. Another one (#94197) depends on having this close method. The DeprecationIndexingComponent actually calls close(), which is why I made the change here (and modified a BulkProcessor test just to make sure it had a little coverage). I've been going back and forth over what the correct behavior here ought to be -- wait forever? wait some amount of time? remove this method and make the client think about what is right for them (although that complicates the dboule try-with-resources in #94197)?
There was a problem hiding this comment.
It's usual to wait a bit (~30s say) but then to proceed regardless. We don't want to block the node shutting down forever, but bailing out straight away will sometimes drop some messages even if everything is working properly.
|
@elasticmachine update branch |
|
@elasticmachine update branch |
|
@elasticmachine update branch |
In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore. This PR ports deprecation logging over to the new BulkProcessor2 implementation. This PR is somewhat complicated because we queue up deprecation log requests until the deprecation index template and ILM policy are in place. This was previously handled by BulkProcessor in the logic added in #80406. But this has not been ported to BulkProcessor2 since it was unique to this use case. Instead, there's now a bounded queue in DeprecationIndexingComponent to handle this.