Make embeddings configurable#678
Conversation
Add EMBEDDING_MODEL_CONFIG__DIMENSIONS_MODE (auto|always|never) controlling
whether the dimensions= parameter is forwarded on OpenAI embeddings.create
calls. auto (default) sends it when the operator explicitly set
EMBEDDING_VECTOR_DIMENSIONS and the configured model is not on the
known-rejecting allowlist (currently text-embedding-ada-002).
The provenance check (was VECTOR_DIMENSIONS explicitly set?) lives as
EmbeddingSettings.resolve_send_dimensions() because it needs access to
model_fields_set, which the standalone resolver does not have. The
resolved boolean is passed into _EmbeddingClient at construction time;
the client never inspects mode or provenance.
Also pins cloudevents <2.0 — 2.0.0 reorganized the package and dropped
cloudevents.conversion and cloudevents.http, which src/telemetry/emitter.py
imports. The original `>=1.12.0` constraint allowed the broken 2.0 resolve.
With the pin, the imports resolve cleanly and the basedpyright warning
cascade (37+ warnings about unknown types) disappears.
Drive-by cleanups (all unnecessary cast/ignore comments flagged by
basedpyright after the cloudevents downgrade):
- vector_store/lancedb.py, tests/conftest.py, and
tests/deriver/test_vector_reconciliation.py — drop dead pyright ignores
- sdks/python/src/honcho/http/{async_,}client.py — drop unnecessary
cast(datetime, ...) (parsedate_to_datetime already returns datetime)
- vector_store/turbopuffer.py — cast(Any, rows) for the upsert_rows
TypedDict that the SDK exposes but our row builder doesn't satisfy
- tests/test_datetime_parsing.py — ignore reportArgumentType on the
test that deliberately passes wrong types to assert raises
…alidator
Add src/startup/embedding_validator.py that introspects the actual pgvector
column dim at boot and refuses to start if it does not match
EMBEDDING_VECTOR_DIMENSIONS. Runs after the DB pool is up and before the
embedding client is constructed, in both src/main.py (FastAPI lifespan) and
src/deriver/__main__.py.
Implementation details:
- Schema-qualified pg_attribute join through pg_class/pg_namespace respects
DB.SCHEMA rather than relying on search_path
- Bounded retry (3 attempts, 1s backoff) for transient introspection failure,
then fail-closed with "could not validate embedding schema" — uncertainty
is not a green light to serve traffic
- External-store sampler (turbopuffer, lancedb) enumerates workspaces from
the application DB and probes their lazy-created namespaces; current
per-namespace probe is a no-op stub since the SDKs do not expose
uniform dim introspection — full enumeration is left to
`configure_embeddings --report` in Phase 3
Atomic guard swap: deletes the old dim-vs-MIGRATED config validator (which
forbade non-1536 pgvector unless MIGRATED=True) in the same commit as the
new runtime validator. There is no release window where non-1536 pgvector
can start unprotected. The 9 dual-write branches that use VECTOR_STORE.MIGRATED
remain untouched and load-bearing for legacy-tenant backend swaps.
VECTOR_STORE_DIMENSIONS deprecation: drop the "must match" raise; in
propagate_namespace, check model_fields_set and emit logger.warning +
DeprecationWarning (DeprecationWarning alone is filtered by Python's default
config and would not reach operators). Always overwrite with
EMBEDDING.VECTOR_DIMENSIONS regardless.
Test changes:
- tests/test_models_vector_dim.py: Phase 1's VECTOR_STORE_TYPE=lancedb +
MIGRATED=true escape hatches removed; the test now passes on plain
EMBEDDING_VECTOR_DIMENSIONS=768
- tests/llm/test_model_config.py: the two tests asserting the old guards
replaced with tests for the new deprecation + acceptance behavior
- tests/startup/test_embedding_validator.py: 10 new tests — dim assertion
logic (pass/mismatch/missing/unbounded/non-public-schema), fail-closed
retry budget, real-test-DB pass, real-DB ALTER-then-validate, deprecation
warning capture, non-1536 + pgvector + MIGRATED=false at config time
Adds scripts/configure_embeddings.py alongside the other one-off scripts
(provision_db, migrate_db, generate_jwt_secret, etc.). Invoked as
`uv run python scripts/configure_embeddings.py` — same convention as the
existing scripts in that directory, including the sys.path shim that
lets src.* imports resolve when run directly.
Bootstrap step for self-hosted installs at a non-default
EMBEDDING_VECTOR_DIMENSIONS — runs between `alembic upgrade head` and
starting the API/deriver.
pgvector ALTER safety (single transaction):
- LOCK TABLE {schema}.documents, {schema}.message_embeddings IN ACCESS
EXCLUSIVE MODE — closes the TOCTOU window between population check
and ALTER
- COUNT(*) WHERE embedding IS NOT NULL on both tables; refuse with a
non-zero exit if either is populated (ALTER ... USING NULL would
silently wipe those vectors)
- Snapshot HNSW index DDL from pg_indexes; drop, ALTER, recreate from
the captured DDL so operator-set HNSW params (m, ef_construction)
survive the round trip
External vector stores (turbopuffer, lancedb) are never created or
modified — namespaces are per-workspace and lazy-created on first write.
The --report mode enumerates workspaces and collections from the
application DB, derives the expected namespaces via
get_vector_namespace(), and prints a per-namespace status table.
CLI modes (mutually exclusive):
- (default) interactive: print plan, prompt to confirm
- --dry-run: print plan and exit 0 without touching the DB
- --yes: apply without prompt
- --report: print external-store namespace inventory and exit
Also updates src/startup/embedding_validator.py error-message paths and
docs/v3/contributing/configuration.mdx invocations to point at the new
script location.
Tests cover plan no-op, plan needs-alter, plan raises on missing column,
ALTER + HNSW round-trip, refuse-when-populated (monkeypatched count to
avoid wiring the full workspace/peer/collection/document FK chain just
to land one vector row), and idempotency.
Document the supported way to change EMBEDDING_VECTOR_DIMENSIONS or EMBEDDING_MODEL_CONFIG__MODEL on a Honcho deployment: provision a new deployment at the desired configuration, replay source data out of band, cut over at the application layer. The page explains the asymmetry: - Dimension is machine-enforced as immutable. The startup validator introspects pg_attribute and crashes the API/deriver on mismatch. - Model is operator-owned. There is no persistent metadata recording which model produced each vector, so a same-dim model swap is silently undetectable — flagged with a Warning callout. Also documents the truncation edge case (text-embedding-3-large truncated to 1536 with EMBEDDING_VECTOR_DIMENSIONS left at default) and the DIMENSIONS_MODE=always mitigation, plus a pointer that storage-backend swap (VECTOR_STORE_MIGRATED + reconciler) is a distinct operation unaffected by this work. Registers the page in docs/docs.json under the Self-Hosting nav group and cross-links from configuration.mdx.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughMake embedding vector dimensions configurable; enforce configured dims at startup via pgvector/external-store probes; add a pgvector ALTER migration script; conditionally forward OpenAI ChangesEmbedding Dimensions Configuration & Validation
Sequence DiagramsequenceDiagram
participant App as Application Startup
participant Validator as validate_embedding_schema
participant PostgreSQL as PostgreSQL (pgvector)
participant VectorStore as External Vector Store
App->>Validator: invoke during lifespan startup
Validator->>PostgreSQL: query embedding column atttypmod (retry on transient errors)
alt pgvector dims match config
PostgreSQL-->>Validator: atttypmod matches
Validator-->>App: success — proceed to initialize caches and serve
else pgvector dims mismatch
PostgreSQL-->>Validator: atttypmod differs
Validator-->>App: raise StartupValidationError (includes remediation)
App->>App: abort startup
end
alt external store configured (turbopuffer/lancedb)
Validator->>VectorStore: probe sampled workspace namespaces for declared dims
VectorStore-->>Validator: namespace dimensions or None
alt mismatch found
Validator-->>App: raise StartupValidationError
App->>App: abort startup
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/deriver/__main__.py (1)
59-70:⚠️ Potential issue | 🟠 Major | ⚡ Quick winEnsure telemetry is always shut down when schema validation fails.
validate_embedding_schema(engine)runs before thetry/finally, so a validation exception skipsshutdown_telemetry(). Wrap validation in the sametry/finallyblock asmain().Proposed fix
async def run_deriver(): """Run the deriver with proper telemetry lifecycle management.""" # Initialize async telemetry (CloudEvents emitter) await initialize_telemetry_async() - - # Fail fast if the embedding schema does not match settings — same gate - # the API runs in its lifespan. - await validate_embedding_schema(engine) - try: + # Fail fast if the embedding schema does not match settings — same gate + # the API runs in its lifespan. + await validate_embedding_schema(engine) await main() finally: # Shutdown telemetry (flush CloudEvents buffer) await shutdown_telemetry()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/deriver/__main__.py` around lines 59 - 70, Validation runs before the try/finally so telemetry may not be shut down on validation failure; move the await validate_embedding_schema(engine) call inside the same try/finally that awaits main(), keeping initialize_telemetry_async() before the try, so the sequence is: await initialize_telemetry_async(), then try: await validate_embedding_schema(engine); await main(); finally: await shutdown_telemetry(). This ensures initialize_telemetry_async, validate_embedding_schema, main, and shutdown_telemetry are executed in the correct order and that shutdown_telemetry() always runs.
🧹 Nitpick comments (2)
src/startup/embedding_validator.py (2)
39-43: ⚡ Quick winPrefer the shared exception hierarchy instead of introducing a new local runtime error.
StartupValidationErrorshould map to a project exception fromsrc/exceptions.py(or subclass one) to keep error handling semantics consistent across services.As per coding guidelines:
src/**/*.py: "Use custom exceptions defined in src/exceptions.py with specific exception types (ResourceNotFoundException, ValidationException, etc.)".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/startup/embedding_validator.py` around lines 39 - 43, The new StartupValidationError class should inherit from the project's shared exception hierarchy instead of RuntimeError: import the appropriate exception (e.g., ValidationException) from src.exceptions and change class StartupValidationError(RuntimeError) to subclass that shared type (or subclass ValidationException) so existing error handling recognizes it; update any references to StartupValidationError accordingly so it uses the shared exception semantics.
17-25: ⚡ Quick winUse the project logging helper for contextual startup logs.
This file currently initializes a stdlib logger directly. Please switch to the shared logging utility so startup validation logs include consistent context and formatting.
As per coding guidelines:
src/**/*.py: "Use proper logging with context via src/utils/logging.py instead of print statements".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/startup/embedding_validator.py` around lines 17 - 25, Replace the module-level stdlib logger with the project's logging helper: remove logger = logging.getLogger(__name__) and import the shared logger factory from src.utils.logging (e.g., get_logger or equivalent) then initialize logger via that helper (e.g., logger = get_logger(__name__)) so that the startup validation in embedding_validator.py uses the project's contextual formatter and settings; update the import block accordingly and ensure any existing log calls in this module continue to use the new logger variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/v3/contributing/changing-embeddings.mdx`:
- Line 12: Typo: hyphenate "out-of-band" for consistency — update the sentence
"Replay or re-embed your data into it out of band." to use the hyphenated form
"out-of-band" (look for the exact sentence in
docs/v3/contributing/changing-embeddings.mdx or the string "Replay or re-embed
your data into it out of band.").
In `@scripts/configure_embeddings.py`:
- Around line 134-139: The SQL builds embed-related queries by interpolating
schema/table/index identifiers directly (see _count_non_null_embeddings and the
other dynamic-SQL locations around lines 181-183 and 202-217); validate each
identifier against a strict whitelist/regex (e.g. only alphanumerics and
underscores) and then use a single quoting helper to produce a properly quoted
identifier (or use SQLAlchemy's IdentifierPreparer / quoted_name) before
composing the text() query so you never insert raw user-controlled strings into
the f-strings; update every place that constructs SQL with schema/table/index
(including the COUNT query in _count_non_null_embeddings and the other dynamic
queries) to call the validator/quoting helper first.
In `@tests/llm/test_model_config.py`:
- Around line 281-292: The test
test_app_settings_accepts_non_1536_with_any_vector_store_configuration currently
claims "any vector store configuration" but the combos list omits "turbopuffer";
update the combos tuple in that test to include ("turbopuffer", True) and
("turbopuffer", False) so all three backends are exercised, or alternatively
rename the test to accurately describe the covered backends (e.g.,
"...pgvector_and_lancedb_only") and adjust the docstring to match; modify the
combos variable referenced in the test to reflect whichever option you choose.
---
Outside diff comments:
In `@src/deriver/__main__.py`:
- Around line 59-70: Validation runs before the try/finally so telemetry may not
be shut down on validation failure; move the await
validate_embedding_schema(engine) call inside the same try/finally that awaits
main(), keeping initialize_telemetry_async() before the try, so the sequence is:
await initialize_telemetry_async(), then try: await
validate_embedding_schema(engine); await main(); finally: await
shutdown_telemetry(). This ensures initialize_telemetry_async,
validate_embedding_schema, main, and shutdown_telemetry are executed in the
correct order and that shutdown_telemetry() always runs.
---
Nitpick comments:
In `@src/startup/embedding_validator.py`:
- Around line 39-43: The new StartupValidationError class should inherit from
the project's shared exception hierarchy instead of RuntimeError: import the
appropriate exception (e.g., ValidationException) from src.exceptions and change
class StartupValidationError(RuntimeError) to subclass that shared type (or
subclass ValidationException) so existing error handling recognizes it; update
any references to StartupValidationError accordingly so it uses the shared
exception semantics.
- Around line 17-25: Replace the module-level stdlib logger with the project's
logging helper: remove logger = logging.getLogger(__name__) and import the
shared logger factory from src.utils.logging (e.g., get_logger or equivalent)
then initialize logger via that helper (e.g., logger = get_logger(__name__)) so
that the startup validation in embedding_validator.py uses the project's
contextual formatter and settings; update the import block accordingly and
ensure any existing log calls in this module continue to use the new logger
variable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da77d40a-2e72-44c8-be8c-eae4935eb21e
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (26)
docs/docs.jsondocs/v3/contributing/changing-embeddings.mdxdocs/v3/contributing/configuration.mdxpyproject.tomlscripts/configure_embeddings.pysdks/python/src/honcho/http/async_client.pysdks/python/src/honcho/http/client.pysrc/config.pysrc/deriver/__main__.pysrc/embedding_client.pysrc/main.pysrc/models.pysrc/startup/__init__.pysrc/startup/embedding_validator.pysrc/vector_store/lancedb.pysrc/vector_store/turbopuffer.pytests/conftest.pytests/deriver/test_vector_reconciliation.pytests/llm/test_embedding_client.pytests/llm/test_model_config.pytests/scripts/__init__.pytests/scripts/test_configure_embeddings.pytests/startup/__init__.pytests/startup/test_embedding_validator.pytests/test_datetime_parsing.pytests/test_models_vector_dim.py
- Turbopuffer attribute type for a vector column is `[N]f32` / `[N]f16` /
`[N]i8`, not `f32_vector(N)` as the earlier probe assumed. The earlier
regex returned None for the real SDK format, so existing Turbopuffer
namespaces would have been reported as "missing" instead of validated
for mismatch. Regex switched to `\[(\d+)\]` which is the
vendor-stable shape. Test cases rewritten to lock the actual format.
- docs/v3/contributing/configuration.mdx had a contradictory pair of
bullets: 223 said explicit 1536 makes `auto` forward dimensions=, 224
said `auto` would skip the parameter because 1536 is the default.
Operators reading both would (rightly) conclude they need `always`
even when `auto` would work. Rewrote both bullets so:
- `auto` is provenance-driven (explicit-set, not non-default-value).
- `always` is positioned as defense-in-depth for config layers that
might strip explicit default-valued envs, not the only path for
same-as-default truncation.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@scripts/configure_embeddings.py`:
- Around line 291-297: The branch in _emit_report prints " --report has no
effect with VECTOR_STORE_TYPE=pgvector" unconditionally, causing an unwanted
message during normal runs; change _emit_report to accept a boolean flag (e.g.,
report_requested) or otherwise detect explicit report mode and only print that
message when report_requested is true, then update callers such as _async_main
to pass the report flag when the user requested --report so normal/no-op
pgvector runs remain silent.
In `@src/startup/embedding_validator.py`:
- Around line 183-188: The loop currently only probes workspace-derived message
namespaces (using workspace_names, store.get_vector_namespace, and
_probe_namespace_dim) and appends mismatches to mismatches; extend this to also
iterate the collection-derived document namespaces (the document inventory from
the report path) and probe each document namespace with _probe_namespace_dim the
same way, appending any namespace/actual_dim pairs to mismatches so document
namespaces are validated against target_dim as well. Ensure you reference the
same store.get_vector_namespace call or equivalent for document namespaces and
reuse target_dim and mismatches to collect failures.
In `@src/vector_store/lancedb.py`:
- Around line 381-384: The function that scans schema for the "vector" field
currently returns None when it cannot recover a fixed dimension, which lets a
malformed existing table be treated like a missing namespace; change this to
raise VectorStoreError when a schema is present but no field.name == "vector"
with a type.list_size is found (i.e., if schema is truthy but dimension
extraction fails), while preserving the existing None return only when the
table/schema is truly absent; update the function that contains the for field in
schema loop accordingly and add the VectorStoreError import/usage.
In `@src/vector_store/turbopuffer.py`:
- Around line 333-337: The code that inspects the schema's vector attribute (the
block using schema.get("vector"), vector_attr, and re.search on
vector_attr.type) must not return None after exists() has already succeeded;
instead raise an explicit exception (e.g., ValueError or a custom
MalformedNamespaceError) when the "vector" attribute is missing or its type
string is unparsable so the caller surfaces the bad namespace. Update the logic
in the function containing that block to throw an exception with a clear message
including the namespace identifier and the offending type string (or that the
attribute is missing) rather than returning None, and ensure callers of this
function will propagate or handle that exception appropriately.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c95416f2-f6df-4fc6-a814-88295bfa607b
📒 Files selected for processing (10)
.env.templateconfig.toml.exampledocs/v3/contributing/configuration.mdxscripts/configure_embeddings.pysrc/startup/embedding_validator.pysrc/vector_store/__init__.pysrc/vector_store/lancedb.pysrc/vector_store/turbopuffer.pytests/vector_store/__init__.pytests/vector_store/test_namespace_dim_probes.py
✅ Files skipped from review due to trivial changes (4)
- config.toml.example
- tests/vector_store/test_namespace_dim_probes.py
- .env.template
- docs/v3/contributing/configuration.mdx
| # disposed" warnings during tests. | ||
| asyncio.run(engine.dispose()) | ||
|
|
||
|
|
There was a problem hiding this comment.
nit: can we just call `await engine.dispose() at the end of _async_main?
| + " This script only configures empty tables. Re-embed out-of-band" | ||
| + " into a fresh deployment, then cut over." | ||
| ) |
There was a problem hiding this comment.
error message seems like it got cut off
| async def _introspect_pgvector_dims_with_retry( | ||
| engine: AsyncEngine, schema: str | ||
| ) -> dict[str, int]: |
There was a problem hiding this comment.
nit: could use tenacity here
| for workspace_name in workspace_names: | ||
| namespace = store.get_vector_namespace("message", workspace_name) | ||
| actual_dim = await _probe_namespace_dim(store, namespace) |
There was a problem hiding this comment.
should we "probe" the documents namespaces as well?
CodeRabbit + Rajat review feedback. All actionable items addressed except two false-positives (responded on PR). Bug fixes: - deriver telemetry leak: validator was called outside try/finally so shutdown_telemetry() did not run on validation failure. Moved inside. - _emit_report printed "no effect with pgvector" unconditionally, including from implicit post-apply calls. Added is_report_mode flag; only print on explicit --report. - LanceDB and Turbopuffer probes returned None when the namespace existed but its schema was malformed (no vector field / unparseable type string), silently bucketing real corruption as "missing" (lazy-create) and letting it pass the startup validator. Now raise VectorStoreError with actionable diagnostics; None remains valid only for "namespace does not exist." - Startup validator only sampled message namespaces; added a parallel Collection-row sample so document namespaces are probed too, with the same dim assertion. Mirrors the --report path. Hygiene: - StartupValidationError now subclasses HonchoException so existing exception handlers recognize it. ValidationException is @Final and has 422 request-validation semantics that would be misleading here. - scripts/configure_embeddings.py main() no longer spins up two event loops. engine.dispose() moved into a try/finally inside _async_main so cleanup runs in the same loop as the pipeline. - Replaced hand-rolled retry loop with tenacity.AsyncRetrying; same fail-closed semantics, less code, before_sleep_log for visibility. - Added _validate_identifier() defense-in-depth: DB.SCHEMA and HNSW index names are regex-checked against [A-Za-z_][A-Za-z0-9_]* before SQL interpolation. Operator config + DB catalog are not user input under the current threat model, but the constraint is cheap to gate. Test + docs: - test_app_settings_accepts_non_1536_with_any_vector_store_configuration now actually exercises turbopuffer (was missing); supplies a dummy TURBOPUFFER_API_KEY to satisfy the model_validator. - changing-embeddings.mdx: hyphenated "out-of-band" per reviewer style.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@scripts/configure_embeddings.py`:
- Around line 412-418: _in _async_main, catch VectorStoreError raised by await
_run_pipeline (which may propagate from VectorStore.probe_namespace_dim) so the
CLI exits cleanly: change the try to try/except VectorStoreError as e, write a
single-line message to stderr (e.g., via sys.stderr.write or print(...,
file=sys.stderr)) and return a non-zero int (e.g., 1) instead of letting the
exception bubble; keep the existing finally block that awaits engine.dispose()
so cleanup still runs. Ensure you import VectorStoreError and sys if not already
present and reference _async_main, _run_pipeline,
VectorStore.probe_namespace_dim(), and engine.dispose() where applicable.
- Around line 421-424: Add an early validation in _run_pipeline to check
settings.EMBEDDING.VECTOR_DIMENSIONS against the HNSW limit (2000) and fail fast
with a clear error message; after retrieving target_dim and before proceeding
(and after _validate_identifier(schema, ...)), if target_dim > 2000 raise/exit
with a descriptive error indicating HNSW indexing supports at most 2000
dimensions and instruct the user to reduce EMBEDDING.VECTOR_DIMENSIONS to <=2000
so the later ALTER to recreate HNSW indexes won't fail.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b3adc736-97b9-4fc0-b051-a7902529248b
📒 Files selected for processing (7)
docs/v3/contributing/changing-embeddings.mdxscripts/configure_embeddings.pysrc/deriver/__main__.pysrc/startup/embedding_validator.pysrc/vector_store/lancedb.pysrc/vector_store/turbopuffer.pytests/llm/test_model_config.py
✅ Files skipped from review due to trivial changes (1)
- docs/v3/contributing/changing-embeddings.mdx
🚧 Files skipped from review as they are similar to previous changes (4)
- src/deriver/main.py
- src/vector_store/turbopuffer.py
- src/startup/embedding_validator.py
- tests/llm/test_model_config.py
| async def _async_main(args: argparse.Namespace) -> int: | ||
| try: | ||
| return await _run_pipeline(args) | ||
| finally: | ||
| # Dispose inside the same event loop so cleanup doesn't spin up a | ||
| # second loop just to await engine.dispose(). | ||
| await engine.dispose() |
There was a problem hiding this comment.
Handle vector-store probe failures as a clean CLI error.
If --report hits a malformed or unreadable external namespace, VectorStore.probe_namespace_dim() can raise VectorStoreError and this entrypoint will currently dump a traceback out of asyncio.run(). Catch it here and return a non-zero exit code with a single stderr line instead.
🛠️ Proposed fix
from src.config import settings # noqa: E402
from src.db import engine # noqa: E402
+from src.exceptions import VectorStoreError # noqa: E402
from src.models import Collection, Workspace # noqa: E402
from src.vector_store import VectorStore # noqa: E402
...
async def _async_main(args: argparse.Namespace) -> int:
try:
return await _run_pipeline(args)
+ except VectorStoreError as exc:
+ print(f"error: {exc}", file=sys.stderr)
+ return 1
finally:
# Dispose inside the same event loop so cleanup doesn't spin up a
# second loop just to await engine.dispose().
await engine.dispose()As per coding guidelines, "Use explicit error handling with appropriate exception types".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@scripts/configure_embeddings.py` around lines 412 - 418, _in _async_main,
catch VectorStoreError raised by await _run_pipeline (which may propagate from
VectorStore.probe_namespace_dim) so the CLI exits cleanly: change the try to
try/except VectorStoreError as e, write a single-line message to stderr (e.g.,
via sys.stderr.write or print(..., file=sys.stderr)) and return a non-zero int
(e.g., 1) instead of letting the exception bubble; keep the existing finally
block that awaits engine.dispose() so cleanup still runs. Ensure you import
VectorStoreError and sys if not already present and reference _async_main,
_run_pipeline, VectorStore.probe_namespace_dim(), and engine.dispose() where
applicable.
| async def _run_pipeline(args: argparse.Namespace) -> int: | ||
| target_dim = settings.EMBEDDING.VECTOR_DIMENSIONS | ||
| schema = settings.DB.SCHEMA | ||
| _validate_identifier(schema, kind="DB.SCHEMA") |
There was a problem hiding this comment.
❓ Verification inconclusive
What is the maximum supported dimension for the pgvector vector type?
The maximum supported dimension for the pgvector vector type is 16,000.[1][2][3] This is defined by VECTOR_MAX_DIM 16000 in the source code src/vector.h, and enforced in functions like CheckDim in src/vector.c, which errors if dim > VECTOR_MAX_DIM.[2][3] The official README confirms: "Vectors can have up to 16,000 dimensions."[1] Note that while storage supports up to 16,000 dimensions, HNSW indexing is limited to 2,000 dimensions for vector (use halfvec for up to 4,000 or bit for up to 64,000).[1][4]
Citations:
- 1: https://github.com/pgvector/pgvector
- 2: https://github.com/pgvector/pgvector/blob/35ab919b/src/vector.h
- 3: https://github.com/pgvector/pgvector/blob/35ab919b/src/vector.c
- 4: https://github.com/pgvector/pgvector/blob/master/src/hnsw.h
Script executed:
# Find settings files and vector store configuration
fd -t f "settings" --max-depth 3 | head -20Repository: plastic-labs/honcho
Repository: plastic-labs/honcho
Exit code: 0
Script executed:
# Search for pgvector configuration and indexing strategy
rg -i "pgvector|hnsw|vector.*index" --type py --max-count 10Repository: plastic-labs/honcho
Repository: plastic-labs/honcho
Exit code: 0
stdout:
tests/utils/test_agent_tools.py: # Force pgvector queries since test documents are created directly in postgres
tests/startup/test_embedding_validator.py: _assert_pgvector_dims_match, # pyright: ignore[reportPrivateUsage]
tests/startup/test_embedding_validator.py:# pgvector stores the declared dim directly in atttypmod (no VARHDRSZ offset).
tests/startup/test_embedding_validator.py:def test_assert_pgvector_dims_match_passes_when_all_dims_align() -> None:
tests/startup/test_embedding_validator.py: _assert_pgvector_dims_match(
tests/startup/test_embedding_validator.py:def test_assert_pgvector_dims_match_raises_on_dim_mismatch() -> None:
tests/startup/test_embedding_validator.py: _assert_pgvector_dims_match(
tests/startup/test_embedding_validator.py:def test_assert_pgvector_dims_match_lists_all_missing_columns() -> None:
tests/startup/test_embedding_validator.py: _assert_pgvector_dims_match(
tests/startup/test_embedding_validator.py:def test_assert_pgvector_dims_match_raises_on_unbounded_typmod() -> None:
tests/startup/test_embedding_validator.py: _assert_pgvector_dims_match(
tests/scripts/test_configure_embeddings.py: _apply_pgvector_alter, # pyright: ignore[reportPrivateUsage]
tests/scripts/test_configure_embeddings.py: _build_pgvector_plan, # pyright: ignore[reportPrivateUsage]
tests/scripts/test_configure_embeddings.py:async def _hnsw_indexes(db_engine: AsyncEngine) -> set[str]:
tests/scripts/test_configure_embeddings.py: AND indexdef ILIKE '%USING hnsw%'
tests/scripts/test_configure_embeddings.py: plan = await _build_pgvector_plan(db_engine, target_dim=1536, schema="public")
tests/scripts/test_configure_embeddings.py: plan = await _build_pgvector_plan(db_engine, target_dim=768, schema="public")
tests/scripts/test_configure_embeddings.py: await _build_pgvector_plan(db_engine, target_dim=1536, schema="no_such_schema")
tests/scripts/test_configure_embeddings.py:async def test_apply_alters_dims_and_recreates_hnsw_indexes(
tests/scripts/test_configure_embeddings.py: # well below pgvector's 2000-dim HNSW limit.
tests/scripts/test_configure_embeddings.py: before_indexes = await _hnsw_indexes(db_engine)
tests/conftest.py: logger.info("Attempting to create pgvector extension...")
tests/conftest.py: logger.info("pgvector extension created successfully.")
tests/conftest.py: "Failed to create pgvector extension. Make sure it's installed on the PostgreSQL server."
tests/llm/test_model_config.py: combos: list[tuple[Literal["pgvector", "turbopuffer", "lancedb"], bool]] = [
tests/llm/test_model_config.py: ("pgvector", True),
tests/llm/test_model_config.py: ("pgvector", False),
tests/alembic/revisions/test_66e63cf2cf77_add_indexes_to_documents_table.py:"""Hooks for revision 66e63cf2cf77 (documents HNSW index)."""
tests/alembic/revisions/test_66e63cf2cf77_add_indexes_to_documents_table.py:def prepare_documents_hnsw(verifier: MigrationVerifier) -> None:
tests/alembic/revisions/test_66e63cf2cf77_add_indexes_to_documents_table.py: verifier.assert_indexes_not_exist([("documents", "idx_documents_embedding_hnsw")])
tests/alembic/revisions/test_66e63cf2cf77_add_indexes_to_documents_table.py:def verify_documents_hnsw(verifier: MigrationVerifier) -> None:
tests/alembic/revisions/test_66e63cf2cf77_add_indexes_to_documents_table.py: verifier.assert_indexes_exist([("documents", "idx_documents_embedding_hnsw")])
tests/alembic/revisions/test_baa22cad81e2_standardize_constraint_names.py: ("documents", "idx_documents_embedding_hnsw"),
tests/alembic/revisions/test_baa22cad81e2_standardize_constraint_names.py: ("message_embeddings", "idx_message_embeddings_embedding_hnsw"),
tests/alembic/revisions/test_baa22cad81e2_standardize_constraint_names.py: ("documents", "ix_documents_embedding_hnsw"),
tests/alembic/revisions/test_baa22cad81e2_standardize_constraint_names.py: ("message_embeddings", "ix_message_embeddings_embedding_hnsw"),
tests/alembic/revisions/test_917195d9b5e9_add_messageembedding_table.py: ("message_embeddings", "idx_message_embeddings_embedding_hnsw"),
src/config.py: """Settings for vector store (pgvector, Turbopuffer, or LanceDB)."""
src/config.py: TYPE: Literal["pgvector", "turbopuffer", "lancedb"] = "pgvector"
src/crud/session.py: # Fetch all MessageEmbedding records to build vector IDs with {message_id}_{chunk_index}
src/crud/message.py: # True when: TYPE=pgvector OR still migrating (dual-write to both stores)
src/crud/message.py: settings.VECTOR_STORE.TYPE == "pgvector"
src/crud/message.py: # If no external vector store (pgvector-only mode), mark as synced immediately
src/crud/message.py:async def _search_messages_pgvector(
src/crud/message.py: """Run semantic message search against pgvector-backed embeddings."""
src/crud/message.py: # pgvector path: cosine distance in SQL
src/crud/message.py: # produce duplicate rows; we deduplicate in Python to preserve HNSW
src/crud/message.py: if settings.VECTOR_STORE.TYPE != "pgvector" and settings.VECTOR_STORE.MIGRATED:
src/crud/message.py: snippets = await _search_messages_pgvector(
src/vector_store/__init__.py: Returns None if TYPE='pgvector' since pgvector operations happen via ORM directly.
src/vector_store/__init__.py: The external vector store instance, or None if using pgvector (ORM handles it).
src/vector_store/__init__.py: if settings.VECTOR_STORE.TYPE == "pgvector":
src/crud/document.py:def _uses_pgvector() -> bool:
src/crud/document.py: """Check whether queries should go through pgvector (DB-only) path."""
src/crud/document.py: settings.VECTOR_STORE.TYPE == "pgvector" or not settings.VECTOR_STORE.MIGRATED
src/crud/document.py: or None when the pgvector (DB-only) path should be used instead.
src/crud/document.py: if _uses_pgvector():
src/crud/document.py:async def _query_documents_pgvector(
src/crud/document.py: """pgvector similarity search — pure DB operation."""
src/crud/document.py: if _uses_pgvector():
src/crud/document.py: # pgvector path — pure DB, open a short session if none provided
src/crud/document.py: return await _query_documents_pgvector(
src/startup/embedding_validator.py:not match the physical pgvector schema. Replaces an earlier config-time guard
src/startup/embedding_validator.py: dims = await _introspect_pgvector_dims_with_retry(engine, schema)
src/startup/embedding_validator.py: _assert_pgvector_dims_match(dims, schema=schema, target_dim=target_dim)
src/startup/embedding_validator.py:async def _introspect_pgvector_dims_with_retry(
src/startup/embedding_validator.py: return await _introspect_pgvector_dims_once(engine, schema)
src/startup/embedding_validator.py:async def _introspect_pgvector_dims_once(
src/startup/embedding_validator.py:def _assert_pgvector_dims_match(
src/startup/embedding_validator.py: # pgvector stores the declared dim directly in atttypmod (no VARHDRSZ).
src/startup/embedding_validator.py: # Settings said TYPE != pgvector but the store could not be created.
src/models.py:from pgvector.sqlalchemy import Vector
src/models.py: # HNSW index on embedding column for efficient similarity search
src/models.py: "ix_message_embeddings_embedding_hnsw",
src/models.py: postgresql_using="hnsw",
src/models.py: # HNSW index on embedding column
src/models.py: "ix_documents_embedding_hnsw",
src/models.py: postgresql_using="hnsw", # HNSW index type
src/models.py: postgresql_with={"m": 16, "ef_construction": 64}, # HNSW parameters
src/main.py: # pgvector columns, the process refuses to start rather than silently
src/db.py: # Install pgvector extension if it doesn't exist
src/reconciler/sync_vectors.py: # True when using pgvector OR during migration (dual-write to both stores)
src/reconciler/sync_vectors.py: settings.VECTOR_STORE.TYPE == "pgvector" or not settings.VECTOR_STORE.MIGRATED
src/reconciler/sync_vectors.py: # True when using pgvector OR during migration (dual-write to both stores)
src/reconciler/sync_vectors.py: settings.VECTOR_STORE.TYPE == "pgvector" or not settings.VECTOR_STORE.MIGRATED
src/reconciler/sync_vectors.py:async def _cleanup_soft_deleted_documents_pgvector(
src/reconciler/sync_vectors.py: # Hard delete directly (no vector store cleanup needed in pgvector mode)
src/reconciler/sync_vectors.py: logger.debug(f"Cleaned up {len(doc_ids)} soft-deleted documents (pgvector mode)")
src/reconciler/sync_vectors.py:async def _cleanup_pgvector_batch(
src/reconciler/sync_vectors.py: Clean up a single batch of soft-deleted documents in pgvector-only mode.
src/reconciler/sync_vectors.py: async with tracked_db("reconciliation_pgvector_cleanup") as db:
scripts/configure_embeddings.py:"""Configure pgvector schema dim to match EMBEDDING_VECTOR_DIMENSIONS.
scripts/configure_embeddings.py:class _PgvectorPlan:
scripts/configure_embeddings.py:# pgvector phase
scripts/configure_embeddings.py:async def _introspect_pgvector(conn: AsyncConnection, schema: str) -> dict[str, int]:
scripts/configure_embeddings.py: pgvector stores the declared dim directly in ``atttypmod`` (no VARHDRSZ).
scripts/configure_embeddings.py:async def _build_pgvector_plan(
scripts/configure_embeddings.py:) -> _PgvectorPlan:
scripts/configure_embeddings.py: current = await _introspect_pgvector(conn, schema)
scripts/configure_embeddings.py: return _PgvectorPlan(
scripts/configure_embeddings.py:async def _fetch_hnsw_index_defs(
src/utils/search.py:def _uses_pgvector_message_search() -> bool:
src/utils/search.py: settings.VECTOR_STORE.TYPE == "pgvector" or not settings.VECTOR_STORE.MIGRATED
src/utils/search.py:async def _semantic_search_pgvector(
src/utils/search.py: Perform semantic message search using pgvector in Postgres.
src/utils/search.py: # produce duplicate rows; we deduplicate in Python to preserve HNSW
src/utils/search.py: if not _uses_pgvector_message_search():
src/utils/search.py: if _uses_pgvector_message_search():
src/utils/search.py: semantic_results = await _semantic_search_pgvector(
migrations/versions/119a52b73c60_support_external_embeddings.py:from pgvector.sqlalchemy import Vector
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py:"""add hnsw index to documents table
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: # Create HNSW index on the embedding column for the documents table for cosine distance
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: f"Creating HNSW index idx_documents_embedding_hnsw on {schema}.documents table"
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: CREATE INDEX idx_documents_embedding_hnsw ON {schema}.documents
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: USING hnsw (embedding vector_cosine_ops)
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: f"HNSW index idx_documents_embedding_hnsw created on {schema}.documents table"
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: f"Error creating HNSW index idx_documents_embedding_hnsw on {schema}.documents table: {e}"
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: f"Dropping HNSW index idx_documents_embedding_hnsw from {schema}.documents table"
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: op.execute(text(f"DROP INDEX IF EXISTS {schema}.idx_documents_embedding_hnsw;"))
migrations/versions/66e63cf2cf77_add_indexes_to_documents_table.py: f"HNSW index idx_documents_embedding_hnsw dropped from {schema}.documents table"
migrations/versions/917195d9b5e9_add_messageembedding_table.py:from pgvector.sqlalchemy import Vector
migrations/versions/917195d9b5e9_add_messageembedding_table.py: # Create HNSW index for vector similarity search
migrations/versions/917195d9b5e9_add_messageembedding_table.py: CREATE INDEX idx_message_embeddings_embedding_hnsw
migrations/versions/917195d9b5e9_add_messageembedding_table.py: USING hnsw (embedding vector_cosine_ops)
migrations/versions/917195d9b5e9_add_messageembedding_table.py: "idx_message_embeddings_embedding_hnsw",
migrations/versions/a1b2c3d4e5f6_initial_schema.py:from pgvector.sqlalchemy import Vector
migrations/versions/baa22cad81e2_standardize_constraint_names.py: if index_exists("documents", "idx_documents_embedding_hnsw", inspector):
migrations/versions/baa22cad81e2_standardize_constraint_names.py: f'ALTER INDEX {schema}."idx_documents_embedding_hnsw" RENAME TO "ix_documents_embedding_hnsw"'
migrations/versions/baa22cad81e2_standardize_constraint_names.py: "message_embeddings", "idx_message_embeddings_embedding_hnsw", inspector
migrations/versions/baa22cad81e2_standardize_constraint_names.py: f'ALTER INDEX {schema}."idx_message_embeddings_embedding_hnsw" RENAME TO "ix_message_embeddings_embedding_hnsw"'
migrations/versions/baa22cad81e2_standardize_constraint_names.py: f'ALTER INDEX {schema}."ix_message_embeddings_embedding_hnsw" RENAME TO "idx_message_embeddings_embedding_hnsw"'
migrations/versions/baa22cad81e2_standardize_constraint_names.py: f'ALTER INDEX {schema}."ix_documents_embedding_hnsw" RENAME TO "idx_documents_embedding_hnsw"'
migrations/env.py: # Install pgvector extension if it doesn't exist
Script executed:
# Look at the configure_embeddings.py file to understand how target_dim is used
fd -t f "configure_embeddings.py" -x cat -n {}Repository: plastic-labs/honcho
Repository: plastic-labs/honcho
Exit code: 0
stdout:
1 """Phase 3: configure_embeddings script tests."""
2
3 from __future__ import annotations
4
5 from collections.abc import AsyncGenerator
6 from contextlib import asynccontextmanager
7
8 import pytest
9 from sqlalchemy import text
10 from sqlalchemy.ext.asyncio import AsyncEngine
11
12 from scripts.configure_embeddings import (
13 _apply_pgvector_alter, # pyright: ignore[reportPrivateUsage]
14 _build_pgvector_plan, # pyright: ignore[reportPrivateUsage]
15 )
16
17 # ---------------------------------------------------------------------------
18 # Fixtures
19 # ---------------------------------------------------------------------------
20
21
22 `@asynccontextmanager`
23 async def _restore_schema_to(db_engine: AsyncEngine, dim: int) -> AsyncGenerator[None]:
24 """ALTER both embedding columns back to ``dim`` on exit so this test
25 leaves the shared test DB in a consistent state for subsequent tests."""
26 try:
27 yield
28 finally:
29 async with db_engine.begin() as conn:
30 for table in ("documents", "message_embeddings"):
31 await conn.execute(
32 text(
33 f"ALTER TABLE {table} ALTER COLUMN embedding"
34 + f" TYPE vector({dim}) USING NULL"
35 )
36 )
37
38
39 async def _current_dims(db_engine: AsyncEngine) -> dict[str, int]:
40 async with db_engine.connect() as conn:
41 result = await conn.execute(
42 text(
43 """
44 SELECT c.relname AS table_name, a.atttypmod AS typmod
45 FROM pg_attribute a
46 JOIN pg_class c ON a.attrelid = c.oid
47 JOIN pg_namespace n ON c.relnamespace = n.oid
48 WHERE n.nspname = 'public'
49 AND c.relname = ANY(:tables)
50 AND a.attname = 'embedding'
51 """
52 ),
53 {"tables": ["documents", "message_embeddings"]},
54 )
55 return {row.table_name: row.typmod for row in result}
56
57
58 async def _hnsw_indexes(db_engine: AsyncEngine) -> set[str]:
59 async with db_engine.connect() as conn:
60 result = await conn.execute(
61 text(
62 """
63 SELECT indexname
64 FROM pg_indexes
65 WHERE schemaname = 'public'
66 AND tablename IN ('documents', 'message_embeddings')
67 AND indexdef ILIKE '%USING hnsw%'
68 """
69 )
70 )
71 return {row.indexname for row in result}
72
73
74 # ---------------------------------------------------------------------------
75 # Plan
76 # ---------------------------------------------------------------------------
77
78
79 `@pytest.mark.asyncio`
80 async def test_plan_no_alter_needed_when_dims_already_match(
81 db_engine: AsyncEngine,
82 ) -> None:
83 plan = await _build_pgvector_plan(db_engine, target_dim=1536, schema="public")
84 assert plan.needs_alter is False
85 assert plan.current_dims == {"documents": 1536, "message_embeddings": 1536}
86
87
88 `@pytest.mark.asyncio`
89 async def test_plan_needs_alter_when_target_differs(db_engine: AsyncEngine) -> None:
90 plan = await _build_pgvector_plan(db_engine, target_dim=768, schema="public")
91 assert plan.needs_alter is True
92
93
94 `@pytest.mark.asyncio`
95 async def test_plan_raises_on_missing_column(db_engine: AsyncEngine) -> None:
96 with pytest.raises(SystemExit, match="required vector columns missing"):
97 await _build_pgvector_plan(db_engine, target_dim=1536, schema="no_such_schema")
98
99
100 # ---------------------------------------------------------------------------
101 # Apply
102 # ---------------------------------------------------------------------------
103
104
105 `@pytest.mark.asyncio`
106 async def test_apply_alters_dims_and_recreates_hnsw_indexes(
107 db_engine: AsyncEngine,
108 ) -> None:
109 # 768 is the canonical "small" dim used in non-1536 deployments and is
110 # well below pgvector's 2000-dim HNSW limit.
111 target = 768
112 async with _restore_schema_to(db_engine, dim=1536):
113 before_indexes = await _hnsw_indexes(db_engine)
114 assert before_indexes, "test fixture should have HNSW indexes pre-alter"
115
116 plan = await _build_pgvector_plan(db_engine, target_dim=target, schema="public")
117 assert plan.needs_alter is True
118 await _apply_pgvector_alter(db_engine, plan)
119
120 after_dims = await _current_dims(db_engine)
121 assert after_dims == {"documents": target, "message_embeddings": target}
122
123 after_indexes = await _hnsw_indexes(db_engine)
124 assert (
125 after_indexes == before_indexes
126 ), "HNSW indexes should be recreated with the same names"
127
128
129 `@pytest.mark.asyncio`
130 async def test_apply_refuses_when_embeddings_populated(
131 db_engine: AsyncEngine,
132 monkeypatch: pytest.MonkeyPatch,
133 ) -> None:
134 """ALTER ... USING NULL would silently wipe non-null embeddings, so the
135 pre-check must abort the transaction before any destructive action.
136
137 We patch the count helper to simulate populated tables rather than wire
138 up the full FK chain of workspace/peer/collection/document just to land
139 one vector row.
140 """
141
142 async def fake_count(_conn: object, _schema: str, table: str) -> int:
143 return 7 if table == "documents" else 0
144
145 monkeypatch.setattr(
146 "scripts.configure_embeddings._count_non_null_embeddings",
147 fake_count,
148 )
149
150 async with _restore_schema_to(db_engine, dim=1536):
151 plan = await _build_pgvector_plan(db_engine, target_dim=768, schema="public")
152 with pytest.raises(
153 SystemExit, match="refusing to ALTER populated embedding tables"
154 ):
155 await _apply_pgvector_alter(db_engine, plan)
156
157 # The SystemExit aborts the transaction; nothing should have changed.
158 dims_after_refuse = await _current_dims(db_engine)
159 assert dims_after_refuse == {
160 "documents": 1536,
161 "message_embeddings": 1536,
162 }
163
164
165 # ---------------------------------------------------------------------------
166 # Idempotency
167 # ---------------------------------------------------------------------------
168
169
170 `@pytest.mark.asyncio`
171 async def test_idempotent_apply_is_a_noop(db_engine: AsyncEngine) -> None:
172 """Build plan twice with the matching dim — second call should still
173 return needs_alter=False without raising or making any changes."""
174 plan_a = await _build_pgvector_plan(db_engine, target_dim=1536, schema="public")
175 plan_b = await _build_pgvector_plan(db_engine, target_dim=1536, schema="public")
176 assert plan_a.needs_alter is False
177 assert plan_b.needs_alter is False
178 assert plan_a.current_dims == plan_b.current_dims
1 """Configure pgvector schema dim to match EMBEDDING_VECTOR_DIMENSIONS.
2
3 Usage::
4
5 uv run python scripts/configure_embeddings.py # interactive
6 uv run python scripts/configure_embeddings.py --dry-run # print intent, no DB write
7 uv run python scripts/configure_embeddings.py --yes # apply without prompt
8 uv run python scripts/configure_embeddings.py --report # full external-store inventory
9
10 The bootstrap sequence for a self-hosted install is:
11
12 1. alembic upgrade head # creates default vector(1536) schema
13 2. uv run python scripts/configure_embeddings.py # ALTER columns to target dim
14 3. start the API and deriver # validators refuse to start on mismatch
15
16 Existing 1536 deployments need no action — step 2 is a no-op when settings
17 already match the schema.
18
19 This script never creates or modifies external-store namespaces. Turbopuffer
20 and LanceDB namespaces are per-workspace and lazy-created on first write by
21 application code; their dim is implicitly pinned at that point. Use
22 ``--report`` to enumerate existing namespaces against the configured dim.
23 """
24
25 from __future__ import annotations
26
27 import argparse
28 import asyncio
29 import logging
30 import os
31 import re
32 import sys
33 from dataclasses import dataclass
34
35 # Match the path-shim convention used by the other scripts in this directory
36 # so `src.*` imports resolve when the script is run directly.
37 _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
38 if _PROJECT_ROOT not in sys.path:
39 sys.path.insert(0, _PROJECT_ROOT)
40
41 from sqlalchemy import select, text # noqa: E402
42 from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine # noqa: E402
43
44 from src.config import settings # noqa: E402
45 from src.db import engine # noqa: E402
46 from src.models import Collection, Workspace # noqa: E402
47 from src.vector_store import VectorStore # noqa: E402
48
49 logger = logging.getLogger(__name__)
50
51 _EMBEDDING_TABLES: tuple[str, ...] = ("documents", "message_embeddings")
52
53 # Defense-in-depth for the dynamic SQL paths in this script. The values we
54 # interpolate (schema from settings, index names from pg_indexes, table
55 # names from a hardcoded constant) are not user input under any threat
56 # model we currently care about, but validating once at the top of the
57 # pipeline keeps the constraint explicit and the surface tight.
58 _SAFE_IDENT_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
59
60
61 def _validate_identifier(name: str, *, kind: str) -> None:
62 if not _SAFE_IDENT_PATTERN.fullmatch(name):
63 raise SystemExit(
64 f"error: refusing to interpolate {kind} {name!r} into SQL —"
65 + " expected a SQL identifier of the form [A-Za-z_][A-Za-z0-9_]*."
66 + " Reconfigure your environment and re-run."
67 )
68
69
70 # ---------------------------------------------------------------------------
71 # Result types
72 # ---------------------------------------------------------------------------
73
74
75 `@dataclass`(frozen=True)
76 class _PgvectorPlan:
77 target_dim: int
78 schema: str
79 current_dims: dict[str, int]
80 needs_alter: bool
81
82
83 `@dataclass`(frozen=True)
84 class _NamespaceRecord:
85 """A single row in the --report output."""
86
87 namespace: str
88 status: str # one of: "ok", "missing", "mismatch"
89 actual_dim: int | None
90 target_dim: int
91
92
93 # ---------------------------------------------------------------------------
94 # pgvector phase
95 # ---------------------------------------------------------------------------
96
97
98 async def _introspect_pgvector(conn: AsyncConnection, schema: str) -> dict[str, int]:
99 """Return ``{table_name: atttypmod}`` for embedding columns in ``schema``.
100
101 Tables not present in the result dict are absent from the schema.
102 pgvector stores the declared dim directly in ``atttypmod`` (no VARHDRSZ).
103 """
104 query = text(
105 """
106 SELECT c.relname AS table_name, a.atttypmod AS typmod
107 FROM pg_attribute a
108 JOIN pg_class c ON a.attrelid = c.oid
109 JOIN pg_namespace n ON c.relnamespace = n.oid
110 WHERE n.nspname = :schema
111 AND c.relname = ANY(:tables)
112 AND a.attname = 'embedding'
113 """
114 )
115 result = await conn.execute(
116 query,
117 {"schema": schema, "tables": list(_EMBEDDING_TABLES)},
118 )
119 return {row.table_name: row.typmod for row in result}
120
121
122 async def _build_pgvector_plan(
123 engine: AsyncEngine, target_dim: int, schema: str
124 ) -> _PgvectorPlan:
125 """Build a plan describing what (if anything) the script will change."""
126 async with engine.connect() as conn:
127 current = await _introspect_pgvector(conn, schema)
128
129 missing = set(_EMBEDDING_TABLES) - current.keys()
130 if missing:
131 listing = ", ".join(sorted(f"{schema}.{t}.embedding" for t in missing))
132 raise SystemExit(
133 f"error: required vector columns missing: {listing}."
134 + " Run `alembic upgrade head` first."
135 )
136 for table, typmod in current.items():
137 if typmod == -1:
138 raise SystemExit(
139 f"error: {schema}.{table}.embedding has no declared vector"
140 + " dimension (unbounded typmod). Drop and recreate the column"
141 + " or restore from a versioned migration before re-running."
142 )
143
144 needs_alter = any(typmod != target_dim for typmod in current.values())
145 return _PgvectorPlan(
146 target_dim=target_dim,
147 schema=schema,
148 current_dims=current,
149 needs_alter=needs_alter,
150 )
151
152
153 async def _count_non_null_embeddings(
154 conn: AsyncConnection, schema: str, table: str
155 ) -> int:
156 query = text(
157 f'SELECT COUNT(*) AS n FROM "{schema}"."{table}" WHERE embedding IS NOT NULL'
158 )
159 result = await conn.execute(query)
160 row = result.first()
161 return int(row.n) if row is not None else 0
162
163
164 async def _fetch_hnsw_index_defs(
165 conn: AsyncConnection, schema: str
166 ) -> list[tuple[str, str]]:
167 """Return ``(index_name, CREATE INDEX ...)`` for HNSW indices on the
168 embedding columns. We re-CREATE them after the ALTER using these exact
169 definitions, preserving operator-set params (m, ef_construction, etc.)."""
170 query = text(
171 """
172 SELECT indexname AS name, indexdef AS ddl
173 FROM pg_indexes
174 WHERE schemaname = :schema
175 AND tablename = ANY(:tables)
176 AND indexdef ILIKE '%USING hnsw%'
177 """
178 )
179 result = await conn.execute(
180 query, {"schema": schema, "tables": list(_EMBEDDING_TABLES)}
181 )
182 return [(row.name, row.ddl) for row in result]
183
184
185 async def _apply_pgvector_alter(engine: AsyncEngine, plan: _PgvectorPlan) -> None:
186 """ALTER the embedding columns to ``plan.target_dim`` in a single
187 transaction. Refuses to proceed if any non-null embeddings exist.
188
189 Sequence (inside the transaction):
190 1. LOCK TABLE ... IN ACCESS EXCLUSIVE MODE — closes the TOCTOU window
191 between the population check and the ALTER.
192 2. SELECT COUNT(embedding IS NOT NULL) per table — refuse if any > 0.
193 3. Save HNSW index definitions, then DROP them (cannot ALTER under HNSW).
194 4. ALTER ... ALTER COLUMN embedding TYPE vector(N) USING NULL.
195 5. Recreate HNSW indices from saved definitions.
196 """
197 async with engine.begin() as conn:
198 # Step 1: lock both tables for the duration of the transaction.
199 for table in _EMBEDDING_TABLES:
200 await conn.execute(
201 text(f'LOCK TABLE "{plan.schema}"."{table}" IN ACCESS EXCLUSIVE MODE')
202 )
203
204 # Step 2: population check.
205 counts: dict[str, int] = {}
206 for table in _EMBEDDING_TABLES:
207 counts[table] = await _count_non_null_embeddings(conn, plan.schema, table)
208 populated = {t: n for t, n in counts.items() if n > 0}
209 if populated:
210 detail = ", ".join(f"{t}: {n} rows" for t, n in sorted(populated.items()))
211 raise SystemExit(
212 f"error: refusing to ALTER populated embedding tables ({detail})."
213 + " This script only configures empty tables. Re-embed out-of-band"
214 + " into a fresh deployment, then cut over."
215 )
216
217 # Step 3: snapshot + drop HNSW indices.
218 index_defs = await _fetch_hnsw_index_defs(conn, plan.schema)
219 for index_name, _ddl in index_defs:
220 _validate_identifier(index_name, kind="HNSW index name")
221 logger.info("dropping HNSW index %s", index_name)
222 await conn.execute(text(f'DROP INDEX "{plan.schema}"."{index_name}"'))
223
224 # Step 4: ALTER columns.
225 for table in _EMBEDDING_TABLES:
226 logger.info(
227 "altering %s.%s.embedding to vector(%d)",
228 plan.schema,
229 table,
230 plan.target_dim,
231 )
232 await conn.execute(
233 text(
234 f'ALTER TABLE "{plan.schema}"."{table}"'
235 + f" ALTER COLUMN embedding TYPE vector({plan.target_dim})"
236 + " USING NULL"
237 )
238 )
239
240 # Step 5: recreate HNSW indices from the saved definitions.
241 for index_name, ddl in index_defs:
242 logger.info("recreating HNSW index %s", index_name)
243 await conn.execute(text(ddl))
244
245
246 # ---------------------------------------------------------------------------
247 # External-store report
248 # ---------------------------------------------------------------------------
249
250
251 async def _enumerate_workspaces(conn: AsyncConnection) -> list[str]:
252 """All workspace names, ordered by creation. Uses the ORM so
253 ``Base.metadata.schema`` (configured from ``DB.SCHEMA``) is honored —
254 non-public schema deployments must not sample the wrong table."""
255 stmt = select(Workspace.name).order_by(Workspace.created_at)
256 result = await conn.execute(stmt)
257 return [row[0] for row in result]
258
259
260 async def _enumerate_collections(
261 conn: AsyncConnection,
262 ) -> list[tuple[str, str, str]]:
263 """Every (workspace_name, observer, observed) triple that has a row in
264 the collections table — these are the document namespaces that could
265 exist in an external store."""
266 stmt = select(Collection.workspace_name, Collection.observer, Collection.observed)
267 result = await conn.execute(stmt)
268 return [(row[0], row[1], row[2]) for row in result]
269
270
271 async def _build_external_namespace_inventory(
272 engine: AsyncEngine,
273 ) -> list[tuple[str, str]]:
274 """Return ``(namespace_type, namespace_name)`` pairs for every namespace
275 that should exist based on the application DB. Message namespaces are
276 derived per workspace, document namespaces per collection row.
277 """
278 from src.vector_store import get_external_vector_store
279
280 store = get_external_vector_store()
281 if store is None:
282 return []
283
284 async with engine.connect() as conn:
285 workspace_names = await _enumerate_workspaces(conn)
286 collection_keys = await _enumerate_collections(conn)
287
288 pairs: list[tuple[str, str]] = []
289 for workspace_name in workspace_names:
290 pairs.append(("message", store.get_vector_namespace("message", workspace_name)))
291 for workspace_name, observer, observed in collection_keys:
292 pairs.append(
293 (
294 "document",
295 store.get_vector_namespace(
296 "document", workspace_name, observer=observer, observed=observed
297 ),
298 )
299 )
300 return pairs
301
302
303 async def _probe_namespace_dim(store: VectorStore, namespace: str) -> int | None:
304 """Return the namespace's declared dim, or ``None`` if the namespace
305 does not exist yet. Delegates to the store-specific probe."""
306 return await store.probe_namespace_dim(namespace)
307
308
309 async def _emit_report(
310 engine: AsyncEngine, target_dim: int, *, is_report_mode: bool
311 ) -> int:
312 """Print the per-namespace inventory and return an exit code. 0 on a
313 clean report (all matching or missing); non-zero on any mismatch.
314
315 ``is_report_mode=True`` means the operator explicitly invoked ``--report``
316 — only then do we print the "no effect with pgvector" notice. Implicit
317 post-apply calls from interactive/dry-run/yes mode stay silent when the
318 deployment is on pgvector.
319 """
320 if settings.VECTOR_STORE.TYPE == "pgvector":
321 if is_report_mode:
322 print("--report has no effect with VECTOR_STORE_TYPE=pgvector")
323 return 0
324
325 inventory = await _build_external_namespace_inventory(engine)
326 if not inventory:
327 print(
328 "no external namespaces to inventory"
329 + " (no workspaces/collections exist yet, or no external store configured)"
330 )
331 return 0
332
333 from src.vector_store import get_external_vector_store
334
335 store = get_external_vector_store()
336 if store is None:
337 print("no external store configured; nothing to report")
338 return 0
339
340 records: list[_NamespaceRecord] = []
341 for _ns_type, namespace in inventory:
342 actual = await _probe_namespace_dim(store, namespace)
343 if actual is None:
344 # Namespace has not been written to yet (lazy-create model).
345 status = "missing"
346 elif actual == target_dim:
347 status = "ok"
348 else:
349 status = "mismatch"
350 records.append(
351 _NamespaceRecord(
352 namespace=namespace,
353 status=status,
354 actual_dim=actual,
355 target_dim=target_dim,
356 )
357 )
358
359 width = max(len(r.namespace) for r in records)
360 print(f"{'namespace'.ljust(width)} status dim")
361 print(f"{'-' * width} --------- ------")
362 for r in records:
363 dim_str = "?" if r.actual_dim is None else str(r.actual_dim)
364 print(f"{r.namespace.ljust(width)} {r.status:<9} {dim_str}")
365
366 mismatches = [r for r in records if r.status == "mismatch"]
367 if mismatches:
368 print(
369 f"\nerror: {len(mismatches)} namespace(s) have dim != {target_dim}",
370 file=sys.stderr,
371 )
372 return 1
373 return 0
374
375
376 # ---------------------------------------------------------------------------
377 # CLI
378 # ---------------------------------------------------------------------------
379
380
381 def _build_parser() -> argparse.ArgumentParser:
382 parser = argparse.ArgumentParser(
383 prog="configure_embeddings",
384 description=(
385 "Configure pgvector schema dim to match EMBEDDING_VECTOR_DIMENSIONS."
386 ),
387 )
388 mode = parser.add_mutually_exclusive_group()
389 mode.add_argument(
390 "--dry-run",
391 action="store_true",
392 help="print intended changes and exit without touching the DB",
393 )
394 mode.add_argument(
395 "--yes",
396 action="store_true",
397 help="apply changes without an interactive prompt",
398 )
399 mode.add_argument(
400 "--report",
401 action="store_true",
402 help="print external-store namespace inventory and exit",
403 )
404 return parser
405
406
407 def _confirm(prompt: str) -> bool:
408 response = input(f"{prompt} [y/N]: ").strip().lower()
409 return response in {"y", "yes"}
410
411
412 async def _async_main(args: argparse.Namespace) -> int:
413 try:
414 return await _run_pipeline(args)
415 finally:
416 # Dispose inside the same event loop so cleanup doesn't spin up a
417 # second loop just to await engine.dispose().
418 await engine.dispose()
419
420
421 async def _run_pipeline(args: argparse.Namespace) -> int:
422 target_dim = settings.EMBEDDING.VECTOR_DIMENSIONS
423 schema = settings.DB.SCHEMA
424 _validate_identifier(schema, kind="DB.SCHEMA")
425
426 if args.report:
427 return await _emit_report(engine, target_dim, is_report_mode=True)
428
429 plan = await _build_pgvector_plan(engine, target_dim, schema)
430 if not plan.needs_alter:
431 print(
432 f"pgvector: {schema}.documents.embedding and"
433 + f" {schema}.message_embeddings.embedding already at dim {target_dim},"
434 + " skipping ALTER"
435 )
436 return await _emit_report(engine, target_dim, is_report_mode=False)
437
438 current_summary = ", ".join(
439 f"{schema}.{t}.embedding={plan.current_dims[t]}" for t in _EMBEDDING_TABLES
440 )
441 print(f"target dim: {target_dim}")
442 print(f"current: {current_summary}")
443 print("planned operations (single transaction):")
444 print(f" - LOCK TABLE {schema}.documents IN ACCESS EXCLUSIVE MODE")
445 print(f" - LOCK TABLE {schema}.message_embeddings IN ACCESS EXCLUSIVE MODE")
446 print(" - refuse if any non-null embeddings exist")
447 print(" - DROP existing HNSW indices on the embedding columns")
448 print(
449 f" - ALTER COLUMN embedding TYPE vector({target_dim}) USING NULL"
450 + " on both tables"
451 )
452 print(" - CREATE HNSW indices from snapshotted definitions")
453
454 if args.dry_run:
455 print("\n--dry-run: no changes applied")
456 return 0
457
458 if not args.yes and not _confirm("apply?"):
459 print("aborted")
460 return 1
461
462 await _apply_pgvector_alter(engine, plan)
463 print(f"\npgvector schema is now at dim {target_dim}")
464 return await _emit_report(engine, target_dim, is_report_mode=False)
465
466
467 def main(argv: list[str] | None = None) -> int:
468 logging.basicConfig(level=logging.INFO, format="%(message)s")
469 parser = _build_parser()
470 args = parser.parse_args(argv)
471 return asyncio.run(_async_main(args))
472
473
474 if __name__ == "__main__":
475 raise SystemExit(main())
Add early validation for HNSW-compatible embedding dimensions.
The pgvector vector type supports up to 16,000 dimensions, but HNSW indexing—used by this codebase—is limited to 2,000 dimensions. A config like 3,072 will fail late during the ALTER when attempting to recreate HNSW indexes, rather than producing a clear validation error upfront. (github.com)
🛠️ Proposed fix
+_HNSW_MAX_DIM = 2000
+
...
async def _run_pipeline(args: argparse.Namespace) -> int:
target_dim = settings.EMBEDDING.VECTOR_DIMENSIONS
+ if not 1 <= target_dim <= _HNSW_MAX_DIM:
+ raise SystemExit(
+ "error: EMBEDDING_VECTOR_DIMENSIONS must be between 1 and "
+ f"{_HNSW_MAX_DIM} for HNSW-indexed deployments"
+ )
schema = settings.DB.SCHEMA
_validate_identifier(schema, kind="DB.SCHEMA")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@scripts/configure_embeddings.py` around lines 421 - 424, Add an early
validation in _run_pipeline to check settings.EMBEDDING.VECTOR_DIMENSIONS
against the HNSW limit (2000) and fail fast with a clear error message; after
retrieving target_dim and before proceeding (and after
_validate_identifier(schema, ...)), if target_dim > 2000 raise/exit with a
descriptive error indicating HNSW indexing supports at most 2000 dimensions and
instruct the user to reduce EMBEDDING.VECTOR_DIMENSIONS to <=2000 so the later
ALTER to recreate HNSW indexes won't fail.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/sdk_typescript/conftest.py (1)
97-121: ⚡ Quick winEnsure cleanup is guaranteed even when setup fails before startup.
try/finallystarts after some mutable setup state is created. If any pre-trystep raises, fixture state can leak (app.dependency_overrides,_ts_session_factory, monkeypatch) and cause cross-test flakiness. Wrap all setup/startup in the sametry/finally(or useExitStack) so teardown is deterministic.Proposed refactor
- mp = pytest.MonkeyPatch() - mp.setattr("src.main.validate_embedding_schema", _skip_validate) - - # Start the server - server = TestServer(app, port) - try: + mp = pytest.MonkeyPatch() + server: TestServer | None = None + try: + mp.setattr("src.main.validate_embedding_schema", _skip_validate) + server = TestServer(app, port) server.start() yield f"http://127.0.0.1:{port}" finally: # Cleanup - server.stop() - app.dependency_overrides.clear() - _ts_session_factory = None - mp.undo() + try: + if server is not None: + server.stop() + finally: + app.dependency_overrides.clear() + _ts_session_factory = None + mp.undo()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/sdk_typescript/conftest.py` around lines 97 - 121, The fixture currently sets up app.dependency_overrides, _ts_session_factory, and a MonkeyPatch (mp) before entering the try/finally that starts/stops TestServer, which can leak state if an exception occurs during setup; move all mutable setup (assigning app.dependency_overrides[get_db]=override_get_db, creating mp and calling mp.setattr for src.main.validate_embedding_schema, assigning _ts_session_factory, and instantiating server = TestServer(app, port)) inside the same try block (or use contextlib.ExitStack) so the finally always runs and always performs cleanup (server.stop(), app.dependency_overrides.clear(), _ts_session_factory = None, mp.undo()); ensure references to override_get_db, _skip_validate, mp, server, TestServer, and app.dependency_overrides are preserved and undone in the finally.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@tests/sdk_typescript/conftest.py`:
- Around line 97-121: The fixture currently sets up app.dependency_overrides,
_ts_session_factory, and a MonkeyPatch (mp) before entering the try/finally that
starts/stops TestServer, which can leak state if an exception occurs during
setup; move all mutable setup (assigning
app.dependency_overrides[get_db]=override_get_db, creating mp and calling
mp.setattr for src.main.validate_embedding_schema, assigning
_ts_session_factory, and instantiating server = TestServer(app, port)) inside
the same try block (or use contextlib.ExitStack) so the finally always runs and
always performs cleanup (server.stop(), app.dependency_overrides.clear(),
_ts_session_factory = None, mp.undo()); ensure references to override_get_db,
_skip_validate, mp, server, TestServer, and app.dependency_overrides are
preserved and undone in the finally.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: dd2abccd-b251-4350-a150-a8538ba29e4c
📒 Files selected for processing (1)
tests/sdk_typescript/conftest.py
Sync fork with upstream through 9f26fdd (v3.0.2 → v3.0.9). No conflicts; local CLAUDE.md (GitNexus block) auto-merged. Notable: base_url threading to LLM clients (plastic-labs#643), configurable embeddings (plastic-labs#678), deriver custom instructions (plastic-labs#609). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Summary by CodeRabbit
New Features
Documentation
Chores
Tests