Skip to content

Refactor tasks to improve APM support#87917

Merged
pugnascotia merged 17 commits intoelastic:masterfrom
pugnascotia:apm-integration-part-1
Jul 5, 2022
Merged

Refactor tasks to improve APM support#87917
pugnascotia merged 17 commits intoelastic:masterfrom
pugnascotia:apm-integration-part-1

Conversation

@pugnascotia
Copy link
Copy Markdown
Contributor

Part of #84369. Split out from #87696. Rework how some work is executed
by creating child tasks for them, so that when traced by APM, it results
in more meaningful parent and child tasks in the UI. It also improves
how Elasticsearch is modelling the work.

Part of elastic#84369. Split out from elastic#87696. Rework how some work is executed
by creating child tasks for them, so that when traced by APM, it results
in more meaningful parent and child tasks in the UI. It also improves
how Elasticsearch is modelling the work.
@pugnascotia pugnascotia added :Distributed/Task Management Issues for anything around the Tasks API - both persistent and node level. >refactoring v8.4.0 labels Jun 22, 2022
@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team. label Jun 22, 2022
@elasticmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@pugnascotia
Copy link
Copy Markdown
Contributor Author

@elasticmachine run elasticsearch-ci/part-1 because the failure doesn't reproduce.

@pugnascotia pugnascotia changed the title Preliminary refactoring for APM support Refactor tasks to improve APM support Jun 23, 2022
Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks mostly good. I hope we can fix the constructor order. Also, I would like to see a bit of shallow testing that we actually utilize the task manager/generate child tasks during cluster state publishing and recovery, just to retain the functionality for the future.


static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

public static final String STATE_UPDATE_ACTION_NAME = "internal:cluster/coordination/update_state";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a transport action name. In other places where we register a task, we pick a simpler name with no "internal:" prefix, for instance for enrich, it is just policy_execution. I'd prefer the same here to avoid the confusion. publish_cluster_state_update for instance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there are also examples in the same form - for example, JoinHelper has 3 action names that all start with internal:cluster/coordination. The internal: prefix will also make it easier to filter out cluster management tasks when capturing traces, because we can filter in and filter out by span name, which here equates to task name.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those 3 are different, they are all 3 registered as transport actions and used on the wire. A common prefix for all artificial actions would be fine though, we just have not had it. We could start one now or leave this without one. I'd prefer not to reuse the space we have. I think filtering internal: is not going to be useful anyway, so many actions under that anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that filtering on prefixes like internal: is probably not helpful, and indeed end-users probably don't know much about the structure of these action names. It would have been good to have all action names following the same structure as the ones for transport actions, but I think we've already crossed that bridge unfortunately. So yes a bare publish_cluster_state_update would be ok with me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the task name, and had a go at adding a test in MasterServiceTests. Please let me know what you think, I hacked it together from other examples.

I looked into adding something similar for recoveries, but I'm a bit lost. I looked at PeerRecoverySourceServiceTests since it's already being changed to create a task to pass in, but the existing test is very narrow in scope and it's not obvious how to problem the task-executing behaviour without lots of new setup code or by opening up method visibility, neither of which is appealing. I'm hoping someone can point somewhere and say "oh, you just need something a bit like that."

...anyone?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had something like IndexRecoveryIT.testOngoingRecoveryAndMasterFailOver in mind, i.e., ensure a recovery, then block it through transport, then check that the task is registered correctly (not doing the master failover part of the test).

I'd be happy to hack something together.

Did not look at your test yet, might be sufficient.

@pugnascotia
Copy link
Copy Markdown
Contributor Author

@henningandersen CI is green, can you take another look?

Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Node construction part looks good to me. I left a couple more comments.

Also, I would like to see a bit of shallow testing that we actually utilize the task manager/generate child tasks during cluster state publishing and recovery, just to retain the functionality for the future.

I did not see this added, will you look into that please?

