Skip to content

Add sandbox plugin for composite indexing execution engine#20909

Merged
Bukhtawar merged 6 commits intoopensearch-project:mainfrom
alchemist51:ciee
Mar 29, 2026
Merged

Add sandbox plugin for composite indexing execution engine#20909
Bukhtawar merged 6 commits intoopensearch-project:mainfrom
alchemist51:ciee

Conversation

@alchemist51
Copy link
Copy Markdown
Contributor

@alchemist51 alchemist51 commented Mar 18, 2026

Description

This PR introduces the composite-engine sandbox plugin that implements the CompositeIndexingExecutionEngine — the orchestration layer for multi-format indexing as described in RFC #20644

The composite engine enables an index to write documents to multiple storage formats (e.g., Lucene + Parquet) simultaneously through a single IndexingExecutionEngine interface. Format plugins register via the ExtensiblePlugin SPI, and the composite engine delegates writes, refresh, and file management to each per-format engine.

Note: we have used ExtensiblePlugin SPI model temporarily. Once we have introduced Dataformat Registry, we should be able to get rid of this model.

What's included

New sandbox plugin: sandbox/plugins/composite-engine

  • CompositeEnginePluginExtensiblePlugin entry point that discovers DataFormatPlugin implementations at node bootstrap, validates index settings, and creates the composite engine. Registers three index settings:
    • index.composite.enabled (default false)
    • index.composite.primary_data_format (default "lucene")
    • index.composite.secondary_data_formats (default [])
  • CompositeIndexingExecutionEngine — Orchestrates indexing across a primary and zero or more secondary per-format engines. Handles writer creation, refresh (flush all writers → build segments → delegate per-format refresh), file deletion, and document input creation.
  • CompositeDataFormat — A DataFormat wrapper over the constituent formats. Uses Long.MIN_VALUE priority so concrete formats take precedence.
  • CompositeDocumentInput — Broadcasts addField, setRowId, and other metadata operations to all per-format DocumentInput instances. Releases the writer back to the pool on close().
  • CompositeWriter — Delegates addDoc, flush, sync, and close to each per-format writer (primary first, then secondaries). Implements Lock for pool checkout semantics.
  • CompositeDataFormatWriterPool — Thread-safe pool of CompositeWriter instances with lock-based checkout/release and a checkoutAll for flush.
  • RowIdGenerator — Generates monotonically increasing row IDs for cross-format document synchronization within a writer's segment scope.

New sandbox lib: sandbox/libs/composite-engine-lib

  • ConcurrentQueue — Striped concurrent queue using thread-affinity hashing to reduce contention across concurrent indexing threads.
  • LockableConcurrentQueue — Extends ConcurrentQueue with tryLock-based polling so writers can be checked out without blocking.

How format plugins integrate

Format plugins (e.g., Parquet) extend this plugin by:

  1. Declaring extendedPlugins = ['composite-engine'] in their build.gradle
  2. Implementing DataFormatPlugin
  3. The ExtensiblePlugin SPI discovers them automatically during node bootstrap

Related issues

Resolves part of #20876

Check List

  • New functionality includes testing
  • New functionality has been documented
  • All classes are annotated with @ExperimentalApi
  • No BWC tests required (sandbox/experimental)
  • Commits are signed off (DCO)

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 18, 2026

PR Reviewer Guide 🔍

(Review updated until commit a37d2ef)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Add concurrent-queue library (ConcurrentQueue, LockableConcurrentQueue, LockablePool)

Relevant files:

  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/ConcurrentQueue.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/Lockable.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java
  • libs/concurrent-queue/src/main/java/org/opensearch/common/queue/package-info.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/ConcurrentQueueTests.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockableConcurrentQueueTests.java
  • libs/concurrent-queue/src/test/java/org/opensearch/common/queue/LockablePoolTests.java
  • libs/concurrent-queue/src/jmh/java/org/opensearch/common/queue/LockableConcurrentQueueBenchmark.java
  • libs/concurrent-queue/src/jmh/java/org/opensearch/common/queue/LockablePoolBenchmark.java

Sub-PR theme: Add composite-engine sandbox plugin with CompositeIndexingExecutionEngine

Relevant files:

  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java
  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDocumentInputTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java
  • sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java
  • sandbox/plugins/composite-engine/README.md

⚡ Recommended focus areas for review

Null Return

The refresh method returns null when no new segments are produced (line 219). Callers must handle null explicitly, which is error-prone. Consider returning an empty RefreshResult instead.

if (newSegmentList.isEmpty()) {
    logger.debug("No new segments produced from flush");
    return null;
}
Partial Failure Risk

In refresh, writers are flushed and closed one by one. If writer.flush() throws an IOException for one writer, the remaining writers are never closed, potentially leaking resources. There is no try/finally or error-handling around the flush+close loop.

for (CompositeWriter writer : dataFormatWriters) {
    FileInfos fileInfos = writer.flush();
    Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
    boolean hasFiles = false;
    for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
        logger.debug(
            "Writer gen={} flushed format=[{}] files={}",
            writer.getWriterGeneration(),
            entry.getKey().name(),
            entry.getValue().files()
        );
        segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
        hasFiles = true;
    }
    writer.close();
    if (hasFiles) {
        newSegmentList.add(segmentBuilder.build());
    }
}

Race Condition
In checkoutAll, the pool iterates items and locks them, but between the iteration snapshot and the synchronized block, items could be removed or added by other threads. Additionally, items removed from items set but still in availableItems could lead to inconsistency if availableItems.remove fails silently.

Partial Write Inconsistency

In addDoc, if writing to a secondary format fails, the document has already been written to the primary format. There is no rollback mechanism, which can lead to data inconsistency between primary and secondary formats.

public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
    if (state.get() != WriterState.ACTIVE) {
        throw new IllegalStateException("Cannot add document to writer in state " + state.get());
    }
    // Write to primary first
    WriteResult primaryResult = primaryWriter.addDoc(doc.getPrimaryInput());
    switch (primaryResult) {
        case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryFormat.name());
        case WriteResult.Failure f -> {
            logger.debug("Failed to add document in primary format [{}]", primaryFormat.name());
            return primaryResult;
        }
    }

    // Then write to each secondary — keyed lookup by DataFormat (equals/hashCode based on name)
    Map<DataFormat, DocumentInput<?>> secondaryInputs = doc.getSecondaryInputs();
    for (Map.Entry<DataFormat, DocumentInput<?>> inputEntry : secondaryInputs.entrySet()) {
        DataFormat format = inputEntry.getKey();
        Writer<DocumentInput<?>> writer = secondaryWritersByFormat.get(format);
        if (writer == null) {
            logger.warn("No writer found for secondary format [{}], skipping", format.name());
            continue;
        }
        WriteResult result = writer.addDoc(inputEntry.getValue());
        switch (result) {
            case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", format.name());
            case WriteResult.Failure f -> {
                logger.debug("Failed to add document in secondary format [{}]", format.name());
                return result;
            }
        }
    }

    return primaryResult;
TODO Comment

The getDataFormat() method returns null with a TODO comment indicating the design is incomplete. Callers that do not expect null may encounter NullPointerExceptions.

public DataFormat getDataFormat() {
    // TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now
    return null;
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 18, 2026

PR Code Suggestions ✨

Latest suggestions up to a37d2ef

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Return checked-out items in locked state

Items that are locked but not checked out (e.g., because isRegistered returns false)
are unlocked in the finally block, but items that are successfully checked out are
also unlocked there — meaning checked-out items are returned to callers in an
unlocked state, contrary to the method's contract of returning locked items. The
item.unlock() in finally should only apply to items that were not checked out.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java [88-109]

 public List<T> checkoutAll() {
     ensureOpen();
     List<T> lockedItems = new ArrayList<>();
     List<T> checkedOutItems = new ArrayList<>();
     for (T item : this) {
         item.lock();
         lockedItems.add(item);
     }
     synchronized (this) {
         for (T item : lockedItems) {
+            boolean checkedOut = false;
             try {
                 if (isRegistered(item) && items.remove(item)) {
                     availableItems.remove(item);
                     checkedOutItems.add(item);
+                    checkedOut = true;
                 }
             } finally {
-                item.unlock();
+                if (!checkedOut) {
+                    item.unlock();
+                }
             }
         }
     }
     return Collections.unmodifiableList(checkedOutItems);
 }
Suggestion importance[1-10]: 8

__

Why: The current finally block unconditionally unlocks all items including those successfully checked out, meaning callers receive unlocked items contrary to the expected contract. The fix correctly only unlocks items that were not checked out, which is a correctness bug.

Medium
Ensure writer is closed on flush failure

If writer.flush() throws an IOException, writer.close() will never be called,
leaking resources. The writer.close() call should be placed in a finally block (or
use try-with-resources) to ensure cleanup even on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [197-215]

 for (CompositeWriter writer : dataFormatWriters) {
-    FileInfos fileInfos = writer.flush();
-    ...
-    writer.close();
-    if (hasFiles) {
-        newSegmentList.add(segmentBuilder.build());
+    try {
+        FileInfos fileInfos = writer.flush();
+        Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
+        boolean hasFiles = false;
+        for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
+            logger.debug(
+                "Writer gen={} flushed format=[{}] files={}",
+                writer.getWriterGeneration(),
+                entry.getKey().name(),
+                entry.getValue().files()
+            );
+            segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
+            hasFiles = true;
+        }
+        if (hasFiles) {
+            newSegmentList.add(segmentBuilder.build());
+        }
+    } finally {
+        writer.close();
     }
 }
Suggestion importance[1-10]: 7

__

Why: If writer.flush() throws an IOException, writer.close() is never called, causing a resource leak. Wrapping the flush logic in a try-finally block ensures cleanup regardless of failure.

Medium
Close all writers even if one fails

If primaryWriter.close() throws an IOException, the secondary writers will never be
closed, leaking resources. Each close call should be wrapped so that all writers are
closed even if one fails, with exceptions suppressed onto the first.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java [165-170]

 @Override
 public void close() throws IOException {
-    primaryWriter.close();
+    IOException firstException = null;
+    try {
+        primaryWriter.close();
+    } catch (IOException e) {
+        firstException = e;
+    }
     for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
-        writer.close();
+        try {
+            writer.close();
+        } catch (IOException e) {
+            if (firstException == null) {
+                firstException = e;
+            } else {
+                firstException.addSuppressed(e);
+            }
+        }
+    }
+    if (firstException != null) {
+        throw firstException;
     }
 }
Suggestion importance[1-10]: 7

__

Why: If primaryWriter.close() throws, secondary writers are never closed, leaking resources. The improved code ensures all writers are closed with exceptions properly suppressed, consistent with the pattern already used in deleteFiles.

Medium
Avoid returning null from refresh method

Returning null from refresh() when no new segments are produced may cause
NullPointerExceptions in callers that don't expect a null RefreshResult. Consider
returning an empty or no-op RefreshResult (e.g., wrapping the existing segments)
instead of null to maintain a consistent contract.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [217-220]

 if (newSegmentList.isEmpty()) {
     logger.debug("No new segments produced from flush");
-    return null;
+    return new RefreshResult(refreshedSegments);
 }
Suggestion importance[1-10]: 6

__

Why: Returning null from refresh() when no new segments are produced can cause NullPointerException in callers. Returning a RefreshResult with existing segments is a safer contract, though the impact depends on how callers handle the null case.

Low
Verify interface vs class inheritance change

Changing MockDataFormat from implementing an interface (implements DataFormat) to
extending a class (extends DataFormat) is a significant semantic change. If
DataFormat is an interface, this will cause a compilation error. If it is an
abstract class, verify that all abstract methods are still properly overridden and
that the test behavior remains correct.

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java [180]

-static class MockDataFormat extends DataFormat {
+static class MockDataFormat implements DataFormat {
Suggestion importance[1-10]: 6

__

Why: The PR intentionally changes MockDataFormat from implements DataFormat to extends DataFormat, suggesting DataFormat was changed from an interface to an abstract class in this PR. The suggestion raises a valid concern about verifying this semantic change is correct, but the improved_code reverts to the old code rather than validating the new approach, making it somewhat contradictory to the PR's intent.

Low

Previous suggestions

Suggestions up to commit fc6e33a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Preserve lock state for checked-out items

Items that are locked but not checked out (e.g., because isRegistered returns false)
are unlocked in the finally block, but items that are successfully checked out are
also unlocked there — meaning checked-out items are returned to callers in an
unlocked state. The callers in refresh() expect checked-out items to remain locked
(they call writer.setFlushPending() and writer.flush() without re-locking). The
item.unlock() in the finally block should only apply to items that were not checked
out.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java [88-109]

-public List<T> checkoutAll() {
-    ensureOpen();
-    List<T> lockedItems = new ArrayList<>();
-    List<T> checkedOutItems = new ArrayList<>();
-    for (T item : this) {
-        item.lock();
-        lockedItems.add(item);
-    }
-    synchronized (this) {
-        for (T item : lockedItems) {
-            try {
-                if (isRegistered(item) && items.remove(item)) {
-                    availableItems.remove(item);
-                    checkedOutItems.add(item);
-                }
-            } finally {
+synchronized (this) {
+    for (T item : lockedItems) {
+        boolean checkedOut = false;
+        try {
+            if (isRegistered(item) && items.remove(item)) {
+                availableItems.remove(item);
+                checkedOutItems.add(item);
+                checkedOut = true;
+            }
+        } finally {
+            if (!checkedOut) {
                 item.unlock();
             }
         }
     }
-    return Collections.unmodifiableList(checkedOutItems);
 }
Suggestion importance[1-10]: 8

__

Why: The finally block unconditionally unlocks all items including checked-out ones, but callers like refresh() in CompositeIndexingExecutionEngine expect checked-out items to remain locked when returned. This is a correctness bug where items are returned unlocked despite callers assuming they are locked.

Medium
Ensure all writers are closed on failure

If primaryWriter.close() throws an IOException, the secondary writers will never be
closed, causing resource leaks. Use a try-finally or collect all exceptions to
ensure all writers are closed regardless of failures.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java [165-170]

 @Override
 public void close() throws IOException {
-    primaryWriter.close();
+    IOException firstException = null;
+    try {
+        primaryWriter.close();
+    } catch (IOException e) {
+        firstException = e;
+    }
     for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
-        writer.close();
+        try {
+            writer.close();
+        } catch (IOException e) {
+            if (firstException == null) {
+                firstException = e;
+            } else {
+                firstException.addSuppressed(e);
+            }
+        }
+    }
+    if (firstException != null) {
+        throw firstException;
     }
 }
Suggestion importance[1-10]: 7

__

Why: If primaryWriter.close() throws, secondary writers are never closed, causing resource leaks. The suggested try-finally pattern with suppressed exceptions mirrors the existing deleteFiles pattern in the same codebase and is a valid fix.

Medium
Avoid returning null from refresh method

Returning null from refresh() when no new segments are produced may cause a
NullPointerException in callers that don't check for null. Consider returning an
empty RefreshResult (e.g., new RefreshResult(refreshInput.existingSegments()))
instead to maintain a consistent non-null contract.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [217-220]

 if (newSegmentList.isEmpty()) {
     logger.debug("No new segments produced from flush");
-    return null;
+    return new RefreshResult(refreshInput.existingSegments());
 }
Suggestion importance[1-10]: 6

__

Why: Returning null from refresh() could cause NullPointerException in callers that don't null-check the result. Returning an empty RefreshResult would be safer and more consistent, though the impact depends on how callers handle the return value.

Low
Verify correct use of extends vs implements

Changing MockDataFormat from implementing an interface to extending a class may
break compilation if DataFormat is an interface rather than an abstract class.
Verify that DataFormat is indeed an abstract class before using extends. If
DataFormat is an interface, revert this change back to implements DataFormat.

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/DefaultPlanExecutorTests.java [180]

-static class MockDataFormat extends DataFormat {
+static class MockDataFormat implements DataFormat {
Suggestion importance[1-10]: 6

__

Why: The PR changes MockDataFormat from implements DataFormat to extends DataFormat, which is a meaningful semantic change. If DataFormat is an interface, this would cause a compilation error. The suggestion correctly identifies this potential issue, though the improved_code reverts to the old behavior rather than confirming the new one is correct.

Low
General
Remove duplicate Javadoc comment

There is a duplicate Javadoc comment block in DataFormatPlugin.java. The first
Javadoc block should be removed to avoid confusion and redundant documentation.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java [16-25]

 /**
  * Plugin interface for providing custom data format implementations.
  * Plugins implement this to register their data format (e.g., Parquet, Lucene)
  * with the DataFormatRegistry during node bootstrap.
  *
  * @opensearch.experimental
  */
-/**
- * Plugin interface for providing custom data format implementations.
- * Plugins implement this to register their data format (e.g., Parquet, Lucene)
Suggestion importance[1-10]: 4

__

Why: There are two identical Javadoc comment blocks in DataFormatPlugin.java, which is redundant and confusing. Removing the duplicate improves code clarity.

Low
Suggestions up to commit 80dd650
CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle resource cleanup on secondary input creation failure

If an exception occurs while creating secondary document inputs, the primary input
and writer are never cleaned up, causing resource leaks. Wrap the secondary input
creation in a try-catch block to ensure proper cleanup of the primary input and
writer release on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [252-263]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
     DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        try {
+            primaryInput.close();
+        } catch (Exception suppressed) {
+            e.addSuppressed(suppressed);
+        }
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid error handling suggestion that addresses a real resource leak vulnerability. If secondary input creation fails, the primaryInput and writer would not be cleaned up, causing resource exhaustion. The suggested try-catch with proper cleanup is appropriate and important for production robustness.

Medium
General
Handle writer close failures gracefully

If writer.close() throws an exception, the writer is left in an inconsistent state
and subsequent operations may fail. Wrap the close operation in a try-catch to log
the error and continue flushing remaining writers, or ensure all writers are
properly closed even if one fails.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [191-209]

 // Flush each writer to disk and build segments from the file infos
     for (CompositeWriter writer : dataFormatWriters) {
         FileInfos fileInfos = writer.flush();
         Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
         boolean hasFiles = false;
         for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
             logger.debug(
                 "Writer gen={} flushed format=[{}] files={}",
                 writer.getWriterGeneration(),
                 entry.getKey().name(),
                 entry.getValue().files()
             );
             segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
             hasFiles = true;
         }
-        writer.close();
+        try {
+            writer.close();
+        } catch (Exception e) {
+            logger.warn("Failed to close writer gen={}", writer.getWriterGeneration(), e);
+        }
         if (hasFiles) {
             newSegmentList.add(segmentBuilder.build());
         }
     }
Suggestion importance[1-10]: 6

__

Why: This is a reasonable error handling suggestion that prevents a single writer close failure from disrupting the entire refresh operation. Wrapping the close in try-catch with logging allows remaining writers to be processed, improving resilience. However, error handling suggestions typically score lower than functional improvements.

Low
Suggestions up to commit 2081844
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent writer pool resource leak on exception

If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput()
throws an exception, the writer is never released back to the pool, causing a
resource leak. Wrap the document input creation in a try-catch to ensure the writer
is always released on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [251-262]

 @Override
 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 8

__

Why: This is a critical resource management issue. If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput() throws an exception, the writer is never released back to the pool, causing a permanent resource leak. The suggested try-catch-finally pattern correctly ensures the writer is always released, which is essential for maintaining pool integrity and preventing thread starvation in production.

Medium
Add null safety to equals and hashCode

The equals() and hashCode() methods rely on name() which could be null or blank,
potentially causing inconsistent behavior when used as Map keys. Add null/blank
checks to ensure robust equality semantics and prevent silent bugs in collections.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java [49-59]

 @Override
 public final boolean equals(Object o) {
     if (this == o) return true;
     if (o instanceof DataFormat == false) return false;
-    return Objects.equals(name(), ((DataFormat) o).name());
+    String thisName = name();
+    String otherName = ((DataFormat) o).name();
+    if (thisName == null || otherName == null) return false;
+    return thisName.equals(otherName);
 }
 
 @Override
 public final int hashCode() {
-    return Objects.hashCode(name());
+    String n = name();
+    return n == null ? 0 : n.hashCode();
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a potential issue where name() could return null, causing NullPointerException or inconsistent behavior when DataFormat is used as a Map key. However, the PR code already shows that DataFormat is used as a Map key in CompositeWriter and CompositeDocumentInput, and the codebase appears to enforce non-null names (e.g., validation in CompositeEnginePlugin.loadExtensions). The improvement is defensive but may be unnecessary given the existing validation patterns.

Low
General
Ensure all writers are processed despite close failures

If writer.close() throws an exception, the writer is not properly cleaned up and
subsequent writers may not be flushed. Wrap the close operation in a try-finally
block to ensure all writers are processed even if one fails.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [189-208]

 // Flush each writer to disk and build segments from the file infos
     for (CompositeWriter writer : dataFormatWriters) {
         FileInfos fileInfos = writer.flush();
         Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration());
         boolean hasFiles = false;
         for (Map.Entry<DataFormat, WriterFileSet> entry : fileInfos.writerFilesMap().entrySet()) {
             logger.debug(
                 "Writer gen={} flushed format=[{}] files={}",
                 writer.getWriterGeneration(),
                 entry.getKey().name(),
                 entry.getValue().files()
             );
             segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue());
             hasFiles = true;
         }
-        writer.close();
-        if (hasFiles) {
-            newSegmentList.add(segmentBuilder.build());
+        try {
+            writer.close();
+        } finally {
+            if (hasFiles) {
+                newSegmentList.add(segmentBuilder.build());
+            }
         }
     }
Suggestion importance[1-10]: 5

__

Why: While the suggestion identifies a potential issue where writer.close() could throw an exception, the proposed fix has a logic flaw. Moving the segment addition into a finally block means segments would be added even if the close fails, which could result in corrupted segment metadata. A better approach would be to ensure all writers are closed (possibly with exception suppression) before adding segments, rather than the suggested try-finally structure.

Low
Suggestions up to commit 0267a41
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in counter increment ordering

The entry is unlocked before the counter is incremented, creating a race window
where a concurrent lockAndPoll thread can acquire the entry and observe a stale
counter value, causing it to exit the retry loop prematurely and return null even
though an entry was just added. The counter should be incremented before unlocking
the entry so that any thread spinning in lockAndPoll sees the updated count before
the entry becomes acquirable.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
     queue.add(entry);
+    addAndUnlockCounter.incrementAndGet();
     entry.unlock();
-    addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 8

__

Why: This is a genuine race condition: unlocking the entry before incrementing the counter means a lockAndPoll thread could acquire the entry and see the old counter value, causing it to exit the retry loop prematurely and return null. Incrementing the counter before unlock() closes this window.

Medium
Fix checked-out items being incorrectly unlocked

Items that are locked but fail the isRegistered check (e.g., concurrently removed)
are unlocked in the finally block, but items that are successfully checked out are
also unlocked in the same finally block — meaning checked-out items are returned to
callers in an unlocked state. The item.unlock() in the finally block should only be
called for items that were NOT successfully checked out.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java [88-109]

 public List<T> checkoutAll() {
     ensureOpen();
     List<T> lockedItems = new ArrayList<>();
     List<T> checkedOutItems = new ArrayList<>();
     for (T item : this) {
         item.lock();
         lockedItems.add(item);
     }
     synchronized (this) {
         for (T item : lockedItems) {
+            boolean checkedOut = false;
             try {
                 if (isRegistered(item) && items.remove(item)) {
                     availableItems.remove(item);
                     checkedOutItems.add(item);
+                    checkedOut = true;
                 }
             } finally {
-                item.unlock();
+                if (!checkedOut) {
+                    item.unlock();
+                }
             }
         }
     }
     return Collections.unmodifiableList(checkedOutItems);
 }
Suggestion importance[1-10]: 8

__

Why: The finally block unconditionally calls item.unlock() for all items, including those successfully checked out. This means callers receive items in an unlocked state, violating the contract that checkoutAll returns locked items. The fix correctly only unlocks items that were not checked out.

Medium
Prevent writer pool leak on document input creation failure

If primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput()
throws an exception, the checked-out writer is never returned to the pool, causing a
permanent pool leak. The document input creation should be wrapped in a try-catch
that releases the writer back to the pool on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [251-262]

 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 7

__

Why: If primaryEngine.newDocumentInput() or any secondary engine call throws, the checked-out writer is never returned to the pool. The suggested try-catch correctly releases the writer on failure, preventing a pool resource leak.

Medium
General
Remove duplicate Javadoc comment block

The Javadoc comment for DataFormatPlugin is duplicated — the same comment block
appears twice consecutively. The duplicate comment block should be removed to keep
the file clean.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java [16-27]

 /**
  * Plugin interface for providing custom data format implementations.
  * Plugins implement this to register their data format (e.g., Parquet, Lucene)
  * with the DataFormatRegistry during node bootstrap.
  *
  * @opensearch.experimental
  */
-/**
- * Plugin interface for providing custom data format implementations.
- * Plugins implement this to register their data format (e.g., Parquet, Lucene)
- * with the DataFormatRegistry during node bootstrap.
Suggestion importance[1-10]: 3

__

Why: The Javadoc comment is indeed duplicated in the PR diff (lines 16-22 and 23-27 contain the same content). This is a minor code quality issue with no functional impact.

Low
Suggestions up to commit 5c20337
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix checked-out items being unlocked before return

Items that are locked but fail the isRegistered check (e.g., concurrently removed)
are unlocked in the finally block, but items that were successfully checked out are
also unlocked there — meaning all checked-out items are unlocked before being
returned to the caller. The caller receives items that are no longer locked,
defeating the purpose of checkoutAll. Items added to checkedOutItems should not be
unlocked.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java [88-109]

 public List<T> checkoutAll() {
     ensureOpen();
     List<T> lockedItems = new ArrayList<>();
     List<T> checkedOutItems = new ArrayList<>();
     for (T item : this) {
         item.lock();
         lockedItems.add(item);
     }
     synchronized (this) {
         for (T item : lockedItems) {
-            try {
-                if (isRegistered(item) && items.remove(item)) {
-                    availableItems.remove(item);
-                    checkedOutItems.add(item);
-                }
-            } finally {
+            if (isRegistered(item) && items.remove(item)) {
+                availableItems.remove(item);
+                checkedOutItems.add(item);
+            } else {
                 item.unlock();
             }
         }
     }
     return Collections.unmodifiableList(checkedOutItems);
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical correctness bug — the finally block unconditionally unlocks all items including those added to checkedOutItems, so the caller receives items that are already unlocked, completely defeating the purpose of checkoutAll. The improved code correctly only unlocks items that were not checked out.

High
Prevent writer pool leak on document input creation failure

If primaryEngine.newDocumentInput() or any secondary newDocumentInput() throws an
exception, the checked-out writer is never returned to the pool, causing a permanent
pool leak. The writer checkout and document input creation should be wrapped in a
try/finally block to ensure the writer is released on failure.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [251-262]

 public CompositeDocumentInput newDocumentInput() {
     CompositeWriter writer = writerPool.getAndLock();
-    DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
-    Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
-    for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
-        secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+    try {
+        DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
+        Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
+        for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
+            secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
+        }
+        return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
+            assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
+            writerPool.releaseAndUnlock(writer);
+        });
+    } catch (Exception e) {
+        writerPool.releaseAndUnlock(writer);
+        throw e;
     }
-    return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap, () -> {
-        assert writer.getState() == CompositeWriter.WriterState.ACTIVE : "CompositeWriter is not ACTIVE, state=" + writer.getState();
-        writerPool.releaseAndUnlock(writer);
-    });
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid and important bug fix — if primaryEngine.newDocumentInput() or any secondary engine's newDocumentInput() throws, the writer checked out from writerPool is never returned, causing a permanent pool leak that would degrade or halt indexing.

Medium
Fix race condition in counter increment ordering

The entry is unlocked before the counter is incremented. A thread in lockAndPoll
could observe the entry as available (via tryLock), poll it, and then the counter
increment would cause a spurious retry loop. The counter should be incremented
before unlocking so that any waiting lockAndPoll caller sees the updated count only
after the entry is truly available.

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockableConcurrentQueue.java [74-78]

 public void addAndUnlock(T entry) {
     queue.add(entry);
+    addAndUnlockCounter.incrementAndGet();
     entry.unlock();
-    addAndUnlockCounter.incrementAndGet();
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that incrementing the counter before unlocking ensures lockAndPoll callers see the updated count only after the entry is truly available, preventing a subtle race condition where the entry is visible but the counter hasn't been updated yet.

Medium
General
Remove duplicate Javadoc comment

The Javadoc comment for DataFormatPlugin is duplicated — the same block comment
appears twice consecutively. The duplicate should be removed to keep the
documentation clean.

server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java [16-27]

 /**
  * Plugin interface for providing custom data format implementations.
  * Plugins implement this to register their data format (e.g., Parquet, Lucene)
  * with the DataFormatRegistry during node bootstrap.
  *
  * @opensearch.experimental
  */
-/**
- * Plugin interface for providing custom data format implementations.
- * Plugins implement this to register their data format (e.g., Parquet, Lucene)
- * with the DataFormatRegistry during node bootstrap.
Suggestion importance[1-10]: 3

__

Why: The duplicate Javadoc block is a real issue in the PR diff, but it's a minor documentation problem that doesn't affect functionality.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 392e1fd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b3f4e8a

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for b3f4e8a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ae4560a

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 03e124c

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 03e124c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 82ffb04

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dc8d029

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for dc8d029: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6687b5d

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 6687b5d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 19, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 8f87d7e.

PathLineSeverityDescription
server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java13lowDuplicate Javadoc comment block — the same plugin interface documentation appears twice in the file. Likely a copy-paste artifact with no security impact, but anomalous enough to note as it could obscure intentional modifications to the interface contract.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 238416a

@alchemist51 alchemist51 added the Indexing Indexing, Bulk Indexing and anything related to indexing label Mar 19, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 61cd8e4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 61cd8e4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2e5d2fa

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f685976

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5214646

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 166e609

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 3a9ddb3: SUCCESS

alchemist51 and others added 5 commits March 29, 2026 00:22
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2404098: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Failed to generate code suggestions for PR

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for e93d27a: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit fc6e33a

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for fc6e33a: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a37d2ef

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for a37d2ef: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Indexing Indexing, Bulk Indexing and anything related to indexing lucene

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants