Register CcrRepository based on settings update#36086
Register CcrRepository based on settings update#36086Tim-Brooks merged 26 commits intoelastic:masterfrom
Conversation
|
Pinging @elastic/es-distributed |
|
@bleskes - In response to your comment here: Since we are using the client for the propagation to |
ywelsch
left a comment
There was a problem hiding this comment.
Thanks for the PR @tbrooks8. I've left some initial comments. I think we should also do more unit-level testing. Start e.g. with the newly added methods to RepositoriesService. They can easily be unit-tested. The repo manager might also allow some unit-tests.
| import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
|
|
||
| public class DeleteInternalRepositoryAction extends Action<AcknowledgedResponse> { |
There was a problem hiding this comment.
can you move these actions to the CCR plugin?
There was a problem hiding this comment.
Sure. Although I will note that the conception on "internal repositories" still exist in open source. even if the actions to manipulate them do not.
There was a problem hiding this comment.
Do you want me to change the names from what they are such as:
"cluster:admin/internal_repository/put"
to something ccr oriented?
There was a problem hiding this comment.
yes, same for the action names and requests. For now, these can be DeleteInternalCCRRepositoryAction, DeleteInternalCCRRepositoryRequest, ...
| } | ||
|
|
||
| @Override | ||
| public AcknowledgedResponse newResponse() { |
There was a problem hiding this comment.
why AcknowledgedResponse? You're not interested in checking the acknowledged flag, so maybe just an ActionResponse.
|
|
||
| public class DeleteInternalRepositoryRequest extends ActionRequest { | ||
|
|
||
| private String name; |
| public ActionRequestValidationException validate() { | ||
| ActionRequestValidationException validationException = null; | ||
| if (name == null) { | ||
| validationException = addValidationError("name is missing", validationException); |
There was a problem hiding this comment.
maybe easier to check this right away on object creation, i.e., this.name = Objects.requireNonNull(name);
|
|
||
| private String name; | ||
| private String type; | ||
| private Settings settings; |
| closeRepository(repository); | ||
| repository.close(); | ||
| } else { | ||
| logger.warn(() -> new ParameterizedMessage("Attempted to unregistered internal repository [{}][{}]. " + |
| Repository existingRepository = internalRepositories.putIfAbsent(name, repository); | ||
|
|
||
| if (existingRepository != null) { | ||
| logger.error(new ParameterizedMessage("Error registering internal repository [{}][{}]. " + |
There was a problem hiding this comment.
start logging messages with lower case (in style with the rest of the class / code)?
| private final boolean enabled; | ||
| private final Settings settings; | ||
| private final CcrLicenseChecker ccrLicenseChecker; | ||
| private SetOnce<ClusterService> clusterService = new SetOnce<>(); |
There was a problem hiding this comment.
this is not used anywhere?
| private final Settings settings; | ||
| private final CcrLicenseChecker ccrLicenseChecker; | ||
| private SetOnce<ClusterService> clusterService = new SetOnce<>(); | ||
| private SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>(); |
| class CcrRepositoryManager extends RemoteClusterAware { | ||
|
|
||
| private final NodeClient client; | ||
| private final Set<String> clusters = ConcurrentCollections.newConcurrentSet(); |
There was a problem hiding this comment.
instead of caching this list here, can we directly ask RepositoriesService whether it already has a repo for this thing, and add / remove based on what RepositoriesService has?
Having these extra caches are always tricky, in particular when there are failure scenarios and the list of internal repositories in RepositoriesService might go out of sync with the cached clusters here.
There was a problem hiding this comment.
Since put and delete should be idempotent (and they other for other normal repositories requests) I removed the cache and just call the action each time.
The other option we to create a get action, but I thought that was unnecessary as we need to handle potential concurrency in the put and delete methods anyway.
There was a problem hiding this comment.
I removed the cache and just call the action each time.
sounds good
|
|
||
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; |
| Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); | ||
| for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) { | ||
| if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { | ||
| throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); |
There was a problem hiding this comment.
should we enforce that these types are distinct to the non-internal ones?
| Repository newRepository = createRepository(metaData, internalTypesRegistry); | ||
| Repository repositoryToClose = null; | ||
| boolean updated = false; | ||
| synchronized (internalRepositories) { |
There was a problem hiding this comment.
the concurrency logic looks complicated here. Maybe just add synchronized on the registerInternalRepository and unregisterInternalRepository methods? We will not be calling those concurrently anyway?
There was a problem hiding this comment.
After the changes where we do not support updates, we can rely on the ConcurrentMap to provide concurrency control.
| Map<String, OriginalIndices> originalIndicesMap = new HashMap<>(); | ||
| if (isCrossClusterSearchEnabled()) { | ||
| final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists); | ||
| final Map<String, List<String>> groupedIndices = groupClusterIndices(remoteClusters.keySet(), indices, indexExists); |
There was a problem hiding this comment.
use getRemoteClusterNames() here?
|
|
||
| @Override | ||
| public Writeable.Reader<ActionResponse> getResponseReader() { | ||
| return in -> new ActionResponse() {}; |
There was a problem hiding this comment.
safer to use the ActionResponse(StreamInput in) constructor here. Maybe use add a dummy response sub-class here.
| client.executeLocally(DeleteInternalRepositoryAction.INSTANCE, request, future); | ||
| assert future.isDone() : "Should be completed as it is executed synchronously"; | ||
| } else { | ||
| ActionRequest request = new PutInternalRepositoryRequest(clusterAlias, CcrRepository.TYPE); |
There was a problem hiding this comment.
instead of using the clusterAlias name verbatim here, let's prepend something like "ccr" to avoid name conflicts with standard repositories.
| import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
|
|
||
| public class DeleteInternalRepositoryAction extends Action<AcknowledgedResponse> { |
There was a problem hiding this comment.
yes, same for the action names and requests. For now, these can be DeleteInternalCCRRepositoryAction, DeleteInternalCCRRepositoryRequest, ...
| this(name, type, Settings.EMPTY); | ||
| } | ||
|
|
||
| public PutInternalRepositoryRequest(String name, String type, Settings settings) { |
There was a problem hiding this comment.
If we rename this to PutInternalCCRRepositoryRequest, we can get rid of the settings parameter here. We don't use it for CCR.
|
|
||
| // TODO: Normally we would do validation when we update a repository to ensure that it is not in use. | ||
| // Are we okay with not including that validation under the assumption that internal operations | ||
| // will do the right thing. |
There was a problem hiding this comment.
I think it's fine not having this validation here. If we remove the settings parameter from this method, a repo will never be updated with different settings.
| class CcrRepositoryManager extends RemoteClusterAware { | ||
|
|
||
| private final NodeClient client; | ||
| private final Set<String> clusters = ConcurrentCollections.newConcurrentSet(); |
There was a problem hiding this comment.
I removed the cache and just call the action each time.
sounds good
ywelsch
left a comment
There was a problem hiding this comment.
I've left a few more comments. Looking great already.
| Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); | ||
| for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) { | ||
| if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { | ||
| throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); |
| assertAcked(followerClient().admin().cluster().updateSettings(putFollowerRequest).actionGet()); | ||
|
|
||
| String followerCopyRepoName = CcrRepository.NAME_PREFIX + "follower_cluster_copy"; | ||
| assertBusy(() -> { |
There was a problem hiding this comment.
the assertBusy should not be needed here. The update settings call will only return if the corresponding cluster state has been updated on all nodes, and the repositories are created as part of that CS update.
|
run the docbldesx |
|
run default distro tests |
|
run the docbldesx |
This is a follow-up to elastic#36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions. Additionally, this PR stops casting Client to NodeClient. The client we have is a NodeClient so executing the actions will be local.
This is a follow-up to #36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions. Additionally, this PR stops casting Client to NodeClient. The client we have is a NodeClient so executing the actions will be local.
This commit adds an empty CcrRepository snapshot/restore repository. When a new cluster is registered in the remote cluster settings, a new CcrRepository is registered for that cluster. This is implemented using a new concept of "internal repositories". RepositoryPlugin now allows implementations to return factories for "internal repositories". The "internal repositories" are different from normal repositories in that they cannot be registered through the external repository api. Additionally, "internal repositories" are local to a node and are not stored in the cluster state. The repository will be unregistered if the remote cluster is removed.
This commit adds an empty CcrRepository snapshot/restore repository. When a new cluster is registered in the remote cluster settings, a new CcrRepository is registered for that cluster. This is implemented using a new concept of "internal repositories". RepositoryPlugin now allows implementations to return factories for "internal repositories". The "internal repositories" are different from normal repositories in that they cannot be registered through the external repository api. Additionally, "internal repositories" are local to a node and are not stored in the cluster state. The repository will be unregistered if the remote cluster is removed.
This is a follow-up to elastic#36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions. Additionally, this PR stops casting Client to NodeClient. The client we have is a NodeClient so executing the actions will be local.
This is a follow-up to #36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions.
This commit adds an empty
CcrRepositorysnapshot/restore repository.When a new cluster is registered in the remote cluster settings, a new
CcrRepositoryis registered for that cluster.This is implemented using a new concept of "internal repositories".
RepositoryPluginnow allows implementations to return factories for"internal repositories". The "internal repositories" are different from
normal repositories in that they cannot be registered through the
external repository api. Additionally, "internal repositories" are local
to a node and are not stored in the cluster state.
The repository will be unregistered if the remote cluster is removed.