@@ -204,6 +225,28 @@ public TransportService(
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
ConnectionManager connectionManager
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd get rid of both the two old constructors, but one of them is widely used in tests. This one however looks like it is only used in a couple of tests and I'd prefer to just use the one below, creating a task manager in the test.

Can we add a comment to the other constructor that accepts "task headers" that it is used in tests only?


static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

public static final String STATE_UPDATE_ACTION_NAME = "internal:cluster/coordination/update_state";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those 3 are different, they are all 3 registered as transport actions and used on the wire. A common prefix for all artificial actions would be fine though, we just have not had it. We could start one now or leave this without one. I'd prefer not to reuse the space we have. I think filtering internal: is not going to be useful anyway, so many actions under that anyway.

@henningandersen
Copy link
Copy Markdown
Contributor

This class can demonstrate it works for recovery too:

Click to show class
/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the Elastic License
 * 2.0 and the Server Side Public License, v 1; you may not use this file except
 * in compliance with, at your election, the Elastic License 2.0 or the Server
 * Side Public License, v 1.
 */

package org.elasticsearch.indices.recovery;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TaskRecoveryIT extends ESIntegTestCase {

    @Override
    protected boolean addMockInternalEngine() {
        return false;
    }

    @Override
    protected Collection<Class<? extends Plugin>> nodePlugins() {
        return CollectionUtils.appendToCopy(super.nodePlugins(), TaskRecoveryIT.EngineTestPlugin.class);
    }

    public void testTaskForOngoingRecovery() throws Exception {
        String indexName = "test";
        internalCluster().startMasterOnlyNode();
        String nodeWithPrimary = internalCluster().startDataOnlyNode();
        assertAcked(
            client().admin()
                .indices()
                .prepareCreate(indexName)
                .setSettings(
                    Settings.builder()
                        .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
                        .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
                        .put("index.routing.allocation.include._name", nodeWithPrimary)
                )
        );
        try {
            String nodeWithReplica = internalCluster().startDataOnlyNode();
            assertAcked(
                client().admin()
                    .indices()
                    .prepareUpdateSettings(indexName)
                    .setSettings(
                        Settings.builder()
                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
                            .put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
                    )
            );
            assertBusy(() -> {
                List<TaskInfo> primaryTasks = client().admin().cluster().prepareListTasks(nodeWithPrimary).setActions(PeerRecoverySourceService.Actions.START_RECOVERY).get().getTasks();
                assertThat(primaryTasks.size(), equalTo(1));
                List<TaskInfo> replicaTasks =
                    client().admin().cluster().prepareListTasks(nodeWithReplica).setActions(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG).get().getTasks();
                assertThat(replicaTasks.size(), equalTo(1));
                assertThat(replicaTasks.get(0).parentTaskId(), equalTo(primaryTasks.get(0).taskId()));
            });
        } finally {
            StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false).flatMap(ps -> ps.filterPlugins(EnginePlugin.class).stream()).map(EngineTestPlugin.class::cast).forEach(EngineTestPlugin::release);
        }
        ensureGreen(indexName);
    }

    public static class EngineTestPlugin extends Plugin implements EnginePlugin {
        private final CountDownLatch latch = new CountDownLatch(1);

        public EngineTestPlugin() {

        }
        public void release() {
            latch.countDown();
        }

        @Override
        public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
            return Optional.of(config -> new InternalEngine(config) {

                @Override
                public void skipTranslogRecovery() {
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new AssertionError(e);
                    }
                    super.skipTranslogRecovery();
                }
            });
        }
    }
}

@pugnascotia
Copy link
Copy Markdown
Contributor Author

@henningandersen brilliant! Thank you very much for the test 🙏

Can I ask for a final approval now?

Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the extra iterations.

@pugnascotia pugnascotia merged commit ca7c21f into elastic:master Jul 5, 2022
@pugnascotia pugnascotia deleted the apm-integration-part-1 branch July 5, 2022 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Task Management Issues for anything around the Tasks API - both persistent and node level. >refactoring Team:Distributed Meta label for distributed team. v8.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants