Add CcrRestoreSourceService to track sessions#36578
Add CcrRestoreSourceService to track sessions#36578Tim-Brooks merged 18 commits intoelastic:masterfrom
Conversation
|
Pinging @elastic/es-distributed |
|
The You can register listeners in an ad-hoc manner with indexes. But there might be some catches with that (like it looks like it has to be registered when the |
The The |
ywelsch
left a comment
There was a problem hiding this comment.
I've added some initial thoughts.
...main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java
Outdated
Show resolved
Hide resolved
| public static class TransportDeleteCcrRestoreSessionAction | ||
| extends TransportSingleShardAction<DeleteCcrRestoreSessionRequest, DeleteCcrRestoreSessionResponse> { | ||
|
|
||
| private final IndicesService indicesService; |
There was a problem hiding this comment.
perhaps it's nicer to have CcrRestoreSourceService have a reference to IndicesService instead of having it here in the TransportAction class.
There was a problem hiding this comment.
I'm not sure how to do this? CcrRestoreSourceService is created in createComponents. And we do not have IndicesService there.
There was a problem hiding this comment.
I see two other options to the current one:
- Pass
IndicesServicetocreateComponents. - Create
CcrRestoreSourceServiceusing Guice, by overridingCollection<Module> createGuiceModules().
Neither sounds really great so let's keep the current model for now.
...main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java
Outdated
Show resolved
Hide resolved
| Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
| String sessionUUID = UUIDs.randomBase64UUID(); | ||
| PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, | ||
| new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryMetadata)).actionGet(); |
There was a problem hiding this comment.
should we have timeouts on these calls (similar as we do for peer recovery within a cluster)? Perhaps something to mark as a follow-up item?
There was a problem hiding this comment.
I added a todo. I will also add timeout tasks to the meta issue.
| Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); | ||
| String sessionUUID = UUIDs.randomBase64UUID(); | ||
| PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, | ||
| new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryMetadata)).actionGet(); |
There was a problem hiding this comment.
we can derive the correct remote shard id by using indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
| Engine.IndexCommitRef commit; | ||
| if (onGoingRestores.containsKey(sessionUUID)) { | ||
| logger.debug("session [{}] already exists", sessionUUID); | ||
| commit = onGoingRestores.get(sessionUUID); |
There was a problem hiding this comment.
should we be so lenient here, or rather reject opening a session which is already supposed to exist? It depends on how we want to handle failures / retries
There was a problem hiding this comment.
I would kind of prefer the put be idempotent. This is also why I have the session uuid is generated on the follower node. I'll explain more about my design in a top-level comment. Maybe we should add validation to prevent (unlikely) uuid conflicts (ensure that the put session request comes from the same follower node)?
| logger.debug("session [{}] already exists", sessionUUID); | ||
| commit = onGoingRestores.get(sessionUUID); | ||
| } else { | ||
| commit = indexShard.acquireSafeIndexCommit(); |
There was a problem hiding this comment.
if anything goes wrong in this method later, should we release the index commit?
There was a problem hiding this comment.
Made some changes to release. However, I think local timeouts for the index commit being held should also be a future meta task to also help here.
...rc/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
Show resolved
Hide resolved
...plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java
Outdated
Show resolved
Hide resolved
|
Thanks @ywelsch I've made changes. Here are my high-level design thoughts:
In this model the |
ywelsch
left a comment
There was a problem hiding this comment.
I've left mostly smaller comments. Thanks for the high-level design description? Can you also outline how you want to handle (temporary) network disconnects?
| return new ClearCcrRestoreSessionResponse(); | ||
| } | ||
|
|
||
| public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest, |
There was a problem hiding this comment.
TransportNodesAction is only truly useful if you intend on sending something to multiple nodes. I think it might be simpler here to directly use HandledTransportAction?
There was a problem hiding this comment.
I think I would like to implement a specific node request for the file chunks and delete session in a follow-up? I added a meta task.
|
|
||
| public static class PutCcrRestoreSessionResponse extends ActionResponse { | ||
|
|
||
| private String nodeId; |
There was a problem hiding this comment.
can this be made final? I see that you both implemented a constructor with StreamInput and the readFrom method?
There was a problem hiding this comment.
I don't think so. Unfortunately you must implement this:
@Override
protected PutCcrRestoreSessionResponse newResponse() {
return new PutCcrRestoreSessionResponse();
}
on TransportSingleShardAction.
|
|
||
| Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); | ||
| String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); | ||
| ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); |
There was a problem hiding this comment.
do we need to get the leader index name from Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY?
There was a problem hiding this comment.
No. We don't need that because the index name provided by the args to restoreShard is correct. It is only the uuid that is not.
...plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java
Outdated
Show resolved
Hide resolved
...plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java
Show resolved
Hide resolved
| ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = | ||
| remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); | ||
| if (response.hasFailures()) { | ||
| throw response.failures().get(0); |
There was a problem hiding this comment.
by not making this a BaseNodesResponse, we will not need this weird unwrapping.
There was a problem hiding this comment.
I think I would like to implement a specific node request for the file chunks and delete session in a follow-up? I added a meta task.
...c/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java
Outdated
Show resolved
Hide resolved
...n/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java
Show resolved
Hide resolved
|
@ywelsch - I think I would like to implement the mechanism to direct a request to a specific node on the remote cluster in a follow-up. I added a task for that on the meta issue. |
| } | ||
|
|
||
| private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { | ||
| logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); |
There was a problem hiding this comment.
IndexShard does not have a toString implementation AFAICS
| } else { | ||
| logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); | ||
| if (indexShard.state() == IndexShardState.CLOSED) { | ||
| throw new IllegalIndexShardStateException(indexShard.shardId(), IndexShardState.CLOSED, |
There was a problem hiding this comment.
preferably throw IndexShardClosedException
* elastic/master: (31 commits) enable bwc tests and switch transport serialization version to 6.6.0 for CAS features [DOCs] Adds ml-cpp PRs to alpha release notes (elastic#36790) Synchronize WriteReplicaResult callbacks (elastic#36770) Add CcrRestoreSourceService to track sessions (elastic#36578) [Painless] Add tests for boxed return types (elastic#36747) Internal: Remove originalSettings from Node (elastic#36569) [ILM][DOCS] Update ILM API authorization docs (elastic#36749) Core: Deprecate use of scientific notation in epoch time parsing (elastic#36691) [ML] Merge the Jindex master feature branch (elastic#36702) Tests: Mute SnapshotDisruptionIT.testDisruptionOnSnapshotInitialization Update versions in SearchSortValues transport serialization Update version in SearchHits transport serialization [Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (elastic#36751) [Docs] Fix error in Common Grams Token Filter (elastic#36774) Fix rollup search statistics (elastic#36674) SQL: Fix wrong appliance of StackOverflow limit for IN (elastic#36724) [TEST] Added more logging Invalidate Token API enhancements - HLRC (elastic#36362) Deprecate types in index API (elastic#36575) Disable bwc tests until elastic#36555 backport is complete (elastic#36737) ...
This commit is related to elastic#36127. It adds a CcrRestoreSourceService to track Engine.IndexCommitRef need for in-process file restores. When a follower starts restoring a shard through the CcrRepository it opens a session with the leader through the PutCcrRestoreSessionAction. The leader responds to the request by telling the follower what files it needs to fetch for a restore. This is not yet implemented. Once, the restore is complete, the follower closes the session with the DeleteCcrRestoreSessionAction action.
This commit is related to #36127. It adds a CcrRestoreSourceService to track Engine.IndexCommitRef need for in-process file restores. When a follower starts restoring a shard through the CcrRepository it opens a session with the leader through the PutCcrRestoreSessionAction. The leader responds to the request by telling the follower what files it needs to fetch for a restore. This is not yet implemented. Once, the restore is complete, the follower closes the session with the DeleteCcrRestoreSessionAction action.
This commit is related to #36127. It adds a
CcrRestoreSourceServicetotrack
Engine.IndexCommitRefneed for in-process file restores. When afollower starts restoring a shard through the
CcrRepositoryit opens asession with the leader through the
PutCcrRestoreSessionAction. Theleader responds to the request by telling the follower what files it
needs to fetch for a restore. This is not yet implemented.
Once, the restore is complete, the follower closes the session with the
DeleteCcrRestoreSessionActionaction.