Bulk processor#awaitClose to close scheduler#29263
Conversation
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering. I discovered this when adding some tests for the bulk processor, as we mainly test it with the transport client, and the same tests run with the high-level REST client already uncovered a bug.
|
Pinging @elastic/es-core-infra |
tlrx
left a comment
There was a problem hiding this comment.
This looks great, thanks a lot for all the tests. I added minor comments.
| Response response = client().performRequest("PUT", "/test-ro", Collections.emptyMap(), entity); | ||
| assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); | ||
|
|
||
| //ensureGreen(); |
| try (BulkProcessor processor = initBulkProcessorBuilder(listener) | ||
| .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) | ||
| //set interval and size to high values | ||
| .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { |
There was a problem hiding this comment.
Looks like we always use these values, maybe we could set them in initBulkProcessorBuilder?
There was a problem hiding this comment.
we almost always use the same values, but not exactly the same, there are subtle differences. I took these tests from the existing BulkProcessorIT tests, and adapted them from transport client to rest high level client. The idea was to not touch anything and see if things still work the same way, which they kind of do (up to bugs)
| } | ||
| return this.bulkRequestHandler.awaitClose(timeout, unit); | ||
| boolean awaitClose = this.bulkRequestHandler.awaitClose(timeout, unit); | ||
| onClose.run(); |
There was a problem hiding this comment.
I'm wondering if we should ensure that the onClose.run() is always executed (like in a finally block) if something goes wrong in bulkRequestHandler.awaitClose(). I don't have a strong feeling about this, and if we decide to add it we should also have a test for it, but I think it would be safer.
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering.
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering.
When the
BulkProcessoris used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once theBulkProcessoris closed. There are two ways to close theBulkProcessorthough, one is the ordinaryclosemethod and the other one isawaitClose. The former closes the scheduler while the latter doesn't, leaving threads lingering.I discovered this when adding some tests for the bulk processor, as we mainly test it with the transport client, and the same tests run with the high-level REST client already uncovered a bug.