Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions openviking/models/embedder/jina_embedders.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,32 @@ class JinaDenseEmbedder(DenseEmbedderBase):
"""Jina AI Dense Embedder Implementation

Uses Jina AI embedding API via OpenAI-compatible client.
Supports task-specific embeddings and Matryoshka dimension reduction.
Supports task-specific embeddings (non-symmetric) and Matryoshka dimension reduction.

Jina models are non-symmetric by default and require the 'task' parameter to distinguish
between query and document embeddings. This is different from official OpenAI models,
which are symmetric and do not support the input_type parameter.

Example:
>>> embedder = JinaDenseEmbedder(
>>> # Query embedding
>>> query_embedder = JinaDenseEmbedder(
... model_name="jina-embeddings-v5-text-small",
... api_key="jina_xxx",
... dimension=512,
... task="retrieval.query"
... context="query"
... )
>>> result = embedder.embed("Hello world")
>>> print(len(result.dense_vector))
>>> query_vector = query_embedder.embed("search query")
>>> print(len(query_vector.dense_vector))
512

>>> # Document embedding
>>> doc_embedder = JinaDenseEmbedder(
... model_name="jina-embeddings-v5-text-small",
... api_key="jina_xxx",
... dimension=512,
... context="document"
... )
>>> doc_vector = doc_embedder.embed("document content")
"""

