Skip to content

Commit ebc684a

Browse files
authored
Execute remote actions on another extension (opensearch-project#588)
* Add ProxyAction with TransportAction and handlers Signed-off-by: Daniel Widdis <widdis@gmail.com> * Give SDKActionModule a copy of ExtensionsRunner to use with transport Signed-off-by: Daniel Widdis <widdis@gmail.com> * Add new ProxyActionRequest Signed-off-by: Daniel Widdis <widdis@gmail.com> * Add SDKTransportService wrapper accessible to actions Signed-off-by: Daniel Widdis <widdis@gmail.com> * Implement ProxyTransportAction Signed-off-by: Daniel Widdis <widdis@gmail.com> * Add test case to HelloWorldExtension Signed-off-by: Daniel Widdis <widdis@gmail.com> * Better naming of ExtensionActionResponse and correct action name Signed-off-by: Daniel Widdis <widdis@gmail.com> * Refactoring with TransportService and latest OpenSearch PR updates Signed-off-by: Daniel Widdis <widdis@gmail.com> * Add ExtensionsActionRequestHandler Signed-off-by: Daniel Widdis <widdis@gmail.com> * Instantiate Proxy Action Request Signed-off-by: Daniel Widdis <widdis@gmail.com> * Working test case! Signed-off-by: Daniel Widdis <widdis@gmail.com> * Properly parse returned byte array into a response Signed-off-by: Daniel Widdis <widdis@gmail.com> * Add sequence diagram to DESIGN.md Signed-off-by: Daniel Widdis <widdis@gmail.com> * Typoo fix Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update with latest changes on companion PR Signed-off-by: Daniel Widdis <widdis@gmail.com> * Rename ProxyFoo to RemoteExtensionFoo Signed-off-by: Daniel Widdis <widdis@gmail.com> * Better handling of response bytes Signed-off-by: Daniel Widdis <widdis@gmail.com> * Handle plugin remote action requests Signed-off-by: Daniel Widdis <widdis@gmail.com> * Address code review comments Signed-off-by: Daniel Widdis <widdis@gmail.com> * Update sequence diagram Signed-off-by: Daniel Widdis <widdis@gmail.com> --------- Signed-off-by: Daniel Widdis <widdis@gmail.com>
1 parent b66d74e commit ebc684a

22 files changed

Lines changed: 1017 additions & 132 deletions

DESIGN.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ The `ExtensionsManager` reads a list of extensions present in `extensions.yml`.
119119

120120
(27) The User receives the response.
121121

122+
#### Remote Action Execution on another Extension
123+
124+
Extensions may invoke actions on other extensions using the `ProxyAction` and `ProxyActionRequest`. The code sequence is shown below.
125+
126+
![](Docs/RemoteActionExecution.svg)
127+
122128
#### Extension Point Implementation Walk Through
123129

124130
An example of a more complex extension point, `getNamedXContent()` is shown below. A similar pattern can be followed for most extension points.

Docs/RemoteActionExecution.svg

Lines changed: 1 addition & 0 deletions
Loading

src/main/java/org/opensearch/sdk/BaseExtension.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,13 @@
1111

1212
import java.io.IOException;
1313

14-
import com.google.inject.Inject;
15-
1614
/**
1715
* An abstract class that simplifies extension initialization and provides an instance of the runner.
1816
*/
1917
public abstract class BaseExtension implements Extension {
2018
/**
2119
* The {@link ExtensionsRunner} instance running this extension
2220
*/
23-
@Inject
2421
private ExtensionsRunner extensionsRunner;
2522

2623
/**
@@ -56,6 +53,11 @@ public ExtensionSettings getExtensionSettings() {
5653
return this.settings;
5754
}
5855

56+
@Override
57+
public void setExtensionsRunner(ExtensionsRunner runner) {
58+
this.extensionsRunner = runner;
59+
}
60+
5961
/**
6062
* Gets the {@link ExtensionsRunner} of this extension.
6163
*

src/main/java/org/opensearch/sdk/Extension.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@
2525
*/
2626
public interface Extension {
2727

28+
/**
29+
* Set the instance of {@link ExtensionsRunner} for this extension.
30+
*
31+
* @param runner The ExtensionsRunner instance.
32+
*/
33+
public void setExtensionsRunner(ExtensionsRunner runner);
34+
2835
/**
2936
* Gets the {@link ExtensionSettings} of this extension.
3037
*

src/main/java/org/opensearch/sdk/ExtensionsRunner.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.extensions.DiscoveryExtensionNode;
2626
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
2727
import org.opensearch.extensions.UpdateSettingsRequest;
28+
import org.opensearch.extensions.action.ExtensionActionRequest;
2829
import org.opensearch.extensions.ExtensionsManager.RequestType;
2930
import org.opensearch.extensions.ExtensionRequest;
3031
import org.opensearch.extensions.ExtensionsManager;
@@ -33,6 +34,7 @@
3334
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
3435
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
3536
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
37+
import org.opensearch.sdk.handlers.ExtensionActionRequestHandler;
3638
import org.opensearch.sdk.action.SDKActionModule;
3739
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
3840
import org.opensearch.sdk.handlers.ExtensionDependencyResponseHandler;
@@ -132,13 +134,15 @@ public class ExtensionsRunner {
132134
private final SDKNamedXContentRegistry sdkNamedXContentRegistry;
133135
private final SDKClient sdkClient;
134136
private final SDKClusterService sdkClusterService;
137+
private final SDKTransportService sdkTransportService;
135138
private final SDKActionModule sdkActionModule;
136139

137-
private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
138-
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
139-
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
140+
private final ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
141+
private final ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
142+
private final ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
140143
new ExtensionsIndicesModuleNameRequestHandler();
141-
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
144+
private final ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
145+
private final ExtensionActionRequestHandler extensionsActionRequestHandler;
142146

143147
/**
144148
* Instantiates a new update settings request handler
@@ -152,7 +156,10 @@ public class ExtensionsRunner {
152156
* @throws IOException if the runner failed to read settings or API.
153157
*/
154158
protected ExtensionsRunner(Extension extension) throws IOException {
159+
// Link these classes together
155160
this.extension = extension;
161+
extension.setExtensionsRunner(this);
162+
156163
// Initialize concrete classes needed by extensions
157164
// These must have getters from this class to be accessible via createComponents
158165
// If they require later initialization, create a concrete wrapper class and update the internals
@@ -175,6 +182,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
175182
this.sdkClient = new SDKClient(extensionSettings);
176183
// initialize SDKClusterService. Must happen after extension field assigned
177184
this.sdkClusterService = new SDKClusterService(this);
185+
// initialize SDKTransportService. Must happen after extension field assigned
186+
this.sdkTransportService = new SDKTransportService();
178187

179188
// Create Guice modules for injection
180189
List<com.google.inject.Module> modules = new ArrayList<>();
@@ -189,6 +198,7 @@ protected ExtensionsRunner(Extension extension) throws IOException {
189198

190199
b.bind(SDKClient.class).toInstance(getSdkClient());
191200
b.bind(SDKClusterService.class).toInstance(getSdkClusterService());
201+
b.bind(SDKTransportService.class).toInstance(getSdkTransportService());
192202
});
193203
// Bind the return values from create components
194204
modules.add(this::injectComponents);
@@ -202,6 +212,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
202212
// initialize SDKClient action map
203213
initializeSdkClient();
204214

215+
extensionsActionRequestHandler = new ExtensionActionRequestHandler(getSdkClient());
216+
205217
if (extension instanceof ActionExtension) {
206218
// store REST handlers in the registry
207219
for (ExtensionRestHandler extensionRestHandler : ((ActionExtension) extension).getExtensionRestHandlers()) {
@@ -391,6 +403,25 @@ public void startTransportService(TransportService transportService) {
391403
((request, channel, task) -> channel.sendResponse(updateSettingsRequestHandler.handleUpdateSettingsRequest(request)))
392404
);
393405

406+
// This handles a remote extension request from OpenSearch or a plugin, sending an ExtensionActionResponse
407+
transportService.registerRequestHandler(
408+
ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION,
409+
ThreadPool.Names.GENERIC,
410+
false,
411+
false,
412+
ExtensionActionRequest::new,
413+
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleExtensionActionRequest(request)))
414+
);
415+
416+
// This handles a remote extension request from another extension, sending a RemoteExtensionActionResponse
417+
transportService.registerRequestHandler(
418+
ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION,
419+
ThreadPool.Names.GENERIC,
420+
false,
421+
false,
422+
ExtensionActionRequest::new,
423+
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleRemoteExtensionActionRequest(request)))
424+
);
394425
}
395426

396427
/**
@@ -638,6 +669,10 @@ public TransportService getExtensionTransportService() {
638669
return extensionTransportService;
639670
}
640671

672+
public SDKTransportService getSdkTransportService() {
673+
return sdkTransportService;
674+
}
675+
641676
/**
642677
* Starts an ActionListener.
643678
*
@@ -660,6 +695,8 @@ public static void run(Extension extension) throws IOException {
660695
// initialize the transport service
661696
NettyTransport nettyTransport = new NettyTransport(runner);
662697
runner.extensionTransportService = nettyTransport.initializeExtensionTransportService(runner.getSettings(), runner.getThreadPool());
698+
// TODO: merge above line with below line when refactoring out extensionTransportService
699+
runner.getSdkTransportService().setTransportService(runner.extensionTransportService);
663700
runner.startActionListener(0);
664701
}
665702

src/main/java/org/opensearch/sdk/SDKClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.io.IOException;
1414
import java.util.Collections;
1515
import java.util.Map;
16+
import java.util.stream.Collectors;
1617

1718
import com.fasterxml.jackson.annotation.JsonTypeInfo;
1819
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -100,6 +101,9 @@ public SDKClient(ExtensionSettings extensionSettings) {
100101
// Used by client.execute, populated by initialize method
101102
@SuppressWarnings("rawtypes")
102103
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
104+
// Used by remote client execution where we get a string for the class name
105+
@SuppressWarnings("rawtypes")
106+
private Map<String, ActionType> actionClassToInstanceMap = Collections.emptyMap();
103107

104108
/**
105109
* Initialize this client.
@@ -109,6 +113,7 @@ public SDKClient(ExtensionSettings extensionSettings) {
109113
@SuppressWarnings("rawtypes")
110114
public void initialize(Map<ActionType, TransportAction> actions) {
111115
this.actions = actions;
116+
this.actionClassToInstanceMap = actions.keySet().stream().collect(Collectors.toMap(a -> a.getClass().getName(), a -> a));
112117
}
113118

114119
/**
@@ -285,6 +290,17 @@ public void close() throws IOException {
285290
doCloseHighLevelClient();
286291
}
287292

293+
/**
294+
* Gets an instance of {@link ActionType} from its corresponding class name, suitable for using as the first parameter in {@link #execute(ActionType, ActionRequest, ActionListener)}.
295+
*
296+
* @param className The class name of the action type
297+
* @return The instance corresponding to the class name
298+
*/
299+
@SuppressWarnings("unchecked")
300+
public ActionType<? extends ActionResponse> getActionFromClassName(String className) {
301+
return actionClassToInstanceMap.get(className);
302+
}
303+
288304
/**
289305
* Executes a generic action, denoted by an {@link ActionType}.
290306
*
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.sdk;
11+
12+
import java.nio.ByteBuffer;
13+
import java.nio.charset.StandardCharsets;
14+
import java.util.Map;
15+
import java.util.Set;
16+
import java.util.concurrent.TimeoutException;
17+
import java.util.stream.Collectors;
18+
19+
import org.apache.logging.log4j.LogManager;
20+
import org.apache.logging.log4j.Logger;
21+
import org.opensearch.cluster.node.DiscoveryNode;
22+
import org.opensearch.extensions.ExtensionsManager;
23+
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
24+
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
25+
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
26+
import org.opensearch.sdk.ActionExtension.ActionHandler;
27+
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
28+
import org.opensearch.sdk.action.SDKActionModule;
29+
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
30+
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
31+
import org.opensearch.transport.TransportService;
32+
33+
/**
34+
* Wrapper class for {@link TransportService} and associated methods.
35+
*
36+
* TODO: Move all the sendFooRequest() methods here
37+
* TODO: Replace usages of getExtensionTransportService with this class
38+
* https://github.com/opensearch-project/opensearch-sdk-java/issues/585
39+
*/
40+
public class SDKTransportService {
41+
private final Logger logger = LogManager.getLogger(SDKTransportService.class);
42+
43+
private TransportService transportService;
44+
private DiscoveryNode opensearchNode;
45+
private String uniqueId;
46+
47+
/**
48+
* Requests that OpenSearch register the Transport Actions for this extension.
49+
*
50+
* @param actions The map of registered actions from {@link SDKActionModule#getActions()}
51+
*/
52+
public void sendRegisterTransportActionsRequest(Map<String, ActionHandler<?, ?>> actions) {
53+
logger.info("Sending Register Transport Actions request to OpenSearch");
54+
Set<String> actionNameSet = actions.values()
55+
.stream()
56+
.filter(h -> !h.getAction().name().startsWith("internal"))
57+
.map(h -> h.getAction().getClass().getName())
58+
.collect(Collectors.toSet());
59+
AcknowledgedResponseHandler registerTransportActionsResponseHandler = new AcknowledgedResponseHandler();
60+
try {
61+
transportService.sendRequest(
62+
opensearchNode,
63+
ExtensionsManager.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
64+
new RegisterTransportActionsRequest(uniqueId, actionNameSet),
65+
registerTransportActionsResponseHandler
66+
);
67+
} catch (Exception e) {
68+
logger.error("Failed to send Register Transport Actions request to OpenSearch", e);
69+
}
70+
}
71+
72+
/**
73+
* Requests that OpenSearch execute a Transport Actions on another extension.
74+
*
75+
* @param request The request to send
76+
* @return A buffer serializing the response from the remote action if successful, otherwise null
77+
*/
78+
public RemoteExtensionActionResponse sendRemoteExtensionActionRequest(RemoteExtensionActionRequest request) {
79+
logger.info("Sending Remote Extension Action request to OpenSearch for [" + request.getAction() + "]");
80+
// Combine class name string and request bytes
81+
byte[] requestClassBytes = request.getRequestClass().getBytes(StandardCharsets.UTF_8);
82+
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + request.getRequestBytes().length)
83+
.put(requestClassBytes)
84+
.put(RemoteExtensionActionRequest.UNIT_SEPARATOR)
85+
.put(request.getRequestBytes())
86+
.array();
87+
ExtensionActionResponseHandler extensionActionResponseHandler = new ExtensionActionResponseHandler();
88+
try {
89+
transportService.sendRequest(
90+
opensearchNode,
91+
ExtensionsManager.TRANSPORT_ACTION_REQUEST_FROM_EXTENSION,
92+
new TransportActionRequestFromExtension(request.getAction(), proxyRequestBytes, uniqueId),
93+
extensionActionResponseHandler
94+
);
95+
// Wait on response
96+
extensionActionResponseHandler.awaitResponse();
97+
} catch (TimeoutException e) {
98+
logger.error("Failed to receive Remote Extension Action response from OpenSearch", e);
99+
} catch (Exception e) {
100+
logger.error("Failed to send Remote Extension Action request to OpenSearch", e);
101+
}
102+
// At this point, response handler has read in the response bytes
103+
return new RemoteExtensionActionResponse(
104+
extensionActionResponseHandler.isSuccess(),
105+
extensionActionResponseHandler.getResponseBytes()
106+
);
107+
}
108+
109+
public TransportService getTransportService() {
110+
return transportService;
111+
}
112+
113+
public DiscoveryNode getOpensearchNode() {
114+
return opensearchNode;
115+
}
116+
117+
public String getUniqueId() {
118+
return uniqueId;
119+
}
120+
121+
public void setTransportService(TransportService transportService) {
122+
this.transportService = transportService;
123+
}
124+
125+
public void setOpensearchNode(DiscoveryNode opensearchNode) {
126+
this.opensearchNode = opensearchNode;
127+
}
128+
129+
public void setUniqueId(String uniqueId) {
130+
this.uniqueId = uniqueId;
131+
}
132+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.sdk.action;
11+
12+
import org.opensearch.action.ActionType;
13+
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
14+
15+
/**
16+
* The {@link ActionType} used as they key for the {@link RemoteExtensionTransportAction}.
17+
*/
18+
public class RemoteExtensionAction extends ActionType<RemoteExtensionActionResponse> {
19+
20+
/**
21+
* The name to look up this action with
22+
*/
23+
public static final String NAME = "internal:remote-extension-action";
24+
/**
25+
* The singleton instance of this class
26+
*/
27+
public static final RemoteExtensionAction INSTANCE = new RemoteExtensionAction();
28+
29+
private RemoteExtensionAction() {
30+
super(NAME, RemoteExtensionActionResponse::new);
31+
}
32+
}

0 commit comments

Comments
 (0)