Skip to content

Commit d0abb01

Browse files
committed
Only send one extension info when initializing
Signed-off-by: Daniel Widdis <widdis@gmail.com>
1 parent d377ab1 commit d0abb01

3 files changed

Lines changed: 86 additions & 48 deletions

File tree

server/src/main/java/org/opensearch/discovery/InitializeExtensionsRequest.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.opensearch.transport.TransportRequest;
1616

1717
import java.io.IOException;
18-
import java.util.List;
1918
import java.util.Objects;
2019

2120
/**
@@ -25,52 +24,49 @@
2524
*/
2625
public class InitializeExtensionsRequest extends TransportRequest {
2726
private final DiscoveryNode sourceNode;
28-
/*
29-
* TODO change DiscoveryNode to Extension information
30-
*/
31-
private final List<DiscoveryExtension> extensions;
27+
private final DiscoveryExtension extension;
3228

33-
public InitializeExtensionsRequest(DiscoveryNode sourceNode, List<DiscoveryExtension> extensions) {
29+
public InitializeExtensionsRequest(DiscoveryNode sourceNode, DiscoveryExtension extension) {
3430
this.sourceNode = sourceNode;
35-
this.extensions = extensions;
31+
this.extension = extension;
3632
}
3733

3834
public InitializeExtensionsRequest(StreamInput in) throws IOException {
3935
super(in);
4036
sourceNode = new DiscoveryNode(in);
41-
extensions = in.readList(DiscoveryExtension::new);
37+
extension = new DiscoveryExtension(in);
4238
}
4339

4440
@Override
4541
public void writeTo(StreamOutput out) throws IOException {
4642
super.writeTo(out);
4743
sourceNode.writeTo(out);
48-
out.writeList(extensions);
49-
}
50-
51-
public List<DiscoveryExtension> getExtensions() {
52-
return extensions;
44+
extension.writeTo(out);
5345
}
5446

5547
public DiscoveryNode getSourceNode() {
5648
return sourceNode;
5749
}
5850

51+
public DiscoveryExtension getExtension() {
52+
return extension;
53+
}
54+
5955
@Override
6056
public String toString() {
61-
return "InitializeExtensionsRequest{" + "sourceNode=" + sourceNode + ", extensions=" + extensions + '}';
57+
return "InitializeExtensionsRequest{" + "sourceNode=" + sourceNode + ", extension=" + extension + '}';
6258
}
6359

6460
@Override
6561
public boolean equals(Object o) {
6662
if (this == o) return true;
6763
if (o == null || getClass() != o.getClass()) return false;
6864
InitializeExtensionsRequest that = (InitializeExtensionsRequest) o;
69-
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extensions, that.extensions);
65+
return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(extension, that.extension);
7066
}
7167

7268
@Override
7369
public int hashCode() {
74-
return Objects.hash(sourceNode, extensions);
70+
return Objects.hash(sourceNode, extension);
7571
}
7672
}

server/src/main/java/org/opensearch/extensions/ExtensionsOrchestrator.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -207,36 +207,40 @@ private void extensionsDiscovery() throws IOException {
207207
throw new IOException("Could not read from extensions.yml", e);
208208
}
209209
for (Extension extension : extensions) {
210-
try {
211-
DiscoveryExtension discoveryExtension = new DiscoveryExtension(
212-
extension.getName(),
213-
extension.getUniqueId(),
214-
// placeholder for ephemeral id, will change with POC discovery
215-
extension.getUniqueId(),
216-
extension.getHostName(),
217-
extension.getHostAddress(),
218-
new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())),
219-
new HashMap<String, String>(),
220-
Version.fromString(extension.getOpensearchVersion()),
221-
new PluginInfo(
210+
if (extensionIdMap.containsKey(extension.getUniqueId())) {
211+
logger.info("Duplicate uniqueId " + extension.getUniqueId() + ". Did not load extension: " + extension);
212+
} else {
213+
try {
214+
DiscoveryExtension discoveryExtension = new DiscoveryExtension(
222215
extension.getName(),
223-
extension.getDescription(),
224-
extension.getVersion(),
216+
extension.getUniqueId(),
217+
// placeholder for ephemeral id, will change with POC discovery
218+
extension.getUniqueId(),
219+
extension.getHostName(),
220+
extension.getHostAddress(),
221+
new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())),
222+
new HashMap<String, String>(),
225223
Version.fromString(extension.getOpensearchVersion()),
226-
extension.getJavaVersion(),
227-
extension.getClassName(),
228-
new ArrayList<String>(),
229-
Boolean.parseBoolean(extension.hasNativeController())
230-
)
231-
);
232-
extensionsList.add(discoveryExtension);
233-
extensionIdMap.put(extension.getUniqueId(), discoveryExtension);
234-
logger.info("Loaded extension: " + extension + " with id " + extension.getUniqueId());
235-
} catch (IllegalArgumentException e) {
236-
logger.error(e.toString());
224+
new PluginInfo(
225+
extension.getName(),
226+
extension.getDescription(),
227+
extension.getVersion(),
228+
Version.fromString(extension.getOpensearchVersion()),
229+
extension.getJavaVersion(),
230+
extension.getClassName(),
231+
new ArrayList<String>(),
232+
Boolean.parseBoolean(extension.hasNativeController())
233+
)
234+
);
235+
extensionsList.add(discoveryExtension);
236+
extensionIdMap.put(extension.getUniqueId(), discoveryExtension);
237+
logger.info("Loaded extension: " + extension + " with id " + extension.getUniqueId());
238+
} catch (IllegalArgumentException e) {
239+
logger.error(e.toString());
240+
}
237241
}
238242
}
239-
if (!extensionsList.isEmpty()) {
243+
if (!extensionIdMap.isEmpty()) {
240244
logger.info("Loaded all extensions");
241245
}
242246
} else {
@@ -245,12 +249,12 @@ private void extensionsDiscovery() throws IOException {
245249
}
246250

247251
public void extensionsInitialize() {
248-
for (DiscoveryNode extensionNode : extensionsList) {
249-
extensionInitialize(extensionNode);
252+
for (DiscoveryExtension extension : extensionsList) {
253+
extensionInitialize(extension);
250254
}
251255
}
252256

253-
private void extensionInitialize(DiscoveryNode extensionNode) {
257+
private void extensionInitialize(DiscoveryExtension extension) {
254258
final CountDownLatch inProgressLatch = new CountDownLatch(1);
255259
final TransportResponseHandler<InitializeExtensionsResponse> extensionResponseHandler = new TransportResponseHandler<
256260
InitializeExtensionsResponse>() {
@@ -285,11 +289,11 @@ public String executor() {
285289
};
286290
try {
287291
logger.info("Sending extension request type: " + REQUEST_EXTENSION_ACTION_NAME);
288-
transportService.connectToNode(extensionNode, true);
292+
transportService.connectToNode(extension, true);
289293
transportService.sendRequest(
290-
extensionNode,
294+
extension,
291295
REQUEST_EXTENSION_ACTION_NAME,
292-
new InitializeExtensionsRequest(transportService.getLocalNode(), new ArrayList<DiscoveryExtension>(extensionsList)),
296+
new InitializeExtensionsRequest(transportService.getLocalNode(), extension),
293297
extensionResponseHandler
294298
);
295299
inProgressLatch.await(100, TimeUnit.SECONDS);

server/src/test/java/org/opensearch/extensions/ExtensionsOrchestratorTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Map;
3636
import java.util.Objects;
3737
import java.util.concurrent.TimeUnit;
38+
import java.util.stream.Collectors;
3839

3940
import org.apache.logging.log4j.Level;
4041
import org.apache.logging.log4j.LogManager;
@@ -217,6 +218,43 @@ public void testExtensionsDiscovery() throws Exception {
217218
assertEquals(expectedExtensionsList, extensionsOrchestrator.extensionsList);
218219
}
219220

221+
public void testNonUniqueExtensionsDiscovery() throws Exception {
222+
Path extensionDir = createTempDir();
223+
224+
List<String> nonUniqueYmlLines = extensionsYmlLines.stream()
225+
.map(s -> s.replace("uniqueid2", "uniqueid1"))
226+
.collect(Collectors.toList());
227+
Files.write(extensionDir.resolve("extensions.yml"), nonUniqueYmlLines, StandardCharsets.UTF_8);
228+
229+
ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir);
230+
231+
List<DiscoveryExtension> expectedExtensionsList = new ArrayList<DiscoveryExtension>();
232+
233+
expectedExtensionsList.add(
234+
new DiscoveryExtension(
235+
"firstExtension",
236+
"uniqueid1",
237+
"uniqueid1",
238+
"myIndependentPluginHost1",
239+
"127.0.0.0",
240+
new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300),
241+
new HashMap<String, String>(),
242+
Version.fromString("3.0.0"),
243+
new PluginInfo(
244+
"firstExtension",
245+
"Fake description 1",
246+
"0.0.7",
247+
Version.fromString("3.0.0"),
248+
"14",
249+
"fakeClass1",
250+
new ArrayList<String>(),
251+
false
252+
)
253+
)
254+
);
255+
assertEquals(expectedExtensionsList, extensionsOrchestrator.extensionsList);
256+
}
257+
220258
public void testNonAccessibleDirectory() throws Exception {
221259
AccessControlException e = expectThrows(
222260

0 commit comments

Comments
 (0)