Skip to content

Run downsampling using persistent tasks#97557

Merged
martijnvg merged 104 commits intoelastic:mainfrom
salvatore-campagna:feature/93582-downsample-persistent-task
Aug 15, 2023
Merged

Run downsampling using persistent tasks#97557
martijnvg merged 104 commits intoelastic:mainfrom
salvatore-campagna:feature/93582-downsample-persistent-task

Conversation

@salvatore-campagna
Copy link
Copy Markdown
Contributor

@salvatore-campagna salvatore-campagna commented Jul 11, 2023

Add support for downsampling persistent tasks. We would like to have the ability
to resume downsampling tasks, instead of starting from scratch in case of
("retriable") failures.

Instead of keeping track of the state of the task we just try to query the downsampling
target index before starting the actual downsampling task, getting the latest document
indexed and its tsid. If we find any, we just restart from that tsid, maybe
overwriting a subset of the documents already indexed (documents with the same
tsid whose timestamp is smaller then the one of the latest document).

Querying the downsampling target index is possible after introducing a predictable
naming scheme for the target index. This was not the case before, a part of the target
index name was random. That is required to be able to start from where the task left.

Note that since the ordering of documents is based on (tsid, timestamp) we do not
include the timestamp in the query to avoid skipping downsampling of documents
with a larger tsid but smaller timestamp.

Note also that we are not removing the TransportDownsampleIndexerAction for
backward compatibility. An older master node running a previous version would
not use persistent tasks and an API call would result in something like "missing
downsample action".

We are also adding an additional downsample rest query parameter, timeout.
It allows a user to set the maximum time taken while waiting for a downsampling
task to complete before returning with an timeout error. Note that when the timeout
triggers the task will still run under control of the executor, just waiting for it to finish
will result in a timeout. The default value for this timeout is 1 day.

Resolve #93582

@salvatore-campagna salvatore-campagna self-assigned this Jul 11, 2023
@salvatore-campagna salvatore-campagna added :StorageEngine/Downsampling Downsampling (replacement for rollups) - Turn fine-grained time-based data into coarser-grained data >enhancement labels Jul 11, 2023
@elasticsearchmachine elasticsearchmachine added v8.10.0 Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) labels Jul 11, 2023
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @salvatore-campagna, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @salvatore-campagna, I've updated the changelog YAML for you.

@salvatore-campagna
Copy link
Copy Markdown
Contributor Author

I removed x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java but I will restore it for BWC.


@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// TODO: update persistent task state (consider multiple bulk requests in flight). Need to access the tsid 'per-bulk'
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to keep track of in-flight indexing bulk requests.


@Override
protected void nodeOperation(final AllocatedPersistentTask task, final RollupShardTaskParams params, final PersistentTaskState state) {
final RollupShardPersistentTaskState taskState = state == null
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can query the target index getting the latest indexed document and its tsid and inject the tsid as the starting tsid.

new ActionHandler<>(GetRollupIndexCapsAction.INSTANCE, TransportGetRollupIndexCapsAction.class),
new ActionHandler<>(XPackUsageFeatureAction.ROLLUP, RollupUsageTransportAction.class),
new ActionHandler<>(XPackInfoFeatureAction.ROLLUP, RollupInfoTransportAction.class),
new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add this back for BWC.

indicesService
);

this.indicesService = indicesService;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed before getPersisteTaskExecutor

PersistentTaskState {

public static final String NAME = RollupShardTask.TASK_NAME;
private static final ParseField ROLLUP_SHARD_INDEXER_STATUS = new ParseField("status");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the task status other then the tsid to avoid resuming a task that failed with FAILED or CANCELLED. We would like to avoid being stuck in a loop resuming tasks the failed for "non-retriable" reasons.

@salvatore-campagna
Copy link
Copy Markdown
Contributor Author

Will get rid of all deleteIndex.

Copy link
Copy Markdown
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first review round. Thanks for working on this!

I think we should write a java integration test, that tests performing downsampling while all nodes get restarted. A similar test exists for CCR, see: FollowerFailOverIT.

// Here we make sure we assign the task to the actual node holding the shard identified by
// the downsampling task shard id.
final ShardId shardId = params.shardId();
final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).shard(shardId.id());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to check whether the shard routing is started here.

Copy link
Copy Markdown
Contributor Author

@salvatore-campagna salvatore-campagna Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Checking ShardRoutingState?

null,
Collections.emptyMap()
);
return new BoolQueryBuilder().filter(new TermQueryBuilder(TimeSeriesIdFieldMapper.NAME, this.state.tsid()))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use Lucene query api instead of Elasticsearch's QueryBuilder api. That way there is no need to create a SearchExecutionContext here. For example:

return new BooleanQuery.Builder().add(new TermQuery(new Term(TimeSeriesIdFieldMapper.NAME, this.state.tsid())), BooleanClause.Occur.FILTER).build();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this query will only match with documents that only have the specified tsid. I think we want to match with all documents that have the specified tsid and beyond? So I think we need a range query? If that is the case we need SortedSetDocValuesField.newSlowRangeQuery(...) because the _tsid field isn't indexed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, just a heads up, we've got this speedy query built for sorted indices that could totally work here too:
link.
Adapting it to our needs shouldn't be a big deal. And if we find it handy, we might even think about adding it directly into Lucene, kind of like what's been done with IndexSortSortedSetDocValuesRangeQuery.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out Jim. The main focus is to get this PR merged sooner than later. But we will definitely change this in a followup change.

);
return new BoolQueryBuilder().filter(new TermQueryBuilder(TimeSeriesIdFieldMapper.NAME, this.state.tsid()))
.toQuery(searchExecutionContext);
} else if (this.state.done()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe return in the execute() instead of here? This we can check without running query.


if (task.getNumIndexed() != task.getNumSent()) {
task.setRollupShardIndexerStatus(RollupShardIndexerStatus.FAILED);
task.updatePersistentTaskState(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should update the task.updatePersistentTaskState(...) in RollupShardPersistentTaskExecutor#nodeOperation(...)? When can just catch the exceptions thrown here there and set the status to failed.


@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(this.ordinal);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we before serialise this enum as string?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check..in that case I will revert it...

Copy link
Copy Markdown
Contributor Author

@salvatore-campagna salvatore-campagna Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not a Writable before, so yes we were using the string representation of the enum.

@martijnvg martijnvg requested a review from andreidan August 14, 2023 04:54
Copy link
Copy Markdown
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this Martijn, the ILM side LGTM

@@ -93,11 +86,9 @@ public void performAction(
);
listener.onResponse(null);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the index doesn't have the SUCCESS tag but we're re-executing the downsample ILM step we should call performDownsampleIndex(indexName, downsampleIndexName, ActionListener.wrap(listener::onResponse, listener::onFailure)); to start waiting for a result or get an error message from the transport action

);
return;
}
final String downsampleIndexName = generateDownsampleIndexName(indexName, fixedInterval);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just occurred to me - if we have downsampling configured multiple times, say in hot and warm. The hot phase will generate the target index name downsample-myindex-1h and then when we get to the warm phase to execute downsampling again we'll end up with the index name downsample-downsample-myindex-1h-3h
(as we don't use the original index name anymore).

This problem exists already in ILM but perhaps becomes a bit more visible with the new naming convention?
(currently in ILM we can endup with names like this downsample-8hit-downsample-eruv-.ds-my-data-2023.08.14-000001 )

Perhaps the Downsampling transport action should configure the index provided name setting index.provided_name to maintain the original index name? And we can make use of it in ILM and such to generate the new index name based on the originally provided name?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this will look strange (and already looks strange). However I don't think the downsample action shouldn't be responsible for this, because the contract of this api is that the target index name is provided.

I think we should make the DownsampleStep smarter and that it generates a better name based on the source index? If the source index doesn't contain a the downsample prefix then it generates the same way it does now in the PR. If the source index contains the downsample index name prefix than it parse out the source index and generates a name from that.

I think we should do this in a follow up PR. This PR is already large and the problem already exists today. I will create a follow up PR and label it as a bug. We can then backport this as well after the FF.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just use provided index name in DownsampleStep, since current cluster state is available.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, thanks for working on this !

@martijnvg martijnvg merged commit a52ab89 into elastic:main Aug 15, 2023
@andreidan
Copy link
Copy Markdown
Contributor

Thanks @salvatore-campagna and @martijnvg for the great work here ! This is great !

elasticsearchmachine pushed a commit that referenced this pull request Aug 15, 2023
If the downsampling index exists but it's still downsampling we should
invocate the downsample transport action again (and wire up the request
listener such that the ILM listener gets notified of success or failure)

This modifies the test to assert the ILM listener is invoked and removes
an invariant that doesn't hold anymore in ILM (i.e. previously before
#97557 but now we can, and do)

Markin as non-issue as this hasn't been released yet.
csoulios pushed a commit to csoulios/elasticsearch that referenced this pull request Aug 18, 2023
Add support for downsampling persistent tasks. We would like to have the ability
to resume downsampling tasks, instead of starting from scratch in case of
("retriable") failures.

Instead of keeping track of the state of the task we just try to query the downsampling
target index before starting the actual downsampling task, getting the latest document
indexed and its tsid. If we find any, we just restart from that tsid, maybe
overwriting a subset of the documents already indexed (documents with the same
tsid whose timestamp is smaller then the one of the latest document).

Querying the downsampling target index is possible after introducing a predictable
naming scheme for the target index. This was not the case before, a part of the target
index name was random. That is required to be able to start from where the task left.

Note that since the ordering of documents is based on (tsid, timestamp) we do not
include the timestamp in the query to avoid skipping downsampling of documents
with a larger tsid but smaller timestamp.

Note also that we are not removing the TransportDownsampleIndexerAction for
backward compatibility. An older master node running a previous version would
not use persistent tasks and an API call would result in something like "missing
downsample action".

We are also adding an additional downsample rest query parameter, timeout.
It allows a user to set the maximum time taken while waiting for a downsampling
task to complete before returning with an timeout error. Note that when the timeout
triggers the task will still run under control of the executor, just waiting for it to finish
will result in a timeout. The default value for this timeout is 1 day.

Closes elastic#93582

---------

Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
csoulios pushed a commit to csoulios/elasticsearch that referenced this pull request Aug 18, 2023
If the downsampling index exists but it's still downsampling we should
invocate the downsample transport action again (and wire up the request
listener such that the ILM listener gets notified of success or failure)

This modifies the test to assert the ILM listener is invoked and removes
an invariant that doesn't hold anymore in ILM (i.e. previously before
elastic#97557 but now we can, and do)

Markin as non-issue as this hasn't been released yet.
salvatore-campagna added a commit to salvatore-campagna/elasticsearch that referenced this pull request Mar 30, 2024
All nodes on the mixed cluster need to be at least on version
8.10 since PR elastic#97557 introduced execution of downsampling tasks
using the persisten task framework which is incompatible with
how execution was coordinated before.
salvatore-campagna added a commit that referenced this pull request Mar 30, 2024
All nodes on the mixed cluster need to be at least on version
8.10 since PR #97557 introduced execution of downsampling tasks
using the persisten task framework which is incompatible with
how execution was coordinated before.
salvatore-campagna added a commit to salvatore-campagna/elasticsearch that referenced this pull request Mar 30, 2024
All nodes on the mixed cluster need to be at least on version
8.10 since PR elastic#97557 introduced execution of downsampling tasks
using the persisten task framework which is incompatible with
how execution was coordinated before.
elasticsearchmachine pushed a commit that referenced this pull request Mar 31, 2024
…06944)

All nodes on the mixed cluster need to be at least on version
8.10 since PR #97557 introduced execution of downsampling tasks
using the persisten task framework which is incompatible with
how execution was coordinated before.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>enhancement :StorageEngine/Downsampling Downsampling (replacement for rollups) - Turn fine-grained time-based data into coarser-grained data Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.10.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate downsample operation with persistent tasks

6 participants