Skip to content

Commit de30fd9

Browse files
feat(executionplans): implement policy-resolved execution plans
1 parent 705c7c4 commit de30fd9

42 files changed

Lines changed: 2596 additions & 144 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/adr/0003-policy-resolved-execution-plan.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ handlers, middleware, and provider execution code.
4040
The runtime object is derived from:
4141

4242
- the matched persisted execution plan version
43+
- process-level feature configuration for the running gateway instance
4344
- request facts captured from ingress
4445
- request-scoped resolution results such as endpoint metadata and model
4546
resolution
@@ -145,8 +146,12 @@ The first required persistence surface is `audit_logs`.
145146

146147
- `execution_plan_version_id`
147148

148-
This id is sufficient for request explainability because the referenced
149-
execution-plan row is immutable.
149+
This id identifies the immutable execution-plan version selected for the
150+
request.
151+
152+
Process-level feature switches may still disable parts of that plan for a given
153+
deployment. In other words, the matched plan remains traceable, but effective
154+
runtime behavior can also depend on deployment configuration.
150155

151156
The first slice does not require storing the same field in `usage`.
152157

@@ -205,6 +210,14 @@ V1 semantics:
205210
- later steps start only after the previous step fully completes
206211
- if `features.guardrails` is `false`, the guardrails array is ignored
207212
- `ref` must point to an existing named guardrail managed by the gateway
213+
- process-level feature configuration is a hard upper bound over plan features
214+
- the effective runtime feature value is `process_enabled AND plan_enabled`
215+
- if a process-level feature switch is disabled, the corresponding plan field is
216+
ignored by that process
217+
218+
This preserves 12-factor operational control. Operators can disable gateway
219+
features for one deployment through environment-backed process configuration
220+
without rewriting persisted execution plans.
208221

209222
To preserve immutability, omitted feature flags may be accepted at authoring
210223
time, but they must be resolved to explicit booleans before an immutable plan
@@ -217,7 +230,10 @@ In other words:
217230
defaults
218231

219232
This prevents the same immutable execution plan version from changing behavior
220-
later if process-wide defaults drift.
233+
later because authoring-time defaults drift.
234+
235+
Process-level hard-disable switches remain allowed to suppress features at
236+
runtime for a given deployment.
221237

222238
### 8. Future Evolution
223239

internal/app/app.go

Lines changed: 148 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"net/http"
1111
"sync"
12+
"time"
1213

1314
"gomodel/config"
1415
"gomodel/internal/admin"
@@ -17,6 +18,7 @@ import (
1718
"gomodel/internal/auditlog"
1819
"gomodel/internal/batch"
1920
"gomodel/internal/core"
21+
"gomodel/internal/executionplans"
2022
"gomodel/internal/guardrails"
2123
"gomodel/internal/providers"
2224
"gomodel/internal/responsecache"
@@ -28,13 +30,14 @@ import (
2830
// App represents the main application with all its dependencies.
2931
// It provides centralized lifecycle management for all components.
3032
type App struct {
31-
config *config.Config
32-
providers *providers.InitResult
33-
audit *auditlog.Result
34-
usage *usage.Result
35-
batch *batch.Result
36-
aliases *aliases.Result
37-
server *server.Server
33+
config *config.Config
34+
providers *providers.InitResult
35+
audit *auditlog.Result
36+
usage *usage.Result
37+
batch *batch.Result
38+
aliases *aliases.Result
39+
executionPlans *executionplans.Result
40+
server *server.Server
3841

3942
shutdownMu sync.Mutex
4043
shutdown bool
@@ -156,8 +159,10 @@ func New(ctx context.Context, cfg Config) (*App, error) {
156159
var provider core.RoutableProvider = app.providers.Router
157160
var translatedRequestPatcher server.TranslatedRequestPatcher
158161
var batchRequestPreparers []server.BatchRequestPreparer
159-
if appCfg.Guardrails.Enabled {
160-
pipeline, err := buildGuardrailsPipeline(appCfg.Guardrails)
162+
featureCaps := runtimeExecutionFeatureCaps(appCfg)
163+
var guardrailRegistry *guardrails.Registry
164+
if featureCaps.Guardrails {
165+
guardrailRegistry, err = buildGuardrailRegistry(appCfg.Guardrails)
161166
if err != nil {
162167
var (
163168
aliasCloseErr error
@@ -175,14 +180,49 @@ func New(ctx context.Context, cfg Config) (*App, error) {
175180
}
176181
return nil, fmt.Errorf("failed to build guardrails: %w", err)
177182
}
178-
if pipeline.Len() > 0 {
179-
translatedRequestPatcher = guardrails.NewRequestPatcher(pipeline)
183+
}
184+
185+
var executionPlanResult *executionplans.Result
186+
sharedExecutionPlanStorage := firstSharedStorage(auditResult.Storage, usageResult.Storage, batchResult.Storage, aliasResult.Storage)
187+
executionPlanCompiler := executionplans.NewCompilerWithFeatureCaps(guardrailRegistry, featureCaps)
188+
if sharedExecutionPlanStorage != nil {
189+
executionPlanResult, err = executionplans.NewWithSharedStorage(ctx, sharedExecutionPlanStorage, executionPlanCompiler, time.Minute)
190+
} else {
191+
executionPlanResult, err = executionplans.New(ctx, appCfg, executionPlanCompiler, time.Minute)
192+
}
193+
if err != nil {
194+
closeErr := errors.Join(app.aliases.Close(), app.batch.Close(), app.usage.Close(), app.audit.Close(), app.providers.Close())
195+
if closeErr != nil {
196+
return nil, fmt.Errorf("failed to initialize execution plans: %w (also: close error: %v)", err, closeErr)
197+
}
198+
return nil, fmt.Errorf("failed to initialize execution plans: %w", err)
199+
}
200+
defaultExecutionPlan := defaultExecutionPlanInput(appCfg)
201+
if err := executionPlanResult.Service.EnsureDefaultGlobal(ctx, defaultExecutionPlan); err != nil {
202+
closeErr := errors.Join(executionPlanResult.Close(), app.aliases.Close(), app.batch.Close(), app.usage.Close(), app.audit.Close(), app.providers.Close())
203+
if closeErr != nil {
204+
return nil, fmt.Errorf("failed to seed execution plans: %w (also: close error: %v)", err, closeErr)
205+
}
206+
return nil, fmt.Errorf("failed to seed execution plans: %w", err)
207+
}
208+
if err := executionPlanResult.Service.Refresh(ctx); err != nil {
209+
closeErr := errors.Join(executionPlanResult.Close(), app.aliases.Close(), app.batch.Close(), app.usage.Close(), app.audit.Close(), app.providers.Close())
210+
if closeErr != nil {
211+
return nil, fmt.Errorf("failed to load execution plans: %w (also: close error: %v)", err, closeErr)
212+
}
213+
return nil, fmt.Errorf("failed to load execution plans: %w", err)
214+
}
215+
app.executionPlans = executionPlanResult
216+
217+
if featureCaps.Guardrails {
218+
if guardrailRegistry != nil && guardrailRegistry.Len() > 0 {
219+
translatedRequestPatcher = guardrails.NewPlannedRequestPatcher(executionPlanResult.Service)
180220
if appCfg.Guardrails.EnableForBatchProcessing {
181-
batchRequestPreparers = append(batchRequestPreparers, guardrails.NewBatchPreparer(provider, pipeline))
221+
batchRequestPreparers = append(batchRequestPreparers, guardrails.NewPlannedBatchPreparer(provider, executionPlanResult.Service))
182222
}
183223
slog.Info(
184224
"guardrails enabled",
185-
"count", pipeline.Len(),
225+
"count", guardrailRegistry.Len(),
186226
"enable_for_batch_processing", appCfg.Guardrails.EnableForBatchProcessing,
187227
)
188228
}
@@ -208,6 +248,7 @@ func New(ctx context.Context, cfg Config) (*App, error) {
208248
UsageLogger: usageResult.Logger,
209249
PricingResolver: providerResult.Registry,
210250
ModelResolver: app.aliases.Service,
251+
ExecutionPolicyResolver: executionPlanResult.Service,
211252
TranslatedRequestPatcher: translatedRequestPatcher,
212253
GuardrailsHash: guardrailsHash,
213254
BatchRequestPreparer: batchRequestPreparer,
@@ -260,16 +301,20 @@ func New(ctx context.Context, cfg Config) (*App, error) {
260301
rcm, err := responsecache.NewResponseCacheMiddleware(appCfg.Cache.Response, cfg.AppConfig.RawProviders)
261302
if err != nil {
262303
var (
263-
aliasCloseErr error
264-
batchCloseErr error
304+
executionPlansCloseErr error
305+
aliasCloseErr error
306+
batchCloseErr error
265307
)
308+
if app.executionPlans != nil {
309+
executionPlansCloseErr = app.executionPlans.Close()
310+
}
266311
if app.aliases != nil {
267312
aliasCloseErr = app.aliases.Close()
268313
}
269314
if app.batch != nil {
270315
batchCloseErr = app.batch.Close()
271316
}
272-
closeErr := errors.Join(aliasCloseErr, batchCloseErr, app.usage.Close(), app.audit.Close(), app.providers.Close())
317+
closeErr := errors.Join(executionPlansCloseErr, aliasCloseErr, batchCloseErr, app.usage.Close(), app.audit.Close(), app.providers.Close())
273318
if closeErr != nil {
274319
return nil, fmt.Errorf("failed to initialize response cache: %w (also: close error: %v)", err, closeErr)
275320
}
@@ -401,7 +446,7 @@ func (a *App) Shutdown(ctx context.Context) error {
401446
}
402447
}
403448

404-
// 2. Close providers (stops background refresh and cache)
449+
// 2. Close providers (stops model refresh and provider-owned resources)
405450
if a.providers != nil {
406451
if err := a.providers.Close(); err != nil {
407452
slog.Error("providers close error", "error", err)
@@ -417,23 +462,31 @@ func (a *App) Shutdown(ctx context.Context) error {
417462
}
418463
}
419464

420-
// 4. Close batch store (flushes pending entries)
465+
// 4. Close execution plans subsystem.
466+
if a.executionPlans != nil {
467+
if err := a.executionPlans.Close(); err != nil {
468+
slog.Error("execution plans close error", "error", err)
469+
errs = append(errs, fmt.Errorf("execution plans close: %w", err))
470+
}
471+
}
472+
473+
// 5. Close batch store (flushes pending entries)
421474
if a.batch != nil {
422475
if err := a.batch.Close(); err != nil {
423476
slog.Error("batch store close error", "error", err)
424477
errs = append(errs, fmt.Errorf("batch close: %w", err))
425478
}
426479
}
427480

428-
// 5. Close usage tracking (flushes pending entries)
481+
// 6. Close usage tracking (flushes pending entries)
429482
if a.usage != nil {
430483
if err := a.usage.Close(); err != nil {
431484
slog.Error("usage logger close error", "error", err)
432485
errs = append(errs, fmt.Errorf("usage close: %w", err))
433486
}
434487
}
435488

436-
// 6. Close audit logging (flushes pending logs)
489+
// 7. Close audit logging (flushes pending logs)
437490
if a.audit != nil {
438491
if err := a.audit.Close(); err != nil {
439492
slog.Error("audit logger close error", "error", err)
@@ -542,20 +595,26 @@ func initAdmin(auditStorage, usageStorage storage.Storage, registry *providers.M
542595
return adminHandler, dashHandler, nil
543596
}
544597

545-
// buildGuardrailsPipeline creates a guardrails pipeline from configuration.
546-
func buildGuardrailsPipeline(cfg config.GuardrailsConfig) (*guardrails.Pipeline, error) {
547-
pipeline := guardrails.NewPipeline()
598+
func buildGuardrailRegistry(cfg config.GuardrailsConfig) (*guardrails.Registry, error) {
599+
registry := guardrails.NewRegistry()
548600

549601
for i, rule := range cfg.Rules {
550602
g, err := buildGuardrail(rule)
551603
if err != nil {
552604
return nil, fmt.Errorf("guardrail rule #%d (%q): %w", i, rule.Name, err)
553605
}
554-
pipeline.Add(g, rule.Order)
606+
if err := registry.Register(g, responsecache.GuardrailRuleDescriptor{
607+
Type: rule.Type,
608+
Order: rule.Order,
609+
Mode: effectiveSystemPromptMode(rule.SystemPrompt.Mode),
610+
Content: rule.SystemPrompt.Content,
611+
}); err != nil {
612+
return nil, fmt.Errorf("register guardrail %q: %w", rule.Name, err)
613+
}
555614
slog.Info("guardrail registered", "name", rule.Name, "type", rule.Type, "order", rule.Order)
556615
}
557616

558-
return pipeline, nil
617+
return registry, nil
559618
}
560619

561620
// buildGuardrail creates a single Guardrail instance from a rule config.
@@ -566,10 +625,7 @@ func buildGuardrail(rule config.GuardrailRuleConfig) (guardrails.Guardrail, erro
566625

567626
switch rule.Type {
568627
case "system_prompt":
569-
mode := guardrails.SystemPromptMode(rule.SystemPrompt.Mode)
570-
if mode == "" {
571-
mode = guardrails.SystemPromptInject
572-
}
628+
mode := guardrails.SystemPromptMode(effectiveSystemPromptMode(rule.SystemPrompt.Mode))
573629
return guardrails.NewSystemPromptGuardrail(rule.Name, mode, rule.SystemPrompt.Content)
574630

575631
default:
@@ -590,9 +646,71 @@ func computeGuardrailsHashFromConfig(cfg config.GuardrailsConfig) string {
590646
Name: r.Name,
591647
Type: r.Type,
592648
Order: r.Order,
593-
Mode: r.SystemPrompt.Mode,
649+
Mode: effectiveSystemPromptMode(r.SystemPrompt.Mode),
594650
Content: r.SystemPrompt.Content,
595651
}
596652
}
597653
return responsecache.ComputeGuardrailsHash(rules)
598654
}
655+
656+
func effectiveSystemPromptMode(mode string) string {
657+
resolved := guardrails.SystemPromptMode(mode)
658+
if resolved == "" {
659+
return string(guardrails.SystemPromptInject)
660+
}
661+
return string(resolved)
662+
}
663+
664+
func defaultExecutionPlanInput(cfg *config.Config) executionplans.CreateInput {
665+
payload := executionplans.Payload{
666+
SchemaVersion: 1,
667+
Features: executionplans.FeatureFlags{
668+
Cache: responseCacheConfigured(cfg.Cache.Response),
669+
Audit: cfg.Logging.Enabled,
670+
Usage: cfg.Usage.Enabled,
671+
Guardrails: cfg.Guardrails.Enabled && len(cfg.Guardrails.Rules) > 0,
672+
},
673+
}
674+
if payload.Features.Guardrails {
675+
payload.Guardrails = make([]executionplans.GuardrailStep, 0, len(cfg.Guardrails.Rules))
676+
for _, rule := range cfg.Guardrails.Rules {
677+
payload.Guardrails = append(payload.Guardrails, executionplans.GuardrailStep{
678+
Ref: rule.Name,
679+
Step: rule.Order,
680+
})
681+
}
682+
}
683+
684+
return executionplans.CreateInput{
685+
Scope: executionplans.Scope{},
686+
Activate: true,
687+
Name: "default-global",
688+
Description: "Bootstrapped from runtime configuration",
689+
Payload: payload,
690+
}
691+
}
692+
693+
func runtimeExecutionFeatureCaps(cfg *config.Config) core.ExecutionFeatures {
694+
if cfg == nil {
695+
return core.ExecutionFeatures{}
696+
}
697+
return core.ExecutionFeatures{
698+
Cache: responseCacheConfigured(cfg.Cache.Response),
699+
Audit: cfg.Logging.Enabled,
700+
Usage: cfg.Usage.Enabled,
701+
Guardrails: cfg.Guardrails.Enabled,
702+
}
703+
}
704+
705+
func responseCacheConfigured(cfg config.ResponseCacheConfig) bool {
706+
return (cfg.Simple.Redis != nil && cfg.Simple.Redis.URL != "") || config.SemanticCacheActive(&cfg.Semantic)
707+
}
708+
709+
func firstSharedStorage(candidates ...storage.Storage) storage.Storage {
710+
for _, candidate := range candidates {
711+
if candidate != nil {
712+
return candidate
713+
}
714+
}
715+
return nil
716+
}

internal/auditlog/auditlog.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ type LogEntry struct {
3838
DurationNs int64 `json:"duration_ns" bson:"duration_ns"`
3939

4040
// Core fields (indexed for queries)
41-
Model string `json:"model" bson:"model"`
42-
ResolvedModel string `json:"resolved_model,omitempty" bson:"resolved_model,omitempty"`
43-
Provider string `json:"provider" bson:"provider"`
44-
AliasUsed bool `json:"alias_used,omitempty" bson:"alias_used,omitempty"`
45-
StatusCode int `json:"status_code" bson:"status_code"`
41+
Model string `json:"model" bson:"model"`
42+
ResolvedModel string `json:"resolved_model,omitempty" bson:"resolved_model,omitempty"`
43+
Provider string `json:"provider" bson:"provider"`
44+
AliasUsed bool `json:"alias_used,omitempty" bson:"alias_used,omitempty"`
45+
ExecutionPlanVersionID string `json:"execution_plan_version_id,omitempty" bson:"execution_plan_version_id,omitempty"`
46+
StatusCode int `json:"status_code" bson:"status_code"`
4647

4748
// Extracted fields for efficient filtering (indexed in relational DBs)
4849
RequestID string `json:"request_id,omitempty" bson:"request_id,omitempty"`

0 commit comments

Comments
 (0)