Skip to content

fix(embedder): resolve PR #607 review — task_type per-op, EmbeddingHandler relocation, multimodal feature flag#2

Closed
chethanuk wants to merge 3 commits intomainfrom
resolve
Closed

fix(embedder): resolve PR #607 review — task_type per-op, EmbeddingHandler relocation, multimodal feature flag#2
chethanuk wants to merge 3 commits intomainfrom
resolve

Conversation

@chethanuk
Copy link
Copy Markdown
Owner

@chethanuk chethanuk commented Mar 15, 2026

Summary

  • Chunk 1 — embed_query() task_type per-operation: Added embed_query() to DenseEmbedderBase (delegates to embed()); GeminiDenseEmbedder overrides with dedicated _query_config (RETRIEVAL_QUERY) while embed() uses _index_config (RETRIEVAL_DOCUMENT). Updated hierarchical_retriever.py and memory_deduplicator.py to call embed_query(). Deprecated task_type config field (still accepted).
  • Chunk 2 — EmbeddingHandler rename + relocate: Moved TextEmbeddingHandler from collection_schemas.py to queuefs/embedding_handler.py, renamed to EmbeddingHandler. Backward-compat alias kept. Removed 2 constant-testing unit tests; consolidated 2 duplicate fallback tests into one @pytest.mark.parametrize.
  • Chunk 3 — Multimodal feature flag + parts API + PDF guard + batch: All multimodal code ships gated behind enable_multimodal: bool = False. Vectorize gains parts: Optional[List[ContentPart]] + get_parts(). embed_multimodal rewrites to use parts API with pdfminer-six PDF page guard. embed_multimodal_batch/async_embed_multimodal_batch added to base class and Gemini (anyio semaphore). Flip enable_multimodal=True in April — no code swap needed.

Issues Fixed (PR volcengine#607 review)

# Issue Severity
1 embed_multimodal only 1 text+1 media → now arbitrary parts[] sequences High (gated)
2 No PDF page-count guard (Gemini max 6 pages) High (gated)
3 TextEmbeddingHandler in wrong file with wrong name Medium
4 No embed_multimodal_batch / async_embed_multimodal_batch Medium (gated)
6 task_type baked at init → same type for index + query Medium

Test Plan

  • tests/unit/test_embedder_base.pyembed_query() base fallback
  • tests/unit/test_vectorize_parts.pyVectorize.get_parts() (5 cases)
  • tests/unit/test_gemini_embedder.py — feature flag, task_type split, PDF guard, multi-part, batch
  • tests/misc/test_config_validation.pytask_type deprecation notice
  • 212 tests passing, 0 new failures
  • Integration: pytest tests/integration/test_gemini_e2e.py -m integration (requires GEMINI_API_KEY)

Summary by CodeRabbit

Release Notes

  • New Features

    • Multimodal embedding support for Gemini (images, audio, video, PDFs with automatic page validation)
    • Batch and async batch embedding processing capabilities
    • Query-specific embeddings with optimized routing
  • Refactor

    • Streamlined embedding handler implementation
    • Reorganized configuration structure
  • Tests

    • Added comprehensive multimodal embedding test coverage

- Add embed_query() default to DenseEmbedderBase (delegates to embed())
- GeminiDenseEmbedder overrides embed_query() using _query_config
  (RETRIEVAL_QUERY); embed() uses _index_config (RETRIEVAL_DOCUMENT)
- Update hierarchical_retriever + memory_deduplicator to call embed_query()
- Deprecate task_type config field (still accepted, no validation error)
- Add enable_multimodal: bool = False flag; supports_multimodal reflects it
- Add embed_multimodal_batch / async_embed_multimodal_batch to base class
- Add Gemini async_embed_multimodal_batch override (anyio semaphore)
- Rewrite embed_multimodal: parts API + pdfminer PDF guard (gated by flag)
- Fix PR volcengine#607 issues #1, #2, #4, #6
…n queuefs/

- Create openviking/storage/queuefs/embedding_handler.py with EmbeddingHandler
  (same logic, corrected class name + docstring)
- Replace TextEmbeddingHandler class body in collection_schemas.py with
  import + backward-compat alias TextEmbeddingHandler = EmbeddingHandler
- Update queue_manager.py to import EmbeddingHandler directly from queuefs
- Fixes PR volcengine#607 issue #3
… embedding

- Add ContentPart = Union[str, ModalContent] type alias
- Add parts: Optional[List[ContentPart]] field to Vectorize
- Add get_parts(): returns parts if set, else builds [text, media] from legacy fields
- Add multi-part integration tests (TestGeminiE2EMultipartEmbedding)
- Fixes PR volcengine#607 issue #1 (multi-part sequences)
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 15, 2026

📝 Walkthrough

Walkthrough

This PR enhances the embedding infrastructure to support multimodal embeddings (images, PDFs, video), introduces batch and async batch processing methods to embedders, refactors embedding handlers, and updates embedder API usage across the codebase. The changes introduce an enable_multimodal feature flag for Gemini embedders, extend Vectorize to support explicit parts lists, and modernize embedding handler implementations.

Changes

Cohort / File(s) Summary
Vectorize Enhancement
openviking/core/context.py
Added ContentPart type alias and parts parameter to Vectorize constructor; introduced get_parts() method to return canonical parts list (explicit parts, or constructed from text/media).
Embedder Base API
openviking/models/embedder/base.py
Added three public methods to DenseEmbedderBase: embed_query(text) for text queries, embed_multimodal_batch(vectorizes) for sequential batch processing, and async_embed_multimodal_batch(vectorizes) for concurrent batch processing.
Gemini Multimodal Implementation
openviking/models/embedder/gemini_embedders.py
Introduced enable_multimodal flag and multimodal embeddings support including PDF page validation, MIME type checking, and fallback-to-text logic. Added embed_query() using RETRIEVAL_QUERY task type, enhanced embed_multimodal() with parts handling and media validation, and async_embed_multimodal_batch() using anyio concurrency. Refactored config to separate _index_config, _query_config, and _embed_config.
Embedder API Migration
openviking/retrieve/hierarchical_retriever.py, openviking/session/memory_deduplicator.py
Updated embedder calls from embed(query) to embed_query(query) to use the new dedicated query embedding API.
Embedding Handler Refactoring
openviking/storage/collection_schemas.py, openviking/storage/queuefs/embedding_handler.py, openviking/storage/queuefs/queue_manager.py
Removed large TextEmbeddingHandler class and replaced with backward-compat alias to EmbeddingHandler. Introduced new EmbeddingHandler class that processes dequeued embedding messages with multimodal support, PDF page checks, and deterministic ID seeding. Updated queue manager to use EmbeddingHandler.
Configuration
openviking_cli/utils/config/embedding_config.py
Added enable_multimodal boolean field to EmbeddingModelConfig with default False, and propagated it to Gemini dense embedder constructor.
Unit Tests
tests/unit/test_embedder_base.py, tests/unit/test_gemini_embedder.py, tests/unit/test_vectorize_parts.py
Added tests for DenseEmbedderBase.embed_query() fallback behavior, Vectorize get_parts() with legacy and explicit modes, and comprehensive Gemini multimodal flag behavior including fallbacks, PDF validation, and error handling.
Integration & Validation Tests
tests/integration/test_gemini_e2e.py, tests/misc/test_config_validation.py
Added multimodal embedding end-to-end tests and config validation test for deprecated task_type field with deprecation notice.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant GeminiDenseEmbedder
    participant VikingFS
    participant GeminiAPI
    
    Client->>GeminiDenseEmbedder: embed_multimodal(vectorize)<br/>(multimodal enabled)
    GeminiDenseEmbedder->>GeminiDenseEmbedder: get_parts() from vectorize
    
    alt Has media parts
        GeminiDenseEmbedder->>GeminiDenseEmbedder: Validate MIME types<br/>against whitelist
        
        alt PDF media
            GeminiDenseEmbedder->>VikingFS: Read PDF file bytes
            VikingFS-->>GeminiDenseEmbedder: PDF content
            GeminiDenseEmbedder->>GeminiDenseEmbedder: Count PDF pages
            
            alt Pages <= limit
                GeminiDenseEmbedder->>GeminiAPI: Request multimodal embed<br/>(parts: text+image+text)
            else Pages > limit
                GeminiDenseEmbedder->>GeminiDenseEmbedder: Fallback to text-only<br/>embedding
            end
        else Image/other media
            GeminiDenseEmbedder->>VikingFS: Read media file bytes
            VikingFS-->>GeminiDenseEmbedder: Media content
            GeminiDenseEmbedder->>GeminiAPI: Request multimodal embed
        end
    else No supported media
        GeminiDenseEmbedder->>GeminiDenseEmbedder: Fallback to text<br/>embedding only
    end
    
    alt API Success
        GeminiAPI-->>GeminiDenseEmbedder: Embedding result
    else API Transient Error
        GeminiDenseEmbedder->>GeminiDenseEmbedder: Fallback to text<br/>embedding
    end
    
    GeminiDenseEmbedder-->>Client: EmbedResult(dense_vector)
Loading
sequenceDiagram
    participant QueueManager
    participant EmbeddingHandler
    participant VikingFS
    participant Embedder
    participant VikingVectorIndexBackend
    participant Database
    
    QueueManager->>EmbeddingHandler: on_dequeue(data: Dict)
    EmbeddingHandler->>EmbeddingHandler: Lazy init embedder<br/>if needed
    
    alt Multimodal enabled & media provided
        EmbeddingHandler->>VikingFS: Read media file bytes
        VikingFS-->>EmbeddingHandler: File content
        EmbeddingHandler->>Embedder: embed_multimodal(vectorize)<br/>(text + media)
        
        alt Success
            Embedder-->>EmbeddingHandler: EmbedResult
        else Failure
            EmbeddingHandler->>Embedder: embed(text_only)<br/>fallback
            Embedder-->>EmbeddingHandler: EmbedResult
        end
    else Text-only
        EmbeddingHandler->>Embedder: embed(text)
        Embedder-->>EmbeddingHandler: EmbedResult
    end
    
    EmbeddingHandler->>EmbeddingHandler: Validate dense vector<br/>dimension
    EmbeddingHandler->>EmbeddingHandler: Generate deterministic<br/>ID from uri+account+level
    EmbeddingHandler->>VikingVectorIndexBackend: upsert(id, vectors, data)
    VikingVectorIndexBackend->>Database: Write embedding
    Database-->>VikingVectorIndexBackend: Success
    VikingVectorIndexBackend-->>EmbeddingHandler: Confirmation
    EmbeddingHandler-->>QueueManager: Success result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 Parts and embeddings, oh what a sight!
Multimodal magic takes flight!
PDFs and pictures in parts so neat,
Async batches complete the feat! 🌙✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 62.07% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the three main changes: task_type per-operation handling, EmbeddingHandler relocation, and multimodal feature flag introduction.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch resolve
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the embedding system by introducing per-operation task type configurations for Gemini embedders, allowing for distinct embedding strategies for queries versus documents. It also lays the groundwork for advanced multimodal embedding by adding a flexible parts API, PDF handling, and batch processing, all controlled by a feature flag for staged rollout. These changes improve the system's adaptability, modularity, and support for diverse content types.

Highlights

  • Per-Operation Task Type for Embeddings: Introduced an embed_query() method in DenseEmbedderBase that delegates to embed(). The GeminiDenseEmbedder now overrides this to use a dedicated RETRIEVAL_QUERY task type for queries, while embed() uses RETRIEVAL_DOCUMENT for indexing. Existing task_type configuration is now deprecated but still accepted for backward compatibility.
  • Embedding Handler Relocation and Renaming: The TextEmbeddingHandler class was moved from collection_schemas.py to a new file queuefs/embedding_handler.py and renamed to EmbeddingHandler. A backward-compatible alias TextEmbeddingHandler is maintained in the old location. Associated unit tests were consolidated.
  • Multimodal Embedding with Feature Flag: Implemented multimodal embedding capabilities, including support for arbitrary ContentPart sequences in the Vectorize object via a new parts API and get_parts() method. Multimodal code is gated behind an enable_multimodal feature flag (defaulting to False). A PDF page guard using pdfminer-six was added for Gemini embeddings, and batch multimodal embedding methods (embed_multimodal_batch, async_embed_multimodal_batch) were introduced.
Changelog
  • openviking/core/context.py
    • Added Union to imports.
    • Defined ContentPart type for flexible content representation.
    • Modified Vectorize class to include an optional parts list in its constructor.
    • Implemented get_parts() method in Vectorize to retrieve content parts, prioritizing the parts list or constructing from legacy text and media fields.
  • openviking/models/embedder/base.py
    • Added embed_query method to DenseEmbedderBase for query-specific embeddings, defaulting to embed().
    • Introduced embed_multimodal_batch and async_embed_multimodal_batch methods for batch processing of multimodal content, with a default sequential implementation.
  • openviking/models/embedder/gemini_embedders.py
    • Added _GEMINI_PDF_MAX_PAGES constant and _count_pdf_pages utility for PDF page counting.
    • Modified GeminiDenseEmbedder constructor to accept an enable_multimodal flag.
    • Refactored __init__ to configure separate _index_config (for general embedding) and _query_config (specifically for queries).
    • Updated supports_multimodal property to reflect the _enable_multimodal flag.
    • Implemented embed_query to utilize the _query_config with RETRIEVAL_QUERY task type.
    • Rewrote embed_multimodal to process Vectorize objects using the new parts API, enforce PDF page limits, and filter unsupported media types.
    • Implemented async_embed_multimodal_batch using anyio.Semaphore for concurrent multimodal batch processing.
  • openviking/retrieve/hierarchical_retriever.py
    • Updated retrieve method to use embed_query() for embedding query strings.
  • openviking/session/memory_deduplicator.py
    • Updated _find_similar_memories method to use embed_query() for embedding candidate memory texts.
  • openviking/storage/collection_schemas.py
    • Removed the TextEmbeddingHandler class.
    • Added a backward-compatible alias TextEmbeddingHandler = EmbeddingHandler.
  • openviking/storage/queuefs/embedding_handler.py
    • Added a new file containing the EmbeddingHandler class, moved and renamed from collection_schemas.py.
    • Updated docstrings to reflect multimodal embedding capabilities.
  • openviking/storage/queuefs/queue_manager.py
    • Updated import statement for EmbeddingHandler to its new location.
    • Instantiated EmbeddingHandler instead of TextEmbeddingHandler for the embedding queue.
    • Updated logging message to reflect the new handler class name.
  • openviking_cli/utils/config/embedding_config.py
    • Modified task_type field description to mark it as deprecated and clarify its role for indexing.
    • Added enable_multimodal boolean field to control multimodal embedding, defaulting to False.
    • Updated _create_embedder to pass the enable_multimodal configuration to the GeminiDenseEmbedder.
  • tests/integration/test_gemini_e2e.py
    • Added TestGeminiE2EMultipartEmbedding class with tests for text-only and multi-part text-image-text embeddings.
  • tests/misc/test_config_validation.py
    • Added a test to confirm the deprecated task_type field is still accepted and its description includes a deprecation notice.
  • tests/unit/test_embedder_base.py
    • Added a new file with unit tests for DenseEmbedderBase, specifically verifying embed_query() defaults to embed().
  • tests/unit/test_gemini_embedder.py
    • Removed redundant constant-testing unit tests.
    • Updated test_supports_multimodal_true to check the enable_multimodal flag.
    • Added tests for embed() using RETRIEVAL_DOCUMENT by default and embed_query() using RETRIEVAL_QUERY.
    • Introduced tests for the enable_multimodal feature flag and its impact on embed_multimodal fallback behavior.
    • Consolidated fallback tests for no-media and unsupported MIME types into a single parameterized test.
    • Added tests for multi-part embeddings, PDF page limit enforcement, and handling of unknown PDF page counts.
  • tests/unit/test_vectorize_parts.py
    • Added a new file with unit tests for Vectorize.get_parts(), covering various scenarios including legacy fields, explicit parts, and empty inputs.
Activity
  • Unit tests for embed_query() base fallback were executed and passed.
  • Unit tests for Vectorize.get_parts() covering 5 cases were executed and passed.
  • Unit tests for Gemini embedder, including feature flag, task_type split, PDF guard, multi-part, and batch functionality, were executed and passed.
  • Unit tests for task_type deprecation notice in config validation were executed and passed.
  • All 212 existing tests passed with 0 new failures.
  • Integration tests for test_gemini_e2e.py are pending manual execution (requires GEMINI_API_KEY).
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant enhancements to the embedding functionality. It refactors task_type handling to be per-operation, relocates and renames TextEmbeddingHandler for better organization, and adds comprehensive support for multimodal embeddings behind a feature flag, including a new parts API and batch processing methods. The changes are well-structured and supported by an extensive set of new and updated tests. My review focuses on improving the robustness of the new asynchronous batch embedding logic.

Comment on lines +305 to +313
except RuntimeError as e:
if "transient" in str(e).lower():
raise
logger.warning(
f"async_embed_multimodal_batch item {idx} failed: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This except block introduces redundant fallback logic and makes the batch operation brittle.

The embed_multimodal method (called in the try block) already has its own internal fallback to text embedding for non-transient errors. If that internal fallback also fails, it raises a RuntimeError. This block catches that error and then attempts the exact same text embedding fallback again.

This second fallback attempt is not only redundant but also brittle. If it fails, the unhandled exception will cause the entire batch operation to terminate.

A more robust approach would be to catch the final RuntimeError from embed_multimodal, log it as an unrecoverable error for that specific item, and allow the rest of the batch to proceed. This would prevent one failed item from halting the entire process. For example:

                except RuntimeError as e:
                    if "transient" in str(e).lower():
                        # Propagate transient errors so the caller can retry the batch.
                        raise
                    # embed_multimodal already attempted a fallback.
                    # This is an unrecoverable error for this item.
                    logger.error(f"Unrecoverable error for item {idx} in async_embed_multimodal_batch: {e}. Skipping item.")

Note that this change would require adjusting the final return statement to handle cases where an item in results remains None (e.g., by filtering them out), which would alter the method's contract.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
openviking_cli/utils/config/embedding_config.py (1)

27-27: ⚠️ Potential issue | 🟡 Minor

Minor typo in field description.

"Access Key Secretfor" should be "Access Key Secret for" (missing space).

✏️ Fix
-    sk: Optional[str] = Field(default=None, description="Access Key Secretfor VikingDB API")
+    sk: Optional[str] = Field(default=None, description="Access Key Secret for VikingDB API")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking_cli/utils/config/embedding_config.py` at line 27, Fix the typo in
the Field description for the sk variable in embedding_config.py: update the
description of sk: Optional[str] = Field(default=None, description="Access Key
Secretfor VikingDB API") to insert the missing space so it reads "...Access Key
Secret for VikingDB API...", ensuring the Field description string for sk is
corrected.
🧹 Nitpick comments (3)
tests/misc/test_config_validation.py (2)

238-251: Consider consolidating with test_gemini_task_type_field.

This test duplicates the config creation and task_type assertion from test_gemini_task_type_field (lines 227-235). The only new assertion is checking for "DEPRECATED" in the field description. Consider merging the deprecation check into the existing test to avoid redundancy.

♻️ Suggested consolidation
 def test_gemini_task_type_field():
     from openviking_cli.utils.config.embedding_config import EmbeddingModelConfig
     cfg = EmbeddingModelConfig(
         model="gemini-embedding-2-preview",
         provider="gemini",
         api_key="test-key",
         task_type="RETRIEVAL_DOCUMENT",
     )
     assert cfg.task_type == "RETRIEVAL_DOCUMENT"
+    # Verify deprecation notice is present in field description
+    field_info = EmbeddingModelConfig.model_fields["task_type"]
+    assert "DEPRECATED" in (field_info.description or "")
-
-
-def test_gemini_task_type_field_still_accepted_with_deprecation_notice():
-    """task_type is deprecated but must still be accepted — no validation error."""
-    from openviking_cli.utils.config.embedding_config import EmbeddingModelConfig
-    cfg = EmbeddingModelConfig(
-        model="gemini-embedding-2-preview",
-        provider="gemini",
-        api_key="test-key",
-        task_type="RETRIEVAL_DOCUMENT",
-    )
-    assert cfg.task_type == "RETRIEVAL_DOCUMENT"
-    # Field description must carry the deprecation notice
-    field_info = EmbeddingModelConfig.model_fields["task_type"]
-    assert "DEPRECATED" in (field_info.description or "")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/misc/test_config_validation.py` around lines 238 - 251, The two tests
duplicate EmbeddingModelConfig construction and task_type assertion; consolidate
by removing test_gemini_task_type_field_still_accepted_with_deprecation_notice
and adding the deprecation-check assertion into the existing
test_gemini_task_type_field: after the existing cfg creation and assert
cfg.task_type == "RETRIEVAL_DOCUMENT", add the line that grabs field_info =
EmbeddingModelConfig.model_fields["task_type"] and assert "DEPRECATED" in
(field_info.description or "") so the single test covers both acceptance and
deprecation metadata while eliminating redundancy.

209-209: Move import to the top of the file.

The import pytest statement is placed mid-file after function definitions. Move it to the top with other imports (lines 6-11) for better organization and consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/misc/test_config_validation.py` at line 209, The stray "import pytest"
is placed mid-file; move that import up into the module import block alongside
the other imports at the top of tests/misc/test_config_validation.py so all
imports are grouped together; ensure there are no duplicate imports and run
tests to confirm nothing else relies on an import-order side-effect.
openviking/models/embedder/base.py (1)

163-169: Consider adding a concurrency limit or documenting the expectation.

The base async_embed_multimodal_batch spawns one thread per item via asyncio.to_thread with no concurrency bound. For large batches, this could overwhelm the thread pool. GeminiDenseEmbedder correctly overrides with a semaphore, but other subclasses might inadvertently use this unbounded default.

Consider either adding a default semaphore or documenting that subclasses should override for production use.

💡 Optional: Add default concurrency limit
     async def async_embed_multimodal_batch(
         self, vectorizes: List["Vectorize"]
     ) -> List["EmbedResult"]:
-        """Concurrent batch embed via asyncio.gather + thread pool."""
+        """Concurrent batch embed via asyncio.gather + thread pool.
+        
+        Note: Subclasses should override with bounded concurrency for production.
+        """
         import asyncio
         tasks = [asyncio.to_thread(self.embed_multimodal, v) for v in vectorizes]
         return list(await asyncio.gather(*tasks))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking/models/embedder/base.py` around lines 163 - 169, The base
async_embed_multimodal_batch currently spawns one thread per Vectorize via
asyncio.to_thread with no bound; update async_embed_multimodal_batch to limit
concurrency (e.g., create an asyncio.Semaphore with a sane default like 10, wrap
the per-item to_thread call in a short helper coroutine that acquires/releases
the semaphore, and gather those helper coroutines) so large batches won't
exhaust the thread pool, or alternatively expand the method docstring to
explicitly state subclasses must override with their own semaphore/limit for
production; locate the method async_embed_multimodal_batch in the Embedder base
class and apply the semaphore pattern or docstring change there.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@openviking/models/embedder/gemini_embedders.py`:
- Around line 296-319: The current _embed_one coroutine can let non-RuntimeError
exceptions escape, leaving results[idx] as None; modify _embed_one to add a
broad except Exception as e handler after the existing RuntimeError branch that
logs the exception (include idx and exception) and then attempts the same text
fallback by calling self.embed via anyio.to_thread.run_sync (using getattr(v,
"text", "")). Ensure every control path assigns results[idx] (either the
multimodal result or the text fallback result); if the text fallback also fails,
catch that error, log it, and set results[idx] to a safe failure value (e.g., a
deterministic EmbedResult or a clearly documented placeholder) so the final
return [r for r in results] never contains None. Reference: _embed_one,
embed_multimodal, embed, and results.
- Around line 305-313: Replace the brittle string-match transient detection in
async_embed_multimodal_batch with a concrete error type or code: define a custom
exception class (e.g., TransientEmbeddingError subclassing RuntimeError) or add
a specific attribute/code to the error raised where the transient condition
originates, update that raising site to raise TransientEmbeddingError (or set
e.code = "TRANSIENT"), and then change the except block in
async_embed_multimodal_batch to check isinstance(e, TransientEmbeddingError) (or
check e.code == "TRANSIENT") and re-raise only for that case; otherwise log the
failure and fall back to text embedding by calling self.embed as before.

In `@openviking/storage/queuefs/embedding_handler.py`:
- Around line 178-203: The handler currently calls report_success() even when
self._vikingdb.upsert(inserted_data) returns an empty/falsy record_id (which
signals a rejected payload); update the logic after calling _vikingdb.upsert to
treat falsy record_id as a failure: if record_id is falsy, call
self.report_error(...) with a clear message (including inserted_data or
abstract), do not call self.report_success(), and return None; keep existing
success path only for truthy record_id. Reference symbols: _vikingdb.upsert,
record_id, inserted_data, report_error, report_success,
VikingVectorIndexBackend.upsert.
- Around line 64-68: The on_dequeue handler returns None for empty data without
signaling completion, leaving NamedQueue's in-progress counter inconsistent;
modify async def on_dequeue(self, ...) to call the queue completion API (e.g.,
self.report_error("Empty dequeue payload") or self.report_success()) before
returning when data is falsy so the NamedQueue in-progress state is decremented
and the queue remains consistent; locate the change in the on_dequeue function
and ensure the chosen call matches the intended semantics for empty payloads.
- Around line 149-157: The current check uses "if result.dense_vector:" which
lets missing or empty vectors slip through; change the logic in the embedding
handling flow (around result.dense_vector, inserted_data["vector"], and the
self._vector_dim validation) to treat absent or empty dense vectors as a hard
failure: explicitly check for None or zero-length (e.g., if result.dense_vector
is None or len(result.dense_vector) == 0), call self.report_error with a clear
message and return None, and only after that perform the length equality check
against self._vector_dim before assigning inserted_data["vector"].

---

Outside diff comments:
In `@openviking_cli/utils/config/embedding_config.py`:
- Line 27: Fix the typo in the Field description for the sk variable in
embedding_config.py: update the description of sk: Optional[str] =
Field(default=None, description="Access Key Secretfor VikingDB API") to insert
the missing space so it reads "...Access Key Secret for VikingDB API...",
ensuring the Field description string for sk is corrected.

---

Nitpick comments:
In `@openviking/models/embedder/base.py`:
- Around line 163-169: The base async_embed_multimodal_batch currently spawns
one thread per Vectorize via asyncio.to_thread with no bound; update
async_embed_multimodal_batch to limit concurrency (e.g., create an
asyncio.Semaphore with a sane default like 10, wrap the per-item to_thread call
in a short helper coroutine that acquires/releases the semaphore, and gather
those helper coroutines) so large batches won't exhaust the thread pool, or
alternatively expand the method docstring to explicitly state subclasses must
override with their own semaphore/limit for production; locate the method
async_embed_multimodal_batch in the Embedder base class and apply the semaphore
pattern or docstring change there.

In `@tests/misc/test_config_validation.py`:
- Around line 238-251: The two tests duplicate EmbeddingModelConfig construction
and task_type assertion; consolidate by removing
test_gemini_task_type_field_still_accepted_with_deprecation_notice and adding
the deprecation-check assertion into the existing test_gemini_task_type_field:
after the existing cfg creation and assert cfg.task_type ==
"RETRIEVAL_DOCUMENT", add the line that grabs field_info =
EmbeddingModelConfig.model_fields["task_type"] and assert "DEPRECATED" in
(field_info.description or "") so the single test covers both acceptance and
deprecation metadata while eliminating redundancy.
- Line 209: The stray "import pytest" is placed mid-file; move that import up
into the module import block alongside the other imports at the top of
tests/misc/test_config_validation.py so all imports are grouped together; ensure
there are no duplicate imports and run tests to confirm nothing else relies on
an import-order side-effect.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c79c44a7-6a94-4f89-9bc9-e8a71d3d0f5a

📥 Commits

Reviewing files that changed from the base of the PR and between 01bad0d and 8e01ab8.

📒 Files selected for processing (14)
  • openviking/core/context.py
  • openviking/models/embedder/base.py
  • openviking/models/embedder/gemini_embedders.py
  • openviking/retrieve/hierarchical_retriever.py
  • openviking/session/memory_deduplicator.py
  • openviking/storage/collection_schemas.py
  • openviking/storage/queuefs/embedding_handler.py
  • openviking/storage/queuefs/queue_manager.py
  • openviking_cli/utils/config/embedding_config.py
  • tests/integration/test_gemini_e2e.py
  • tests/misc/test_config_validation.py
  • tests/unit/test_embedder_base.py
  • tests/unit/test_gemini_embedder.py
  • tests/unit/test_vectorize_parts.py

Comment on lines +296 to +319
results: List[Optional[EmbedResult]] = [None] * len(vectorizes)
sem = anyio.Semaphore(self._max_concurrent_batches)

async def _embed_one(idx: int, v: "Vectorize") -> None:
async with sem:
try:
results[idx] = await anyio.to_thread.run_sync(
self.embed_multimodal, v
)
except RuntimeError as e:
if "transient" in str(e).lower():
raise
logger.warning(
f"async_embed_multimodal_batch item {idx} failed: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)

async with anyio.create_task_group() as tg:
for idx, v in enumerate(vectorizes):
tg.start_soon(_embed_one, idx, v)

return [r for r in results] # type: ignore[return-value]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential None values in results list if exception escapes.

If an exception other than the handled RuntimeError occurs in _embed_one, the corresponding results[idx] remains None. The list comprehension at line 319 would then include None values, causing downstream issues.

🛡️ Suggested fix: add broader exception handling
         async def _embed_one(idx: int, v: "Vectorize") -> None:
             async with sem:
                 try:
                     results[idx] = await anyio.to_thread.run_sync(
                         self.embed_multimodal, v
                     )
                 except RuntimeError as e:
                     if "transient" in str(e).lower():
                         raise
                     logger.warning(
                         f"async_embed_multimodal_batch item {idx} failed: {e}. "
                         "Falling back to text embed."
                     )
                     text = getattr(v, "text", "")
                     results[idx] = await anyio.to_thread.run_sync(self.embed, text)
+                except Exception as e:
+                    logger.warning(
+                        f"async_embed_multimodal_batch item {idx} unexpected error: {e}. "
+                        "Falling back to text embed."
+                    )
+                    text = getattr(v, "text", "")
+                    results[idx] = await anyio.to_thread.run_sync(self.embed, text)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
results: List[Optional[EmbedResult]] = [None] * len(vectorizes)
sem = anyio.Semaphore(self._max_concurrent_batches)
async def _embed_one(idx: int, v: "Vectorize") -> None:
async with sem:
try:
results[idx] = await anyio.to_thread.run_sync(
self.embed_multimodal, v
)
except RuntimeError as e:
if "transient" in str(e).lower():
raise
logger.warning(
f"async_embed_multimodal_batch item {idx} failed: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)
async with anyio.create_task_group() as tg:
for idx, v in enumerate(vectorizes):
tg.start_soon(_embed_one, idx, v)
return [r for r in results] # type: ignore[return-value]
results: List[Optional[EmbedResult]] = [None] * len(vectorizes)
sem = anyio.Semaphore(self._max_concurrent_batches)
async def _embed_one(idx: int, v: "Vectorize") -> None:
async with sem:
try:
results[idx] = await anyio.to_thread.run_sync(
self.embed_multimodal, v
)
except RuntimeError as e:
if "transient" in str(e).lower():
raise
logger.warning(
f"async_embed_multimodal_batch item {idx} failed: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)
except Exception as e:
logger.warning(
f"async_embed_multimodal_batch item {idx} unexpected error: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)
async with anyio.create_task_group() as tg:
for idx, v in enumerate(vectorizes):
tg.start_soon(_embed_one, idx, v)
return [r for r in results] # type: ignore[return-value]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking/models/embedder/gemini_embedders.py` around lines 296 - 319, The
current _embed_one coroutine can let non-RuntimeError exceptions escape, leaving
results[idx] as None; modify _embed_one to add a broad except Exception as e
handler after the existing RuntimeError branch that logs the exception (include
idx and exception) and then attempts the same text fallback by calling
self.embed via anyio.to_thread.run_sync (using getattr(v, "text", "")). Ensure
every control path assigns results[idx] (either the multimodal result or the
text fallback result); if the text fallback also fails, catch that error, log
it, and set results[idx] to a safe failure value (e.g., a deterministic
EmbedResult or a clearly documented placeholder) so the final return [r for r in
results] never contains None. Reference: _embed_one, embed_multimodal, embed,
and results.

Comment on lines +305 to +313
except RuntimeError as e:
if "transient" in str(e).lower():
raise
logger.warning(
f"async_embed_multimodal_batch item {idx} failed: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fragile transient error detection via string matching.

The pattern "transient" in str(e).lower() is brittle — it depends on the specific wording in the error message raised at line 207. If that message changes, this detection breaks silently.

Consider using a custom exception type or checking for specific error codes instead.

💡 Suggested improvement
+class GeminiTransientError(RuntimeError):
+    """Transient API error that should be retried by caller."""
+    pass

 # In embed_multimodal:
         except APIError as e:
             if e.code in (429, 502, 503, 504):
-                raise RuntimeError(f"Gemini transient error (code={e.code}), caller should retry") from e
+                raise GeminiTransientError(f"Gemini transient error (code={e.code}), caller should retry") from e

 # In async_embed_multimodal_batch:
-                except RuntimeError as e:
-                    if "transient" in str(e).lower():
+                except GeminiTransientError:
                         raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking/models/embedder/gemini_embedders.py` around lines 305 - 313,
Replace the brittle string-match transient detection in
async_embed_multimodal_batch with a concrete error type or code: define a custom
exception class (e.g., TransientEmbeddingError subclassing RuntimeError) or add
a specific attribute/code to the error raised where the transient condition
originates, update that raising site to raise TransientEmbeddingError (or set
e.code = "TRANSIENT"), and then change the except block in
async_embed_multimodal_batch to check isinstance(e, TransientEmbeddingError) (or
check e.code == "TRANSIENT") and re-raise only for that case; otherwise log the
failure and fall back to text embedding by calling self.embed as before.

Comment on lines +64 to +68
async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Process dequeued message and add embedding vector(s)."""
if not data:
return None

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Always signal completion/error on empty dequeue payloads.

At Line 66-68, returning None without report_success()/report_error() can leave queue state inconsistent because NamedQueue increments in-progress before invoking handlers (see openviking/storage/queuefs/named_queue.py Line 219-229).

Proposed fix
 async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
     """Process dequeued message and add embedding vector(s)."""
     if not data:
-        return None
+        error_msg = "Empty dequeue payload"
+        logger.warning(error_msg)
+        self.report_error(error_msg, data)
+        return None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Process dequeued message and add embedding vector(s)."""
if not data:
return None
async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Process dequeued message and add embedding vector(s)."""
if not data:
error_msg = "Empty dequeue payload"
logger.warning(error_msg)
self.report_error(error_msg, data)
return None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking/storage/queuefs/embedding_handler.py` around lines 64 - 68, The
on_dequeue handler returns None for empty data without signaling completion,
leaving NamedQueue's in-progress counter inconsistent; modify async def
on_dequeue(self, ...) to call the queue completion API (e.g.,
self.report_error("Empty dequeue payload") or self.report_success()) before
returning when data is falsy so the NamedQueue in-progress state is decremented
and the queue remains consistent; locate the change in the on_dequeue function
and ensure the chosen call matches the intended semantics for empty payloads.

Comment on lines +149 to +157
if result.dense_vector:
inserted_data["vector"] = result.dense_vector
# Validate vector dimension
if len(result.dense_vector) != self._vector_dim:
error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}"
logger.error(error_msg)
self.report_error(error_msg, data)
return None

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Treat missing/empty dense vectors as a hard failure.

At Line 149, if result.dense_vector: is too permissive: a missing dense vector can bypass validation and still continue to DB write logic.

Proposed fix
-                # Add dense vector
-                if result.dense_vector:
-                    inserted_data["vector"] = result.dense_vector
-                    # Validate vector dimension
-                    if len(result.dense_vector) != self._vector_dim:
-                        error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}"
-                        logger.error(error_msg)
-                        self.report_error(error_msg, data)
-                        return None
+                # Add dense vector (required)
+                if result.dense_vector is None:
+                    error_msg = "Missing dense vector from embedder result"
+                    logger.error(error_msg)
+                    self.report_error(error_msg, data)
+                    return None
+                inserted_data["vector"] = result.dense_vector
+                # Validate vector dimension
+                if len(result.dense_vector) != self._vector_dim:
+                    error_msg = (
+                        f"Dense vector dimension mismatch: expected {self._vector_dim}, "
+                        f"got {len(result.dense_vector)}"
+                    )
+                    logger.error(error_msg)
+                    self.report_error(error_msg, data)
+                    return None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking/storage/queuefs/embedding_handler.py` around lines 149 - 157, The
current check uses "if result.dense_vector:" which lets missing or empty vectors
slip through; change the logic in the embedding handling flow (around
result.dense_vector, inserted_data["vector"], and the self._vector_dim
validation) to treat absent or empty dense vectors as a hard failure: explicitly
check for None or zero-length (e.g., if result.dense_vector is None or
len(result.dense_vector) == 0), call self.report_error with a clear message and
return None, and only after that perform the length equality check against
self._vector_dim before assigning inserted_data["vector"].

Comment on lines +178 to +203
record_id = await self._vikingdb.upsert(inserted_data)
if record_id:
logger.debug(
f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}"
)
except CollectionNotFoundError as db_err:
# During shutdown, queue workers may finish one dequeued item.
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
self.report_error(str(db_err), data)
return None
except Exception as db_err:
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
traceback.print_exc()
self.report_error(str(db_err), data)
return None

self.report_success()
return inserted_data
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not report success when upsert() returns an empty id.

At Line 178-203, record_id == "" still falls through to report_success(). VikingVectorIndexBackend.upsert() can return empty IDs for rejected payloads, so this currently risks silent data loss.

Proposed fix
                 record_id = await self._vikingdb.upsert(inserted_data)
-                if record_id:
-                    logger.debug(
-                        f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}"
-                    )
+                if not record_id:
+                    error_msg = "Vector DB upsert returned empty id"
+                    logger.error(error_msg)
+                    self.report_error(error_msg, data)
+                    return None
+                logger.debug(
+                    "Successfully wrote embedding to database: id=%s uri=%s",
+                    record_id,
+                    inserted_data.get("uri"),
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
record_id = await self._vikingdb.upsert(inserted_data)
if record_id:
logger.debug(
f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}"
)
except CollectionNotFoundError as db_err:
# During shutdown, queue workers may finish one dequeued item.
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
self.report_error(str(db_err), data)
return None
except Exception as db_err:
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
traceback.print_exc()
self.report_error(str(db_err), data)
return None
self.report_success()
return inserted_data
record_id = await self._vikingdb.upsert(inserted_data)
if not record_id:
error_msg = "Vector DB upsert returned empty id"
logger.error(error_msg)
self.report_error(error_msg, data)
return None
logger.debug(
"Successfully wrote embedding to database: id=%s uri=%s",
record_id,
inserted_data.get("uri"),
)
except CollectionNotFoundError as db_err:
# During shutdown, queue workers may finish one dequeued item.
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
self.report_error(str(db_err), data)
return None
except Exception as db_err:
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self.report_success()
return None
logger.error(f"Failed to write to vector database: {db_err}")
traceback.print_exc()
self.report_error(str(db_err), data)
return None
self.report_success()
return inserted_data
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openviking/storage/queuefs/embedding_handler.py` around lines 178 - 203, The
handler currently calls report_success() even when
self._vikingdb.upsert(inserted_data) returns an empty/falsy record_id (which
signals a rejected payload); update the logic after calling _vikingdb.upsert to
treat falsy record_id as a failure: if record_id is falsy, call
self.report_error(...) with a clear message (including inserted_data or
abstract), do not call self.report_success(), and return None; keep existing
success path only for truthy record_id. Reference symbols: _vikingdb.upsert,
record_id, inserted_data, report_error, report_success,
VikingVectorIndexBackend.upsert.

@chethanuk chethanuk closed this Mar 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant