feat(executionplans): implement policy-resolved execution plans#180
feat(executionplans): implement policy-resolved execution plans#180SantiagoDePolonia merged 5 commits intomainfrom
Conversation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds an execution-plans subsystem (persistence, compilation, in-memory service, registry) and threads policy resolution into request flow; request-scoped plans now gate audit, usage, caching, and guardrails, and execution-plan version IDs are persisted to logs and batch records. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant HTTP as HTTP Server
participant PlanMW as ExecutionPlan MW
participant Service as ExecPlan Service
participant Store as ExecPlan Store
participant Compiler as Compiler
participant Audit as Audit MW
participant Handler as Request Handler
Client->>HTTP: POST /v1/...
HTTP->>PlanMW: ExecutionPlanningWithResolverAndPolicy(ctx)
PlanMW->>Service: Match(selector)
Service->>Store: ListActive()
Store-->>Service: Active versions
Service->>Compiler: Compile(version)
Compiler->>Service: CompiledPlan (Policy + Pipeline)
Service-->>PlanMW: ResolvedExecutionPolicy
PlanMW->>HTTP: attach plan to context
HTTP->>Audit: auditEnabledForContext(ctx)
alt audit enabled
Audit->>Handler: PopulateRequestData / capture
else
Audit->>Handler: skip
end
Handler->>Handler: gate cache/audit/usage/guardrails on plan features
Handler-->>Client: Response
sequenceDiagram
participant Service as ExecutionPlan Service
participant Store as Store
participant DB as Database
participant Compiler as Compiler
participant Registry as Guardrails Registry
Service->>Store: ListActive()
Store->>DB: SELECT active versions
DB-->>Store: rows
Store-->>Service: versions
Service->>Compiler: Compile(version)
Compiler->>Compiler: Apply process feature caps
alt Guardrails enabled
Compiler->>Registry: BuildPipeline(steps)
Registry-->>Compiler: Pipeline + Hash
end
Compiler-->>Service: CompiledPlan
Service->>Service: atomic replace snapshot
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 12
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/app/app.go`:
- Around line 189-192: The hardcoded refresh interval time.Minute used when
calling executionplans.NewWithSharedStorage and executionplans.New should be
made configurable: add a duration field (e.g., ExecutionPlanRefreshInterval
time.Duration) to the application config struct (and load a sane default like 1m
if unspecified), parse it from env/flags/config loader, then replace the literal
time.Minute in the calls to executionplans.NewWithSharedStorage(ctx,
sharedExecutionPlanStorage, executionPlanCompiler, time.Minute) and
executionplans.New(ctx, appCfg, executionPlanCompiler, time.Minute) with the new
config value so operators can tune the execution plan refresh interval.
In `@internal/auditlog/auditlog_test.go`:
- Around line 636-641: The test currently uses core.DefaultExecutionFeatures()
which makes the Audit flag implicit; update the test setup that constructs the
core.ExecutionPlan (the call to core.WithExecutionPlan with a
core.ResolvedExecutionPolicy) to use an explicit Features value with Audit set
to true (instead of core.DefaultExecutionFeatures()) so the test always
exercises version-ID persistence regardless of package defaults; locate the
instantiation of core.ExecutionPlan / core.ResolvedExecutionPolicy in the
auditlog_test and replace the Features argument with an explicit features struct
(with Audit: true).
In `@internal/auditlog/reader_sqlite.go`:
- Around line 98-99: The rows.Scan call that reads execution_plan_version_id
into e.ExecutionPlanVersionID must handle NULLs; change the scan target for
execution_plan_version_id to a nullable type (e.g., sql.NullString or *string)
in the scan call and then set e.ExecutionPlanVersionID accordingly (assign empty
string or nil as your struct expects) both where rows.Scan is used (the scan at
rows.Scan(&e.ID, &ts, ... &e.ExecutionPlanVersionID, ... ) and the later scan
around line ~305); update any variable names (e.g., executionPlanVersionNS or
execPlanPtr) and the post-scan assignment so NULL rows do not cause scan
conversion errors.
In `@internal/executionplans/service.go`:
- Around line 217-221: The context cancel created by refreshCtx, refreshCancel
:= context.WithTimeout(ctx, 30*time.Second) should be deferred immediately after
creation to guarantee cleanup even if s.Refresh(refreshCtx) panics; move the
call to refreshCancel() into a defer (i.e., defer refreshCancel() right after
the WithTimeout line) and remove the later explicit refreshCancel() invocation
so resources are always released regardless of panic in s.Refresh.
In `@internal/executionplans/store_mongodb.go`:
- Around line 106-120: Wrap the deactivation (UpdateMany) and insertion
(InsertOne) in a MongoDB multi-document transaction so both succeed or both are
rolled back: start a session (client.StartSession or
s.collection.Database().Client().StartSession), use session.WithTransaction (or
explicit StartTransaction/CommitTransaction/AbortTransaction) and run the
UpdateMany and InsertOne calls inside that transaction using the session
context; handle mongo.ErrNoDocuments and unique-index errors by
aborting/returning the wrapped error from the transaction, and ensure the same
session context is passed to s.collection.UpdateMany and s.collection.InsertOne
to avoid leaving the scope without an active plan when input.Activate is true
(the unique index on (scope_key, active=true) must be respected).
In `@internal/executionplans/store_postgresql.go`:
- Around line 98-110: The version allocator using `SELECT COALESCE(MAX(version),
0) + 1` is racy for concurrent Create calls; after starting the transaction
(`tx, err := s.pool.Begin(ctx)`) take a per-scope lock before computing
`nextVersion` (e.g. call `pg_advisory_xact_lock(hashtext(scopeKey))` or
`pg_try_advisory_xact_lock` using `scopeKey`’s hash) and then run the `SELECT
... FROM execution_plan_versions` query to guarantee serialization, or
alternatively add retry logic around the insert to catch the unique `(scope_key,
version)` violation and retry the whole transaction a few times; ensure the
lock/acquire or retry is applied in the same code path that computes
`nextVersion` and performs the insert so `scopeKey`, `nextVersion`, `tx`, and
`execution_plan_versions` are the referenced symbols.
In `@internal/executionplans/store_sqlite.go`:
- Around line 94-106: The allocation of nextVersion using tx.QueryRowContext
with `SELECT COALESCE(MAX(version), 0) + 1 FROM execution_plan_versions WHERE
scope_key = ?` is racy and can cause unique constraint violations; change the
transaction start to an immediate/serialized transaction (e.g., use a BEGIN
IMMEDIATE equivalent instead of s.db.BeginTx) so the MAX+1 allocation is
serialized, or implement retry logic around the insert (catch unique constraint
on (scope_key, version) from the insert and retry the read/insert loop). Update
the logic in store_sqlite.go around the BeginTx/tx creation and the
QueryRowContext + insert path (execution_plan_versions / scope_key allocation)
to use one of these approaches so concurrent creates cannot collide.
In `@internal/executionplans/types.go`:
- Around line 64-80: normalizeScope currently allows provider/model to contain
':' which causes scopeKey to collide (both scopeKey and normalizeScope are
involved); fix by rejecting any ':' in Scope.Provider and Scope.Model during
normalizeScope (trimmed values) and return a validation error (e.g., "scope
fields cannot contain ':'") so keys remain unambiguous; alternatively, if you
prefer to preserve colons, update scopeKey to encode each component (e.g.,
URL-encode or base64) before concatenation so provider/model are
distinguished—implement one of these fixes and ensure tests for scopeKey
uniqueness are updated accordingly.
In `@internal/responsecache/simple.go`:
- Around line 196-203: The nil-plan branch was inverted: change the early check
so that when plan == nil the function returns true (skip caching) instead of
false; update the block containing plan == nil so it returns true, keeping the
subsequent checks using plan.CacheEnabled(), plan.Mode,
core.ExecutionModeTranslated and plan.Resolution unchanged to prevent
cross-provider cache contamination.
In `@internal/server/execution_policy.go`:
- Around line 18-20: The call to resolver.Match can return arbitrary
storage/cache errors that must be normalized before returning; update the error
handling around resolver.Match (where policy, err := resolver.Match(selector) is
called) to check whether err is already a core.GatewayError and return it
unchanged if so, otherwise wrap the error in a core.GatewayError using the
appropriate client-facing category (e.g., "provider_error") and include a short
contextual message about policy resolution so downstream code always receives a
GatewayError.
In `@internal/server/native_batch_service.go`:
- Around line 155-179: storeExecutionPlanForBatch currently propagates errors
returned from applyExecutionPolicy (and ultimately resolver.Match) directly to
callers; update the error handling here so any non-core.GatewayError returned by
applyExecutionPolicy is wrapped into a core.GatewayError before being returned.
Concretely, after calling applyExecutionPolicy(plan, s.executionPolicyResolver,
selector) in storeExecutionPlanForBatch, check the returned error and if it is
not already a core.GatewayError, wrap or convert it to one (using your project's
core.GatewayError constructor/wrapper) and return that wrapped error so clients
always receive a core.GatewayError from resolver.Match()/applyExecutionPolicy.
In `@internal/server/native_batch_support.go`:
- Around line 89-129: determineBatchExecutionSelector currently repeats the
BatchItemRequestedModelSelector/Normalize/ResolveModel loop that another helper
uses to pick the batch provider, causing an extra O(n) pass; refactor so
provider selection and execution-plan selection are derived in a single pass
(either by changing determineBatchExecutionSelector to accept/return the
provider choice or by creating a shared helper that walks req.Requests once and
returns both the resolved common model (or "" if mixed/empty) and the
providerType/selection info). Keep references to
BatchItemRequestedModelSelector, Normalize, and resolver.ResolveModel in that
single walk, and return the combined result (core.ExecutionPlanSelector plus the
provider decision) instead of performing the same resolution twice.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 52742121-209d-428e-8ae8-577047259359
📒 Files selected for processing (42)
docs/adr/0003-policy-resolved-execution-plan.mdinternal/app/app.gointernal/auditlog/auditlog.gointernal/auditlog/auditlog_test.gointernal/auditlog/middleware.gointernal/auditlog/reader_mongodb.gointernal/auditlog/reader_postgresql.gointernal/auditlog/reader_sqlite.gointernal/auditlog/store_mongodb.gointernal/auditlog/store_postgresql.gointernal/auditlog/store_postgresql_test.gointernal/auditlog/store_sqlite.gointernal/batch/store.gointernal/core/execution_plan.gointernal/executionplans/compiler.gointernal/executionplans/compiler_test.gointernal/executionplans/factory.gointernal/executionplans/service.gointernal/executionplans/service_test.gointernal/executionplans/store.gointernal/executionplans/store_helpers.gointernal/executionplans/store_mongodb.gointernal/executionplans/store_postgresql.gointernal/executionplans/store_sqlite.gointernal/executionplans/types.gointernal/guardrails/planned_executor.gointernal/guardrails/registry.gointernal/responsecache/middleware_test.gointernal/responsecache/simple.gointernal/server/execution_plan_helpers.gointernal/server/execution_plan_helpers_test.gointernal/server/execution_policy.gointernal/server/handlers.gointernal/server/handlers_test.gointernal/server/http.gointernal/server/model_validation.gointernal/server/model_validation_test.gointernal/server/native_batch_service.gointernal/server/native_batch_support.gointernal/server/passthrough_support.gointernal/server/translated_inference_service.gointernal/server/translated_inference_service_test.go
internal/server/execution_policy.go
Outdated
| policy, err := resolver.Match(selector) | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
Wrap policy-resolution failures before returning them.
resolver.Match can emit arbitrary storage/cache errors, and Line 20 forwards them unchanged through the request path. Normalize non-core.GatewayError failures here and preserve existing gateway errors as-is so policy-resolution outages cannot bypass the gateway error contract.
As per coding guidelines **/*.go: All errors returned to clients must be instances of core.GatewayError with typed client-facing categories: provider_error, rate_limit_error, invalid_request_error, authentication_error, and not_found_error.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/server/execution_policy.go` around lines 18 - 20, The call to
resolver.Match can return arbitrary storage/cache errors that must be normalized
before returning; update the error handling around resolver.Match (where policy,
err := resolver.Match(selector) is called) to check whether err is already a
core.GatewayError and return it unchanged if so, otherwise wrap the error in a
core.GatewayError using the appropriate client-facing category (e.g.,
"provider_error") and include a short contextual message about policy resolution
so downstream code always receives a GatewayError.
| func (s *nativeBatchService) storeExecutionPlanForBatch( | ||
| c *echo.Context, | ||
| req *core.BatchRequest, | ||
| providerType string, | ||
| ) (*core.ExecutionPlan, error) { | ||
| plan := cloneCurrentExecutionPlan(c) | ||
| if plan == nil { | ||
| return nil, nil | ||
| } | ||
| plan.Mode = core.ExecutionModeNativeBatch | ||
| plan.ProviderType = strings.TrimSpace(providerType) | ||
|
|
||
| if s.executionPolicyResolver != nil { | ||
| selector, err := determineBatchExecutionSelector(s.provider, s.modelResolver, req, providerType) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if err := applyExecutionPolicy(plan, s.executionPolicyResolver, selector); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| storeExecutionPlan(c, plan) | ||
| return plan, nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify applyExecutionPolicy and determineBatchExecutionSelector return core.GatewayError
ast-grep --pattern $'func applyExecutionPolicy($$$) ($$$, error) {
$$$
}'
ast-grep --pattern $'func determineBatchExecutionSelector($$$) ($$$, error) {
$$$
}'Repository: ENTERPILOT/GOModel
Length of output: 3328
🏁 Script executed:
# Find applyExecutionPolicy function definition
rg "func applyExecutionPolicy" -A 30Repository: ENTERPILOT/GOModel
Length of output: 1905
🏁 Script executed:
# Verify core.NewInvalidRequestError is a core.GatewayError
rg "type InvalidRequestError|func NewInvalidRequestError" --type go -A 5Repository: ENTERPILOT/GOModel
Length of output: 804
🏁 Script executed:
# Check the core package for GatewayError definition
fd "core" -t d | head -5Repository: ENTERPILOT/GOModel
Length of output: 76
🏁 Script executed:
# Find RequestExecutionPolicyResolver interface definition
rg "type RequestExecutionPolicyResolver|interface.*{" -A 10 | grep -A 10 "RequestExecutionPolicyResolver"Repository: ENTERPILOT/GOModel
Length of output: 221
🏁 Script executed:
# Find implementations of Match method that handle RequestExecutionPolicyResolver
rg "func.*Match.*selector.*core.ExecutionPlanSelector" -A 15Repository: ENTERPILOT/GOModel
Length of output: 2268
🏁 Script executed:
# Find matchCompiled method definition and implementation
rg "func.*matchCompiled" -A 20Repository: ENTERPILOT/GOModel
Length of output: 1481
Wrap errors from applyExecutionPolicy's resolver as core.GatewayError.
determineBatchExecutionSelector correctly returns core.NewInvalidRequestError() instances. However, applyExecutionPolicy (line 172) returns errors directly from resolver.Match(), which can return non-typed errors (e.g., fmt.Errorf() from matchCompiled). Since these errors flow to clients through handleError, ensure the resolver returns core.GatewayError instances or wrap the error before returning.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/server/native_batch_service.go` around lines 155 - 179,
storeExecutionPlanForBatch currently propagates errors returned from
applyExecutionPolicy (and ultimately resolver.Match) directly to callers; update
the error handling here so any non-core.GatewayError returned by
applyExecutionPolicy is wrapped into a core.GatewayError before being returned.
Concretely, after calling applyExecutionPolicy(plan, s.executionPolicyResolver,
selector) in storeExecutionPlanForBatch, check the returned error and if it is
not already a core.GatewayError, wrap or convert it to one (using your project's
core.GatewayError constructor/wrapper) and return that wrapped error so clients
always receive a core.GatewayError from resolver.Match()/applyExecutionPolicy.
# Conflicts: # internal/server/translated_inference_service.go
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/app/app.go (1)
458-472:⚠️ Potential issue | 🟠 MajorClose execution plans before aliases when storage may be shared.
firstSharedStorage(...)can selectaliasResult.Storageforexecutionplans.NewWithSharedStorage(...), butShutdown()currently closesa.aliasesfirst. That can tear down the shared storage underneath the execution-plan service while its refresher /Close()path is still running. The response-cache init failure path already closes execution plans before aliases; shutdown should use the same order.♻️ Minimal reorder
- // 3. Close aliases subsystem. - if a.aliases != nil { - if err := a.aliases.Close(); err != nil { - slog.Error("aliases close error", "error", err) - errs = append(errs, fmt.Errorf("aliases close: %w", err)) - } - } - - // 4. Close execution plans subsystem. - if a.executionPlans != nil { - if err := a.executionPlans.Close(); err != nil { - slog.Error("execution plans close error", "error", err) - errs = append(errs, fmt.Errorf("execution plans close: %w", err)) - } - } + // 3. Close execution plans subsystem. + if a.executionPlans != nil { + if err := a.executionPlans.Close(); err != nil { + slog.Error("execution plans close error", "error", err) + errs = append(errs, fmt.Errorf("execution plans close: %w", err)) + } + } + + // 4. Close aliases subsystem. + if a.aliases != nil { + if err := a.aliases.Close(); err != nil { + slog.Error("aliases close error", "error", err) + errs = append(errs, fmt.Errorf("aliases close: %w", err)) + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/app/app.go` around lines 458 - 472, Shutdown currently closes a.aliases before a.executionPlans which can tear down shared storage used by executionplans.NewWithSharedStorage; reorder the shutdown sequence so that a.executionPlans is closed before a.aliases (i.e., move the executionPlans close block above the aliases close block in Shutdown()), ensuring any errors are still logged/appended to errs exactly as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/executionplans/store_postgresql.go`:
- Around line 113-128: In createVersion on PostgreSQLStore, acquire a
transaction-scoped advisory lock on the scope before computing nextVersion to
serialize allocation for the same scopeKey: call tx.Exec with "SELECT
pg_advisory_xact_lock(hashtext($1))" (passing scopeKey) immediately after
beginning the tx and before the tx.QueryRow that selects
COALESCE(MAX(version),0)+1 from execution_plan_versions; this ensures concurrent
callers for the same scopeKey wait instead of racing and retrying
(alternatively, implement a SELECT ... FOR UPDATE on a dedicated per-scope row
if you maintain a scope table).
In `@internal/responsecache/middleware_test.go`:
- Around line 419-449: The test
TestSimpleCacheMiddleware_BypassesCacheWithoutExecutionPlan hard-codes that a
nil execution plan must bypass caching; update it to not assume that policy. In
the test that constructs mw via NewResponseCacheMiddlewareWithStore and
registers the handler, remove the assertions that X-Cache must be empty and that
callCount == 2; instead assert that the middleware's behavior is consistent
across repeated requests (i.e., the X-Cache header value is the same for both
responses and callCount is either 1 if cached or 2 if not), or parameterize the
test to explicitly exercise both expected runtime contracts; this keeps the test
from codifying a specific nil-plan policy while still validating determinism of
Middleware().
In `@internal/server/execution_policy.go`:
- Around line 39-48: The cloneCurrentExecutionPlan function performs a shallow
copy so cloned.Policy still points to the same *ResolvedExecutionPolicy as the
original; update cloneCurrentExecutionPlan to deep-copy the Policy pointer
(e.g., if existing.Policy != nil then copy the struct value and set
cloned.Policy = &copiedPolicy) so callers cannot accidentally mutate shared
policy state, or alternatively add a clear comment in cloneCurrentExecutionPlan
documenting the invariant that callers (like applyExecutionPolicy in
native_batch_service.go) must always replace the Policy pointer rather than
mutate it.
---
Outside diff comments:
In `@internal/app/app.go`:
- Around line 458-472: Shutdown currently closes a.aliases before
a.executionPlans which can tear down shared storage used by
executionplans.NewWithSharedStorage; reorder the shutdown sequence so that
a.executionPlans is closed before a.aliases (i.e., move the executionPlans close
block above the aliases close block in Shutdown()), ensuring any errors are
still logged/appended to errs exactly as before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 213368c9-82a0-4b9e-b5f1-4e89e47a4d60
📒 Files selected for processing (22)
config/config.example.yamlconfig/config.goconfig/config_test.gointernal/app/app.gointernal/auditlog/auditlog_test.gointernal/auditlog/reader_postgresql.gointernal/auditlog/reader_sqlite.gointernal/auditlog/store_sqlite_test.gointernal/executionplans/service.gointernal/executionplans/store_mongodb.gointernal/executionplans/store_postgresql.gointernal/executionplans/store_sqlite.gointernal/executionplans/types.gointernal/executionplans/types_test.gointernal/responsecache/middleware_test.gointernal/responsecache/simple.gointernal/server/execution_policy.gointernal/server/execution_policy_test.gointernal/server/handlers_test.gointernal/server/native_batch_service.gointernal/server/native_batch_support.gointernal/server/translated_inference_service.go
| func (s *PostgreSQLStore) createVersion(ctx context.Context, input CreateInput, scopeKey, planHash string) (*Version, error) { | ||
| tx, err := s.pool.Begin(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("begin execution plan transaction: %w", err) | ||
| } | ||
| defer func() { | ||
| _ = tx.Rollback(ctx) | ||
| }() | ||
|
|
||
| var nextVersion int | ||
| if err := tx.QueryRow(ctx, | ||
| `SELECT COALESCE(MAX(version), 0) + 1 FROM execution_plan_versions WHERE scope_key = $1`, | ||
| scopeKey, | ||
| ).Scan(&nextVersion); err != nil { | ||
| return nil, fmt.Errorf("select next execution plan version: %w", err) | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider using SELECT ... FOR UPDATE or advisory lock for high-contention scenarios.
While the retry loop handles races, under high write contention for the same scope_key, the MAX(version)+1 pattern without row-level locking could lead to multiple retries. For write-heavy workloads on the same scope, consider:
SELECT pg_advisory_xact_lock(hashtext($1))before the MAX() query to serialize version allocation within the scope.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/executionplans/store_postgresql.go` around lines 113 - 128, In
createVersion on PostgreSQLStore, acquire a transaction-scoped advisory lock on
the scope before computing nextVersion to serialize allocation for the same
scopeKey: call tx.Exec with "SELECT pg_advisory_xact_lock(hashtext($1))"
(passing scopeKey) immediately after beginning the tx and before the tx.QueryRow
that selects COALESCE(MAX(version),0)+1 from execution_plan_versions; this
ensures concurrent callers for the same scopeKey wait instead of racing and
retrying (alternatively, implement a SELECT ... FOR UPDATE on a dedicated
per-scope row if you maintain a scope table).
| func TestSimpleCacheMiddleware_BypassesCacheWithoutExecutionPlan(t *testing.T) { | ||
| store := cache.NewMapStore() | ||
| defer store.Close() | ||
| mw := NewResponseCacheMiddlewareWithStore(store, time.Hour) | ||
| e := echo.New() | ||
| e.Use(mw.Middleware()) | ||
|
|
||
| callCount := 0 | ||
| e.POST("/v1/chat/completions", func(c *echo.Context) error { | ||
| callCount++ | ||
| return c.JSON(http.StatusOK, map[string]string{"result": "ok"}) | ||
| }) | ||
|
|
||
| body := []byte(`{"model":"gpt-4","messages":[{"role":"user","content":"hi"}]}`) | ||
| for i := range 2 { | ||
| req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) | ||
| req.Header.Set("Content-Type", "application/json") | ||
| rec := httptest.NewRecorder() | ||
| e.ServeHTTP(rec, req) | ||
| if rec.Code != http.StatusOK { | ||
| t.Fatalf("request %d: got status %d", i+1, rec.Code) | ||
| } | ||
| if got := rec.Header().Get("X-Cache"); got != "" { | ||
| t.Fatalf("expected nil-plan request to bypass cache, got X-Cache=%q", got) | ||
| } | ||
| } | ||
|
|
||
| if callCount != 2 { | ||
| t.Fatalf("expected nil-plan requests to bypass cache, handler called %d times", callCount) | ||
| } | ||
| } |
There was a problem hiding this comment.
Don't hard-code nil-plan bypass until the runtime contract is consistent.
This test asserts that missing execution plans must skip caching, but the translated inference path still treats a missing plan as cacheable. That makes cache behavior depend on which layer populated the plan and turns this into a codified inconsistency instead of a regression detector.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/responsecache/middleware_test.go` around lines 419 - 449, The test
TestSimpleCacheMiddleware_BypassesCacheWithoutExecutionPlan hard-codes that a
nil execution plan must bypass caching; update it to not assume that policy. In
the test that constructs mw via NewResponseCacheMiddlewareWithStore and
registers the handler, remove the assertions that X-Cache must be empty and that
callCount == 2; instead assert that the middleware's behavior is consistent
across repeated requests (i.e., the X-Cache header value is the same for both
responses and callCount is either 1 if cached or 2 if not), or parameterize the
test to explicitly exercise both expected runtime contracts; this keeps the test
from codifying a specific nil-plan policy while still validating determinism of
Middleware().
| func cloneCurrentExecutionPlan(c *echo.Context) *core.ExecutionPlan { | ||
| if c == nil { | ||
| return nil | ||
| } | ||
| if existing := core.GetExecutionPlan(c.Request().Context()); existing != nil { | ||
| cloned := *existing | ||
| return &cloned | ||
| } | ||
| return &core.ExecutionPlan{} | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how cloneCurrentExecutionPlan result is used - does it mutate Policy fields or replace Policy?
rg -A 10 "cloneCurrentExecutionPlan" --type go | head -50Repository: ENTERPILOT/GOModel
Length of output: 1502
🏁 Script executed:
# Find applyExecutionPolicy function definition and its implementation
rg -B 2 -A 20 "func applyExecutionPolicy" --type goRepository: ENTERPILOT/GOModel
Length of output: 1513
🏁 Script executed:
# Check ExecutionPlan structure definition
rg -B 2 -A 10 "type ExecutionPlan struct" --type goRepository: ENTERPILOT/GOModel
Length of output: 900
🏁 Script executed:
# Find ALL usages of cloneCurrentExecutionPlan to ensure we see complete picture
rg "cloneCurrentExecutionPlan" --type goRepository: ENTERPILOT/GOModel
Length of output: 245
Shallow copy of ExecutionPlan shares mutable Policy pointer; confirm all callers replace rather than mutate it.
The shallow copy (cloned := *existing) means cloned.Policy references the same *ResolvedExecutionPolicy object as the original. The only current caller (native_batch_service.go) correctly calls applyExecutionPolicy, which replaces the entire Policy pointer rather than mutating its fields. However, document this invariant or consider a deep copy to prevent future callers from accidentally mutating shared state.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/server/execution_policy.go` around lines 39 - 48, The
cloneCurrentExecutionPlan function performs a shallow copy so cloned.Policy
still points to the same *ResolvedExecutionPolicy as the original; update
cloneCurrentExecutionPlan to deep-copy the Policy pointer (e.g., if
existing.Policy != nil then copy the struct value and set cloned.Policy =
&copiedPolicy) so callers cannot accidentally mutate shared policy state, or
alternatively add a clear comment in cloneCurrentExecutionPlan documenting the
invariant that callers (like applyExecutionPolicy in native_batch_service.go)
must always replace the Policy pointer rather than mutate it.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/auditlog/middleware.go`:
- Around line 47-49: The early auditEnabledForContext(c.Request().Context())
check in the middleware rarely short-circuits because the execution plan is
typically not yet in the context (planning middleware runs later), so update the
middleware.go code around the if (!auditEnabledForContext...) branch to add a
concise comment stating this check is primarily for the uncommon case where an
upstream component (e.g., semantic enrichers) has already populated a plan with
Audit=false; reference the symbols auditEnabledForContext and next(c) and note
that the real gating occurs after next(c) (lines around the later check that
inspects the plan).
In `@tests/integration/setup_test.go`:
- Around line 144-153: The switch in resetIntegrationStorage silently ignores
unknown dbType values; add a default case to the switch in function
resetIntegrationStorage that calls t.Fatalf (or t.Fatalf-like test failure) with
a clear message including the unknown dbType (e.g., "unknown db type: %s") so
tests fail fast and indicate that storage reset was skipped.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: cabcd780-0386-4e66-93f7-3452855a2995
📒 Files selected for processing (9)
internal/auditlog/auditlog_test.gointernal/auditlog/entry_capture.gointernal/auditlog/middleware.gointernal/server/http.gointernal/server/model_validation.gointernal/server/passthrough_support.gointernal/server/translated_inference_service.gotests/integration/main_test.gotests/integration/setup_test.go
Summary
Verification
Summary by CodeRabbit
New Features
Behavior Changes
Configuration
Tests
Documentation