def __init__(
Expand All @@ -42,7 +56,9 @@ def __init__(
api_key: Optional[str] = None,
api_base: Optional[str] = None,
dimension: Optional[int] = None,
task: Optional[str] = None,
context: Optional[str] = None,
query_param: str = "retrieval.query",
document_param: str = "retrieval.passage",
late_chunking: Optional[bool] = None,
config: Optional[Dict[str, Any]] = None,
):
Expand All @@ -53,9 +69,15 @@ def __init__(
api_key: API key, required
api_base: API base URL, defaults to https://api.jina.ai/v1
dimension: Dimension for Matryoshka reduction, optional
task: Task type for task-specific embeddings, optional.
Valid values: retrieval.query, retrieval.passage,
text-matching, classification, separation
context: Embedding context, either 'query' or 'document'. Jina models are
non-symmetric by default; task is always sent unless context is None.
Pass None to disable task (e.g. for symmetric deployments via OpenAI
compatible endpoint).
query_param: Task value for query-side embeddings. Defaults to 'retrieval.query'.
Override for models with different task naming conventions.
document_param: Task value for document-side embeddings. Defaults to
'retrieval.passage'. Override for models with different task
naming conventions.
late_chunking: Enable late chunking via extra_body, optional
config: Additional configuration dict

Expand All @@ -67,7 +89,12 @@ def __init__(
self.api_key = api_key
self.api_base = api_base or "https://api.jina.ai/v1"
self.dimension = dimension
self.task = task
if context == "query":
self.task: Optional[str] = query_param
elif context == "document":
self.task = document_param
else:
self.task = None
self.late_chunking = late_chunking

if not self.api_key:
Expand Down
93 changes: 82 additions & 11 deletions openviking/models/embedder/openai_embedders.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@


class OpenAIDenseEmbedder(DenseEmbedderBase):
"""OpenAI Dense Embedder Implementation
"""OpenAI-Compatible Dense Embedder Implementation

Supports OpenAI embedding models such as text-embedding-3-small, text-embedding-3-large, etc.
Supports OpenAI embedding models (e.g., text-embedding-3-small, text-embedding-3-large)
and OpenAI-compatible third-party models that support non-symmetric embeddings.

Note: Official OpenAI models are symmetric and do not support the input_type parameter.
Non-symmetric mode (context='query'/'document') is only supported by OpenAI-compatible
third-party models (e.g., BGE-M3, Jina, Cohere, etc.) that implement the input_type parameter.

Example:
>>> # Symmetric mode (official OpenAI models)
>>> embedder = OpenAIDenseEmbedder(
... model_name="text-embedding-3-small",
... api_key="sk-xxx",
Expand All @@ -32,6 +38,17 @@ class OpenAIDenseEmbedder(DenseEmbedderBase):
>>> result = embedder.embed("Hello world")
>>> print(len(result.dense_vector))
1536

>>> # Non-symmetric mode (OpenAI-compatible third-party models)
>>> query_embedder = OpenAIDenseEmbedder(
... model_name="bge-m3",
... api_key="your-api-key",
... api_base="https://your-api-endpoint.com/v1",
... context="query",
... query_param="query",
... document_param="passage"
... )
>>> query_vector = query_embedder.embed("search query")
"""

def __init__(
Expand All @@ -40,28 +57,66 @@ def __init__(
api_key: Optional[str] = None,
api_base: Optional[str] = None,
dimension: Optional[int] = None,
context: Optional[str] = None,
query_param: Optional[str] = None,
document_param: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
max_tokens: Optional[int] = None,
):
"""Initialize OpenAI Dense Embedder
"""Initialize OpenAI-Compatible Dense Embedder

Args:
model_name: OpenAI model name, defaults to text-embedding-3-small
model_name: Model name. For official OpenAI models (e.g., text-embedding-3-small),
use symmetric mode (context=None, query_param=None, document_param=None).
For OpenAI-compatible third-party models (e.g., BGE-M3, Jina, Cohere), use
non-symmetric mode with context='query'/'document'.
api_key: API key, if None will read from env vars (OPENVIKING_EMBEDDING_API_KEY or OPENAI_API_KEY)
api_base: API base URL, optional
api_base: API base URL, optional. Required for third-party OpenAI-compatible APIs.
dimension: Dimension (if model supports), optional
context: Embedding context, either 'query' or 'document'. When both query_param
and document_param are None (the default), the model is treated as symmetric
and no input_type is sent to the API. Set query_param and/or document_param
to enable non-symmetric mode. Only supported by OpenAI-compatible third-party
models, not official OpenAI models.
query_param: The input_type value for query-side embeddings, e.g. 'query' or
'search_query'. Defaults to 'query' when non-symmetric mode is active.
Setting this (or document_param) activates non-symmetric mode.
Only supported by OpenAI-compatible third-party models.
document_value: The input_type value for document-side embeddings, e.g. 'passage'
or 'document'. Defaults to 'passage' when non-symmetric mode is
active. Setting this (or query_value) activates non-symmetric mode.
Only supported by OpenAI-compatible third-party models.
config: Additional configuration dict
max_tokens: Maximum token count per embedding request, None to use default (8000)

Raises:
ValueError: If api_key is not provided and env vars are not set

Note:
Official OpenAI models (e.g., text-embedding-3-small, text-embedding-3-large) are
symmetric and do not support the input_type parameter. Non-symmetric mode is only
supported by OpenAI-compatible third-party models (e.g., BGE-M3, Jina, Cohere) that
implement the input_type parameter.
"""
super().__init__(model_name, config, max_tokens=max_tokens)

self.api_key = api_key
self.api_base = api_base
self.dimension = dimension

# Symmetric by default: only activate input_type if user explicitly sets either value
non_symmetric = query_param is not None or document_param is not None
if not non_symmetric:
self.input_type: Optional[str] = None
elif context == "query":
self.input_type = query_param if query_param is not None else "query"
elif context == "document":
self.input_type = document_param if document_param is not None else "passage"
else:
self.input_type = None

if not self.api_key:
raise ValueError("api_key is required")
# Allow missing api_key when api_base is set (e.g. local OpenAI-compatible servers)
if not self.api_key and not self.api_base:
raise ValueError("api_key is required (or set api_base for local servers)")
Expand Down Expand Up @@ -134,6 +189,18 @@ def _usage_value(key: str, default: int = 0) -> int:
output_tokens,
)

def _build_extra_body(self) -> Optional[Dict[str, Any]]:
"""Build extra_body dict for OpenAI-compatible parameters

Returns:
Dict containing input_type if non-symmetric mode is active.
Only supported by OpenAI-compatible third-party models.
"""
extra_body = {}
if self.input_type is not None:
extra_body["input_type"] = self.input_type
return extra_body if extra_body else None

def _embed_single(self, text: str) -> EmbedResult:
"""Perform raw embedding without chunking logic.

Expand All @@ -147,9 +214,11 @@ def _embed_single(self, text: str) -> EmbedResult:
RuntimeError: When API call fails
"""
try:
kwargs = {"input": text, "model": self.model_name}
if self.dimension:
kwargs["dimensions"] = self.dimension
kwargs: Dict[str, Any] = {"input": text, "model": self.model_name}

extra_body = self._build_extra_body()
if extra_body:
kwargs["extra_body"] = extra_body

response = self.client.embeddings.create(**kwargs)
self._update_telemetry_token_usage(response)
Expand Down Expand Up @@ -211,9 +280,11 @@ def embed_batch(self, texts: List[str]) -> List[EmbedResult]:

if short_texts:
try:
kwargs = {"input": short_texts, "model": self.model_name}
if self.dimension:
kwargs["dimensions"] = self.dimension
kwargs: Dict[str, Any] = {"input": short_texts, "model": self.model_name}

extra_body = self._build_extra_body()
if extra_body:
kwargs["extra_body"] = extra_body

response = self.client.embeddings.create(**kwargs)
self._update_telemetry_token_usage(response)
Expand Down
21 changes: 15 additions & 6 deletions openviking/service/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
)

# Initialize embedder
self._embedder = config.embedding.get_embedder()
self._embedder = config.embedding.get_query_embedder()
logger.info(
f"Initialized embedder (dim {config.embedding.dimension}, sparse {self._embedder.is_sparse})"
)
Expand Down Expand Up @@ -132,15 +132,15 @@ def _init_storage(
logger.warning("AGFS client not initialized, skipping queue manager")

# Initialize VikingDBManager with QueueManager
self._vikingdb_manager = VikingDBManager(
vectordb_config=config.vectordb, queue_manager=self._queue_manager
)
self._vikingdb_manager = VikingDBManager(vectordb_config=config.vectordb, queue_manager=self._queue_manager)

# Configure queues if QueueManager is available
if self._queue_manager:
self._queue_manager.setup_standard_queues(self._vikingdb_manager)

# Initialize TransactionManager
# Initialize TransactionManager (fail-fast if AGFS missing)
if self._agfs_client is None:
raise RuntimeError("AGFS client not initialized for TransactionManager")
self._transaction_manager = init_transaction_manager(agfs=self._agfs_client)

@property
Expand Down Expand Up @@ -236,8 +236,15 @@ async def initialize(self) -> None:
enable_recorder = os.environ.get("OPENVIKING_ENABLE_RECORDER", "").lower() == "true"

# Create context collection
if self._vikingdb_manager is None:
raise RuntimeError("VikingDBManager not initialized")
await init_context_collection(self._vikingdb_manager)

if self._agfs_client is None:
raise RuntimeError("AGFS client not initialized")
if self._embedder is None:
raise RuntimeError("Embedder not initialized")

self._viking_fs = init_viking_fs(
agfs=self._agfs_client,
query_embedder=self._embedder,
Expand All @@ -261,7 +268,9 @@ async def initialize(self) -> None:
)

# Initialize processors
self._resource_processor = ResourceProcessor(vikingdb=self._vikingdb_manager)
self._resource_processor = ResourceProcessor(
vikingdb=self._vikingdb_manager,
)
self._skill_processor = SkillProcessor(vikingdb=self._vikingdb_manager)
self._session_compressor = SessionCompressor(vikingdb=self._vikingdb_manager)

Expand Down
3 changes: 2 additions & 1 deletion openviking/session/memory_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def __init__(
):
"""Initialize deduplicator."""
self.vikingdb = vikingdb
self.embedder = self.vikingdb.get_embedder()
config = get_openviking_config()
self.embedder = config.embedding.get_query_embedder()

async def deduplicate(
self,
Expand Down
9 changes: 6 additions & 3 deletions openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ async def init_context_collection(storage) -> bool:
config = get_openviking_config()
name = config.storage.vectordb.name
vector_dim = config.embedding.dimension
schema = CollectionSchemas.context_collection(name, vector_dim)
return await storage.create_collection(name, schema)
if not name:
raise ValueError("Vector DB collection name is required")
collection_name = name
schema = CollectionSchemas.context_collection(collection_name, vector_dim)
return await storage.create_collection(collection_name, schema)


class TextEmbeddingHandler(DequeueHandlerBase):
Expand Down Expand Up @@ -162,7 +165,7 @@ def __init__(self, vikingdb: VikingVectorIndexBackend):

def _initialize_embedder(self, config: "OpenVikingConfig"):
"""Initialize the embedder instance from config."""
self._embedder = config.embedding.get_embedder()
self._embedder = config.embedding.get_document_embedder()

@classmethod
def _merge_request_stats(
Expand Down
Loading