fix(embedder): resolve PR #607 review — task_type per-op, EmbeddingHandler relocation, multimodal feature flag#2
Conversation
- Add embed_query() default to DenseEmbedderBase (delegates to embed()) - GeminiDenseEmbedder overrides embed_query() using _query_config (RETRIEVAL_QUERY); embed() uses _index_config (RETRIEVAL_DOCUMENT) - Update hierarchical_retriever + memory_deduplicator to call embed_query() - Deprecate task_type config field (still accepted, no validation error) - Add enable_multimodal: bool = False flag; supports_multimodal reflects it - Add embed_multimodal_batch / async_embed_multimodal_batch to base class - Add Gemini async_embed_multimodal_batch override (anyio semaphore) - Rewrite embed_multimodal: parts API + pdfminer PDF guard (gated by flag) - Fix PR volcengine#607 issues #1, #2, #4, #6
…n queuefs/ - Create openviking/storage/queuefs/embedding_handler.py with EmbeddingHandler (same logic, corrected class name + docstring) - Replace TextEmbeddingHandler class body in collection_schemas.py with import + backward-compat alias TextEmbeddingHandler = EmbeddingHandler - Update queue_manager.py to import EmbeddingHandler directly from queuefs - Fixes PR volcengine#607 issue #3
… embedding - Add ContentPart = Union[str, ModalContent] type alias - Add parts: Optional[List[ContentPart]] field to Vectorize - Add get_parts(): returns parts if set, else builds [text, media] from legacy fields - Add multi-part integration tests (TestGeminiE2EMultipartEmbedding) - Fixes PR volcengine#607 issue #1 (multi-part sequences)
📝 WalkthroughWalkthroughThis PR enhances the embedding infrastructure to support multimodal embeddings (images, PDFs, video), introduces batch and async batch processing methods to embedders, refactors embedding handlers, and updates embedder API usage across the codebase. The changes introduce an Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant GeminiDenseEmbedder
participant VikingFS
participant GeminiAPI
Client->>GeminiDenseEmbedder: embed_multimodal(vectorize)<br/>(multimodal enabled)
GeminiDenseEmbedder->>GeminiDenseEmbedder: get_parts() from vectorize
alt Has media parts
GeminiDenseEmbedder->>GeminiDenseEmbedder: Validate MIME types<br/>against whitelist
alt PDF media
GeminiDenseEmbedder->>VikingFS: Read PDF file bytes
VikingFS-->>GeminiDenseEmbedder: PDF content
GeminiDenseEmbedder->>GeminiDenseEmbedder: Count PDF pages
alt Pages <= limit
GeminiDenseEmbedder->>GeminiAPI: Request multimodal embed<br/>(parts: text+image+text)
else Pages > limit
GeminiDenseEmbedder->>GeminiDenseEmbedder: Fallback to text-only<br/>embedding
end
else Image/other media
GeminiDenseEmbedder->>VikingFS: Read media file bytes
VikingFS-->>GeminiDenseEmbedder: Media content
GeminiDenseEmbedder->>GeminiAPI: Request multimodal embed
end
else No supported media
GeminiDenseEmbedder->>GeminiDenseEmbedder: Fallback to text<br/>embedding only
end
alt API Success
GeminiAPI-->>GeminiDenseEmbedder: Embedding result
else API Transient Error
GeminiDenseEmbedder->>GeminiDenseEmbedder: Fallback to text<br/>embedding
end
GeminiDenseEmbedder-->>Client: EmbedResult(dense_vector)
sequenceDiagram
participant QueueManager
participant EmbeddingHandler
participant VikingFS
participant Embedder
participant VikingVectorIndexBackend
participant Database
QueueManager->>EmbeddingHandler: on_dequeue(data: Dict)
EmbeddingHandler->>EmbeddingHandler: Lazy init embedder<br/>if needed
alt Multimodal enabled & media provided
EmbeddingHandler->>VikingFS: Read media file bytes
VikingFS-->>EmbeddingHandler: File content
EmbeddingHandler->>Embedder: embed_multimodal(vectorize)<br/>(text + media)
alt Success
Embedder-->>EmbeddingHandler: EmbedResult
else Failure
EmbeddingHandler->>Embedder: embed(text_only)<br/>fallback
Embedder-->>EmbeddingHandler: EmbedResult
end
else Text-only
EmbeddingHandler->>Embedder: embed(text)
Embedder-->>EmbeddingHandler: EmbedResult
end
EmbeddingHandler->>EmbeddingHandler: Validate dense vector<br/>dimension
EmbeddingHandler->>EmbeddingHandler: Generate deterministic<br/>ID from uri+account+level
EmbeddingHandler->>VikingVectorIndexBackend: upsert(id, vectors, data)
VikingVectorIndexBackend->>Database: Write embedding
Database-->>VikingVectorIndexBackend: Success
VikingVectorIndexBackend-->>EmbeddingHandler: Confirmation
EmbeddingHandler-->>QueueManager: Success result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the embedding system by introducing per-operation task type configurations for Gemini embedders, allowing for distinct embedding strategies for queries versus documents. It also lays the groundwork for advanced multimodal embedding by adding a flexible parts API, PDF handling, and batch processing, all controlled by a feature flag for staged rollout. These changes improve the system's adaptability, modularity, and support for diverse content types. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces significant enhancements to the embedding functionality. It refactors task_type handling to be per-operation, relocates and renames TextEmbeddingHandler for better organization, and adds comprehensive support for multimodal embeddings behind a feature flag, including a new parts API and batch processing methods. The changes are well-structured and supported by an extensive set of new and updated tests. My review focuses on improving the robustness of the new asynchronous batch embedding logic.
| except RuntimeError as e: | ||
| if "transient" in str(e).lower(): | ||
| raise | ||
| logger.warning( | ||
| f"async_embed_multimodal_batch item {idx} failed: {e}. " | ||
| "Falling back to text embed." | ||
| ) | ||
| text = getattr(v, "text", "") | ||
| results[idx] = await anyio.to_thread.run_sync(self.embed, text) |
There was a problem hiding this comment.
This except block introduces redundant fallback logic and makes the batch operation brittle.
The embed_multimodal method (called in the try block) already has its own internal fallback to text embedding for non-transient errors. If that internal fallback also fails, it raises a RuntimeError. This block catches that error and then attempts the exact same text embedding fallback again.
This second fallback attempt is not only redundant but also brittle. If it fails, the unhandled exception will cause the entire batch operation to terminate.
A more robust approach would be to catch the final RuntimeError from embed_multimodal, log it as an unrecoverable error for that specific item, and allow the rest of the batch to proceed. This would prevent one failed item from halting the entire process. For example:
except RuntimeError as e:
if "transient" in str(e).lower():
# Propagate transient errors so the caller can retry the batch.
raise
# embed_multimodal already attempted a fallback.
# This is an unrecoverable error for this item.
logger.error(f"Unrecoverable error for item {idx} in async_embed_multimodal_batch: {e}. Skipping item.")Note that this change would require adjusting the final return statement to handle cases where an item in results remains None (e.g., by filtering them out), which would alter the method's contract.
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
openviking_cli/utils/config/embedding_config.py (1)
27-27:⚠️ Potential issue | 🟡 MinorMinor typo in field description.
"Access Key Secretfor" should be "Access Key Secret for" (missing space).
✏️ Fix
- sk: Optional[str] = Field(default=None, description="Access Key Secretfor VikingDB API") + sk: Optional[str] = Field(default=None, description="Access Key Secret for VikingDB API")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openviking_cli/utils/config/embedding_config.py` at line 27, Fix the typo in the Field description for the sk variable in embedding_config.py: update the description of sk: Optional[str] = Field(default=None, description="Access Key Secretfor VikingDB API") to insert the missing space so it reads "...Access Key Secret for VikingDB API...", ensuring the Field description string for sk is corrected.
🧹 Nitpick comments (3)
tests/misc/test_config_validation.py (2)
238-251: Consider consolidating withtest_gemini_task_type_field.This test duplicates the config creation and
task_typeassertion fromtest_gemini_task_type_field(lines 227-235). The only new assertion is checking for "DEPRECATED" in the field description. Consider merging the deprecation check into the existing test to avoid redundancy.♻️ Suggested consolidation
def test_gemini_task_type_field(): from openviking_cli.utils.config.embedding_config import EmbeddingModelConfig cfg = EmbeddingModelConfig( model="gemini-embedding-2-preview", provider="gemini", api_key="test-key", task_type="RETRIEVAL_DOCUMENT", ) assert cfg.task_type == "RETRIEVAL_DOCUMENT" + # Verify deprecation notice is present in field description + field_info = EmbeddingModelConfig.model_fields["task_type"] + assert "DEPRECATED" in (field_info.description or "") - - -def test_gemini_task_type_field_still_accepted_with_deprecation_notice(): - """task_type is deprecated but must still be accepted — no validation error.""" - from openviking_cli.utils.config.embedding_config import EmbeddingModelConfig - cfg = EmbeddingModelConfig( - model="gemini-embedding-2-preview", - provider="gemini", - api_key="test-key", - task_type="RETRIEVAL_DOCUMENT", - ) - assert cfg.task_type == "RETRIEVAL_DOCUMENT" - # Field description must carry the deprecation notice - field_info = EmbeddingModelConfig.model_fields["task_type"] - assert "DEPRECATED" in (field_info.description or "")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/misc/test_config_validation.py` around lines 238 - 251, The two tests duplicate EmbeddingModelConfig construction and task_type assertion; consolidate by removing test_gemini_task_type_field_still_accepted_with_deprecation_notice and adding the deprecation-check assertion into the existing test_gemini_task_type_field: after the existing cfg creation and assert cfg.task_type == "RETRIEVAL_DOCUMENT", add the line that grabs field_info = EmbeddingModelConfig.model_fields["task_type"] and assert "DEPRECATED" in (field_info.description or "") so the single test covers both acceptance and deprecation metadata while eliminating redundancy.
209-209: Move import to the top of the file.The
import pyteststatement is placed mid-file after function definitions. Move it to the top with other imports (lines 6-11) for better organization and consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/misc/test_config_validation.py` at line 209, The stray "import pytest" is placed mid-file; move that import up into the module import block alongside the other imports at the top of tests/misc/test_config_validation.py so all imports are grouped together; ensure there are no duplicate imports and run tests to confirm nothing else relies on an import-order side-effect.openviking/models/embedder/base.py (1)
163-169: Consider adding a concurrency limit or documenting the expectation.The base
async_embed_multimodal_batchspawns one thread per item viaasyncio.to_threadwith no concurrency bound. For large batches, this could overwhelm the thread pool.GeminiDenseEmbeddercorrectly overrides with a semaphore, but other subclasses might inadvertently use this unbounded default.Consider either adding a default semaphore or documenting that subclasses should override for production use.
💡 Optional: Add default concurrency limit
async def async_embed_multimodal_batch( self, vectorizes: List["Vectorize"] ) -> List["EmbedResult"]: - """Concurrent batch embed via asyncio.gather + thread pool.""" + """Concurrent batch embed via asyncio.gather + thread pool. + + Note: Subclasses should override with bounded concurrency for production. + """ import asyncio tasks = [asyncio.to_thread(self.embed_multimodal, v) for v in vectorizes] return list(await asyncio.gather(*tasks))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openviking/models/embedder/base.py` around lines 163 - 169, The base async_embed_multimodal_batch currently spawns one thread per Vectorize via asyncio.to_thread with no bound; update async_embed_multimodal_batch to limit concurrency (e.g., create an asyncio.Semaphore with a sane default like 10, wrap the per-item to_thread call in a short helper coroutine that acquires/releases the semaphore, and gather those helper coroutines) so large batches won't exhaust the thread pool, or alternatively expand the method docstring to explicitly state subclasses must override with their own semaphore/limit for production; locate the method async_embed_multimodal_batch in the Embedder base class and apply the semaphore pattern or docstring change there.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@openviking/models/embedder/gemini_embedders.py`:
- Around line 296-319: The current _embed_one coroutine can let non-RuntimeError
exceptions escape, leaving results[idx] as None; modify _embed_one to add a
broad except Exception as e handler after the existing RuntimeError branch that
logs the exception (include idx and exception) and then attempts the same text
fallback by calling self.embed via anyio.to_thread.run_sync (using getattr(v,
"text", "")). Ensure every control path assigns results[idx] (either the
multimodal result or the text fallback result); if the text fallback also fails,
catch that error, log it, and set results[idx] to a safe failure value (e.g., a
deterministic EmbedResult or a clearly documented placeholder) so the final
return [r for r in results] never contains None. Reference: _embed_one,
embed_multimodal, embed, and results.
- Around line 305-313: Replace the brittle string-match transient detection in
async_embed_multimodal_batch with a concrete error type or code: define a custom
exception class (e.g., TransientEmbeddingError subclassing RuntimeError) or add
a specific attribute/code to the error raised where the transient condition
originates, update that raising site to raise TransientEmbeddingError (or set
e.code = "TRANSIENT"), and then change the except block in
async_embed_multimodal_batch to check isinstance(e, TransientEmbeddingError) (or
check e.code == "TRANSIENT") and re-raise only for that case; otherwise log the
failure and fall back to text embedding by calling self.embed as before.
In `@openviking/storage/queuefs/embedding_handler.py`:
- Around line 178-203: The handler currently calls report_success() even when
self._vikingdb.upsert(inserted_data) returns an empty/falsy record_id (which
signals a rejected payload); update the logic after calling _vikingdb.upsert to
treat falsy record_id as a failure: if record_id is falsy, call
self.report_error(...) with a clear message (including inserted_data or
abstract), do not call self.report_success(), and return None; keep existing
success path only for truthy record_id. Reference symbols: _vikingdb.upsert,
record_id, inserted_data, report_error, report_success,
VikingVectorIndexBackend.upsert.
- Around line 64-68: The on_dequeue handler returns None for empty data without
signaling completion, leaving NamedQueue's in-progress counter inconsistent;
modify async def on_dequeue(self, ...) to call the queue completion API (e.g.,
self.report_error("Empty dequeue payload") or self.report_success()) before
returning when data is falsy so the NamedQueue in-progress state is decremented
and the queue remains consistent; locate the change in the on_dequeue function
and ensure the chosen call matches the intended semantics for empty payloads.
- Around line 149-157: The current check uses "if result.dense_vector:" which
lets missing or empty vectors slip through; change the logic in the embedding
handling flow (around result.dense_vector, inserted_data["vector"], and the
self._vector_dim validation) to treat absent or empty dense vectors as a hard
failure: explicitly check for None or zero-length (e.g., if result.dense_vector
is None or len(result.dense_vector) == 0), call self.report_error with a clear
message and return None, and only after that perform the length equality check
against self._vector_dim before assigning inserted_data["vector"].
---
Outside diff comments:
In `@openviking_cli/utils/config/embedding_config.py`:
- Line 27: Fix the typo in the Field description for the sk variable in
embedding_config.py: update the description of sk: Optional[str] =
Field(default=None, description="Access Key Secretfor VikingDB API") to insert
the missing space so it reads "...Access Key Secret for VikingDB API...",
ensuring the Field description string for sk is corrected.
---
Nitpick comments:
In `@openviking/models/embedder/base.py`:
- Around line 163-169: The base async_embed_multimodal_batch currently spawns
one thread per Vectorize via asyncio.to_thread with no bound; update
async_embed_multimodal_batch to limit concurrency (e.g., create an
asyncio.Semaphore with a sane default like 10, wrap the per-item to_thread call
in a short helper coroutine that acquires/releases the semaphore, and gather
those helper coroutines) so large batches won't exhaust the thread pool, or
alternatively expand the method docstring to explicitly state subclasses must
override with their own semaphore/limit for production; locate the method
async_embed_multimodal_batch in the Embedder base class and apply the semaphore
pattern or docstring change there.
In `@tests/misc/test_config_validation.py`:
- Around line 238-251: The two tests duplicate EmbeddingModelConfig construction
and task_type assertion; consolidate by removing
test_gemini_task_type_field_still_accepted_with_deprecation_notice and adding
the deprecation-check assertion into the existing test_gemini_task_type_field:
after the existing cfg creation and assert cfg.task_type ==
"RETRIEVAL_DOCUMENT", add the line that grabs field_info =
EmbeddingModelConfig.model_fields["task_type"] and assert "DEPRECATED" in
(field_info.description or "") so the single test covers both acceptance and
deprecation metadata while eliminating redundancy.
- Line 209: The stray "import pytest" is placed mid-file; move that import up
into the module import block alongside the other imports at the top of
tests/misc/test_config_validation.py so all imports are grouped together; ensure
there are no duplicate imports and run tests to confirm nothing else relies on
an import-order side-effect.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c79c44a7-6a94-4f89-9bc9-e8a71d3d0f5a
📒 Files selected for processing (14)
openviking/core/context.pyopenviking/models/embedder/base.pyopenviking/models/embedder/gemini_embedders.pyopenviking/retrieve/hierarchical_retriever.pyopenviking/session/memory_deduplicator.pyopenviking/storage/collection_schemas.pyopenviking/storage/queuefs/embedding_handler.pyopenviking/storage/queuefs/queue_manager.pyopenviking_cli/utils/config/embedding_config.pytests/integration/test_gemini_e2e.pytests/misc/test_config_validation.pytests/unit/test_embedder_base.pytests/unit/test_gemini_embedder.pytests/unit/test_vectorize_parts.py
| results: List[Optional[EmbedResult]] = [None] * len(vectorizes) | ||
| sem = anyio.Semaphore(self._max_concurrent_batches) | ||
|
|
||
| async def _embed_one(idx: int, v: "Vectorize") -> None: | ||
| async with sem: | ||
| try: | ||
| results[idx] = await anyio.to_thread.run_sync( | ||
| self.embed_multimodal, v | ||
| ) | ||
| except RuntimeError as e: | ||
| if "transient" in str(e).lower(): | ||
| raise | ||
| logger.warning( | ||
| f"async_embed_multimodal_batch item {idx} failed: {e}. " | ||
| "Falling back to text embed." | ||
| ) | ||
| text = getattr(v, "text", "") | ||
| results[idx] = await anyio.to_thread.run_sync(self.embed, text) | ||
|
|
||
| async with anyio.create_task_group() as tg: | ||
| for idx, v in enumerate(vectorizes): | ||
| tg.start_soon(_embed_one, idx, v) | ||
|
|
||
| return [r for r in results] # type: ignore[return-value] |
There was a problem hiding this comment.
Potential None values in results list if exception escapes.
If an exception other than the handled RuntimeError occurs in _embed_one, the corresponding results[idx] remains None. The list comprehension at line 319 would then include None values, causing downstream issues.
🛡️ Suggested fix: add broader exception handling
async def _embed_one(idx: int, v: "Vectorize") -> None:
async with sem:
try:
results[idx] = await anyio.to_thread.run_sync(
self.embed_multimodal, v
)
except RuntimeError as e:
if "transient" in str(e).lower():
raise
logger.warning(
f"async_embed_multimodal_batch item {idx} failed: {e}. "
"Falling back to text embed."
)
text = getattr(v, "text", "")
results[idx] = await anyio.to_thread.run_sync(self.embed, text)
+ except Exception as e:
+ logger.warning(
+ f"async_embed_multimodal_batch item {idx} unexpected error: {e}. "
+ "Falling back to text embed."
+ )
+ text = getattr(v, "text", "")
+ results[idx] = await anyio.to_thread.run_sync(self.embed, text)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| results: List[Optional[EmbedResult]] = [None] * len(vectorizes) | |
| sem = anyio.Semaphore(self._max_concurrent_batches) | |
| async def _embed_one(idx: int, v: "Vectorize") -> None: | |
| async with sem: | |
| try: | |
| results[idx] = await anyio.to_thread.run_sync( | |
| self.embed_multimodal, v | |
| ) | |
| except RuntimeError as e: | |
| if "transient" in str(e).lower(): | |
| raise | |
| logger.warning( | |
| f"async_embed_multimodal_batch item {idx} failed: {e}. " | |
| "Falling back to text embed." | |
| ) | |
| text = getattr(v, "text", "") | |
| results[idx] = await anyio.to_thread.run_sync(self.embed, text) | |
| async with anyio.create_task_group() as tg: | |
| for idx, v in enumerate(vectorizes): | |
| tg.start_soon(_embed_one, idx, v) | |
| return [r for r in results] # type: ignore[return-value] | |
| results: List[Optional[EmbedResult]] = [None] * len(vectorizes) | |
| sem = anyio.Semaphore(self._max_concurrent_batches) | |
| async def _embed_one(idx: int, v: "Vectorize") -> None: | |
| async with sem: | |
| try: | |
| results[idx] = await anyio.to_thread.run_sync( | |
| self.embed_multimodal, v | |
| ) | |
| except RuntimeError as e: | |
| if "transient" in str(e).lower(): | |
| raise | |
| logger.warning( | |
| f"async_embed_multimodal_batch item {idx} failed: {e}. " | |
| "Falling back to text embed." | |
| ) | |
| text = getattr(v, "text", "") | |
| results[idx] = await anyio.to_thread.run_sync(self.embed, text) | |
| except Exception as e: | |
| logger.warning( | |
| f"async_embed_multimodal_batch item {idx} unexpected error: {e}. " | |
| "Falling back to text embed." | |
| ) | |
| text = getattr(v, "text", "") | |
| results[idx] = await anyio.to_thread.run_sync(self.embed, text) | |
| async with anyio.create_task_group() as tg: | |
| for idx, v in enumerate(vectorizes): | |
| tg.start_soon(_embed_one, idx, v) | |
| return [r for r in results] # type: ignore[return-value] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openviking/models/embedder/gemini_embedders.py` around lines 296 - 319, The
current _embed_one coroutine can let non-RuntimeError exceptions escape, leaving
results[idx] as None; modify _embed_one to add a broad except Exception as e
handler after the existing RuntimeError branch that logs the exception (include
idx and exception) and then attempts the same text fallback by calling
self.embed via anyio.to_thread.run_sync (using getattr(v, "text", "")). Ensure
every control path assigns results[idx] (either the multimodal result or the
text fallback result); if the text fallback also fails, catch that error, log
it, and set results[idx] to a safe failure value (e.g., a deterministic
EmbedResult or a clearly documented placeholder) so the final return [r for r in
results] never contains None. Reference: _embed_one, embed_multimodal, embed,
and results.
| except RuntimeError as e: | ||
| if "transient" in str(e).lower(): | ||
| raise | ||
| logger.warning( | ||
| f"async_embed_multimodal_batch item {idx} failed: {e}. " | ||
| "Falling back to text embed." | ||
| ) | ||
| text = getattr(v, "text", "") | ||
| results[idx] = await anyio.to_thread.run_sync(self.embed, text) |
There was a problem hiding this comment.
Fragile transient error detection via string matching.
The pattern "transient" in str(e).lower() is brittle — it depends on the specific wording in the error message raised at line 207. If that message changes, this detection breaks silently.
Consider using a custom exception type or checking for specific error codes instead.
💡 Suggested improvement
+class GeminiTransientError(RuntimeError):
+ """Transient API error that should be retried by caller."""
+ pass
# In embed_multimodal:
except APIError as e:
if e.code in (429, 502, 503, 504):
- raise RuntimeError(f"Gemini transient error (code={e.code}), caller should retry") from e
+ raise GeminiTransientError(f"Gemini transient error (code={e.code}), caller should retry") from e
# In async_embed_multimodal_batch:
- except RuntimeError as e:
- if "transient" in str(e).lower():
+ except GeminiTransientError:
raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openviking/models/embedder/gemini_embedders.py` around lines 305 - 313,
Replace the brittle string-match transient detection in
async_embed_multimodal_batch with a concrete error type or code: define a custom
exception class (e.g., TransientEmbeddingError subclassing RuntimeError) or add
a specific attribute/code to the error raised where the transient condition
originates, update that raising site to raise TransientEmbeddingError (or set
e.code = "TRANSIENT"), and then change the except block in
async_embed_multimodal_batch to check isinstance(e, TransientEmbeddingError) (or
check e.code == "TRANSIENT") and re-raise only for that case; otherwise log the
failure and fall back to text embedding by calling self.embed as before.
| async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: | ||
| """Process dequeued message and add embedding vector(s).""" | ||
| if not data: | ||
| return None | ||
|
|
There was a problem hiding this comment.
Always signal completion/error on empty dequeue payloads.
At Line 66-68, returning None without report_success()/report_error() can leave queue state inconsistent because NamedQueue increments in-progress before invoking handlers (see openviking/storage/queuefs/named_queue.py Line 219-229).
Proposed fix
async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Process dequeued message and add embedding vector(s)."""
if not data:
- return None
+ error_msg = "Empty dequeue payload"
+ logger.warning(error_msg)
+ self.report_error(error_msg, data)
+ return None📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: | |
| """Process dequeued message and add embedding vector(s).""" | |
| if not data: | |
| return None | |
| async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: | |
| """Process dequeued message and add embedding vector(s).""" | |
| if not data: | |
| error_msg = "Empty dequeue payload" | |
| logger.warning(error_msg) | |
| self.report_error(error_msg, data) | |
| return None |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openviking/storage/queuefs/embedding_handler.py` around lines 64 - 68, The
on_dequeue handler returns None for empty data without signaling completion,
leaving NamedQueue's in-progress counter inconsistent; modify async def
on_dequeue(self, ...) to call the queue completion API (e.g.,
self.report_error("Empty dequeue payload") or self.report_success()) before
returning when data is falsy so the NamedQueue in-progress state is decremented
and the queue remains consistent; locate the change in the on_dequeue function
and ensure the chosen call matches the intended semantics for empty payloads.
| if result.dense_vector: | ||
| inserted_data["vector"] = result.dense_vector | ||
| # Validate vector dimension | ||
| if len(result.dense_vector) != self._vector_dim: | ||
| error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}" | ||
| logger.error(error_msg) | ||
| self.report_error(error_msg, data) | ||
| return None | ||
|
|
There was a problem hiding this comment.
Treat missing/empty dense vectors as a hard failure.
At Line 149, if result.dense_vector: is too permissive: a missing dense vector can bypass validation and still continue to DB write logic.
Proposed fix
- # Add dense vector
- if result.dense_vector:
- inserted_data["vector"] = result.dense_vector
- # Validate vector dimension
- if len(result.dense_vector) != self._vector_dim:
- error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}"
- logger.error(error_msg)
- self.report_error(error_msg, data)
- return None
+ # Add dense vector (required)
+ if result.dense_vector is None:
+ error_msg = "Missing dense vector from embedder result"
+ logger.error(error_msg)
+ self.report_error(error_msg, data)
+ return None
+ inserted_data["vector"] = result.dense_vector
+ # Validate vector dimension
+ if len(result.dense_vector) != self._vector_dim:
+ error_msg = (
+ f"Dense vector dimension mismatch: expected {self._vector_dim}, "
+ f"got {len(result.dense_vector)}"
+ )
+ logger.error(error_msg)
+ self.report_error(error_msg, data)
+ return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openviking/storage/queuefs/embedding_handler.py` around lines 149 - 157, The
current check uses "if result.dense_vector:" which lets missing or empty vectors
slip through; change the logic in the embedding handling flow (around
result.dense_vector, inserted_data["vector"], and the self._vector_dim
validation) to treat absent or empty dense vectors as a hard failure: explicitly
check for None or zero-length (e.g., if result.dense_vector is None or
len(result.dense_vector) == 0), call self.report_error with a clear message and
return None, and only after that perform the length equality check against
self._vector_dim before assigning inserted_data["vector"].
| record_id = await self._vikingdb.upsert(inserted_data) | ||
| if record_id: | ||
| logger.debug( | ||
| f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}" | ||
| ) | ||
| except CollectionNotFoundError as db_err: | ||
| # During shutdown, queue workers may finish one dequeued item. | ||
| if self._vikingdb.is_closing: | ||
| logger.debug(f"Skip embedding write during shutdown: {db_err}") | ||
| self.report_success() | ||
| return None | ||
| logger.error(f"Failed to write to vector database: {db_err}") | ||
| self.report_error(str(db_err), data) | ||
| return None | ||
| except Exception as db_err: | ||
| if self._vikingdb.is_closing: | ||
| logger.debug(f"Skip embedding write during shutdown: {db_err}") | ||
| self.report_success() | ||
| return None | ||
| logger.error(f"Failed to write to vector database: {db_err}") | ||
| traceback.print_exc() | ||
| self.report_error(str(db_err), data) | ||
| return None | ||
|
|
||
| self.report_success() | ||
| return inserted_data |
There was a problem hiding this comment.
Do not report success when upsert() returns an empty id.
At Line 178-203, record_id == "" still falls through to report_success(). VikingVectorIndexBackend.upsert() can return empty IDs for rejected payloads, so this currently risks silent data loss.
Proposed fix
record_id = await self._vikingdb.upsert(inserted_data)
- if record_id:
- logger.debug(
- f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}"
- )
+ if not record_id:
+ error_msg = "Vector DB upsert returned empty id"
+ logger.error(error_msg)
+ self.report_error(error_msg, data)
+ return None
+ logger.debug(
+ "Successfully wrote embedding to database: id=%s uri=%s",
+ record_id,
+ inserted_data.get("uri"),
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| record_id = await self._vikingdb.upsert(inserted_data) | |
| if record_id: | |
| logger.debug( | |
| f"Successfully wrote embedding to database: {record_id} abstract {inserted_data['abstract']} vector {inserted_data['vector'][:5]}" | |
| ) | |
| except CollectionNotFoundError as db_err: | |
| # During shutdown, queue workers may finish one dequeued item. | |
| if self._vikingdb.is_closing: | |
| logger.debug(f"Skip embedding write during shutdown: {db_err}") | |
| self.report_success() | |
| return None | |
| logger.error(f"Failed to write to vector database: {db_err}") | |
| self.report_error(str(db_err), data) | |
| return None | |
| except Exception as db_err: | |
| if self._vikingdb.is_closing: | |
| logger.debug(f"Skip embedding write during shutdown: {db_err}") | |
| self.report_success() | |
| return None | |
| logger.error(f"Failed to write to vector database: {db_err}") | |
| traceback.print_exc() | |
| self.report_error(str(db_err), data) | |
| return None | |
| self.report_success() | |
| return inserted_data | |
| record_id = await self._vikingdb.upsert(inserted_data) | |
| if not record_id: | |
| error_msg = "Vector DB upsert returned empty id" | |
| logger.error(error_msg) | |
| self.report_error(error_msg, data) | |
| return None | |
| logger.debug( | |
| "Successfully wrote embedding to database: id=%s uri=%s", | |
| record_id, | |
| inserted_data.get("uri"), | |
| ) | |
| except CollectionNotFoundError as db_err: | |
| # During shutdown, queue workers may finish one dequeued item. | |
| if self._vikingdb.is_closing: | |
| logger.debug(f"Skip embedding write during shutdown: {db_err}") | |
| self.report_success() | |
| return None | |
| logger.error(f"Failed to write to vector database: {db_err}") | |
| self.report_error(str(db_err), data) | |
| return None | |
| except Exception as db_err: | |
| if self._vikingdb.is_closing: | |
| logger.debug(f"Skip embedding write during shutdown: {db_err}") | |
| self.report_success() | |
| return None | |
| logger.error(f"Failed to write to vector database: {db_err}") | |
| traceback.print_exc() | |
| self.report_error(str(db_err), data) | |
| return None | |
| self.report_success() | |
| return inserted_data |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openviking/storage/queuefs/embedding_handler.py` around lines 178 - 203, The
handler currently calls report_success() even when
self._vikingdb.upsert(inserted_data) returns an empty/falsy record_id (which
signals a rejected payload); update the logic after calling _vikingdb.upsert to
treat falsy record_id as a failure: if record_id is falsy, call
self.report_error(...) with a clear message (including inserted_data or
abstract), do not call self.report_success(), and return None; keep existing
success path only for truthy record_id. Reference symbols: _vikingdb.upsert,
record_id, inserted_data, report_error, report_success,
VikingVectorIndexBackend.upsert.
Summary
embed_query()task_type per-operation: Addedembed_query()toDenseEmbedderBase(delegates toembed());GeminiDenseEmbedderoverrides with dedicated_query_config(RETRIEVAL_QUERY) whileembed()uses_index_config(RETRIEVAL_DOCUMENT). Updatedhierarchical_retriever.pyandmemory_deduplicator.pyto callembed_query(). Deprecatedtask_typeconfig field (still accepted).EmbeddingHandlerrename + relocate: MovedTextEmbeddingHandlerfromcollection_schemas.pytoqueuefs/embedding_handler.py, renamed toEmbeddingHandler. Backward-compat alias kept. Removed 2 constant-testing unit tests; consolidated 2 duplicate fallback tests into one@pytest.mark.parametrize.enable_multimodal: bool = False.Vectorizegainsparts: Optional[List[ContentPart]]+get_parts().embed_multimodalrewrites to use parts API with pdfminer-six PDF page guard.embed_multimodal_batch/async_embed_multimodal_batchadded to base class and Gemini (anyio semaphore). Flipenable_multimodal=Truein April — no code swap needed.Issues Fixed (PR volcengine#607 review)
embed_multimodalonly 1 text+1 media → now arbitraryparts[]sequencesTextEmbeddingHandlerin wrong file with wrong nameembed_multimodal_batch/async_embed_multimodal_batchtask_typebaked at init → same type for index + queryTest Plan
tests/unit/test_embedder_base.py—embed_query()base fallbacktests/unit/test_vectorize_parts.py—Vectorize.get_parts()(5 cases)tests/unit/test_gemini_embedder.py— feature flag, task_type split, PDF guard, multi-part, batchtests/misc/test_config_validation.py—task_typedeprecation noticepytest tests/integration/test_gemini_e2e.py -m integration(requiresGEMINI_API_KEY)Summary by CodeRabbit
Release Notes
New Features
Refactor
Tests