feat(hooks): add before_response_emit hook for output policies#39203
feat(hooks): add before_response_emit hook for output policies#39203zeroaltitude wants to merge 3 commits intoopenclaw:mainfrom
Conversation
Adds the before_llm_call modifying hook that fires before every LLM API call within the agent loop. Plugins can inspect/modify the message context, filter the tool list, or block the call entirely. Key design decisions: - First-writer-wins for messages/systemPrompt (security plugins can't be overridden by lower-priority handlers) - Intersection latch for tools (a later handler can only narrow, never widen, the tool allowlist) - BeforeLlmCallBlockError sentinel class for graceful blocking without surfacing errors to the user - streamFn wrapper pattern (outermost wrapper, same context as agentLoop) Also moves clearInternalHooks() from server-startup.ts to server.impl.ts so plugin hooks registered during loadGatewayPlugins are preserved when bundled hooks load later. Files: - src/plugins/types.ts: BeforeLlmCallEvent/Result types - src/plugins/hooks.ts: runBeforeLlmCall runner - src/agents/pi-embedded-runner/run/hook-stream-wrapper.ts: streamFn wrapper - src/agents/pi-embedded-runner/run/attempt.ts: wrapper setup + iteration tracking - src/gateway/server-startup.ts, server.impl.ts: clearInternalHooks fix Tests: 7 stream wrapper + 8 merge semantics = 15 tests
… gate Adds the after_llm_call modifying hook — fires after the LLM response is received, before tool execution. Plugins can block all tool calls or filter specific tool calls by ID. ## Key design: deterministic Promise-based gate Unlike the previous implementation which used a synchronous mutable ref populated via .then() from a subscription callback (racy, best-effort), this uses a Promise-based gate that's set synchronously from the streamFn wrapper. The wrapper runs inside agentLoop's async context, so the Promise is guaranteed to exist before executeToolCalls() starts. The tool wrapper (pi-tools.before-tool-call.ts) awaits the gate Promise: - First tool call blocks until the hook resolves - Subsequent calls in the same turn get the cached result instantly - Turn boundaries clear the gate via the turn_start subscription This eliminates: - Monotonic sequence counters - Staleness detection logic - The "best-effort" caveat ## Architecture 1. streamFn wrapper intercepts response completion (done/error event) 2. Extracts tool calls from the final message 3. Fires runAfterLlmCall, stores the Promise in the gate (synchronously) 4. Tool wrapper awaits the gate before tool.execute() Files: - src/plugins/types.ts: AfterLlmCallEvent/Result types - src/plugins/hooks.ts: runAfterLlmCall runner - src/agents/pi-embedded-runner/run/hook-stream-wrapper.ts: response interception - src/agents/pi-embedded-runner/run/after-llm-call-gate.ts: Promise-based gate - src/agents/pi-tools.before-tool-call.ts: async gate check Tests: 9 gate + 4 merge semantics + 7 stream wrapper = 20 new tests
Adds the before_response_emit modifying hook — fires before the final assistant response is delivered. Plugins can modify, redact, or block the response entirely. Key features: - Single-message modification via `content` - Multi-turn modification via `allContent` (full PII redaction across tool-loop iterations) - Run-scoped rewrites (only current-run messages, not prior history) - Compaction-safe fallback (tail-based scan when preRunMessageCount is invalidated) - Session history scrubbing on block (clears all current-run assistant content parts) Merge semantics: - content/allContent: first-writer-wins, mutually exclusive - block: one-way latch - blockReason: first-writer-wins Files: - src/plugins/types.ts: BeforeResponseEmitEvent/Result types - src/plugins/hooks.ts: runBeforeResponseEmit runner - src/agents/pi-embedded-runner/run/hook-response-emit.ts: implementation - src/agents/pi-embedded-runner/run/attempt.ts: integration Tests: 26 response emit + 4 merge semantics = 30 new tests Total: 58 tests across all 3 PRs
|
Closing this PR because the author has more than 10 active PRs in this repo. Please reduce the active PR queue and reopen or resubmit once it is back under the limit. You can close your own PRs to get back under the limit. |
Greptile SummaryThis PR adds the Key items to address before merge:
Confidence Score: 3/5
|
| ); | ||
| } |
There was a problem hiding this comment.
Gate not cleared on session end — memory leak
The gates map in after-llm-call-gate.ts is a module-level singleton. clearAfterLlmCallGate is called at turn_start (to clear the previous turn's gate before a new one) and inside wrapStreamFnWithHooks before setting a fresh gate. Neither of these paths fires after the final turn of a session.
Because this finally block only calls hookIterationUnsub?.() (which just unsubscribes the turn_start listener) and unsubscribe(), the resolved Promise<GateDecision> entry for the last turn is left in the map indefinitely. For long-running servers handling many sessions with unique session IDs, these entries accumulate forever.
The fix is one extra call here:
| ); | |
| } | |
| try { | |
| hookIterationUnsub?.(); | |
| if (params.sessionId && hookRunner?.hasHooks("after_llm_call")) { | |
| const { clearAfterLlmCallGate } = await import("./after-llm-call-gate.js"); | |
| clearAfterLlmCallGate(params.sessionId); | |
| } | |
| unsubscribe(); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agents/pi-embedded-runner/run/attempt.ts
Line: 2063-2064
Comment:
**Gate not cleared on session end — memory leak**
The `gates` map in `after-llm-call-gate.ts` is a module-level singleton. `clearAfterLlmCallGate` is called at `turn_start` (to clear the *previous* turn's gate before a new one) and inside `wrapStreamFnWithHooks` before setting a fresh gate. Neither of these paths fires after the *final* turn of a session.
Because this `finally` block only calls `hookIterationUnsub?.()` (which just unsubscribes the `turn_start` listener) and `unsubscribe()`, the resolved `Promise<GateDecision>` entry for the last turn is left in the map indefinitely. For long-running servers handling many sessions with unique session IDs, these entries accumulate forever.
The fix is one extra call here:
```suggestion
try {
hookIterationUnsub?.();
if (params.sessionId && hookRunner?.hasHooks("after_llm_call")) {
const { clearAfterLlmCallGate } = await import("./after-llm-call-gate.js");
clearAfterLlmCallGate(params.sessionId);
}
unsubscribe();
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 22a884db65
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
|
||
| // Scope to only current-run messages so we never corrupt prior history. | ||
| // Uses tail-based scan (last N assistant messages) which is compaction-safe. | ||
| const runMessages = getRunScopedMessages(activeSession.messages, assistantTexts.length); |
There was a problem hiding this comment.
Use assistant-message count for run scoping
assistantTexts.length is treated as the number of assistant turns here, but in block-reply/chunking flows assistantTexts is populated per emitted chunk (see src/agents/pi-embedded-subscribe.ts where each chunk is pushed), so a single assistant message can increment it multiple times. In that case getRunScopedMessages() cannot find enough assistant messages and returns empty, which drives the block path into the fallback that clears all assistant content in the session, wiping prior history unexpectedly whenever before_response_emit blocks a chunked response.
Useful? React with 👍 / 👎.
| return { blocked: false } as GateDecision; | ||
| }); | ||
|
|
||
| gates.set(sessionId, gatePromise); |
There was a problem hiding this comment.
Clear gate entries when sessions finish
This stores a per-session promise in a process-wide Map, but entries are only removed on a later turn_start or another LLM stream for the same session; if a session ends after a tool-using turn and is never resumed, its gate stays resident forever. Over time (many unique session IDs), this creates unbounded retained entries (promises + allowlist sets) in the gateway process, so the gate should be explicitly cleared during run/session teardown.
Useful? React with 👍 / 👎.
TL;DR
Adds the
before_response_emitmodifying hook — fires before the final assistant response is delivered. Plugins can modify, redact, or block the response.Split from #33916 (3 of 3). Depends on #39200 (
after_llm_call) — merge that first.Features
Single-message modification
Multi-turn modification (full PII redaction)
Rewrites all assistant messages from the current run — not just the last one.
Blocking
Clears all current-run assistant content parts from session history.
Run-scoped rewrites
The hook only modifies messages from the current run. Prior history (messages before the user prompt that triggered this run) is untouched.
Compaction-safe: if auto-compaction invalidates the pre-run message count during the run, the handler falls back to a tail-based scan that finds assistant messages from the end of the snapshot.
Hook contract
contentstringallContentstring[]blockbooleanblockReasonstringTesting
pnpm checkcleanAI-Assisted Development 🤖
Developed with AI assistance (Claude/OpenClaw agent "Tank"). All changes reviewed and directed by Eddie Abrams.