fix: Improve hot-path instrumentation and low-risk streaming performance#157
fix: Improve hot-path instrumentation and low-risk streaming performance#157SantiagoDePolonia merged 7 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughThis PR introduces Go pprof profiling support via configuration flags, implements hot-path performance benchmarking with deterministic guards, optimizes audit and usage logging through batched PostgreSQL inserts, refines streaming response handling to skip buffering for event streams, and centralizes model-interaction path detection to the core package. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
internal/auditlog/stream_wrapper.go (1)
101-109:⚠️ Potential issue | 🟠 MajorSSE framing/parser is too strict and can drop events while growing memory.
Line 103 only matches
\n\n, and Line 125 only acceptsdata:(with space). Valid CRLF-framed streams ordata:lines can be missed, leaving data stuck inpendingand causing unbounded growth.🔧 Proposed hardening patch
+const maxPendingSSEBytes = 1 << 20 // 1 MiB safety cap + func (w *StreamLogWrapper) processSSEData(data []byte) { // Prepend any pending data from previous read if len(w.pending) > 0 { data = append(w.pending, data...) w.pending = nil } + + // Accept both LF and CRLF framing. + data = bytes.ReplaceAll(data, []byte("\r\n"), []byte("\n")) // Split on double newline (SSE event separator) for { idx := bytes.Index(data, []byte("\n\n")) if idx == -1 { // No complete event, save as pending if len(data) > 0 { - w.pending = make([]byte, len(data)) - copy(w.pending, data) + if len(data) > maxPendingSSEBytes { + data = data[len(data)-maxPendingSSEBytes:] + } + w.pending = append(w.pending[:0], data...) } return } @@ func (w *StreamLogWrapper) processSSEEvent(event []byte) { // Find the data line lines := bytes.Split(event, []byte("\n")) for _, line := range lines { - if bytes.HasPrefix(line, []byte("data: ")) { - jsonData := bytes.TrimPrefix(line, []byte("data: ")) + if bytes.HasPrefix(line, []byte("data:")) { + jsonData := bytes.TrimPrefix(line, []byte("data:")) + jsonData = bytes.TrimPrefix(jsonData, []byte(" ")) // Skip [DONE] marker if bytes.Equal(jsonData, []byte("[DONE]")) { continue } w.parseEventJSON(jsonData) } } }Also applies to: 125-126
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/auditlog/stream_wrapper.go` around lines 101 - 109, The SSE framing is too strict: update the loop that searches for the event separator to accept both "\n\n" and CRLF "\r\n\r\n" (e.g., check for either sequence or normalize CRLF to LF before splitting) and when parsing "data" lines accept both "data:" and "data: " (allow optional space and trim any trailing '\r'); update the logic that saves incomplete bytes into w.pending to reuse the existing slice capacity instead of always allocating a new slice (for example use w.pending = append(w.pending[:0], data...) or copy into a retained buffer) so partial events are not dropped and pending cannot grow unbounded; refer to the streaming loop and the w.pending field and the code that handles data line parsing to make these changes.internal/usage/store_postgresql.go (1)
122-128:⚠️ Potential issue | 🟠 MajorOnly start a transaction once the batch spans multiple
INSERTs.After this refactor, anything up to
usageInsertMaxRowsPerQueryrows is still a single multi-rowINSERT, so batches of 10-4369 entries now payBEGIN/COMMIToverhead without gaining atomicity. A single PostgreSQL statement is already atomic.⚙️ Suggested threshold change
- // For larger batches, use a transaction to ensure atomicity - // For smaller batches, use individual inserts without transaction overhead - if len(entries) < 10 { + // A single multi-row INSERT is already atomic. Only start a transaction + // when the batch must be split across multiple statements. + if len(entries) <= usageInsertMaxRowsPerQuery { return s.writeBatchSmall(ctx, entries) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/usage/store_postgresql.go` around lines 122 - 128, The batch-size check currently switches to a transaction for any batch with len(entries) >= 10; change it to only start a transaction when the batch cannot fit in a single multi-row INSERT by comparing against usageInsertMaxRowsPerQuery instead: call writeBatchSmall(ctx, entries) when len(entries) <= usageInsertMaxRowsPerQuery (single-statement atomic insert) and call writeBatchLarge(ctx, entries) otherwise (requires BEGIN/COMMIT). Update the conditional and adjust the comment to reflect that single-statement inserts are atomic up to usageInsertMaxRowsPerQuery; keep the existing writeBatchSmall and writeBatchLarge call sites unchanged.internal/auditlog/store_postgresql.go (1)
127-133:⚠️ Potential issue | 🟠 MajorOnly start a transaction once the batch spans multiple
INSERTs.After this refactor, anything up to
auditLogInsertMaxRowsPerQueryrows is still a single multi-rowINSERT, so batches of 10-4369 entries now payBEGIN/COMMIToverhead without gaining atomicity. A single PostgreSQL statement is already atomic.⚙️ Suggested threshold change
- // For larger batches, use a transaction to ensure atomicity - // For smaller batches, use individual inserts without transaction overhead - if len(entries) < 10 { + // A single multi-row INSERT is already atomic. Only start a transaction + // when the batch must be split across multiple statements. + if len(entries) <= auditLogInsertMaxRowsPerQuery { return s.writeBatchSmall(ctx, entries) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/auditlog/store_postgresql.go` around lines 127 - 133, The decision to start a transaction should be based on whether the batch requires multiple INSERT statements, not a fixed small-number threshold. Compute insertsNeeded = ceil(len(entries) / auditLogInsertMaxRowsPerQuery) and only call the transactional path (s.writeBatchLarge) when insertsNeeded > 1; otherwise call the non-transactional single-statement path (s.writeBatchSmall). Update the branching logic where the current len(entries) < 10 check lives so it uses auditLogInsertMaxRowsPerQuery and the computed insertsNeeded instead.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/auditlog/store_postgresql.go`:
- Around line 165-220: The batch-insert SQL created by buildAuditLogInsert
varies per batch size, which causes pgx to populate the per-connection
prepared-statement cache for every size; to fix this either (a) stop generating
variable SQL by making writeAuditLogInsertChunks/buildAuditLogInsert always emit
a fixed-size VALUES clause (e.g. always use auditLogInsertMaxRowsPerQuery
placeholders and pad smaller batches with NULLs) so the query string is
identical across chunks, or (b) keep the current builder but avoid caching by
changing the execution path in writeAuditLogInsertChunks to use a non-caching
execution mode (e.g. provide an ExecNoCache on auditLogBatchExecutor or
configure the underlying pgx Exec to use the simple/no-cache protocol) and call
that for the insert; update references to buildAuditLogInsert and
writeAuditLogInsertChunks accordingly.
In `@internal/server/http.go`:
- Around line 127-129: The pprof routes are being added to authSkipPaths
unconditionally when cfg.PprofEnabled is true, which exposes them even when
MasterKey is set; update the conditional around authSkipPaths (where cfg,
cfg.PprofEnabled and authSkipPaths are referenced) to only append "/debug/pprof"
and "/debug/pprof/*" when profiling is enabled AND no MasterKey is configured
(e.g., cfg.MasterKey is empty/zero), so pprof stays protected when a MasterKey
exists.
- Around line 284-295: The /debug/pprof symbol route in registerPprofRoutes only
registers GET which breaks pprof POST behavior; add a POST route for
"/debug/pprof/symbol" that uses the same wrapped handler as the GET (i.e. use
echo.POST("/debug/pprof/symbol",
echo.WrapHandler(http.HandlerFunc(httppprof.Symbol)))) so POST requests read the
body like the stdlib pprof; keep the existing GET mapping for query-string
behavior.
In `@internal/usage/store_postgresql.go`:
- Around line 133-135: The insert failure is being logged twice: remove the
local slog.Warn in the block that calls writeUsageInsertChunks so the function
returns the formatted error and lets the caller (internal/usage/logger.go
handling WriteBatch) perform logging; specifically, in the code that invokes
writeUsageInsertChunks with s.pool and entries (the block that currently does
slog.Warn("failed to insert usage batch", ... ) and returns fmt.Errorf(...)),
delete the slog.Warn call and only return the wrapped error. Apply the same
removal to the other duplicate location that logs before returning (the second
insert-batch error path).
---
Outside diff comments:
In `@internal/auditlog/store_postgresql.go`:
- Around line 127-133: The decision to start a transaction should be based on
whether the batch requires multiple INSERT statements, not a fixed small-number
threshold. Compute insertsNeeded = ceil(len(entries) /
auditLogInsertMaxRowsPerQuery) and only call the transactional path
(s.writeBatchLarge) when insertsNeeded > 1; otherwise call the non-transactional
single-statement path (s.writeBatchSmall). Update the branching logic where the
current len(entries) < 10 check lives so it uses auditLogInsertMaxRowsPerQuery
and the computed insertsNeeded instead.
In `@internal/auditlog/stream_wrapper.go`:
- Around line 101-109: The SSE framing is too strict: update the loop that
searches for the event separator to accept both "\n\n" and CRLF "\r\n\r\n"
(e.g., check for either sequence or normalize CRLF to LF before splitting) and
when parsing "data" lines accept both "data:" and "data: " (allow optional space
and trim any trailing '\r'); update the logic that saves incomplete bytes into
w.pending to reuse the existing slice capacity instead of always allocating a
new slice (for example use w.pending = append(w.pending[:0], data...) or copy
into a retained buffer) so partial events are not dropped and pending cannot
grow unbounded; refer to the streaming loop and the w.pending field and the code
that handles data line parsing to make these changes.
In `@internal/usage/store_postgresql.go`:
- Around line 122-128: The batch-size check currently switches to a transaction
for any batch with len(entries) >= 10; change it to only start a transaction
when the batch cannot fit in a single multi-row INSERT by comparing against
usageInsertMaxRowsPerQuery instead: call writeBatchSmall(ctx, entries) when
len(entries) <= usageInsertMaxRowsPerQuery (single-statement atomic insert) and
call writeBatchLarge(ctx, entries) otherwise (requires BEGIN/COMMIT). Update the
conditional and adjust the comment to reflect that single-statement inserts are
atomic up to usageInsertMaxRowsPerQuery; keep the existing writeBatchSmall and
writeBatchLarge call sites unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: de19681e-7380-4a66-b581-fa4305c0e456
📒 Files selected for processing (27)
.github/workflows/test.yml.pre-commit-config.yamlMakefileconfig/config.example.yamlconfig/config.goconfig/config_helpers_test.goconfig/config_test.gointernal/app/app.gointernal/auditlog/auditlog_test.gointernal/auditlog/constants.gointernal/auditlog/middleware.gointernal/auditlog/store_postgresql.gointernal/auditlog/store_postgresql_test.gointernal/auditlog/stream_wrapper.gointernal/core/request_snapshot.gointernal/core/request_snapshot_test.gointernal/server/http.gointernal/server/http_test.gointernal/server/request_snapshot.gointernal/server/request_snapshot_test.gointernal/server/security_test.gointernal/usage/store_postgresql.gointernal/usage/store_postgresql_test.gointernal/usage/stream_wrapper.gointernal/usage/stream_wrapper_test.gotests/perf/README.mdtests/perf/hotpath_test.go
💤 Files with no reviewable changes (2)
- internal/auditlog/constants.go
- internal/usage/stream_wrapper.go
| func writeAuditLogInsertChunks(ctx context.Context, exec auditLogBatchExecutor, entries []*LogEntry) error { | ||
| for start := 0; start < len(entries); start += auditLogInsertMaxRowsPerQuery { | ||
| end := min(start+auditLogInsertMaxRowsPerQuery, len(entries)) | ||
| query, args := buildAuditLogInsert(entries[start:end]) | ||
| if _, err := exec.Exec(ctx, query, args...); err != nil { | ||
| return fmt.Errorf("batch chunk [%d:%d): %w", start, end, err) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // If any inserts failed, rollback and return error (consistent with writeBatchSmall) | ||
| if len(errs) > 0 { | ||
| return fmt.Errorf("failed to insert %d of %d audit logs: %w", len(errs), len(entries), errors.Join(errs...)) | ||
| } | ||
| func buildAuditLogInsert(entries []*LogEntry) (string, []any) { | ||
| var builder strings.Builder | ||
| builder.Grow(len(auditLogInsertPrefix) + len(auditLogInsertSuffix) + len(entries)*auditLogInsertColumnCount*4) | ||
| builder.WriteString(auditLogInsertPrefix) | ||
|
|
||
| if err := tx.Commit(ctx); err != nil { | ||
| return fmt.Errorf("failed to commit transaction: %w", err) | ||
| args := make([]any, 0, len(entries)*auditLogInsertColumnCount) | ||
| placeholder := 1 | ||
|
|
||
| for i, entry := range entries { | ||
| if i > 0 { | ||
| builder.WriteString(", ") | ||
| } | ||
| builder.WriteByte('(') | ||
| for col := 0; col < auditLogInsertColumnCount; col++ { | ||
| if col > 0 { | ||
| builder.WriteString(", ") | ||
| } | ||
| builder.WriteByte('$') | ||
| builder.WriteString(strconv.Itoa(placeholder)) | ||
| placeholder++ | ||
| } | ||
| builder.WriteByte(')') | ||
|
|
||
| dataJSON := marshalLogData(entry.Data, entry.ID) | ||
| args = append(args, | ||
| entry.ID, | ||
| entry.Timestamp, | ||
| entry.DurationNs, | ||
| entry.Model, | ||
| entry.ResolvedModel, | ||
| entry.Provider, | ||
| entry.AliasUsed, | ||
| entry.StatusCode, | ||
| entry.RequestID, | ||
| entry.ClientIP, | ||
| entry.Method, | ||
| entry.Path, | ||
| entry.Stream, | ||
| entry.ErrorType, | ||
| dataJSON, | ||
| ) | ||
| } | ||
|
|
||
| return nil | ||
| builder.WriteString(auditLogInsertSuffix) | ||
| return builder.String(), args |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and read the full file to understand the context
cat -n internal/auditlog/store_postgresql.go | head -300Repository: ENTERPILOT/GOModel
Length of output: 9677
🏁 Script executed:
# Search for pgx pool configuration to see if caching is enabled or disabled
rg -t go "pgxpool|Config|PrepareCache" --Repository: ENTERPILOT/GOModel
Length of output: 50374
🏁 Script executed:
# Look at the PostgreSQL pool configuration more closely
cat -n internal/storage/postgresql.go | head -100Repository: ENTERPILOT/GOModel
Length of output: 2426
🏁 Script executed:
# Check if there's any explicit prepared statement cache configuration
rg -t go "PrepareCache|PoolConfig|StatementCache" internal/Repository: ENTERPILOT/GOModel
Length of output: 44
🏁 Script executed:
# Check how queries are executed and if there's any explicit mode settings
rg -t go "QueryExecMode|Exec\(" internal/auditlog/store_postgresql.go -A 3 -B 3Repository: ENTERPILOT/GOModel
Length of output: 1546
🏁 Script executed:
# Search for pgx documentation or comments about prepared statement behavior
rg -t go "pgx" internal/storage/postgresql.go -B 5 -A 5 | head -40Repository: ENTERPILOT/GOModel
Length of output: 1013
🏁 Script executed:
# Let me verify the exact behavior by checking how often small vs large batches are used
rg -t go "WriteBatch|writeBatchSmall|writeBatchLarge" internal/auditlog/ -B 2 -A 2Repository: ENTERPILOT/GOModel
Length of output: 7621
🏁 Script executed:
# Let me verify pgx's default behavior for prepared statements
web_search "pgx v5 prepared statement cache default QueryExecMode"Repository: ENTERPILOT/GOModel
Length of output: 112
🌐 Web query:
pgx v5 prepared statement cache default QueryExecMode behavior
💡 Result:
In pgx v5, the default QueryExecMode for prepared statement cache behavior is QueryExecModeCacheStatement. This mode caches prepared statements per connection, enabling automatic preparation and reuse for repeated queries to improve performance. It is particularly suitable for pgxpool in transaction pooling mode with PgBouncer, as PgBouncer can hijack and rename these prepared statements (e.g., to PGBOUNCER_XXX) for compatibility across pooled connections. This is confirmed in pgx v5 documentation and discussions: - pgx features include "Automatic statement preparation and caching". - GitHub issue #791: "For pgxpool v5 and transaction pooling, the most suitable mode is QueryExecModeCacheStatement (it's the default one)". - PgBouncer compatibility section references QueryExecMode, with CacheStatement recommended as default. QueryExecMode can be customized via ConnConfig.DefaultQueryExecMode (e.g., for CopyFrom operations), but defaults to caching behavior. Other modes like CacheDescribe, DescribeExec, or SimpleProtocol exist for specific cases (e.g., PgBouncer session pooling), but CacheStatement is standard for optimal caching.
Citations:
- 1: https://pkg.go.dev/github.com/jackc/pgx/v5@v5.8.0
- 2: https://pkg.go.dev/github.com/jackc/pgx/v5
- 3: Example for using prepared statements with pgx jackc/pgx#791
- 4: Add QueryExecMode to CopyFrom jackc/pgx#1412
- 5: https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool
Variable batch sizes create cache churn in pgx's prepared-statement cache.
buildAuditLogInsert generates a unique SQL string for each distinct batch size. With pgx's default QueryExecModeCacheStatement mode, each variation becomes a separate prepared statement in the per-connection cache. Small batches (<10 entries) are especially problematic—each size from 1–9 creates its own prepared statement. For large batches, only remainder chunks differ from the fixed 4369-row chunks, but even this adds cache pressure on the hot path.
Consider using fixed batch sizes throughout or switching to a non-caching query execution mode for these inserts.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/auditlog/store_postgresql.go` around lines 165 - 220, The
batch-insert SQL created by buildAuditLogInsert varies per batch size, which
causes pgx to populate the per-connection prepared-statement cache for every
size; to fix this either (a) stop generating variable SQL by making
writeAuditLogInsertChunks/buildAuditLogInsert always emit a fixed-size VALUES
clause (e.g. always use auditLogInsertMaxRowsPerQuery placeholders and pad
smaller batches with NULLs) so the query string is identical across chunks, or
(b) keep the current builder but avoid caching by changing the execution path in
writeAuditLogInsertChunks to use a non-caching execution mode (e.g. provide an
ExecNoCache on auditLogBatchExecutor or configure the underlying pgx Exec to use
the simple/no-cache protocol) and call that for the insert; update references to
buildAuditLogInsert and writeAuditLogInsertChunks accordingly.
Summary
perf-check/perf-bench, and opt-inpprofIncluded changes
tests/perfhot-path benchmarks plus CI and pre-commit perf thresholdspprofmounting behind config/envResponseWriterpathcore.IsModelInteractionPathdirectly as the route-classification ownerVerification
make perf-checkgo test ./internal/auditlog ./internal/servergo test ./internal/usage ./internal/servergo test ./...Summary by CodeRabbit
New Features
Performance Improvements
Quality Assurance