Run downsampling using persistent tasks#97557
Conversation
|
Pinging @elastic/es-analytics-geo (Team:Analytics) |
|
Hi @salvatore-campagna, I've created a changelog YAML for you. |
|
Hi @salvatore-campagna, I've updated the changelog YAML for you. |
|
I removed |
|
|
||
| @Override | ||
| public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { | ||
| // TODO: update persistent task state (consider multiple bulk requests in flight). Need to access the tsid 'per-bulk' |
There was a problem hiding this comment.
No need to keep track of in-flight indexing bulk requests.
|
|
||
| @Override | ||
| protected void nodeOperation(final AllocatedPersistentTask task, final RollupShardTaskParams params, final PersistentTaskState state) { | ||
| final RollupShardPersistentTaskState taskState = state == null |
There was a problem hiding this comment.
Here we can query the target index getting the latest indexed document and its tsid and inject the tsid as the starting tsid.
| new ActionHandler<>(GetRollupIndexCapsAction.INSTANCE, TransportGetRollupIndexCapsAction.class), | ||
| new ActionHandler<>(XPackUsageFeatureAction.ROLLUP, RollupUsageTransportAction.class), | ||
| new ActionHandler<>(XPackInfoFeatureAction.ROLLUP, RollupInfoTransportAction.class), | ||
| new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class), |
There was a problem hiding this comment.
Will add this back for BWC.
| indicesService | ||
| ); | ||
|
|
||
| this.indicesService = indicesService; |
There was a problem hiding this comment.
This is executed before getPersisteTaskExecutor
| PersistentTaskState { | ||
|
|
||
| public static final String NAME = RollupShardTask.TASK_NAME; | ||
| private static final ParseField ROLLUP_SHARD_INDEXER_STATUS = new ParseField("status"); |
There was a problem hiding this comment.
We need the task status other then the tsid to avoid resuming a task that failed with FAILED or CANCELLED. We would like to avoid being stuck in a loop resuming tasks the failed for "non-retriable" reasons.
|
Will get rid of all |
martijnvg
left a comment
There was a problem hiding this comment.
I did a first review round. Thanks for working on this!
I think we should write a java integration test, that tests performing downsampling while all nodes get restarted. A similar test exists for CCR, see: FollowerFailOverIT.
| // Here we make sure we assign the task to the actual node holding the shard identified by | ||
| // the downsampling task shard id. | ||
| final ShardId shardId = params.shardId(); | ||
| final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).shard(shardId.id()); |
There was a problem hiding this comment.
I think we also need to check whether the shard routing is started here.
There was a problem hiding this comment.
What do you mean? Checking ShardRoutingState?
| null, | ||
| Collections.emptyMap() | ||
| ); | ||
| return new BoolQueryBuilder().filter(new TermQueryBuilder(TimeSeriesIdFieldMapper.NAME, this.state.tsid())) |
There was a problem hiding this comment.
I think we can use Lucene query api instead of Elasticsearch's QueryBuilder api. That way there is no need to create a SearchExecutionContext here. For example:
return new BooleanQuery.Builder().add(new TermQuery(new Term(TimeSeriesIdFieldMapper.NAME, this.state.tsid())), BooleanClause.Occur.FILTER).build();
There was a problem hiding this comment.
Actually this query will only match with documents that only have the specified tsid. I think we want to match with all documents that have the specified tsid and beyond? So I think we need a range query? If that is the case we need SortedSetDocValuesField.newSlowRangeQuery(...) because the _tsid field isn't indexed.
There was a problem hiding this comment.
Hey, just a heads up, we've got this speedy query built for sorted indices that could totally work here too:
link.
Adapting it to our needs shouldn't be a big deal. And if we find it handy, we might even think about adding it directly into Lucene, kind of like what's been done with IndexSortSortedSetDocValuesRangeQuery.
There was a problem hiding this comment.
Thanks for pointing this out Jim. The main focus is to get this PR merged sooner than later. But we will definitely change this in a followup change.
| ); | ||
| return new BoolQueryBuilder().filter(new TermQueryBuilder(TimeSeriesIdFieldMapper.NAME, this.state.tsid())) | ||
| .toQuery(searchExecutionContext); | ||
| } else if (this.state.done()) { |
There was a problem hiding this comment.
maybe return in the execute() instead of here? This we can check without running query.
|
|
||
| if (task.getNumIndexed() != task.getNumSent()) { | ||
| task.setRollupShardIndexerStatus(RollupShardIndexerStatus.FAILED); | ||
| task.updatePersistentTaskState( |
There was a problem hiding this comment.
maybe we should update the task.updatePersistentTaskState(...) in RollupShardPersistentTaskExecutor#nodeOperation(...)? When can just catch the exceptions thrown here there and set the status to failed.
|
|
||
| @Override | ||
| public void writeTo(final StreamOutput out) throws IOException { | ||
| out.writeVInt(this.ordinal); |
There was a problem hiding this comment.
Did we before serialise this enum as string?
There was a problem hiding this comment.
I will check..in that case I will revert it...
There was a problem hiding this comment.
This was not a Writable before, so yes we were using the string representation of the enum.
…e can't execute multiple downsample operations for the same source index concurrently.
andreidan
left a comment
There was a problem hiding this comment.
Thanks for iterating on this Martijn, the ILM side LGTM
| @@ -93,11 +86,9 @@ public void performAction( | |||
| ); | |||
| listener.onResponse(null); | |||
| } | |||
There was a problem hiding this comment.
If the index doesn't have the SUCCESS tag but we're re-executing the downsample ILM step we should call performDownsampleIndex(indexName, downsampleIndexName, ActionListener.wrap(listener::onResponse, listener::onFailure)); to start waiting for a result or get an error message from the transport action
| ); | ||
| return; | ||
| } | ||
| final String downsampleIndexName = generateDownsampleIndexName(indexName, fixedInterval); |
There was a problem hiding this comment.
This just occurred to me - if we have downsampling configured multiple times, say in hot and warm. The hot phase will generate the target index name downsample-myindex-1h and then when we get to the warm phase to execute downsampling again we'll end up with the index name downsample-downsample-myindex-1h-3h
(as we don't use the original index name anymore).
This problem exists already in ILM but perhaps becomes a bit more visible with the new naming convention?
(currently in ILM we can endup with names like this downsample-8hit-downsample-eruv-.ds-my-data-2023.08.14-000001 )
Perhaps the Downsampling transport action should configure the index provided name setting index.provided_name to maintain the original index name? And we can make use of it in ILM and such to generate the new index name based on the originally provided name?
There was a problem hiding this comment.
Right, this will look strange (and already looks strange). However I don't think the downsample action shouldn't be responsible for this, because the contract of this api is that the target index name is provided.
I think we should make the DownsampleStep smarter and that it generates a better name based on the source index? If the source index doesn't contain a the downsample prefix then it generates the same way it does now in the PR. If the source index contains the downsample index name prefix than it parse out the source index and generates a name from that.
I think we should do this in a follow up PR. This PR is already large and the problem already exists today. I will create a follow up PR and label it as a bug. We can then backport this as well after the FF.
There was a problem hiding this comment.
Or just use provided index name in DownsampleStep, since current cluster state is available.
There was a problem hiding this comment.
Great point, thanks for working on this !
|
Thanks @salvatore-campagna and @martijnvg for the great work here ! This is great ! |
If the downsampling index exists but it's still downsampling we should invocate the downsample transport action again (and wire up the request listener such that the ILM listener gets notified of success or failure) This modifies the test to assert the ILM listener is invoked and removes an invariant that doesn't hold anymore in ILM (i.e. previously before #97557 but now we can, and do) Markin as non-issue as this hasn't been released yet.
Add support for downsampling persistent tasks. We would like to have the ability
to resume downsampling tasks, instead of starting from scratch in case of
("retriable") failures.
Instead of keeping track of the state of the task we just try to query the downsampling
target index before starting the actual downsampling task, getting the latest document
indexed and its tsid. If we find any, we just restart from that tsid, maybe
overwriting a subset of the documents already indexed (documents with the same
tsid whose timestamp is smaller then the one of the latest document).
Querying the downsampling target index is possible after introducing a predictable
naming scheme for the target index. This was not the case before, a part of the target
index name was random. That is required to be able to start from where the task left.
Note that since the ordering of documents is based on (tsid, timestamp) we do not
include the timestamp in the query to avoid skipping downsampling of documents
with a larger tsid but smaller timestamp.
Note also that we are not removing the TransportDownsampleIndexerAction for
backward compatibility. An older master node running a previous version would
not use persistent tasks and an API call would result in something like "missing
downsample action".
We are also adding an additional downsample rest query parameter, timeout.
It allows a user to set the maximum time taken while waiting for a downsampling
task to complete before returning with an timeout error. Note that when the timeout
triggers the task will still run under control of the executor, just waiting for it to finish
will result in a timeout. The default value for this timeout is 1 day.
Closes elastic#93582
---------
Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
If the downsampling index exists but it's still downsampling we should invocate the downsample transport action again (and wire up the request listener such that the ILM listener gets notified of success or failure) This modifies the test to assert the ILM listener is invoked and removes an invariant that doesn't hold anymore in ILM (i.e. previously before elastic#97557 but now we can, and do) Markin as non-issue as this hasn't been released yet.
All nodes on the mixed cluster need to be at least on version 8.10 since PR elastic#97557 introduced execution of downsampling tasks using the persisten task framework which is incompatible with how execution was coordinated before.
All nodes on the mixed cluster need to be at least on version 8.10 since PR #97557 introduced execution of downsampling tasks using the persisten task framework which is incompatible with how execution was coordinated before.
All nodes on the mixed cluster need to be at least on version 8.10 since PR elastic#97557 introduced execution of downsampling tasks using the persisten task framework which is incompatible with how execution was coordinated before.
Add support for downsampling persistent tasks. We would like to have the ability
to resume downsampling tasks, instead of starting from scratch in case of
("retriable") failures.
Instead of keeping track of the state of the task we just try to query the downsampling
target index before starting the actual downsampling task, getting the latest document
indexed and its
tsid. If we find any, we just restart from thattsid, maybeoverwriting a subset of the documents already indexed (documents with the same
tsid whose timestamp is smaller then the one of the latest document).
Querying the downsampling target index is possible after introducing a predictable
naming scheme for the target index. This was not the case before, a part of the target
index name was random. That is required to be able to start from where the task left.
Note that since the ordering of documents is based on (tsid, timestamp) we do not
include the timestamp in the query to avoid skipping downsampling of documents
with a larger tsid but smaller timestamp.
Note also that we are not removing the
TransportDownsampleIndexerActionforbackward compatibility. An older master node running a previous version would
not use persistent tasks and an API call would result in something like "missing
downsample action".
We are also adding an additional downsample rest query parameter,
timeout.It allows a user to set the maximum time taken while waiting for a downsampling
task to complete before returning with an timeout error. Note that when the timeout
triggers the task will still run under control of the executor, just waiting for it to finish
will result in a timeout. The default value for this timeout is 1 day.
Resolve #93582