feat: add SecOps security agent with rule engine, audit log, and ToolInvoker integration#192
feat: add SecOps security agent with rule engine, audit log, and ToolInvoker integration#192
Conversation
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (17)
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a new SecOps security subsystem (models, config, rule engine, detectors, risk classifier, audit log, output scanner, action-type registry), wires action_type metadata into tools, and integrates pre-/post- tool interception into ToolInvoker and AgentEngine; includes extensive unit tests and gitleaks allowlist entries for security tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Invoker as ToolInvoker
participant SecOps as SecOpsService
participant Engine as RuleEngine
participant Detectors as Detectors
participant Approval as ApprovalStore
participant Tool as Tool
participant Scanner as OutputScanner
Invoker->>SecOps: evaluate_pre_tool(SecurityContext)
SecOps->>Engine: evaluate(SecurityContext)
Engine->>Detectors: evaluate(rule, SecurityContext) (ordered)
Detectors-->>Engine: SecurityVerdict (ALLOW / DENY / ESCALATE)
Engine-->>SecOps: SecurityVerdict
alt ESCALATE
SecOps->>Approval: create_approval_item(...)
Approval-->>SecOps: approval_id / error
end
alt DENY
SecOps-->>Invoker: return denied ToolResult
else ALLOW
Invoker->>Tool: execute(...)
Tool-->>Invoker: raw_output
Invoker->>SecOps: scan_output(SecurityContext, raw_output)
SecOps->>Scanner: scan(output)
Scanner-->>SecOps: OutputScanResult (findings, redacted_content?)
alt sensitive data found
SecOps->>SecOps: record audit entry (output_scan)
SecOps-->>Invoker: return redacted ToolResult
else no sensitive data
SecOps-->>Invoker: return normal ToolResult
end
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
Greptile SummaryThis PR introduces a comprehensive SecOps security layer (~6 300 lines across 61 files) consisting of a pluggable rule engine, an append-only audit log, a post-execution output scanner, and full Key findings:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Agent as AgentEngine
participant Invoker as ToolInvoker
participant SecOps as SecOpsService
participant RE as RuleEngine
participant Rules as Rules (Policy/Cred/Path/Destructive/Leak)
participant Store as ApprovalStore
participant Log as AuditLog
participant Tool as Tool.execute()
participant Scanner as OutputScanner
Agent->>Invoker: invoke(tool_call)
Invoker->>SecOps: evaluate_pre_tool(SecurityContext)
SecOps->>RE: evaluate(context)
RE->>Rules: evaluate(context) [in order]
Rules-->>RE: SecurityVerdict | None
alt DENY or ESCALATE
RE-->>SecOps: SecurityVerdict (DENY/ESCALATE)
opt ESCALATE
SecOps->>Store: add(ApprovalItem)
Store-->>SecOps: ok / error → convert to DENY
end
SecOps->>Log: record(AuditEntry)
SecOps-->>Invoker: SecurityVerdict (DENY)
Invoker-->>Agent: ToolResult(is_error=True)
else ALLOW
RE-->>SecOps: SecurityVerdict (ALLOW)
SecOps->>Log: record(AuditEntry)
SecOps-->>Invoker: SecurityVerdict (ALLOW)
Invoker->>Tool: execute(arguments)
Tool-->>Invoker: ToolExecutionResult
Invoker->>SecOps: scan_output(context, output)
SecOps->>Scanner: scan(output)
Scanner-->>SecOps: OutputScanResult
opt sensitive data found
SecOps->>Log: record(AuditEntry [output_scan])
end
SecOps-->>Invoker: OutputScanResult
Invoker-->>Agent: ToolResult (redacted if needed)
end
Last reviewed commit: 7e50440 |
There was a problem hiding this comment.
Pull request overview
Adds a SecOps security subsystem and integrates it into the tool execution pipeline to pre-check tool calls, audit decisions, and scan/redact sensitive tool outputs.
Changes:
- Introduces security domain models/config, rule engine + detectors, audit log, and output scanner.
- Wires
SecurityInterceptionStrategyintoToolInvoker(pre-tool gate + post-tool scan/redaction) andAgentEngine. - Expands action-type taxonomy (
ActionType, ToolCategory→ActionType defaults) and updates built-in tools + tests.
Reviewed changes
Copilot reviewed 56 out of 56 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| .gitleaks.toml | Allowlist fake secrets used in security tests. |
| src/ai_company/config/defaults.py | Adds security section to default config dict. |
| src/ai_company/config/schema.py | Adds SecurityConfig to RootConfig schema. |
| src/ai_company/core/enums.py | Expands ActionType to category:action taxonomy. |
| src/ai_company/engine/agent_engine.py | Builds SecOps interceptor and injects into ToolInvoker. |
| src/ai_company/observability/events/security.py | Adds security-domain event constants. |
| src/ai_company/observability/events/tool.py | Adds tool-level security/redaction events. |
| src/ai_company/security/init.py | Public re-exports for security subsystem. |
| src/ai_company/security/action_type_mapping.py | Default ToolCategory→ActionType mapping for tools. |
| src/ai_company/security/action_types.py | Action-type taxonomy registry + validation helpers. |
| src/ai_company/security/audit.py | Append-only in-memory audit log with querying/eviction. |
| src/ai_company/security/config.py | SecurityConfig + RuleEngineConfig + policy rule model. |
| src/ai_company/security/models.py | SecurityContext/Verdict/AuditEntry/OutputScanResult models. |
| src/ai_company/security/output_scanner.py | Scans/redacts sensitive patterns in tool output. |
| src/ai_company/security/protocol.py | SecurityInterceptionStrategy protocol for ToolInvoker. |
| src/ai_company/security/rules/init.py | Security rules package init. |
| src/ai_company/security/rules/_utils.py | Shared recursive string-walk utility for detectors. |
| src/ai_company/security/rules/credential_detector.py | Detects credential patterns in tool arguments. |
| src/ai_company/security/rules/data_leak_detector.py | Detects sensitive paths + PII in arguments. |
| src/ai_company/security/rules/destructive_op_detector.py | Detects destructive commands/ops in arguments. |
| src/ai_company/security/rules/engine.py | Rule engine orchestration (soft-allow + fail-closed per-rule). |
| src/ai_company/security/rules/path_traversal_detector.py | Detects traversal/null byte/encoding traversal patterns. |
| src/ai_company/security/rules/policy_validator.py | Hard-deny / auto-approve policy rule (soft-allow behavior). |
| src/ai_company/security/rules/protocol.py | Protocol for synchronous security rules. |
| src/ai_company/security/rules/risk_classifier.py | Default action_type→risk classification map. |
| src/ai_company/security/service.py | SecOps service implementing interception strategy + approvals + audit. |
| src/ai_company/tools/_git_base.py | Adds action_type override + strengthens git arg validation. |
| src/ai_company/tools/base.py | Adds action_type to BaseTool with category-derived default. |
| src/ai_company/tools/file_system/_base_fs_tool.py | Adds action_type override plumbing for FS tools. |
| src/ai_company/tools/file_system/delete_file.py | Sets action_type for delete tool. |
| src/ai_company/tools/file_system/edit_file.py | Sets action_type for edit tool. |
| src/ai_company/tools/file_system/list_directory.py | Sets action_type for list tool. |
| src/ai_company/tools/file_system/read_file.py | Sets action_type for read tool. |
| src/ai_company/tools/file_system/write_file.py | Sets action_type for write tool. |
| src/ai_company/tools/git_tools.py | Adds action_type to git tools; tightens clone schemes. |
| src/ai_company/tools/invoker.py | Integrates security interception + output scanning into invoke pipeline. |
| tests/unit/core/test_enums.py | Updates tests for expanded ActionType taxonomy. |
| tests/unit/observability/test_events.py | Ensures events.security module discovery. |
| tests/unit/security/init.py | Security unit test package marker. |
| tests/unit/security/rules/init.py | Security rules unit test package marker. |
| tests/unit/security/rules/test_credential_detector.py | Tests for credential detector rule. |
| tests/unit/security/rules/test_data_leak_detector.py | Tests for data leak detector rule. |
| tests/unit/security/rules/test_destructive_op_detector.py | Tests for destructive op detector rule. |
| tests/unit/security/rules/test_engine.py | Tests for rule engine behavior (deny/escalate/soft-allow). |
| tests/unit/security/rules/test_path_traversal_detector.py | Tests for traversal detector rule. |
| tests/unit/security/rules/test_policy_validator.py | Tests for policy validator rule behavior. |
| tests/unit/security/rules/test_risk_classifier.py | Tests for risk classifier defaults/overrides. |
| tests/unit/security/test_action_type_mapping.py | Tests ToolCategory→ActionType mapping completeness/immutability. |
| tests/unit/security/test_action_types.py | Tests ActionTypeRegistry/Category behavior. |
| tests/unit/security/test_audit.py | Tests AuditLog record/eviction/query behavior. |
| tests/unit/security/test_config.py | Tests security config model validation/immutability. |
| tests/unit/security/test_models.py | Tests security models validation/immutability/roundtrips. |
| tests/unit/security/test_output_scanner.py | Tests output scanner detection + redaction behavior. |
| tests/unit/security/test_service.py | Tests SecOps service: pre-tool eval, audit, escalation, scan_output. |
| tests/unit/tools/test_base_action_type.py | Tests BaseTool default/override action_type behavior. |
| tests/unit/tools/test_invoker_security.py | Tests ToolInvoker integration with security interception + redaction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| super().__init__( | ||
| name="git_clone", | ||
| action_type=ActionType.VCS_COMMIT, | ||
| description=( |
There was a problem hiding this comment.
GitCloneTool is assigned action_type=ActionType.VCS_COMMIT, which misclassifies clone operations as commits. This will skew SecOps policy checks and risk classification (e.g., clone could be treated as a commit-level action). Please change to a more appropriate action type (or introduce a dedicated vcs:clone ActionType if needed).
| @staticmethod | ||
| def get_category(action_type: str) -> str: | ||
| """Extract the category prefix from an action type string. | ||
|
|
||
| Args: | ||
| action_type: A ``category:action`` string. | ||
|
|
||
| Returns: | ||
| The category prefix before the first ``:``. | ||
|
|
||
| Raises: | ||
| ValueError: If the string does not contain ``:``. | ||
| """ | ||
| if ":" not in action_type: | ||
| msg = f"Action type {action_type!r} must use 'category:action' format" | ||
| raise ValueError(msg) | ||
| return action_type.split(":", maxsplit=1)[0] |
There was a problem hiding this comment.
ActionTypeRegistry.get_category() only checks for the presence of ':'; strings like ':action' or 'category:' will currently return an empty/partial category instead of raising. This is inconsistent with the stricter validation used for custom_types in init and can let malformed action types slip through. Consider validating action_type doesn't start/end with ':' here as well.
| def classify(self, action_type: str) -> ApprovalRiskLevel: | ||
| """Return the risk level for an action type. | ||
|
|
||
| Falls back to ``MEDIUM`` for unknown action types. | ||
|
|
||
| Args: | ||
| action_type: The ``category:action`` string. | ||
|
|
||
| Returns: | ||
| The assessed risk level. | ||
| """ | ||
| result = self._risk_map.get(action_type) | ||
| if result is None: | ||
| logger.debug( | ||
| SECURITY_RISK_FALLBACK, | ||
| action_type=action_type, | ||
| fallback="MEDIUM", | ||
| ) | ||
| return ApprovalRiskLevel.MEDIUM | ||
| return result |
There was a problem hiding this comment.
RiskClassifier currently falls back to MEDIUM for unknown action types, but DESIGN_SPEC.md D19 specifies unknown action types should default to HIGH (fail-safe). This impacts security posture and any downstream logic that uses risk_level when no rule triggers; please change the fallback (and the logged fallback value) to HIGH to match the spec.
| def query( # noqa: PLR0913 | ||
| self, | ||
| *, | ||
| agent_id: str | None = None, | ||
| tool_name: str | None = None, | ||
| verdict: str | None = None, | ||
| risk_level: ApprovalRiskLevel | None = None, | ||
| since: datetime | None = None, | ||
| limit: int = 100, | ||
| ) -> tuple[AuditEntry, ...]: | ||
| """Query audit entries with optional filters. | ||
|
|
||
| Filters are AND-combined. Results are returned newest-first, | ||
| up to *limit* entries. | ||
|
|
||
| Args: | ||
| agent_id: Filter by agent ID. | ||
| tool_name: Filter by tool name. | ||
| verdict: Filter by verdict string. | ||
| risk_level: Filter by risk level. | ||
| since: Entries before this datetime are excluded. | ||
| limit: Maximum results to return. | ||
|
|
||
| Returns: | ||
| Tuple of matching entries, newest first. | ||
| """ | ||
| results: list[AuditEntry] = [] | ||
| for entry in reversed(self._entries): | ||
| if agent_id is not None and entry.agent_id != agent_id: | ||
| continue | ||
| if tool_name is not None and entry.tool_name != tool_name: | ||
| continue | ||
| if verdict is not None and entry.verdict != verdict: | ||
| continue | ||
| if risk_level is not None and entry.risk_level != risk_level: | ||
| continue | ||
| if since is not None and entry.timestamp < since: | ||
| continue | ||
| results.append(entry) | ||
| if len(results) >= limit: | ||
| break |
There was a problem hiding this comment.
AuditLog.query() does not validate limit. With the current loop logic, limit=0 (or a negative value) will still return at least one entry, which is surprising and makes the limit parameter unreliable. Please validate limit >= 1 (raise ValueError or return an empty tuple) before iterating.
src/ai_company/tools/invoker.py
Outdated
| """Scan tool output for sensitive data (if interceptor is set). | ||
|
|
||
| If sensitive data is found and redacted, returns a new | ||
| ``ToolExecutionResult`` with the redacted content. Exceptions | ||
| from the scanner are caught — the original result is returned | ||
| to avoid destroying valid tool output. | ||
| """ | ||
| if self._security_interceptor is None: | ||
| return result | ||
| if result.is_error: | ||
| return result | ||
|
|
||
| try: | ||
| scan_result = await self._security_interceptor.scan_output( | ||
| context, | ||
| result.content, | ||
| ) | ||
| except MemoryError, RecursionError: | ||
| raise | ||
| except Exception: | ||
| logger.exception( | ||
| SECURITY_OUTPUT_SCAN_ERROR, | ||
| tool_call_id=tool_call.id, | ||
| tool_name=tool_call.name, | ||
| ) | ||
| return result |
There was a problem hiding this comment.
_scan_output() returns the original (unredacted) tool output if the security interceptor's scan raises an exception. This can leak sensitive data and contradicts the stated "fail-closed" security posture. Consider fail-closing on scan failures too (e.g., return an error ToolExecutionResult / redact-all placeholder) or make the behavior explicitly configurable.
| def _make_tool_invoker( | ||
| self, | ||
| identity: AgentIdentity, | ||
| ) -> ToolInvoker | None: | ||
| """Create a ToolInvoker with permission checking, or None.""" | ||
| """Create a ToolInvoker with permission checking and security. | ||
|
|
||
| Returns None if no tool registry is configured. | ||
| """ | ||
| if self._tool_registry is None: | ||
| return None | ||
| checker = ToolPermissionChecker.from_permissions(identity.tools) | ||
| return ToolInvoker(self._tool_registry, permission_checker=checker) | ||
| interceptor = self._make_security_interceptor() | ||
| return ToolInvoker( | ||
| self._tool_registry, | ||
| permission_checker=checker, | ||
| security_interceptor=interceptor, | ||
| agent_id=str(identity.id), | ||
| ) |
There was a problem hiding this comment.
ToolInvoker now supports task_id for SecurityContext/AuditEntry, but AgentEngine only passes agent_id when constructing the ToolInvoker. This means security/audit records created via the engine will miss task_id. Please pass the current task_id into ToolInvoker (and adjust _make_tool_invoker signature/call site accordingly).
There was a problem hiding this comment.
Actionable comments posted: 24
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/ai_company/tools/base.py (1)
82-114:⚠️ Potential issue | 🟠 MajorFail closed when
action_typeis invalid or unmapped.SecOps makes its policy and risk decisions off this field. Right now an unmapped
ToolCategorysilently degrades tocode:read, and an explicit blank override would also slip through, which turns a tool-registration mistake into a weaker security classification instead of surfacing it immediately.🔒 Proposed fix
- self._action_type = ( - action_type - if action_type is not None - else str( - DEFAULT_CATEGORY_ACTION_MAP.get( - category, - ActionType.CODE_READ, - ), - ) - ) + if action_type is not None: + if not action_type.strip(): + msg = "Tool action_type must not be empty or whitespace-only" + raise ValueError(msg) + self._action_type = action_type + else: + derived_action_type = DEFAULT_CATEGORY_ACTION_MAP.get(category) + if derived_action_type is None: + msg = f"No default action_type mapping defined for {category!s}" + raise ValueError(msg) + self._action_type = str(derived_action_type)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/tools/base.py` around lines 82 - 114, The constructor currently silently defaults an unmapped category (or accepts a blank explicit action_type) to ActionType.CODE_READ; change it to fail closed: if action_type is provided and is empty/whitespace, raise ValueError; if action_type is None, look up DEFAULT_CATEGORY_ACTION_MAP.get(category) and if the result is None raise ValueError about an unmapped ToolCategory, otherwise set self._action_type = str(lookup). Update the handling around DEFAULT_CATEGORY_ACTION_MAP, ActionType.CODE_READ, and self._action_type in the initializer to implement these checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.gitleaks.toml:
- Around line 4-8: Remove the broad path allowlist entries from the [allowlist]
block in the gitleaks config and replace them with targeted allowlisting: delete
the two file-wide exemptions in the allowlist and instead add inline allow
comments (`#gitleaks`:allow) on the specific lines in the test files that contain
intentional test credentials or create .gitleaksignore entries using finding
fingerprints for those exact test values; update the config to no longer
suppress entire files so only the explicit, commented lines or fingerprinted
findings are ignored.
In `@src/ai_company/config/schema.py`:
- Around line 578-581: The RootConfig class docstring is missing the new
security field in its Attributes section; update the RootConfig docstring (the
Attributes list near the class definition) to add an entry for "security:
SecurityConfig - Security subsystem configuration" following the same wording
and style as the other fields so documentation stays consistent with the
security: SecurityConfig Field in the class.
In `@src/ai_company/core/enums.py`:
- Around line 371-406: The change to ActionType removed/migrated built-in
strings but didn’t provide a migration/normalization path, so older
SecurityContext.action_type values may be loaded and mis-classified by
RiskClassifier.classify(); add a normalization layer that maps legacy string
variants to the new ActionType members (e.g., a dict mapping legacy names ->
ActionType constants) and apply it when SecurityContext instances are
constructed/deserialized (or inside RiskClassifier.classify() before decision
logic) so that classify() receives canonical ActionType values; update
references to ActionType in the codebase to use the normalized value and ensure
unknown strings still fall back to the current MEDIUM behavior.
In `@src/ai_company/security/action_type_mapping.py`:
- Around line 13-30: DEFAULT_CATEGORY_ACTION_MAP currently provides single
default verbs for broad categories; remove unsafe defaults for mixed-semantics
categories so tools must declare action_type explicitly. Specifically, delete
entries for ToolCategory.DEPLOYMENT, ToolCategory.COMMUNICATION, and
ToolCategory.VERSION_CONTROL from DEFAULT_CATEGORY_ACTION_MAP and update the
BaseTool fallback behavior (where it reads DEFAULT_CATEGORY_ACTION_MAP) to treat
missing mapping as "no implicit action" — either raise a validation error or
return None so callers must set ActionType explicitly; keep safe defaults for
the remaining ToolCategory -> ActionType pairs.
In `@src/ai_company/security/action_types.py`:
- Around line 71-75: The loop over custom_types accepts malformed strings like
"a:b:c"; replace the current ad-hoc check with a centralized validator (e.g.,
validate_action_type or parse_action_type) that ensures ct.count(":") == 1 and
that both category and action (the parts from ct.split(":", 1)) are non-empty;
call this validator where custom_types are processed and where get_category() is
used (the other check around lines 132-135) so any malformed value logs via
logger.warning(SECURITY_CONFIG_LOADED, error=...) and raises ValueError with the
same message used now.
In `@src/ai_company/security/audit.py`:
- Around line 36-42: Replace the eviction event used for the validation error
with a more appropriate config/validation event: define a new constant (e.g.,
SECURITY_AUDIT_CONFIG_ERROR) alongside existing events and use it in the
logger.warning call that handles invalid max_entries instead of
SECURITY_AUDIT_EVICTION; update any imports/usages in audit.py where the
logger.warning is called for max_entries (the block referencing max_entries and
SECURITY_AUDIT_EVICTION) to emit SECURITY_AUDIT_CONFIG_ERROR so the event
semantic correctly reflects a configuration/validation error.
In `@src/ai_company/security/config.py`:
- Around line 77-85: Replace the hardcoded action-type strings in
hard_deny_action_types and auto_approve_action_types with the corresponding
ActionType enum constants to prevent typos and improve maintainability: import
ActionType and set the tuples using the enum values (e.g.,
ActionType.DEPLOY_PRODUCTION.value, ActionType.DB_ADMIN.value,
ActionType.ORG_FIRE.value for hard_deny_action_types and
ActionType.CODE_READ.value, ActionType.DOCS_WRITE.value for
auto_approve_action_types) so the variables remain tuple[str, ...] while
sourcing values from the ActionType enum.
In `@src/ai_company/security/rules/_utils.py`:
- Around line 20-32: The helper walk_string_values currently skips nested lists
and silently stops when _depth >= _MAX_RECURSION_DEPTH; update
walk_string_values to recursively descend into list items that are lists (not
only dicts/strs) and to treat reaching the depth cap as a fail-closed condition:
log a WARNING/ERROR with context (include the offending argument or path) and
raise a specific exception instead of returning silently; ensure you reference
the existing _MAX_RECURSION_DEPTH and _depth parameters and add logging before
raising so all callers (the four detectors) receive a clear error path.
In `@src/ai_company/security/rules/credential_detector.py`:
- Around line 22-66: CREDENTIAL_PATTERNS is currently exported but other
detectors use private constants; if it’s not intended for external use, rename
CREDENTIAL_PATTERNS to _CREDENTIAL_PATTERNS to match the convention used by
_DESTRUCTIVE_PATTERNS and _TRAVERSAL_PATTERNS, updating any internal references
(e.g., in this module and any tests) accordingly; if it is intended to be public
(e.g., reused by OutputScanner), instead leave the name and add a brief note in
the module docstring explaining its public contract and intended reuse to make
the API choice explicit.
In `@src/ai_company/security/rules/data_leak_detector.py`:
- Line 32: The finding label "AWS credentials" hard-codes a vendor name; rename
that tuple entry to a provider-neutral label (e.g., "cloud provider credentials"
or "provider credentials") while keeping the regex
re.compile(r"\.aws[/\\]credentials$") unchanged, and update any code paths that
reference the string literal "AWS credentials" (these are used in detector
reasons/logs) to use the new label so logs and outputs no longer embed a vendor
name.
- Around line 24-27: The regex rules for key files currently use the optional
suffix (?:\.pub)? which causes public keys to be flagged; update the patterns
associated with "RSA key file", "Ed25519 key file", "ECDSA key file", and "DSA
key file" to match only private key filenames (remove the optional \.pub part so
they match id_rsa, id_ed25519, id_ecdsa, id_dsa only—optionally add anchors like
^ and $ for exact matches) so that files like id_rsa.pub are not flagged as
secrets.
In `@src/ai_company/security/rules/destructive_op_detector.py`:
- Around line 53-57: The current rule tuple that uses
re.compile(r"\bgit\s+push\s+.*--force\b") (paired with the "git push --force"
label and SecurityVerdictType.ESCALATE) misses the short form "-f"; update that
regex to detect either "--force" or "-f" (for example use a pattern like
r"\bgit\s+push\b.*(?:--force\b|(?:\s|^)-f\b)" or
r"\bgit\s+push\b.*(?:--force\b|-f\b)"), and replace the existing tuple entry in
destructive_op_detector.py so force-pushes using the short flag are escalated as
well while preserving the same label and SecurityVerdictType.ESCALATE.
- Around line 26-29: The regex for detecting "rm -rf" variants can miss cases
like "sudo rm -rf" and exact "-rf" or "-fr" flags; update the tuple in
destructive_op_detector.py (the entry paired with SecurityVerdictType.DENY) to
use a regex anchored with a word boundary before "rm", allow an optional "sudo"
prefix, and match both flag orders ("-rf" and "-fr") (for example replace the
current re.compile with one that uses r"\b(?:sudo\s+)?rm\s+-(?:rf|fr)\b" or a
variant that also tolerates extra whitespace), so the detector reliably catches
common permutations without false negatives.
In `@src/ai_company/security/rules/protocol.py`:
- Around line 3-6: The module currently only imports SecurityContext and
SecurityVerdict under TYPE_CHECKING which makes them unavailable for runtime
introspection (get_type_hints); move or add real runtime imports for
SecurityContext and SecurityVerdict (not just under TYPE_CHECKING) so the
Protocol definitions decorated with `@runtime_checkable` can resolve those names
at runtime — update the top-level imports to include SecurityContext and
SecurityVerdict from ai_company.security.models (and adjust any references in
the Protocol class definitions around the methods referenced on lines 23-26) so
get_type_hints() and runtime checks succeed.
In `@src/ai_company/tools/git_tools.py`:
- Line 640: The action for git_clone is currently recorded as
ActionType.VCS_COMMIT which is incorrect; update the audit/action type in the
git_clone implementation to a dedicated read/clone action (e.g.,
ActionType.VCS_CLONE or ActionType.VCS_READ) instead of VCS_COMMIT, and ensure
any downstream code that interprets the action (audit logs, policy checks) will
accept the new enum value by adding it to the ActionType enum if missing and
adjusting references in functions that consume the action_type field (search for
git_clone and places that handle ActionType.VCS_COMMIT).
- Line 103: The git read-only tools (git_status, git_log, git_diff) are
incorrectly using ActionType.CODE_READ; change their action_type to the
read-only VCS action (use ActionType.VCS_READ) in each function’s action
creation. If the ActionType enum does not yet define a VCS_READ member, add
ActionType.VCS_READ to the enum (with an appropriate label like "vcs:read") and
replace the ActionType.CODE_READ usages in git_status, git_log, and git_diff
accordingly so these tools fall under the vcs:* classification.
In `@src/ai_company/tools/invoker.py`:
- Around line 162-175: _build_security_context currently constructs a
SecurityContext (using agent_id and task_id) outside the guarded _check_security
path which can raise uncaught NotBlankStr validation errors; move the creation
of SecurityContext into the try/guarded path in invoke() (the path that calls
_check_security) and reuse that single instance for both the security check and
the later invocation flow, ensuring agent_id and task_id are
validated/normalized to NotBlankStr | None at the system boundary (or explicitly
converted/checked before building SecurityContext) so any invalid/blank ids are
caught and cause a blocked tool result rather than an uncaught exception; update
all other call sites that build SecurityContext (references:
_build_security_context, _check_security, invoke, SecurityContext, agent_id,
task_id, NotBlankStr) to follow the same guarded-creation pattern.
- Around line 255-291: The post-tool output scanning currently skips when
result.is_error, swallows scanner exceptions and returns raw output when
has_sensitive_data is true but redacted_content is None; change the logic in the
method that calls self._security_interceptor.scan_output so it always attempts a
scan (remove the early "if result.is_error: return result" branch), and when
scan_output raises any Exception treat it as a closed failure: log
SECURITY_OUTPUT_SCAN_ERROR and return a ToolExecutionResult that redacts the
output (e.g., content="[REDACTED]" or empty), sets is_error=True and
metadata["output_redacted"]=True with redaction_findings noting the exception;
likewise if scan_result.has_sensitive_data is true but
scan_result.redacted_content is None, return a redacted ToolExecutionResult
(again setting output_redacted and redaction_findings) instead of passing raw
content; keep MemoryError/RecursionError re-raises and preserve references to
ToolExecutionResult, self._security_interceptor.scan_output, scan_result,
logger, SECURITY_OUTPUT_SCAN_ERROR and TOOL_OUTPUT_REDACTED to locate the code.
In `@tests/unit/security/rules/test_credential_detector.py`:
- Around line 36-84: The test data in
tests/unit/security/rules/test_credential_detector.py contains provider-branded
labels and sample values (e.g., "AWS access key", "GitHub PAT",
"aws_secret_access_key", "token ghp_...") which violates the no-vendor-names
rule; update the table of tuples to use provider-neutral labels and anonymized
sample strings (e.g., "cloud access key", "service secret key", "personal access
token", "generic_api_key_...") ensuring the same detection patterns are
preserved so tests exercise the same logic without vendor branding.
In `@tests/unit/security/rules/test_engine.py`:
- Around line 91-262: Add two unit tests: one that verifies a "soft-allow" (a
rule returning a soft-ALLOW verdict) does not short-circuit evaluation and
allows later rules to run (use _make_engine, _make_context and _StubRule
instances where the first returns a soft-allow verdict and a later rule returns
DENY or ESCALATE and assert both were called and final verdict matches later
rule), and another that verifies _safe_evaluate converts exceptions raised by a
rule into a DENY (create a _StubRule that raises in its evaluate path, put it
into _make_engine, call engine.evaluate and assert verdict.verdict ==
SecurityVerdictType.DENY and that the exception-producing rule was invoked).
Ensure tests assert evaluation_duration_ms is set where applicable.
In `@tests/unit/security/rules/test_risk_classifier.py`:
- Around line 185-194: The test test_all_action_types_are_mapped is ineffective
because RiskClassifier.classify(...) returns an ApprovalRiskLevel even for the
fallback path; change the assertion to verify the action type exists in the
classifier's explicit mapping rather than using isinstance. Concretely, get the
classifier's underlying mapping (e.g., RiskClassifier._default_risk_map or the
instance attribute classifier._risk_map / classifier.default_map used by
classify) and assert action_type is a key in that mapping, and optionally assert
classifier.classify(action_type) == classifier._default_risk_map[action_type] to
ensure the mapping returns the expected value.
In `@tests/unit/security/test_audit.py`:
- Around line 269-275: Update the test and/or the AuditLog.query behavior so
limit <= 0 returns no rows: in the test function test_query_limit add cases
calling log.query(limit=0) and log.query(limit=-1) and assert both return 0
results (use AuditLog() and self._populate as before); if AuditLog.query
currently appends a match before enforcing the limit, change the implementation
in src/ai_company/security/audit.py (the AuditLog.query method) to enforce the
limit boundary check before any appends and ensure it treats limit <= 0 as "no
results" so the new tests pass.
In `@tests/unit/security/test_models.py`:
- Around line 297-317: The test test_creation_with_all_fields creates an
AuditEntry with verdict=SecurityVerdictType.DENY but sets approval_id, which is
only valid for ESCALATE; fix by removing the approval_id parameter and the
approval_id assertion from test_creation_with_all_fields in
tests/unit/security/test_models.py (or alternatively change verdict to ESCALATE
if the intent is to assert approval_id behavior), ensuring AuditEntry is
exercised only in a valid state consistent with the model documentation.
In `@tests/unit/tools/test_base_action_type.py`:
- Around line 118-124: The test is asserting that an empty string "" is accepted
as a valid action_type, but we should not treat blank overrides as valid; update
_ActionTypeTool to treat empty-string action_type the same as None (i.e.,
reject/normalize empty strings and fall back to the category default or raise a
ValueError) and adjust the test accordingly; specifically, modify the
_ActionTypeTool constructor/initializer (the logic that sets tool.action_type)
to check for action_type == "" and either set action_type to None (so downstream
code uses the category default) or raise a clear exception, then change
test_explicit_empty_string_is_accepted to expect the normalized/failing
behavior.
---
Outside diff comments:
In `@src/ai_company/tools/base.py`:
- Around line 82-114: The constructor currently silently defaults an unmapped
category (or accepts a blank explicit action_type) to ActionType.CODE_READ;
change it to fail closed: if action_type is provided and is empty/whitespace,
raise ValueError; if action_type is None, look up
DEFAULT_CATEGORY_ACTION_MAP.get(category) and if the result is None raise
ValueError about an unmapped ToolCategory, otherwise set self._action_type =
str(lookup). Update the handling around DEFAULT_CATEGORY_ACTION_MAP,
ActionType.CODE_READ, and self._action_type in the initializer to implement
these checks.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4550a9bd-0ff7-43f2-9c24-3a42f8bafb6c
📒 Files selected for processing (56)
.gitleaks.tomlsrc/ai_company/config/defaults.pysrc/ai_company/config/schema.pysrc/ai_company/core/enums.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/observability/events/security.pysrc/ai_company/observability/events/tool.pysrc/ai_company/security/__init__.pysrc/ai_company/security/action_type_mapping.pysrc/ai_company/security/action_types.pysrc/ai_company/security/audit.pysrc/ai_company/security/config.pysrc/ai_company/security/models.pysrc/ai_company/security/output_scanner.pysrc/ai_company/security/protocol.pysrc/ai_company/security/rules/__init__.pysrc/ai_company/security/rules/_utils.pysrc/ai_company/security/rules/credential_detector.pysrc/ai_company/security/rules/data_leak_detector.pysrc/ai_company/security/rules/destructive_op_detector.pysrc/ai_company/security/rules/engine.pysrc/ai_company/security/rules/path_traversal_detector.pysrc/ai_company/security/rules/policy_validator.pysrc/ai_company/security/rules/protocol.pysrc/ai_company/security/rules/risk_classifier.pysrc/ai_company/security/service.pysrc/ai_company/tools/_git_base.pysrc/ai_company/tools/base.pysrc/ai_company/tools/file_system/_base_fs_tool.pysrc/ai_company/tools/file_system/delete_file.pysrc/ai_company/tools/file_system/edit_file.pysrc/ai_company/tools/file_system/list_directory.pysrc/ai_company/tools/file_system/read_file.pysrc/ai_company/tools/file_system/write_file.pysrc/ai_company/tools/git_tools.pysrc/ai_company/tools/invoker.pytests/unit/core/test_enums.pytests/unit/observability/test_events.pytests/unit/security/__init__.pytests/unit/security/rules/__init__.pytests/unit/security/rules/test_credential_detector.pytests/unit/security/rules/test_data_leak_detector.pytests/unit/security/rules/test_destructive_op_detector.pytests/unit/security/rules/test_engine.pytests/unit/security/rules/test_path_traversal_detector.pytests/unit/security/rules/test_policy_validator.pytests/unit/security/rules/test_risk_classifier.pytests/unit/security/test_action_type_mapping.pytests/unit/security/test_action_types.pytests/unit/security/test_audit.pytests/unit/security/test_config.pytests/unit/security/test_models.pytests/unit/security/test_output_scanner.pytests/unit/security/test_service.pytests/unit/tools/test_base_action_type.pytests/unit/tools/test_invoker_security.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: CI Pass
- GitHub Check: Agent
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649 native lazy annotations
Use PEP 758 except syntax: useexcept A, B:(no parentheses) — ruff enforces this on Python 3.14
All public functions must have type hints; mypy strict mode is enforced
Public classes and functions must have Google-style docstrings (enforced by ruff D rules)
Every module with business logic must include logging setup:from ai_company.observability import get_loggerthenlogger = get_logger(__name__). Never useimport loggingorprint()in application code. Variable name must always belogger
Use structured logging with event name constants:logger.info(EVENT_CONSTANT, key=value)— never use string interpolation likelogger.info('msg %s', val). Import event constants directly fromai_company.observability.events.<domain>
All error paths must log at WARNING or ERROR with context before raising; all state transitions must log at INFO; DEBUG for object creation, internal flow, and entry/exit of key functions
Line length maximum is 88 characters (enforced by ruff)
Functions must be < 50 lines; files must be < 800 lines
Handle errors explicitly; never silently swallow exceptions
Validate at system boundaries: user input, external APIs, and config files. Do not validate unnecessarily at internal boundaries
Files:
src/ai_company/security/rules/_utils.pysrc/ai_company/security/protocol.pysrc/ai_company/security/rules/__init__.pytests/unit/security/test_config.pytests/unit/security/rules/__init__.pysrc/ai_company/observability/events/tool.pysrc/ai_company/config/schema.pytests/unit/security/rules/test_credential_detector.pysrc/ai_company/security/output_scanner.pytests/unit/security/rules/test_policy_validator.pysrc/ai_company/security/rules/credential_detector.pysrc/ai_company/tools/file_system/read_file.pysrc/ai_company/tools/file_system/write_file.pytests/unit/security/test_audit.pytests/unit/security/test_output_scanner.pysrc/ai_company/security/service.pysrc/ai_company/config/defaults.pysrc/ai_company/security/rules/data_leak_detector.pytests/unit/security/test_service.pytests/unit/security/test_models.pytests/unit/security/test_action_types.pysrc/ai_company/tools/file_system/delete_file.pysrc/ai_company/security/rules/policy_validator.pytests/unit/security/rules/test_data_leak_detector.pytests/unit/tools/test_base_action_type.pysrc/ai_company/security/rules/path_traversal_detector.pytests/unit/security/test_action_type_mapping.pytests/unit/observability/test_events.pysrc/ai_company/security/models.pysrc/ai_company/security/rules/risk_classifier.pytests/unit/security/rules/test_path_traversal_detector.pysrc/ai_company/security/action_type_mapping.pysrc/ai_company/security/config.pysrc/ai_company/security/rules/destructive_op_detector.pytests/unit/security/__init__.pysrc/ai_company/security/__init__.pytests/unit/tools/test_invoker_security.pysrc/ai_company/security/rules/protocol.pysrc/ai_company/observability/events/security.pysrc/ai_company/tools/file_system/edit_file.pytests/unit/security/rules/test_engine.pysrc/ai_company/security/rules/engine.pysrc/ai_company/tools/base.pysrc/ai_company/core/enums.pysrc/ai_company/security/action_types.pysrc/ai_company/tools/file_system/list_directory.pytests/unit/security/rules/test_destructive_op_detector.pysrc/ai_company/tools/file_system/_base_fs_tool.pysrc/ai_company/tools/git_tools.pysrc/ai_company/engine/agent_engine.pytests/unit/core/test_enums.pysrc/ai_company/tools/invoker.pysrc/ai_company/tools/_git_base.pytests/unit/security/rules/test_risk_classifier.pysrc/ai_company/security/audit.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Use Pydantic v2 with frozen models for config/identity; use separate mutable-via-copy models (withmodel_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model
Use@computed_fieldfor derived values instead of storing + validating redundant fields (e.g.TokenUsage.total_tokens); useNotBlankStrfromcore.typesfor all identifier/name fields—including optional (NotBlankStr | None) and tuple variants—instead of manual whitespace validators
Create new objects for immutability; never mutate existing ones. For non-Pydantic internal collections (registries,BaseTool), usecopy.deepcopy()at construction +MappingProxyTypefor read-only enforcement. Fordict/listfields in frozen Pydantic models, rely onfrozen=Trueandcopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task
Files:
src/ai_company/security/rules/_utils.pysrc/ai_company/security/protocol.pysrc/ai_company/security/rules/__init__.pysrc/ai_company/observability/events/tool.pysrc/ai_company/config/schema.pysrc/ai_company/security/output_scanner.pysrc/ai_company/security/rules/credential_detector.pysrc/ai_company/tools/file_system/read_file.pysrc/ai_company/tools/file_system/write_file.pysrc/ai_company/security/service.pysrc/ai_company/config/defaults.pysrc/ai_company/security/rules/data_leak_detector.pysrc/ai_company/tools/file_system/delete_file.pysrc/ai_company/security/rules/policy_validator.pysrc/ai_company/security/rules/path_traversal_detector.pysrc/ai_company/security/models.pysrc/ai_company/security/rules/risk_classifier.pysrc/ai_company/security/action_type_mapping.pysrc/ai_company/security/config.pysrc/ai_company/security/rules/destructive_op_detector.pysrc/ai_company/security/__init__.pysrc/ai_company/security/rules/protocol.pysrc/ai_company/observability/events/security.pysrc/ai_company/tools/file_system/edit_file.pysrc/ai_company/security/rules/engine.pysrc/ai_company/tools/base.pysrc/ai_company/core/enums.pysrc/ai_company/security/action_types.pysrc/ai_company/tools/file_system/list_directory.pysrc/ai_company/tools/file_system/_base_fs_tool.pysrc/ai_company/tools/git_tools.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/tools/invoker.pysrc/ai_company/tools/_git_base.pysrc/ai_company/security/audit.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers:@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow. Minimum coverage 80% (enforced in CI). Async mode:asyncio_mode = 'auto'— no manual@pytest.mark.asyncioneeded. Per-test timeout: 30 seconds. Usepytest-xdistvia-n autofor parallelism. Prefer@pytest.mark.parametrizefor testing similar cases
NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names:example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc. Vendor names may only appear in: (1) DESIGN_SPEC.md provider list, (2).claude/files, (3) third-party import paths
Files:
tests/unit/security/test_config.pytests/unit/security/rules/__init__.pytests/unit/security/rules/test_credential_detector.pytests/unit/security/rules/test_policy_validator.pytests/unit/security/test_audit.pytests/unit/security/test_output_scanner.pytests/unit/security/test_service.pytests/unit/security/test_models.pytests/unit/security/test_action_types.pytests/unit/security/rules/test_data_leak_detector.pytests/unit/tools/test_base_action_type.pytests/unit/security/test_action_type_mapping.pytests/unit/observability/test_events.pytests/unit/security/rules/test_path_traversal_detector.pytests/unit/security/__init__.pytests/unit/tools/test_invoker_security.pytests/unit/security/rules/test_engine.pytests/unit/security/rules/test_destructive_op_detector.pytests/unit/core/test_enums.pytests/unit/security/rules/test_risk_classifier.py
🧠 Learnings (6)
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.py : Every module with business logic must include logging setup: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`. Never use `import logging` or `print()` in application code. Variable name must always be `logger`
Applied to files:
src/ai_company/config/schema.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.py : Use structured logging with event name constants: `logger.info(EVENT_CONSTANT, key=value)` — never use string interpolation like `logger.info('msg %s', val)`. Import event constants directly from `ai_company.observability.events.<domain>`
Applied to files:
src/ai_company/config/schema.pysrc/ai_company/observability/events/security.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to tests/**/*.py : Use pytest markers: `pytest.mark.unit`, `pytest.mark.integration`, `pytest.mark.e2e`, `pytest.mark.slow`. Minimum coverage 80% (enforced in CI). Async mode: `asyncio_mode = 'auto'` — no manual `pytest.mark.asyncio` needed. Per-test timeout: 30 seconds. Use `pytest-xdist` via `-n auto` for parallelism. Prefer `pytest.mark.parametrize` for testing similar cases
Applied to files:
tests/unit/tools/test_base_action_type.pytests/unit/security/test_action_type_mapping.pytests/unit/security/rules/test_risk_classifier.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/**/*.py : Use Pydantic v2 with frozen models for config/identity; use separate mutable-via-copy models (with `model_copy(update=...)`) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model
Applied to files:
src/ai_company/security/models.pysrc/ai_company/security/config.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/**/*.py : Create new objects for immutability; never mutate existing ones. For non-Pydantic internal collections (registries, `BaseTool`), use `copy.deepcopy()` at construction + `MappingProxyType` for read-only enforcement. For `dict`/`list` fields in frozen Pydantic models, rely on `frozen=True` and `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Applied to files:
src/ai_company/security/config.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/providers/**/*.py : All provider calls must go through `BaseCompletionProvider` which applies retry + rate limiting automatically. Never implement retry logic in driver subclasses or calling code. Set `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`. Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. `RetryExhaustedError` signals all retries failed—engine layer catches this for fallback chains. Rate limiter respects `RateLimitError.retry_after` from providers
Applied to files:
src/ai_company/engine/agent_engine.py
🧬 Code graph analysis (41)
src/ai_company/security/protocol.py (2)
src/ai_company/security/models.py (3)
OutputScanResult(112-137)SecurityContext(52-71)SecurityVerdict(28-49)src/ai_company/security/service.py (2)
evaluate_pre_tool(92-144)scan_output(146-178)
src/ai_company/config/schema.py (1)
src/ai_company/security/config.py (1)
SecurityConfig(56-86)
tests/unit/security/rules/test_credential_detector.py (2)
src/ai_company/security/models.py (2)
SecurityContext(52-71)SecurityVerdictType(16-25)src/ai_company/security/rules/credential_detector.py (3)
CredentialDetector(80-122)evaluate(92-122)name(88-90)
src/ai_company/security/output_scanner.py (2)
src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (1)
OutputScanResult(112-137)
tests/unit/security/rules/test_policy_validator.py (3)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/rules/policy_validator.py (3)
PolicyValidator(23-98)evaluate(57-98)name(53-55)src/ai_company/security/rules/engine.py (1)
evaluate(70-147)
src/ai_company/security/rules/credential_detector.py (4)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(441-447)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (3)
SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/rules/_utils.py (1)
walk_string_values(8-32)
src/ai_company/tools/file_system/read_file.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/tools/base.py (1)
action_type(132-134)
src/ai_company/tools/file_system/write_file.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/tools/base.py (1)
action_type(132-134)
tests/unit/security/test_audit.py (3)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/audit.py (5)
AuditLog(17-124)count(117-119)record(47-67)entries(122-124)query(74-115)src/ai_company/security/models.py (1)
AuditEntry(74-109)
tests/unit/security/test_output_scanner.py (2)
src/ai_company/security/output_scanner.py (2)
OutputScanner(31-70)scan(34-70)src/ai_company/security/audit.py (1)
count(117-119)
src/ai_company/security/service.py (12)
src/ai_company/core/approval.py (1)
ApprovalItem(24-96)src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ApprovalStatus(432-438)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/audit.py (2)
AuditLog(17-124)record(47-67)src/ai_company/security/config.py (1)
SecurityConfig(56-86)src/ai_company/security/models.py (5)
AuditEntry(74-109)OutputScanResult(112-137)SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/output_scanner.py (2)
OutputScanner(31-70)scan(34-70)src/ai_company/security/rules/engine.py (2)
RuleEngine(28-172)evaluate(70-147)src/ai_company/api/approval_store.py (1)
ApprovalStore(30-156)src/ai_company/security/protocol.py (2)
evaluate_pre_tool(26-38)scan_output(40-54)src/ai_company/security/rules/data_leak_detector.py (1)
evaluate(83-108)src/ai_company/security/rules/policy_validator.py (1)
evaluate(57-98)
src/ai_company/security/rules/data_leak_detector.py (4)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(441-447)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (3)
SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/rules/_utils.py (1)
walk_string_values(8-32)
tests/unit/security/test_models.py (2)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/models.py (5)
AuditEntry(74-109)OutputScanResult(112-137)SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)
tests/unit/security/test_action_types.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/security/action_types.py (7)
ActionTypeCategory(19-31)ActionTypeRegistry(49-139)is_registered(84-86)validate(88-100)expand_category(102-117)get_category(120-135)all_types(137-139)
src/ai_company/tools/file_system/delete_file.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/tools/base.py (1)
action_type(132-134)
tests/unit/security/rules/test_data_leak_detector.py (3)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/models.py (2)
SecurityContext(52-71)SecurityVerdictType(16-25)src/ai_company/security/rules/data_leak_detector.py (3)
DataLeakDetector(75-108)evaluate(83-108)name(79-81)
tests/unit/tools/test_base_action_type.py (2)
src/ai_company/core/enums.py (2)
ActionType(371-406)ToolCategory(293-307)src/ai_company/tools/base.py (4)
BaseTool(58-183)name(122-124)category(127-129)action_type(132-134)
src/ai_company/security/rules/path_traversal_detector.py (6)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(441-447)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (3)
SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/rules/_utils.py (1)
walk_string_values(8-32)src/ai_company/security/rules/policy_validator.py (2)
name(53-55)evaluate(57-98)src/ai_company/security/rules/engine.py (1)
evaluate(70-147)
tests/unit/security/test_action_type_mapping.py (1)
src/ai_company/core/enums.py (2)
ActionType(371-406)ToolCategory(293-307)
src/ai_company/security/models.py (3)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/tools/base.py (1)
action_type(132-134)src/ai_company/engine/parallel_models.py (2)
agent_id(79-81)task_id(87-89)
src/ai_company/security/rules/risk_classifier.py (3)
src/ai_company/core/enums.py (2)
ActionType(371-406)ApprovalRiskLevel(441-447)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/tools/base.py (1)
action_type(132-134)
tests/unit/security/rules/test_path_traversal_detector.py (3)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/models.py (2)
SecurityContext(52-71)SecurityVerdictType(16-25)src/ai_company/security/rules/path_traversal_detector.py (3)
PathTraversalDetector(62-104)evaluate(74-104)name(70-72)
src/ai_company/security/action_type_mapping.py (1)
src/ai_company/core/enums.py (2)
ActionType(371-406)ToolCategory(293-307)
src/ai_company/security/config.py (2)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(441-447)src/ai_company/security/models.py (1)
SecurityVerdictType(16-25)
src/ai_company/security/rules/destructive_op_detector.py (4)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(441-447)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (3)
SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/rules/_utils.py (1)
walk_string_values(8-32)
src/ai_company/security/__init__.py (7)
src/ai_company/security/action_types.py (2)
ActionTypeCategory(19-31)ActionTypeRegistry(49-139)src/ai_company/security/audit.py (1)
AuditLog(17-124)src/ai_company/security/config.py (3)
RuleEngineConfig(36-53)SecurityConfig(56-86)SecurityPolicyRule(14-33)src/ai_company/security/models.py (5)
AuditEntry(74-109)OutputScanResult(112-137)SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/output_scanner.py (1)
OutputScanner(31-70)src/ai_company/security/protocol.py (1)
SecurityInterceptionStrategy(18-54)src/ai_company/security/rules/protocol.py (1)
SecurityRule(10-35)
src/ai_company/security/rules/protocol.py (7)
src/ai_company/security/models.py (2)
SecurityContext(52-71)SecurityVerdict(28-49)src/ai_company/security/rules/credential_detector.py (2)
name(88-90)evaluate(92-122)src/ai_company/security/rules/data_leak_detector.py (2)
name(79-81)evaluate(83-108)src/ai_company/security/rules/destructive_op_detector.py (2)
name(91-93)evaluate(95-130)src/ai_company/security/rules/path_traversal_detector.py (2)
name(70-72)evaluate(74-104)src/ai_company/security/rules/policy_validator.py (2)
name(53-55)evaluate(57-98)src/ai_company/security/rules/engine.py (1)
evaluate(70-147)
src/ai_company/tools/file_system/edit_file.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/tools/base.py (1)
action_type(132-134)
tests/unit/security/rules/test_engine.py (5)
src/ai_company/core/enums.py (3)
ActionType(371-406)ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/config.py (1)
RuleEngineConfig(36-53)src/ai_company/security/models.py (3)
SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/rules/engine.py (2)
RuleEngine(28-172)evaluate(70-147)src/ai_company/security/rules/risk_classifier.py (1)
RiskClassifier(46-89)
src/ai_company/security/rules/engine.py (7)
src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/config.py (1)
RuleEngineConfig(36-53)src/ai_company/security/models.py (3)
SecurityContext(52-71)SecurityVerdict(28-49)SecurityVerdictType(16-25)src/ai_company/security/rules/protocol.py (3)
SecurityRule(10-35)evaluate(23-35)name(19-21)src/ai_company/security/rules/risk_classifier.py (2)
RiskClassifier(46-89)classify(70-89)src/ai_company/security/rules/policy_validator.py (2)
evaluate(57-98)name(53-55)src/ai_company/tools/base.py (2)
name(122-124)action_type(132-134)
src/ai_company/tools/base.py (1)
src/ai_company/core/enums.py (1)
ActionType(371-406)
src/ai_company/security/action_types.py (3)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/tools/base.py (2)
category(127-129)action_type(132-134)
src/ai_company/tools/file_system/list_directory.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/tools/base.py (1)
action_type(132-134)
tests/unit/security/rules/test_destructive_op_detector.py (3)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(441-447)ToolCategory(293-307)src/ai_company/security/models.py (2)
SecurityContext(52-71)SecurityVerdictType(16-25)src/ai_company/security/rules/destructive_op_detector.py (3)
DestructiveOpDetector(82-130)evaluate(95-130)name(91-93)
src/ai_company/tools/file_system/_base_fs_tool.py (2)
src/ai_company/tools/base.py (5)
action_type(132-134)name(122-124)description(137-139)category(127-129)parameters_schema(142-150)src/ai_company/core/enums.py (1)
ToolCategory(293-307)
src/ai_company/tools/git_tools.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-406)src/ai_company/tools/base.py (1)
action_type(132-134)
src/ai_company/engine/agent_engine.py (15)
src/ai_company/security/audit.py (1)
AuditLog(17-124)src/ai_company/security/output_scanner.py (1)
OutputScanner(31-70)src/ai_company/security/rules/credential_detector.py (1)
CredentialDetector(80-122)src/ai_company/security/rules/data_leak_detector.py (1)
DataLeakDetector(75-108)src/ai_company/security/rules/destructive_op_detector.py (1)
DestructiveOpDetector(82-130)src/ai_company/security/rules/engine.py (1)
RuleEngine(28-172)src/ai_company/security/rules/path_traversal_detector.py (1)
PathTraversalDetector(62-104)src/ai_company/security/rules/policy_validator.py (1)
PolicyValidator(23-98)src/ai_company/security/rules/risk_classifier.py (1)
RiskClassifier(46-89)src/ai_company/security/service.py (1)
SecOpsService(57-264)src/ai_company/tools/invoker.py (1)
ToolInvoker(71-659)src/ai_company/tools/permissions.py (2)
ToolPermissionChecker(35-225)from_permissions(115-128)src/ai_company/api/approval_store.py (1)
ApprovalStore(30-156)src/ai_company/security/config.py (1)
SecurityConfig(56-86)src/ai_company/security/protocol.py (1)
SecurityInterceptionStrategy(18-54)
tests/unit/core/test_enums.py (1)
src/ai_company/core/enums.py (1)
ActionType(371-406)
src/ai_company/tools/invoker.py (4)
src/ai_company/providers/models.py (2)
ToolCall(96-119)ToolResult(122-135)src/ai_company/security/models.py (2)
SecurityContext(52-71)SecurityVerdictType(16-25)src/ai_company/tools/base.py (5)
ToolExecutionResult(26-55)BaseTool(58-183)name(122-124)category(127-129)action_type(132-134)src/ai_company/security/protocol.py (3)
SecurityInterceptionStrategy(18-54)evaluate_pre_tool(26-38)scan_output(40-54)
tests/unit/security/rules/test_risk_classifier.py (2)
src/ai_company/core/enums.py (2)
ActionType(371-406)ApprovalRiskLevel(441-447)src/ai_company/security/rules/risk_classifier.py (2)
RiskClassifier(46-89)classify(70-89)
src/ai_company/security/audit.py (3)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(441-447)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (1)
AuditEntry(74-109)
| [allowlist] | ||
| paths = [ | ||
| '''tests/unit/security/rules/test_credential_detector\.py''', | ||
| '''tests/unit/security/test_output_scanner\.py''', | ||
| ] |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
What allowlist mechanisms does the current Gitleaks version support for suppressing only specific known test secrets—such as regex-scoped allowlists or inline gitleaks:allow annotations—instead of excluding an entire file?
💡 Result:
Current Gitleaks (v8.x) supports these fine‑grained allowlisting options to suppress specific known test secrets without excluding whole files:
- Inline suppression comment: add
#gitleaks:allow(or equivalent comment syntax in your language) on the same line as the test secret; Gitleaks will ignore that finding. There’s also a CLI flag--ignore-gitleaks-allowto disable honoring these inline allow comments. [1] - Per-finding ignore via fingerprint: create a repo-root
.gitleaksignorefile containing the finding Fingerprint value(s) to suppress only those exact findings. [1] - Rule-scoped allowlists (
[[rules.allowlists]]): attach one or more allowlists to a specific rule; a finding is ignored if any allowlist matches. Each allowlist can match on commits, paths, regexes, and stopwords, with a configurablecondition(OR/AND) andregexTargetscoping (e.g., apply regexes to the extractedsecretvs the fullmatchvs the wholeline). [2] - Global allowlists (
[[allowlists]]): same match primitives (commits/paths/regexes/stopwords) andregexTargetscoping, applied at higher precedence than rule allowlists; additionally, you can scope a shared allowlist to only certain rules usingtargetRules(so it’s not truly global). [2]
Sources: Gitleaks upstream README/config docs. [1][2]
References
- Gitleaks README –
gitleaks:allow,.gitleaksignore, and related flags: (github.com) - Gitleaks README – rule allowlists, global allowlists,
regexTarget,condition,targetRules: (github.com)
Citations:
🏁 Script executed:
#!/bin/bash
# Check for Gitleaks version specification in common places
echo "=== Checking Gitleaks version specification ==="
for file in package.json requirements.txt pyproject.toml .tool-versions Pipfile setup.py go.mod; do
if [ -f "$file" ]; then
echo "Found $file:"
if [[ "$file" == "package.json" ]]; then
jq '.devDependencies.gitleaks // .dependencies.gitleaks // empty' "$file" 2>/dev/null || grep -i gitleaks "$file"
else
grep -i gitleaks "$file"
fi
fi
done
echo ""
echo "=== Checking for gitleaks installation ==="
which gitleaks && gitleaks version 2>/dev/null || echo "gitleaks CLI not in PATH"Repository: Aureliolo/ai-company
Length of output: 264
🏁 Script executed:
#!/bin/bash
# View the complete .gitleaks.toml configuration
echo "=== Full .gitleaks.toml content ==="
if [ -f ".gitleaks.toml" ]; then
cat -n .gitleaks.toml
else
echo ".gitleaks.toml not found at root"
find . -name ".gitleaks.toml" -o -name "gitleaks.toml" 2>/dev/null | head -5
fiRepository: Aureliolo/ai-company
Length of output: 443
🏁 Script executed:
#!/bin/bash
# Examine the two test files mentioned in the allowlist
echo "=== Checking test files referenced in allowlist ==="
file1="tests/unit/security/rules/test_credential_detector.py"
file2="tests/unit/security/test_output_scanner.py"
for f in "$file1" "$file2"; do
if [ -f "$f" ]; then
echo ""
echo "=== $f (line count: $(wc -l < "$f")) ==="
# Show file size and first 100 lines to see what's being allowlisted
head -100 "$f"
else
echo "File not found: $f"
fi
doneRepository: Aureliolo/ai-company
Length of output: 6271
Use inline allowlisting for test credentials instead of blanket file exemptions.
Gitleaks v8.x supports narrower allowlisting mechanisms: add #gitleaks:allow comments on lines with intentional test credentials, or use .gitleaksignore with finding fingerprints. File-wide path allowlisting will suppress any future real secrets accidentally added to these files.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.gitleaks.toml around lines 4 - 8, Remove the broad path allowlist entries
from the [allowlist] block in the gitleaks config and replace them with targeted
allowlisting: delete the two file-wide exemptions in the allowlist and instead
add inline allow comments (`#gitleaks`:allow) on the specific lines in the test
files that contain intentional test credentials or create .gitleaksignore
entries using finding fingerprints for those exact test values; update the
config to no longer suppress entire files so only the explicit, commented lines
or fingerprinted findings are ignored.
| class ActionType(StrEnum): | ||
| """Convenience constants for common approval action types. | ||
| """Two-level action type taxonomy for security classification. | ||
|
|
||
| Models typically use ``NotBlankStr`` for ``action_type`` fields, so these | ||
| are optional helper constants and custom string values remain valid. | ||
| Used by autonomy presets (DESIGN_SPEC §12.2), SecOps validation | ||
| (§12.3), tiered timeout policies (§12.4), and progressive trust | ||
| (§11.3). Values follow a ``category:action`` naming convention. | ||
|
|
||
| Custom action type strings are also accepted by models that use | ||
| ``str`` for ``action_type`` fields — these enum members are | ||
| convenience constants for the built-in taxonomy. | ||
| """ | ||
|
|
||
| CODE_MERGE = "code_merge" | ||
| DEPLOYMENT = "deployment" | ||
| BUDGET_SPEND = "budget_spend" | ||
| EXTERNAL_COMMUNICATION = "external_communication" | ||
| HIRING = "hiring" | ||
| ARCHITECTURE_CHANGE = "architecture_change" | ||
| CODE_READ = "code:read" | ||
| CODE_WRITE = "code:write" | ||
| CODE_CREATE = "code:create" | ||
| CODE_DELETE = "code:delete" | ||
| CODE_REFACTOR = "code:refactor" | ||
| TEST_WRITE = "test:write" | ||
| TEST_RUN = "test:run" | ||
| DOCS_WRITE = "docs:write" | ||
| VCS_COMMIT = "vcs:commit" | ||
| VCS_PUSH = "vcs:push" | ||
| VCS_BRANCH = "vcs:branch" | ||
| DEPLOY_STAGING = "deploy:staging" | ||
| DEPLOY_PRODUCTION = "deploy:production" | ||
| COMMS_INTERNAL = "comms:internal" | ||
| COMMS_EXTERNAL = "comms:external" | ||
| BUDGET_SPEND = "budget:spend" | ||
| BUDGET_EXCEED = "budget:exceed" | ||
| ORG_HIRE = "org:hire" | ||
| ORG_FIRE = "org:fire" | ||
| ORG_PROMOTE = "org:promote" | ||
| DB_QUERY = "db:query" | ||
| DB_MUTATE = "db:mutate" | ||
| DB_ADMIN = "db:admin" | ||
| ARCH_DECIDE = "arch:decide" |
There was a problem hiding this comment.
Add a migration path for legacy action_type strings.
SecurityContext.action_type still accepts arbitrary str, and RiskClassifier.classify() falls back to MEDIUM for unknown values. Replacing the built-in values outright means older YAML policies or persisted action_type values can keep loading but be silently reclassified instead of matching their intended policy/risk branch. Please normalize legacy values on load or ship an explicit migration with this taxonomy change.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/core/enums.py` around lines 371 - 406, The change to
ActionType removed/migrated built-in strings but didn’t provide a
migration/normalization path, so older SecurityContext.action_type values may be
loaded and mis-classified by RiskClassifier.classify(); add a normalization
layer that maps legacy string variants to the new ActionType members (e.g., a
dict mapping legacy names -> ActionType constants) and apply it when
SecurityContext instances are constructed/deserialized (or inside
RiskClassifier.classify() before decision logic) so that classify() receives
canonical ActionType values; update references to ActionType in the codebase to
use the normalized value and ensure unknown strings still fall back to the
current MEDIUM behavior.
| DEFAULT_CATEGORY_ACTION_MAP: Final[MappingProxyType[ToolCategory, ActionType]] = ( | ||
| MappingProxyType( | ||
| { | ||
| ToolCategory.FILE_SYSTEM: ActionType.CODE_WRITE, | ||
| ToolCategory.CODE_EXECUTION: ActionType.CODE_WRITE, | ||
| ToolCategory.VERSION_CONTROL: ActionType.VCS_COMMIT, | ||
| ToolCategory.WEB: ActionType.COMMS_EXTERNAL, | ||
| ToolCategory.DATABASE: ActionType.DB_QUERY, | ||
| ToolCategory.TERMINAL: ActionType.CODE_WRITE, | ||
| ToolCategory.DESIGN: ActionType.DOCS_WRITE, | ||
| ToolCategory.COMMUNICATION: ActionType.COMMS_INTERNAL, | ||
| ToolCategory.ANALYTICS: ActionType.CODE_READ, | ||
| ToolCategory.DEPLOYMENT: ActionType.DEPLOY_STAGING, | ||
| ToolCategory.MCP: ActionType.CODE_WRITE, | ||
| ToolCategory.OTHER: ActionType.CODE_READ, | ||
| } | ||
| ) | ||
| ) |
There was a problem hiding this comment.
A single default verb per category is unsafe here.
BaseTool falls back to this map when a tool omits action_type, but several of these categories span very different risk levels. DEPLOYMENT -> deploy:staging can downgrade a production deploy, COMMUNICATION -> comms:internal can hide an external send, and VERSION_CONTROL -> vcs:commit collapses read/push/clone into the wrong bucket. Mixed-semantics categories should require an explicit action_type.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/action_type_mapping.py` around lines 13 - 30,
DEFAULT_CATEGORY_ACTION_MAP currently provides single default verbs for broad
categories; remove unsafe defaults for mixed-semantics categories so tools must
declare action_type explicitly. Specifically, delete entries for
ToolCategory.DEPLOYMENT, ToolCategory.COMMUNICATION, and
ToolCategory.VERSION_CONTROL from DEFAULT_CATEGORY_ACTION_MAP and update the
BaseTool fallback behavior (where it reads DEFAULT_CATEGORY_ACTION_MAP) to treat
missing mapping as "no implicit action" — either raise a validation error or
return None so callers must set ActionType explicitly; keep safe defaults for
the remaining ToolCategory -> ActionType pairs.
| def test_query_limit(self) -> None: | ||
| log = AuditLog() | ||
| self._populate(log) | ||
|
|
||
| results = log.query(limit=2) | ||
|
|
||
| assert len(results) == 2 |
There was a problem hiding this comment.
Lock down the limit <= 0 boundary here.
src/ai_company/security/audit.py appends a match before it enforces limit, so a non-empty log currently returns one row for query(limit=0). This test only covers the happy path; add 0/negative cases and pin the intended behavior.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/security/test_audit.py` around lines 269 - 275, Update the test
and/or the AuditLog.query behavior so limit <= 0 returns no rows: in the test
function test_query_limit add cases calling log.query(limit=0) and
log.query(limit=-1) and assert both return 0 results (use AuditLog() and
self._populate as before); if AuditLog.query currently appends a match before
enforcing the limit, change the implementation in
src/ai_company/security/audit.py (the AuditLog.query method) to enforce the
limit boundary check before any appends and ensure it treats limit <= 0 as "no
results" so the new tests pass.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a comprehensive security subsystem, including a SecOps security agent, audit logging, and output scanning, integrated into the tool execution pipeline. It enhances the system's ability to detect and prevent security risks associated with tool calls, ensuring sensitive data is protected and security policies are enforced. The changes include new configurations, models, rules, and services to support the new security features. Highlights
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive security subsystem, including a rule engine for pre-execution checks, an audit log, and post-execution output scanning, integrated into the ToolInvoker, along with a new ActionType taxonomy. However, several systematic bypasses and weaknesses were identified in the security detectors and their shared utilities. Specifically, the utility for extracting strings from tool arguments fails to recurse into nested lists, and regex patterns for destructive operations and path traversal are too restrictive, allowing for easy bypass. Additionally, there are two critical syntax errors in the exception handling within src/ai_company/tools/invoker.py that need to be addressed.
| verdict = await self._security_interceptor.evaluate_pre_tool( | ||
| context, | ||
| ) | ||
| except MemoryError, RecursionError: |
| context, | ||
| result.content, | ||
| ) | ||
| except MemoryError, RecursionError: |
| elif isinstance(value, list): | ||
| for item in value: | ||
| if isinstance(item, str): | ||
| yield item | ||
| elif isinstance(item, dict): | ||
| yield from walk_string_values(item, _depth=_depth + 1) |
There was a problem hiding this comment.
The walk_string_values utility function, which is used by all security detectors (Credential, Path Traversal, Destructive Op, Data Leak) to scan tool arguments, does not recursively process nested lists. It only checks for strings and dictionaries within a top-level list. If an attacker provides arguments containing strings inside nested lists (e.g., {"cmd": [["rm", "-rf", "/"]]}), these strings will be skipped by the detectors, leading to a systematic bypass of all security controls implemented by the SecOps agent.
| re.compile(r"\brm\s+-[a-zA-Z]*r[a-zA-Z]*f|rm\s+-[a-zA-Z]*f[a-zA-Z]*r"), | ||
| SecurityVerdictType.DENY, |
There was a problem hiding this comment.
| re.compile( | ||
| r"\bDELETE\s+FROM\s+\w+\s*;", | ||
| re.IGNORECASE, | ||
| ), | ||
| SecurityVerdictType.ESCALATE, |
| ), | ||
| ( | ||
| "Windows UNC path", | ||
| re.compile(r"^\\\\[^\\]"), |
There was a problem hiding this comment.
…Invoker integration (#40) Implement the Security Operations meta-agent that validates every tool invocation before execution using a synchronous rule engine (~sub-ms). - Replace 6-member ActionType enum with 24-member category:action taxonomy - Add ActionTypeRegistry, ActionTypeCategory, and DEFAULT_CATEGORY_ACTION_MAP - Add action_type property to BaseTool with category-derived defaults - Set explicit action types on all file system and git tools - Add SecurityContext, SecurityVerdict, AuditEntry, OutputScanResult models - Add SecurityConfig, RuleEngineConfig, SecurityPolicyRule config models - Add SecurityRule protocol and 5 rule detectors: - PolicyValidator (hard deny / auto approve fast path) - CredentialDetector (AWS keys, API keys, SSH keys, tokens) - PathTraversalDetector (../ sequences, null bytes, URL-encoded) - DestructiveOpDetector (rm -rf, DROP TABLE, git push --force) - DataLeakDetector (sensitive paths, PII patterns) - Add RiskClassifier mapping action types to CRITICAL/HIGH/MEDIUM/LOW - Add RuleEngine orchestrating ordered rule evaluation - Add append-only AuditLog with query support and max_entries eviction - Add OutputScanner for post-tool sensitive data detection and redaction - Add SecurityInterceptionStrategy protocol - Add SecOpsService coordinating rule engine, audit, scanner, and approval store escalation - Integrate security check and output scan into ToolInvoker pipeline - Wire SecOpsService into AgentEngine via _make_security_interceptor - Add ToolSecurityDeniedError and security event constants - Add .gitleaks.toml to allowlist test files with fake credentials - Add 418 unit tests across 16 test files (5633 total, 0 failures)
Pre-reviewed by 10 agents, 69 findings addressed: - Convert SecurityVerdictType to StrEnum for type safety - Fix rule engine short-circuit: policy auto-approve no longer bypasses detection rules (soft-allow pattern) - Apply fail-closed error handling: individual rule failures return DENY - Extract shared walk_string_values() utility with depth-limited recursion - Use copy.deepcopy at security boundaries instead of shallow dict() - Add audit log eviction warning with total_recorded counter - Fix AuditLog.query since parameter type (str → datetime) - Add OutputScanResult model_validator for field consistency - Share credential/PII patterns between detectors and output scanner - Reuse SecurityContext in ToolInvoker (built once, shared) - Wire RuleEngineConfig boolean flags to conditional detector inclusion - Add control character check to git argument validation - Fix credit card regex (remove incorrect Amex matching) - DENY operations get CRITICAL risk level (was HIGH) - Add 12 new security event constants for structured logging - Remove dead ToolSecurityDeniedError class - Update tests for all interface changes
Security fixes: - Fix nested list scanning bypass in walk_string_values - Fix false-positive `format` regex in destructive_op_detector - Fix SSH public key false-positive in data_leak_detector - Change output scan failure from fail-open to fail-closed - Change RiskClassifier default from MEDIUM to HIGH (D19 fail-safe) - Add approval_id validator (only allowed on ESCALATE) - Add action_type format validation (must contain ':') - Add disjoint action_type validator to SecurityConfig - Add arguments_hash SHA-256 pattern constraint to AuditEntry - Wrap audit/rule-engine/scan failures in try/except (non-fatal audit) Improvements: - Add VCS_READ action type for read-only VCS operations - Pass task_id through to ToolInvoker security context - Remove unused `tool` parameter from _check_security - Add soft-allow rule semantics (policy_validator doesn't short-circuit) - Add fail-closed error handling in RuleEngine._safe_evaluate - Update DESIGN_SPEC.md project structure and action type taxonomy - Update README.md and CLAUDE.md security package descriptions Tests: - Add test_utils.py for walk_string_values (flat, nested, depth limit) - Add fail-closed tests (engine, service, invoker scan exception) - Add soft-allow tests for policy_validator - Add approval_id/OutputScanResult/SecurityContext validation tests - Add audit total_recorded and query limit boundary tests - Fix AuditEntry verdict type from StrEnum to Literal strings - Fix arguments_hash test values to valid SHA-256 hex - Update enum count, action type edge cases, risk classifier defaults
286cbbf to
fe78e7e
Compare
There was a problem hiding this comment.
Actionable comments posted: 15
♻️ Duplicate comments (7)
tests/unit/security/rules/test_risk_classifier.py (1)
186-200:⚠️ Potential issue | 🟡 MinorThis completeness check still won't catch missing default mappings.
RiskClassifier.classify()falls back toApprovalRiskLevel.HIGHwhen a key is absent, so both assertions here still pass if a built-inActionTypedrops out of the default map. Check membership against the classifier's explicit risk map instead of validating only the return type.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/rules/test_risk_classifier.py` around lines 186 - 200, The test test_all_action_types_are_mapped is vulnerable because RiskClassifier.classify() falls back to ApprovalRiskLevel.HIGH; update the test to assert each ActionType is present in the classifier's explicit mapping rather than only checking the returned value type. Concretely, for each action_type use RiskClassifier() to access its explicit map (e.g., classifier._default_risk_map or the attribute that stores the default mapping), assert action_type in that map, then assert the mapped value is a member of ApprovalRiskLevel; keep the existing assertions about membership but replace the implicit fallback check with the explicit-map membership check.src/ai_company/security/rules/_utils.py (1)
26-45:⚠️ Potential issue | 🔴 CriticalThe recursion cap is still bypassable and fail-open.
When a dict branch reaches
_MAX_RECURSION_DEPTH, this just logs and returns, so strings nested past the cap are silently skipped._walk_value()also never enforces the cap for list recursion, so deeply nested lists can recurse until Python raises. Apply the depth guard in_walk_value()for both dicts and lists, and surface a failure path so SecOps can deny instead of silently ignoring the branch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/security/rules/_utils.py` around lines 26 - 45, The recursion depth guard must be enforced inside _walk_value and surface a failure instead of silently skipping branches: at the start of _walk_value check if _depth >= _MAX_RECURSION_DEPTH, log SECURITY_SCAN_DEPTH_EXCEEDED with depth and max_depth, and then raise a specific exception (e.g., RecursionDepthExceededError or ValueError with context) so callers can deny the request; also ensure both dict handling (where you call walk_string_values) and list handling perform the same depth check before recursing (i.e., only call walk_string_values or recurse into list items when _depth < _MAX_RECURSION_DEPTH) so deeply nested lists are not allowed to recurse past the cap.tests/unit/security/test_models.py (1)
330-346:⚠️ Potential issue | 🟠 Major
approval_idshould only accompany an escalation.This case constructs
AuditEntry(verdict="deny", approval_id="apr-789"). That either fails once the model enforces its contract or, worse, locks an invalid audit state into the suite. Useverdict="escalate"here or dropapproval_idfrom this case.🛠️ Minimal test fix
- verdict="deny", + verdict="escalate",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/test_models.py` around lines 330 - 346, The test constructs an AuditEntry with approval_id while using verdict="deny", which violates the model contract that approval_id only accompanies an escalation; update the test_creation_with_all_fields (the AuditEntry instantiation) to either remove the approval_id field or change verdict to "escalate" so the AuditEntry( ..., verdict=..., approval_id="apr-789") matches the allowed state—prefer changing verdict to "escalate" if you want to keep approval_id, otherwise drop approval_id from that test case.tests/unit/security/rules/test_data_leak_detector.py (1)
145-163: 🧹 Nitpick | 🔵 TrivialAdd public-key regression coverage to the clean-input matrix.
The detector used to over-match
id_rsa.pub/id_ed25519.pub, but this negative table never asserts those paths are allowed. Adding them here will keep the private-key fix from silently regressing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/rules/test_data_leak_detector.py` around lines 145 - 163, The negative test matrix for the "arguments" parametrize is missing public-key filenames, so add entries to ensure public keys like id_rsa.pub and id_ed25519.pub are treated as safe: insert {"path": "/home/user/.ssh/id_rsa.pub"} and {"path": "/home/user/.ssh/id_ed25519.pub"} into the list of parameters and add corresponding ids (e.g., "public_rsa_key", "public_ed25519_key") so the parametrized test (the "arguments" parametrize block) asserts these public-key paths are allowed and guards against regressions.src/ai_company/security/action_type_mapping.py (1)
19-26:⚠️ Potential issue | 🟠 MajorRemove implicit defaults for mixed-semantics categories.
Because
BaseToolderivesaction_typefrom this map when a tool omits one, these entries can under-classify dangerous operations:VERSION_CONTROL -> vcs:commithidesvcs:push,COMMUNICATION -> comms:internalhides external sends, andDEPLOYMENT -> deploy:staginghides production deploys. Those categories should require an explicitaction_typeinstead of a single fallback.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/security/action_type_mapping.py` around lines 19 - 26, The mapping currently provides implicit fallbacks that under-classify risky operations; remove the entries for ToolCategory.VERSION_CONTROL, ToolCategory.COMMUNICATION, and ToolCategory.DEPLOYMENT from the action_type_mapping dict so BaseTool cannot silently derive an action_type for those mixed-semantics categories (symbols to change: action_type_mapping, ToolCategory.VERSION_CONTROL, ToolCategory.COMMUNICATION, ToolCategory.DEPLOYMENT). After removing them, ensure callers/constructors that rely on BaseTool.action_type require an explicit action_type for those categories (or surface a clear error) so tools must declare the precise ActionType instead of relying on a dangerous default.src/ai_company/tools/invoker.py (2)
335-347:⚠️ Potential issue | 🟠 Major
_build_security_context()can raise outside the fail-closed guard.If
SecurityContextconstruction fails (e.g.,NotBlankStrvalidation rejects a blankagent_id), the exception propagates uncaught frominvoke()instead of returning a blockedToolResult. The context should be built inside_check_security'stryblock so validation errors are caught and converted to fail-closed error results.🛡️ Proposed fix
async def _check_security( self, tool_call: ToolCall, - context: SecurityContext | None, - ) -> ToolResult | None: + tool: BaseTool, + ) -> tuple[ToolResult | None, SecurityContext | None]: """Run the security interceptor (if any) before execution. - Returns ``None`` if allowed, or a ``ToolResult(is_error=True)`` - if denied or escalated. + Returns ``(None, context)`` if allowed, or + ``(ToolResult(is_error=True), None)`` if blocked. """ - if self._security_interceptor is None or context is None: - return None + if self._security_interceptor is None: + return None, None try: + context = self._build_security_context(tool, tool_call) verdict = await self._security_interceptor.evaluate_pre_tool( context, ) except MemoryError, RecursionError: raise except Exception: ... - return ToolResult(...) + return ToolResult(...), None if verdict.verdict == SecurityVerdictType.ALLOW: - return None + return None, context ...Then in
invoke():- security_context = ( - self._build_security_context(tool_or_error, tool_call) - if self._security_interceptor is not None - else None - ) - - security_error = await self._check_security( - tool_call, - security_context, - ) + security_error, security_context = await self._check_security( + tool_call, + tool_or_error, + ) if security_error is not None: return security_error🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/tools/invoker.py` around lines 335 - 347, The SecurityContext construction can raise before the fail-closed handling; move the call to _build_security_context(tool_or_error, tool_call) into the try block inside _check_security (or have _check_security itself construct the context) so any validation errors (e.g., from SecurityContext or NotBlankStr) are caught and converted into a blocked ToolResult; update invoke() to stop calling _build_security_context() preemptively and instead pass whatever minimal inputs _check_security needs (or let _check_security call _build_security_context) and ensure _check_security returns a ToolResult error on exceptions.
256-294:⚠️ Potential issue | 🟠 MajorSensitive data leak when
has_sensitive_data=Truebutredacted_contentisNone.When the scanner detects sensitive data but cannot provide a redacted version (
scan_result.has_sensitive_data=Trueandscan_result.redacted_content is None), the original unredacted result is returned, leaking the sensitive data. This violates the fail-closed principle.Additionally, skipping the scan for
result.is_erroroutputs (lines 256-257) means secrets in error messages bypass SecOps entirely.🔒 Proposed fix for fail-closed output scanning
async def _scan_output( self, tool_call: ToolCall, result: ToolExecutionResult, context: SecurityContext, ) -> ToolExecutionResult: ... if self._security_interceptor is None: return result - if result.is_error: - return result try: scan_result = await self._security_interceptor.scan_output( context, result.content, ) except MemoryError, RecursionError: raise except Exception: logger.exception( SECURITY_OUTPUT_SCAN_ERROR, tool_call_id=tool_call.id, tool_name=tool_call.name, ) return ToolExecutionResult( content="Output scan failed (fail-closed). Tool output withheld.", is_error=True, metadata={"output_scan_failed": True}, ) - if scan_result.has_sensitive_data and scan_result.redacted_content is not None: + if scan_result.has_sensitive_data: + if scan_result.redacted_content is None: + logger.warning( + TOOL_OUTPUT_REDACTED, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + note="redaction unavailable — blocking output", + ) + return ToolExecutionResult( + content="Tool output blocked: sensitive data detected.", + is_error=True, + metadata={ + "output_blocked": True, + "redaction_findings": list(scan_result.findings), + }, + ) logger.warning( TOOL_OUTPUT_REDACTED, tool_call_id=tool_call.id,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/tools/invoker.py` around lines 256 - 294, The output scanning currently skips on result.is_error and returns the original content when scan_result.has_sensitive_data is true but scan_result.redacted_content is None, causing a sensitive-data leak; update the logic in the invoker flow around self._security_interceptor.scan_output so that (1) you always call scan_output for all outputs (remove or change the early return on result.is_error) so error outputs are scanned too, (2) preserve existing exception handling for MemoryError and RecursionError, and (3) implement a fail-closed branch when scan_result.has_sensitive_data is True and scan_result.redacted_content is None by returning a ToolExecutionResult with is_error=True, a withheld message like "Output scan failed (fail-closed). Tool output withheld.", and metadata {"output_scan_failed": True} instead of returning the original result; reference symbols: _security_interceptor.scan_output, result.is_error, scan_result.has_sensitive_data, scan_result.redacted_content, and ToolExecutionResult.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@DESIGN_SPEC.md`:
- Around line 3021-3037: The DESIGN_SPEC.md security tree is out of sync with
the PR: it omits the new credential_detector.py and mischaracterizes
data_leak_detector.py; update the security directory listing and the adjacent
implementation-status text to match the actual module surface in this branch by
adding credential_detector.py, splitting the description of
data_leak_detector.py vs credential_detector.py (credential/secret detection vs
other data-leak concerns), and ensure entries for security/output_scanner.py,
security/service.py, and rules/* (engine.py, policy_validator.py,
risk_classifier.py, destructive_op_detector.py, data_leak_detector.py,
credential_detector.py, _utils.py) reflect current names and responsibilities so
the spec matches the merged implementation.
In `@src/ai_company/engine/agent_engine.py`:
- Around line 703-709: The AuditLog is being instantiated inside
_make_tool_invoker which is called per run, causing history to reset; create and
store a single AuditLog in AgentEngine.__init__ (e.g., self._audit_log) or
accept an injected AuditLog instance, then update _make_tool_invoker to pass
that shared AuditLog into SecOpsService instead of new AuditLog(); ensure
SecOpsService(...) in _make_tool_invoker uses self._audit_log so total_recorded
and filters persist across runs.
- Around line 672-677: The current check that returns None when
self._security_config is None lets ToolInvoker run without a
security_interceptor; change this to fail-closed: when self._security_config is
None, log the warning (leave logger.warning(SECURITY_DISABLED,...)) but then
raise a clear exception (e.g., SecurityConfigMissingError or RuntimeError) so
creation/usage of ToolInvoker is blocked, or alternatively instantiate a
deny-all interceptor (e.g., DenyAllSecurityInterceptor) and assign it to the
security_interceptor to ensure all requests are rejected; update references
around self._security_config and the code path that constructs
ToolInvoker/security_interceptor to enforce one of these behaviours.
In `@src/ai_company/security/action_types.py`:
- Around line 51-63: The module-level integrity checks currently use assert
(variables _extracted_categories, _enum_categories, _missing_in_enum,
_missing_in_map) which are skipped under Python -O; change each assert block to
an explicit runtime check: compute _missing_in_enum and _missing_in_map as
before and replace the two assert statements with if _missing_in_enum: raise
RuntimeError(f"ActionType categories missing from ActionTypeCategory:
{_missing_in_enum}") and similarly if _missing_in_map: raise
RuntimeError(f"ActionTypeCategory members missing from ActionType:
{_missing_in_map}") so the validation between _CATEGORY_MAP and
ActionTypeCategory always runs in production.
In `@src/ai_company/security/audit.py`:
- Around line 105-107: The query() method raises ValueError when limit < 1 but
does not log the error; add a warning log call (matching the __init__ pattern)
just before raising so the condition is logged with context (include the invalid
limit value and any relevant identifiers available in scope), using the existing
logger (e.g., self.logger or logger) to log at WARNING level before raising the
ValueError in query().
In `@src/ai_company/security/rules/destructive_op_detector.py`:
- Around line 71-79: _scan_value currently returns on the first regex hit which
can miss higher-severity matches later in the same string; update _scan_value to
collect all matching pattern_name/verdict pairs by checking every (pattern_name,
pattern, verdict) in _DESTRUCTIVE_PATTERNS, then compute and return the single
most severe verdict (e.g., DENY > ESCALATE > ALLOW) along with an appropriate
pattern_name (choose the highest-severity pattern or combine names) instead of
exiting early; apply the same "collect-all-then-pick-most-severe" change to the
similar scanning code at the other site referenced (lines ~103-107) so
multi-match payloads like "DELETE ...; DROP DATABASE ..." are classified by the
worst-case verdict.
- Around line 41-45: The current "DELETE without WHERE" regex in re.compile only
matches simple unqualified table names; update the pattern used in the
re.compile call in destructive_op_detector.py (the "DELETE without WHERE" rule)
to also accept schema-qualified identifiers (e.g., schema.table), quoted
identifiers with double quotes, bracketed/escaped names, optional aliasing
(e.g., "users u" or "users AS u"), and possible trailing whitespace/semicolons
before the end or WHERE; modify the regex to allow identifier tokens like
(?:"[^"]+"|\w+)(?:\.(?:"[^"]+"|\w+))? and an optional alias group, and ensure
the negative lookahead still asserts no WHERE after the statement.
In `@src/ai_company/security/service.py`:
- Around line 196-209: The string literal "output_scan" used as the verdict in
the AuditEntry creation is a magic value; replace it with a named constant
(e.g., OUTPUT_SCAN_VERDICT) or add it as a member to SecurityVerdictType and use
that enum value instead. Update the AuditEntry instantiation in the block that
constructs entry (the AuditEntry call using id, timestamp, agent_id, task_id,
tool_name, tool_category, action_type, arguments_hash, verdict, risk_level,
reason, evaluation_duration_ms) to reference the new constant/enum member, and
add a short comment documenting that this verdict is audit-only to prevent
future typos and to make the value discoverable via grep.
- Around line 94-102: The log uses SECURITY_EVALUATE_START while warning about
unimplemented custom policies during construction; replace that event with a
config-related constant (e.g., SECURITY_CONFIG_WARNING) or an existing
configuration event to better reflect the timing and semantics. Update the
logger.warning call in the block that checks config.custom_policies to use the
new/appropriate event symbol (currently SECURITY_EVALUATE_START) and add or
import the new constant where events are defined so log analysis shows this as a
configuration-time warning rather than an evaluation start.
- Around line 136-150: Update the exception handling in the block that calls
self._rule_engine.evaluate(context) so MemoryError and RecursionError are
re-raised instead of being caught and converted into a DENY SecurityVerdict;
specifically, check for and re-raise these errors (e.g., "except (MemoryError,
RecursionError): raise") before the generic "except Exception:" so that
non-recoverable errors propagate, leaving the existing logger.exception and
fallback creation of SecurityVerdict unchanged for other exceptions.
In `@src/ai_company/tools/base.py`:
- Around line 111-115: The code silently maps unknown ToolCategory values to
ActionType.CODE_READ when setting self._action_type using
DEFAULT_CATEGORY_ACTION_MAP.get(...), which weakens SecOps; change this to
fail-closed by checking whether the category exists in
DEFAULT_CATEGORY_ACTION_MAP and raising a clear error (e.g., ValueError) if
missing (or alternatively substitute a conservative explicit default like
ActionType.NONE only after explicit decision), so locate the assignment to
self._action_type in the initializer and replace the current .get(...) fallback
behavior with an existence check that raises on missing mappings.
- Around line 105-109: The setter that assigns self._action_type currently only
checks for the presence of ":" and must instead fully validate the
"category:action" form: ensure there is exactly one colon, both left and right
segments are non-empty, and reject strings with extra colons (e.g., ":" ,
"code:", "a:b:c"). In the same method where action_type is validated (the block
that assigns self._action_type), add a check that splits on ":" and confirms
len(parts)==2 and parts[0].strip() and parts[1].strip() are non-empty; before
raising ValueError, log the bad value and context at WARNING or ERROR (using the
module/class logger) and then raise the ValueError with the existing message.
Ensure the code still assigns self._action_type only when validation passes.
In `@tests/unit/security/rules/test_utils.py`:
- Around line 79-87: The test test_stops_at_max_depth should assert concrete
behavior instead of only checking the container type: call walk_string_values
with the deep nested structure and assert the returned list equals the expected
leaf values (e.g., ["leaf"]) and/or capture the warning emitted by
walk_string_values to assert a depth-limit warning was produced; update the
assertion in test_stops_at_max_depth to compare result == ["leaf"] and add a
warnings.catch_warnings/assert to verify the depth-limit warning from
walk_string_values is raised.
In `@tests/unit/security/test_service.py`:
- Around line 331-343: The test uses a vendor-specific literal "AWS access key"
and an AWS-style key which violates the no-vendor-names rule; update the
OutputScanResult instantiation and assertions in this test (and the similar
block around the other case) to use a vendor-neutral label (e.g., "provider
access key" or "example access key") and replace any AWS-style key literals in
the mocked scan inputs with a neutral example string so the calls to
OutputScanResult, service._test_output_scanner.scan, and the assertions still
validate the same behavior but without referencing a vendor name.
In `@tests/unit/tools/test_invoker_security.py`:
- Around line 380-400: The test currently assumes that when
OutputScanResult.has_sensitive_data is True but redacted_content is None the
original tool output is allowed; instead update the runtime logic in
ToolInvoker.invoke to "fail closed": detect when scan_result.has_sensitive_data
is True and scan_result.redacted_content is None (use
OutputScanResult.has_sensitive_data and OutputScanResult.redacted_content) and
either raise a security exception (e.g., SecurityViolationError) or replace the
output with a safe placeholder (e.g., "[REDACTED]") before returning; then
update the test test_sensitive_but_no_redacted_content_passes_through to expect
the new behavior (exception or redacted content) rather than the original
"executed: ls" string.
---
Duplicate comments:
In `@src/ai_company/security/action_type_mapping.py`:
- Around line 19-26: The mapping currently provides implicit fallbacks that
under-classify risky operations; remove the entries for
ToolCategory.VERSION_CONTROL, ToolCategory.COMMUNICATION, and
ToolCategory.DEPLOYMENT from the action_type_mapping dict so BaseTool cannot
silently derive an action_type for those mixed-semantics categories (symbols to
change: action_type_mapping, ToolCategory.VERSION_CONTROL,
ToolCategory.COMMUNICATION, ToolCategory.DEPLOYMENT). After removing them,
ensure callers/constructors that rely on BaseTool.action_type require an
explicit action_type for those categories (or surface a clear error) so tools
must declare the precise ActionType instead of relying on a dangerous default.
In `@src/ai_company/security/rules/_utils.py`:
- Around line 26-45: The recursion depth guard must be enforced inside
_walk_value and surface a failure instead of silently skipping branches: at the
start of _walk_value check if _depth >= _MAX_RECURSION_DEPTH, log
SECURITY_SCAN_DEPTH_EXCEEDED with depth and max_depth, and then raise a specific
exception (e.g., RecursionDepthExceededError or ValueError with context) so
callers can deny the request; also ensure both dict handling (where you call
walk_string_values) and list handling perform the same depth check before
recursing (i.e., only call walk_string_values or recurse into list items when
_depth < _MAX_RECURSION_DEPTH) so deeply nested lists are not allowed to recurse
past the cap.
In `@src/ai_company/tools/invoker.py`:
- Around line 335-347: The SecurityContext construction can raise before the
fail-closed handling; move the call to _build_security_context(tool_or_error,
tool_call) into the try block inside _check_security (or have _check_security
itself construct the context) so any validation errors (e.g., from
SecurityContext or NotBlankStr) are caught and converted into a blocked
ToolResult; update invoke() to stop calling _build_security_context()
preemptively and instead pass whatever minimal inputs _check_security needs (or
let _check_security call _build_security_context) and ensure _check_security
returns a ToolResult error on exceptions.
- Around line 256-294: The output scanning currently skips on result.is_error
and returns the original content when scan_result.has_sensitive_data is true but
scan_result.redacted_content is None, causing a sensitive-data leak; update the
logic in the invoker flow around self._security_interceptor.scan_output so that
(1) you always call scan_output for all outputs (remove or change the early
return on result.is_error) so error outputs are scanned too, (2) preserve
existing exception handling for MemoryError and RecursionError, and (3)
implement a fail-closed branch when scan_result.has_sensitive_data is True and
scan_result.redacted_content is None by returning a ToolExecutionResult with
is_error=True, a withheld message like "Output scan failed (fail-closed). Tool
output withheld.", and metadata {"output_scan_failed": True} instead of
returning the original result; reference symbols:
_security_interceptor.scan_output, result.is_error,
scan_result.has_sensitive_data, scan_result.redacted_content, and
ToolExecutionResult.
In `@tests/unit/security/rules/test_data_leak_detector.py`:
- Around line 145-163: The negative test matrix for the "arguments" parametrize
is missing public-key filenames, so add entries to ensure public keys like
id_rsa.pub and id_ed25519.pub are treated as safe: insert {"path":
"/home/user/.ssh/id_rsa.pub"} and {"path": "/home/user/.ssh/id_ed25519.pub"}
into the list of parameters and add corresponding ids (e.g., "public_rsa_key",
"public_ed25519_key") so the parametrized test (the "arguments" parametrize
block) asserts these public-key paths are allowed and guards against
regressions.
In `@tests/unit/security/rules/test_risk_classifier.py`:
- Around line 186-200: The test test_all_action_types_are_mapped is vulnerable
because RiskClassifier.classify() falls back to ApprovalRiskLevel.HIGH; update
the test to assert each ActionType is present in the classifier's explicit
mapping rather than only checking the returned value type. Concretely, for each
action_type use RiskClassifier() to access its explicit map (e.g.,
classifier._default_risk_map or the attribute that stores the default mapping),
assert action_type in that map, then assert the mapped value is a member of
ApprovalRiskLevel; keep the existing assertions about membership but replace the
implicit fallback check with the explicit-map membership check.
In `@tests/unit/security/test_models.py`:
- Around line 330-346: The test constructs an AuditEntry with approval_id while
using verdict="deny", which violates the model contract that approval_id only
accompanies an escalation; update the test_creation_with_all_fields (the
AuditEntry instantiation) to either remove the approval_id field or change
verdict to "escalate" so the AuditEntry( ..., verdict=...,
approval_id="apr-789") matches the allowed state—prefer changing verdict to
"escalate" if you want to keep approval_id, otherwise drop approval_id from that
test case.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: bd6d2131-268a-4a31-95bf-bea8cfd806c7
📒 Files selected for processing (34)
CLAUDE.mdDESIGN_SPEC.mdREADME.mdsrc/ai_company/config/schema.pysrc/ai_company/core/enums.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/observability/events/security.pysrc/ai_company/security/action_type_mapping.pysrc/ai_company/security/action_types.pysrc/ai_company/security/audit.pysrc/ai_company/security/config.pysrc/ai_company/security/models.pysrc/ai_company/security/output_scanner.pysrc/ai_company/security/rules/_utils.pysrc/ai_company/security/rules/data_leak_detector.pysrc/ai_company/security/rules/destructive_op_detector.pysrc/ai_company/security/rules/engine.pysrc/ai_company/security/rules/risk_classifier.pysrc/ai_company/security/service.pysrc/ai_company/tools/base.pysrc/ai_company/tools/git_tools.pysrc/ai_company/tools/invoker.pytests/unit/config/conftest.pytests/unit/core/test_enums.pytests/unit/security/rules/test_data_leak_detector.pytests/unit/security/rules/test_engine.pytests/unit/security/rules/test_risk_classifier.pytests/unit/security/rules/test_utils.pytests/unit/security/test_action_types.pytests/unit/security/test_audit.pytests/unit/security/test_models.pytests/unit/security/test_service.pytests/unit/tools/test_base_action_type.pytests/unit/tools/test_invoker_security.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (4)
**/*.md
📄 CodeRabbit inference engine (CLAUDE.md)
DESIGN_SPEC.md is MANDATORY reading before implementing any feature. The spec is the starting point for architecture, data models, and behavior. If implementation deviates from the spec, alert the user and explain why — user decides whether to proceed or update the spec. Do not silently diverge. When spec sections are referenced (e.g. 'Section 10.2'), read that section verbatim before coding. When approved deviations occur, update DESIGN_SPEC.md to reflect the new reality
Files:
README.mdCLAUDE.mdDESIGN_SPEC.md
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649 native lazy annotations
Use PEP 758 except syntax: useexcept A, B:(no parentheses) — ruff enforces this on Python 3.14
All public functions must have type hints; mypy strict mode is enforced
Public classes and functions must have Google-style docstrings (enforced by ruff D rules)
Every module with business logic must include logging setup:from ai_company.observability import get_loggerthenlogger = get_logger(__name__). Never useimport loggingorprint()in application code. Variable name must always belogger
Use structured logging with event name constants:logger.info(EVENT_CONSTANT, key=value)— never use string interpolation likelogger.info('msg %s', val). Import event constants directly fromai_company.observability.events.<domain>
All error paths must log at WARNING or ERROR with context before raising; all state transitions must log at INFO; DEBUG for object creation, internal flow, and entry/exit of key functions
Line length maximum is 88 characters (enforced by ruff)
Functions must be < 50 lines; files must be < 800 lines
Handle errors explicitly; never silently swallow exceptions
Validate at system boundaries: user input, external APIs, and config files. Do not validate unnecessarily at internal boundaries
Files:
src/ai_company/security/rules/_utils.pytests/unit/security/test_models.pysrc/ai_company/security/rules/data_leak_detector.pytests/unit/security/test_service.pytests/unit/security/rules/test_data_leak_detector.pysrc/ai_company/security/action_type_mapping.pytests/unit/security/test_audit.pytests/unit/security/rules/test_engine.pysrc/ai_company/config/schema.pysrc/ai_company/security/output_scanner.pytests/unit/security/test_action_types.pysrc/ai_company/security/service.pysrc/ai_company/tools/invoker.pysrc/ai_company/tools/base.pytests/unit/tools/test_invoker_security.pysrc/ai_company/security/rules/destructive_op_detector.pysrc/ai_company/security/rules/engine.pysrc/ai_company/security/config.pysrc/ai_company/security/rules/risk_classifier.pysrc/ai_company/security/audit.pysrc/ai_company/engine/agent_engine.pytests/unit/security/rules/test_risk_classifier.pytests/unit/security/rules/test_utils.pytests/unit/core/test_enums.pytests/unit/tools/test_base_action_type.pysrc/ai_company/security/models.pysrc/ai_company/security/action_types.pysrc/ai_company/core/enums.pysrc/ai_company/tools/git_tools.pytests/unit/config/conftest.pysrc/ai_company/observability/events/security.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Use Pydantic v2 with frozen models for config/identity; use separate mutable-via-copy models (withmodel_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model
Use@computed_fieldfor derived values instead of storing + validating redundant fields (e.g.TokenUsage.total_tokens); useNotBlankStrfromcore.typesfor all identifier/name fields—including optional (NotBlankStr | None) and tuple variants—instead of manual whitespace validators
Create new objects for immutability; never mutate existing ones. For non-Pydantic internal collections (registries,BaseTool), usecopy.deepcopy()at construction +MappingProxyTypefor read-only enforcement. Fordict/listfields in frozen Pydantic models, rely onfrozen=Trueandcopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task
Files:
src/ai_company/security/rules/_utils.pysrc/ai_company/security/rules/data_leak_detector.pysrc/ai_company/security/action_type_mapping.pysrc/ai_company/config/schema.pysrc/ai_company/security/output_scanner.pysrc/ai_company/security/service.pysrc/ai_company/tools/invoker.pysrc/ai_company/tools/base.pysrc/ai_company/security/rules/destructive_op_detector.pysrc/ai_company/security/rules/engine.pysrc/ai_company/security/config.pysrc/ai_company/security/rules/risk_classifier.pysrc/ai_company/security/audit.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/security/models.pysrc/ai_company/security/action_types.pysrc/ai_company/core/enums.pysrc/ai_company/tools/git_tools.pysrc/ai_company/observability/events/security.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers:@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow. Minimum coverage 80% (enforced in CI). Async mode:asyncio_mode = 'auto'— no manual@pytest.mark.asyncioneeded. Per-test timeout: 30 seconds. Usepytest-xdistvia-n autofor parallelism. Prefer@pytest.mark.parametrizefor testing similar cases
NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names:example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc. Vendor names may only appear in: (1) DESIGN_SPEC.md provider list, (2).claude/files, (3) third-party import paths
Files:
tests/unit/security/test_models.pytests/unit/security/test_service.pytests/unit/security/rules/test_data_leak_detector.pytests/unit/security/test_audit.pytests/unit/security/rules/test_engine.pytests/unit/security/test_action_types.pytests/unit/tools/test_invoker_security.pytests/unit/security/rules/test_risk_classifier.pytests/unit/security/rules/test_utils.pytests/unit/core/test_enums.pytests/unit/tools/test_base_action_type.pytests/unit/config/conftest.py
🧠 Learnings (11)
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to tests/**/*.py : NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names: `example-provider`, `example-large-001`, `example-medium-001`, `example-small-001`, `large`/`medium`/`small` as aliases. Tests must use `test-provider`, `test-small-001`, etc. Vendor names may only appear in: (1) DESIGN_SPEC.md provider list, (2) `.claude/` files, (3) third-party import paths
Applied to files:
src/ai_company/security/rules/data_leak_detector.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.py : Every module with business logic must include logging setup: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`. Never use `import logging` or `print()` in application code. Variable name must always be `logger`
Applied to files:
src/ai_company/config/schema.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.py : Use structured logging with event name constants: `logger.info(EVENT_CONSTANT, key=value)` — never use string interpolation like `logger.info('msg %s', val)`. Import event constants directly from `ai_company.observability.events.<domain>`
Applied to files:
src/ai_company/config/schema.pysrc/ai_company/observability/events/security.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.py : Use PEP 758 except syntax: use `except A, B:` (no parentheses) — ruff enforces this on Python 3.14
Applied to files:
src/ai_company/tools/invoker.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.py : Handle errors explicitly; never silently swallow exceptions
Applied to files:
src/ai_company/tools/invoker.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/**/*.py : Use Pydantic v2 with frozen models for config/identity; use separate mutable-via-copy models (with `model_copy(update=...)`) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model
Applied to files:
src/ai_company/security/config.pytests/unit/config/conftest.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/**/*.py : Create new objects for immutability; never mutate existing ones. For non-Pydantic internal collections (registries, `BaseTool`), use `copy.deepcopy()` at construction + `MappingProxyType` for read-only enforcement. For `dict`/`list` fields in frozen Pydantic models, rely on `frozen=True` and `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Applied to files:
src/ai_company/security/config.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/**/*.py : Use `computed_field` for derived values instead of storing + validating redundant fields (e.g. `TokenUsage.total_tokens`); use `NotBlankStr` from `core.types` for all identifier/name fields—including optional (`NotBlankStr | None`) and tuple variants—instead of manual whitespace validators
Applied to files:
src/ai_company/security/config.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to src/ai_company/providers/**/*.py : All provider calls must go through `BaseCompletionProvider` which applies retry + rate limiting automatically. Never implement retry logic in driver subclasses or calling code. Set `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`. Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. `RetryExhaustedError` signals all retries failed—engine layer catches this for fallback chains. Rate limiter respects `RateLimitError.retry_after` from providers
Applied to files:
src/ai_company/engine/agent_engine.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to tests/**/*.py : Use pytest markers: `pytest.mark.unit`, `pytest.mark.integration`, `pytest.mark.e2e`, `pytest.mark.slow`. Minimum coverage 80% (enforced in CI). Async mode: `asyncio_mode = 'auto'` — no manual `pytest.mark.asyncio` needed. Per-test timeout: 30 seconds. Use `pytest-xdist` via `-n auto` for parallelism. Prefer `pytest.mark.parametrize` for testing similar cases
Applied to files:
tests/unit/security/rules/test_risk_classifier.pytests/unit/tools/test_base_action_type.py
📚 Learning: 2026-03-09T20:32:32.549Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-09T20:32:32.549Z
Learning: Applies to **/*.md : DESIGN_SPEC.md is MANDATORY reading before implementing any feature. The spec is the starting point for architecture, data models, and behavior. If implementation deviates from the spec, alert the user and explain why — user decides whether to proceed or update the spec. Do not silently diverge. When spec sections are referenced (e.g. 'Section 10.2'), read that section verbatim before coding. When approved deviations occur, update DESIGN_SPEC.md to reflect the new reality
Applied to files:
DESIGN_SPEC.md
🧬 Code graph analysis (25)
src/ai_company/security/rules/_utils.py (1)
src/ai_company/observability/_logger.py (1)
get_logger(8-28)
tests/unit/security/test_models.py (2)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(442-448)ToolCategory(293-307)src/ai_company/security/models.py (5)
AuditEntry(109-146)OutputScanResult(149-174)SecurityContext(70-100)SecurityVerdict(35-67)SecurityVerdictType(23-32)
tests/unit/security/test_service.py (9)
src/ai_company/core/enums.py (3)
ApprovalRiskLevel(442-448)ApprovalStatus(433-439)ToolCategory(293-307)src/ai_company/security/audit.py (3)
AuditLog(19-132)count(125-127)entries(130-132)src/ai_company/security/config.py (1)
SecurityConfig(68-112)src/ai_company/security/models.py (4)
OutputScanResult(149-174)SecurityContext(70-100)SecurityVerdict(35-67)SecurityVerdictType(23-32)src/ai_company/security/output_scanner.py (2)
OutputScanner(31-70)scan(34-70)src/ai_company/security/rules/engine.py (2)
RuleEngine(32-174)evaluate(74-151)src/ai_company/security/service.py (3)
SecOpsService(59-320)evaluate_pre_tool(104-173)scan_output(175-219)tests/unit/security/rules/test_engine.py (3)
evaluate(35-37)evaluate(275-280)evaluate(299-304)tests/unit/api/conftest.py (1)
approval_store(248-249)
src/ai_company/security/action_type_mapping.py (1)
src/ai_company/core/enums.py (2)
ActionType(371-407)ToolCategory(293-307)
tests/unit/security/test_audit.py (5)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(442-448)ToolCategory(293-307)src/ai_company/security/audit.py (6)
AuditLog(19-132)count(125-127)record(49-69)entries(130-132)query(76-123)total_recorded(72-74)src/ai_company/security/models.py (1)
AuditEntry(109-146)src/ai_company/engine/parallel_models.py (1)
agent_id(79-81)src/ai_company/tools/base.py (1)
action_type(134-136)
src/ai_company/config/schema.py (1)
src/ai_company/security/config.py (1)
SecurityConfig(68-112)
src/ai_company/security/output_scanner.py (2)
src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (1)
OutputScanResult(149-174)
src/ai_company/security/service.py (15)
src/ai_company/core/approval.py (1)
ApprovalItem(24-96)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/audit.py (2)
AuditLog(19-132)record(49-69)src/ai_company/security/config.py (1)
SecurityConfig(68-112)src/ai_company/security/models.py (4)
AuditEntry(109-146)OutputScanResult(149-174)SecurityVerdict(35-67)SecurityVerdictType(23-32)src/ai_company/security/output_scanner.py (2)
OutputScanner(31-70)scan(34-70)src/ai_company/security/rules/engine.py (2)
RuleEngine(32-174)evaluate(74-151)src/ai_company/api/approval_store.py (1)
ApprovalStore(30-156)src/ai_company/security/protocol.py (1)
evaluate_pre_tool(26-38)src/ai_company/tools/base.py (2)
action_type(134-136)description(139-141)src/ai_company/engine/parallel_models.py (2)
agent_id(79-81)task_id(87-89)src/ai_company/security/rules/destructive_op_detector.py (1)
evaluate(95-130)src/ai_company/security/rules/credential_detector.py (1)
evaluate(92-122)src/ai_company/security/rules/path_traversal_detector.py (1)
evaluate(74-104)src/ai_company/security/rules/policy_validator.py (1)
evaluate(57-98)
src/ai_company/tools/invoker.py (3)
src/ai_company/security/models.py (2)
SecurityContext(70-100)SecurityVerdictType(23-32)src/ai_company/tools/base.py (5)
ToolExecutionResult(26-55)BaseTool(58-185)name(124-126)category(129-131)action_type(134-136)src/ai_company/security/protocol.py (3)
SecurityInterceptionStrategy(18-54)evaluate_pre_tool(26-38)scan_output(40-54)
src/ai_company/tools/base.py (1)
src/ai_company/core/enums.py (1)
ActionType(371-407)
tests/unit/tools/test_invoker_security.py (2)
src/ai_company/security/models.py (4)
OutputScanResult(149-174)SecurityContext(70-100)SecurityVerdict(35-67)SecurityVerdictType(23-32)src/ai_company/security/protocol.py (2)
evaluate_pre_tool(26-38)scan_output(40-54)
src/ai_company/security/rules/destructive_op_detector.py (5)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(442-448)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (3)
SecurityContext(70-100)SecurityVerdict(35-67)SecurityVerdictType(23-32)src/ai_company/security/rules/_utils.py (1)
walk_string_values(13-34)src/ai_company/security/rules/engine.py (1)
evaluate(74-151)
src/ai_company/security/rules/engine.py (10)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(442-448)src/ai_company/security/models.py (3)
SecurityContext(70-100)SecurityVerdict(35-67)SecurityVerdictType(23-32)src/ai_company/security/rules/protocol.py (3)
SecurityRule(10-35)evaluate(23-35)name(19-21)src/ai_company/security/rules/risk_classifier.py (2)
RiskClassifier(47-91)classify(71-91)src/ai_company/security/rules/data_leak_detector.py (2)
evaluate(83-108)name(79-81)src/ai_company/security/rules/destructive_op_detector.py (2)
evaluate(95-130)name(91-93)src/ai_company/security/rules/credential_detector.py (2)
evaluate(92-122)name(88-90)src/ai_company/security/rules/path_traversal_detector.py (2)
evaluate(74-104)name(70-72)src/ai_company/security/rules/policy_validator.py (2)
evaluate(57-98)name(53-55)src/ai_company/tools/base.py (2)
name(124-126)action_type(134-136)
src/ai_company/security/config.py (3)
src/ai_company/core/enums.py (2)
ActionType(371-407)ApprovalRiskLevel(442-448)src/ai_company/security/models.py (2)
SecurityVerdictType(23-32)_check_action_type_format(92-100)src/ai_company/security/rules/policy_validator.py (1)
name(53-55)
src/ai_company/security/rules/risk_classifier.py (1)
src/ai_company/core/enums.py (2)
ActionType(371-407)ApprovalRiskLevel(442-448)
src/ai_company/security/audit.py (4)
src/ai_company/core/enums.py (1)
ApprovalRiskLevel(442-448)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/security/models.py (1)
AuditEntry(109-146)src/ai_company/engine/parallel_models.py (1)
agent_id(79-81)
src/ai_company/engine/agent_engine.py (11)
src/ai_company/security/output_scanner.py (1)
OutputScanner(31-70)src/ai_company/security/rules/credential_detector.py (1)
CredentialDetector(80-122)src/ai_company/security/rules/destructive_op_detector.py (1)
DestructiveOpDetector(82-130)src/ai_company/security/rules/engine.py (1)
RuleEngine(32-174)src/ai_company/security/rules/path_traversal_detector.py (1)
PathTraversalDetector(62-104)src/ai_company/security/rules/policy_validator.py (1)
PolicyValidator(23-98)src/ai_company/security/rules/risk_classifier.py (1)
RiskClassifier(47-91)src/ai_company/security/service.py (1)
SecOpsService(59-320)src/ai_company/tools/invoker.py (1)
ToolInvoker(71-665)src/ai_company/security/config.py (1)
SecurityConfig(68-112)src/ai_company/security/protocol.py (1)
SecurityInterceptionStrategy(18-54)
tests/unit/security/rules/test_risk_classifier.py (2)
src/ai_company/core/enums.py (2)
ActionType(371-407)ApprovalRiskLevel(442-448)src/ai_company/security/rules/risk_classifier.py (2)
RiskClassifier(47-91)classify(71-91)
tests/unit/security/rules/test_utils.py (1)
src/ai_company/security/rules/_utils.py (1)
walk_string_values(13-34)
tests/unit/core/test_enums.py (1)
src/ai_company/core/enums.py (1)
ActionType(371-407)
tests/unit/tools/test_base_action_type.py (2)
src/ai_company/core/enums.py (2)
ActionType(371-407)ToolCategory(293-307)src/ai_company/tools/base.py (5)
BaseTool(58-185)name(124-126)category(129-131)action_type(134-136)execute(167-185)
src/ai_company/security/models.py (2)
src/ai_company/core/enums.py (2)
ApprovalRiskLevel(442-448)ToolCategory(293-307)src/ai_company/security/config.py (1)
_check_action_type_format(36-45)
src/ai_company/security/action_types.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-407)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
src/ai_company/tools/git_tools.py (2)
src/ai_company/core/enums.py (1)
ActionType(371-407)src/ai_company/tools/base.py (1)
action_type(134-136)
tests/unit/config/conftest.py (1)
src/ai_company/security/config.py (1)
SecurityConfig(68-112)
🪛 LanguageTool
README.md
[typographical] ~36-~36: To join two clauses or introduce examples, consider using an em dash.
Context: ...only - Security/Approval System (M7) - SecOps agent with rule engine (soft-allo...
(DASH_RULE)
| if self._security_config is None: | ||
| logger.warning( | ||
| SECURITY_DISABLED, | ||
| note="No SecurityConfig provided — all security checks skipped", | ||
| ) | ||
| return None |
There was a problem hiding this comment.
Fail closed when SecurityConfig is missing.
Returning None here means ToolInvoker runs without a security_interceptor whenever a caller forgets to pass security_config. That makes SecOps opt-in and bypassable instead of a mandatory gatekeeper.
🔒 Minimal fail-closed change
if self._security_config is None:
- logger.warning(
+ logger.error(
SECURITY_DISABLED,
- note="No SecurityConfig provided — all security checks skipped",
+ note="SecurityConfig is required when tool execution is enabled",
)
- return None
+ msg = "SecurityConfig is required when tool execution is enabled"
+ raise ValueError(msg)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/agent_engine.py` around lines 672 - 677, The current
check that returns None when self._security_config is None lets ToolInvoker run
without a security_interceptor; change this to fail-closed: when
self._security_config is None, log the warning (leave
logger.warning(SECURITY_DISABLED,...)) but then raise a clear exception (e.g.,
SecurityConfigMissingError or RuntimeError) so creation/usage of ToolInvoker is
blocked, or alternatively instantiate a deny-all interceptor (e.g.,
DenyAllSecurityInterceptor) and assign it to the security_interceptor to ensure
all requests are rejected; update references around self._security_config and
the code path that constructs ToolInvoker/security_interceptor to enforce one of
these behaviours.
| def test_stops_at_max_depth(self) -> None: | ||
| """Build a structure deeper than 20 levels — no crash.""" | ||
| data: dict[str, object] = {"val": "leaf"} | ||
| for _ in range(25): | ||
| data = {"nested": data} | ||
|
|
||
| # Should not raise; values beyond max depth are silently skipped. | ||
| result = list(walk_string_values(data)) | ||
| assert isinstance(result, list) |
There was a problem hiding this comment.
This depth-limit test doesn't validate the behavior.
list(walk_string_values(data)) is always a list, so this passes even if the recursion cap starts returning the wrong values or stops warning entirely. Assert the expected result and/or captured warning explicitly so the depth-limit contract is actually pinned.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/security/rules/test_utils.py` around lines 79 - 87, The test
test_stops_at_max_depth should assert concrete behavior instead of only checking
the container type: call walk_string_values with the deep nested structure and
assert the returned list equals the expected leaf values (e.g., ["leaf"]) and/or
capture the warning emitted by walk_string_values to assert a depth-limit
warning was produced; update the assertion in test_stops_at_max_depth to compare
result == ["leaf"] and add a warnings.catch_warnings/assert to verify the
depth-limit warning from walk_string_values is raised.
| "rm -rf", | ||
| re.compile(r"\brm\s+-[a-zA-Z]*r[a-zA-Z]*f|rm\s+-[a-zA-Z]*f[a-zA-Z]*r"), | ||
| SecurityVerdictType.DENY, | ||
| ), |
There was a problem hiding this comment.
Missing \b word boundary in second rm -fr alternative
The first alternative correctly anchors with \b:
\brm\s+-[a-zA-Z]*r[a-zA-Z]*f
but the second alternative does not:
rm\s+-[a-zA-Z]*f[a-zA-Z]*r
Without \b, the second alternative matches inside any word ending in rm, e.g. charm -fr /tmp/something or worm -fa -r /data. While this produces a false-positive ESCALATE rather than a false-negative, it means legitimate tool calls could be blocked or escalated unnecessarily.
| "rm -rf", | |
| re.compile(r"\brm\s+-[a-zA-Z]*r[a-zA-Z]*f|rm\s+-[a-zA-Z]*f[a-zA-Z]*r"), | |
| SecurityVerdictType.DENY, | |
| ), | |
| re.compile(r"\brm\s+-[a-zA-Z]*r[a-zA-Z]*f|\brm\s+-[a-zA-Z]*f[a-zA-Z]*r"), |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/security/rules/destructive_op_detector.py
Line: 26-29
Comment:
**Missing `\b` word boundary in second `rm -fr` alternative**
The first alternative correctly anchors with `\b`:
```
\brm\s+-[a-zA-Z]*r[a-zA-Z]*f
```
but the second alternative does not:
```
rm\s+-[a-zA-Z]*f[a-zA-Z]*r
```
Without `\b`, the second alternative matches inside any word ending in `rm`, e.g. `charm -fr /tmp/something` or `worm -fa -r /data`. While this produces a false-positive ESCALATE rather than a false-negative, it means legitimate tool calls could be blocked or escalated unnecessarily.
```suggestion
re.compile(r"\brm\s+-[a-zA-Z]*r[a-zA-Z]*f|\brm\s+-[a-zA-Z]*f[a-zA-Z]*r"),
```
How can I resolve this? If you propose a fix, please make it concise.| def _check_sensitive_paths(arguments: dict[str, object]) -> list[str]: | ||
| """Find sensitive file paths in argument values (recursive).""" | ||
| findings: list[str] = [] | ||
| for value in walk_string_values(arguments): | ||
| for label, pattern in _SENSITIVE_PATHS: | ||
| if pattern.search(value): | ||
| findings.append(f"sensitive path detected ({label})") | ||
| break | ||
| return findings | ||
|
|
||
|
|
||
| def _check_pii(arguments: dict[str, object]) -> list[str]: | ||
| """Find PII patterns in string argument values (recursive).""" | ||
| findings: list[str] = [] | ||
| for value in walk_string_values(arguments): | ||
| for name, pattern in PII_PATTERNS: | ||
| if pattern.search(value): | ||
| findings.append(name) | ||
| return findings |
There was a problem hiding this comment.
Inconsistent early break between _check_sensitive_paths and _check_pii
_check_sensitive_paths uses break after the first pattern matches for a given string value (line 61), so only one finding is recorded per argument value even if it matches multiple sensitive-path patterns. _check_pii has no break, so all PII patterns are checked and every match is appended.
This means a value like "/etc/id_rsa.pem" records only one finding (RSA private key file), silently dropping the PEM certificate match. The reason inconsistency matters for auditing: the AuditEntry.reason field and downstream alert logic see incomplete findings, making it harder to determine the full extent of a policy violation.
If the intent is "one finding per value is enough to trigger DENY", both helpers should break; if the intent is "report all matched patterns", neither should.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/security/rules/data_leak_detector.py
Line: 54-72
Comment:
**Inconsistent early `break` between `_check_sensitive_paths` and `_check_pii`**
`_check_sensitive_paths` uses `break` after the first pattern matches for a given string value (line 61), so only one finding is recorded per argument value even if it matches multiple sensitive-path patterns. `_check_pii` has no `break`, so all PII patterns are checked and every match is appended.
This means a value like `"/etc/id_rsa.pem"` records only one finding (`RSA private key file`), silently dropping the `PEM certificate` match. The reason inconsistency matters for auditing: the `AuditEntry.reason` field and downstream alert logic see incomplete findings, making it harder to determine the full extent of a policy violation.
If the intent is "one finding per value is enough to trigger DENY", both helpers should `break`; if the intent is "report all matched patterns", neither should.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Pull request overview
Copilot reviewed 61 out of 61 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """Scan tool output for sensitive data (if interceptor is set). | ||
|
|
||
| If sensitive data is found and redacted, returns a new | ||
| ``ToolExecutionResult`` with the redacted content. Exceptions | ||
| from the scanner are caught — the original result is returned | ||
| to avoid destroying valid tool output. | ||
| """ |
There was a problem hiding this comment.
The _scan_output docstring says scanner exceptions return the original tool output, but the implementation returns an error result and withholds output (fail-closed). Update the docstring to match the current fail-closed behavior, or change the code to match the documented pass-through semantics.
| def __init__( | ||
| self, | ||
| *, | ||
| rules: tuple[SecurityRule, ...], | ||
| risk_classifier: RiskClassifier, | ||
| config: RuleEngineConfig, | ||
| ) -> None: | ||
| """Initialize the rule engine. | ||
|
|
||
| Args: | ||
| rules: Ordered tuple of rules to evaluate. | ||
| risk_classifier: Fallback risk classifier. | ||
| config: Rule engine configuration. | ||
| """ | ||
| self._rules = rules | ||
| self._risk_classifier = risk_classifier | ||
| self._config = config | ||
|
|
There was a problem hiding this comment.
RuleEngineConfig is stored on the engine (self._config) but never used, and max_argument_length is documented as limiting scanning without any enforcement. Either wire RuleEngineConfig into rule evaluation (e.g., truncate/skip overly long strings before passing to detectors) or remove the unused config plumbing to avoid misleading configuration knobs.
| if config.custom_policies: | ||
| logger.warning( | ||
| SECURITY_EVALUATE_START, | ||
| note=( | ||
| "custom_policies configured but not yet " | ||
| "evaluated — enforcement is not implemented" | ||
| ), | ||
| policy_count=len(config.custom_policies), | ||
| ) |
There was a problem hiding this comment.
custom_policies are exposed in SecurityConfig, but the service currently only logs that they are not evaluated (“enforcement is not implemented”). This is a functional gap relative to the intended policy system and can surprise operators who think custom policies are active. Either implement evaluation of custom_policies in the rule engine/service, or remove/clearly separate the config surface until it’s supported; also consider using a dedicated config-related event instead of SECURITY_EVALUATE_START for this warning.
| try: | ||
| verdict = self._rule_engine.evaluate(context) | ||
| except Exception: | ||
| logger.exception( | ||
| SECURITY_INTERCEPTOR_ERROR, | ||
| tool_name=context.tool_name, | ||
| note="Rule engine evaluation failed (fail-closed)", | ||
| ) | ||
| verdict = SecurityVerdict( | ||
| verdict=SecurityVerdictType.DENY, | ||
| reason="Rule engine evaluation failed (fail-closed)", | ||
| risk_level=ApprovalRiskLevel.CRITICAL, | ||
| evaluated_at=datetime.now(UTC), | ||
| evaluation_duration_ms=0.0, | ||
| ) |
There was a problem hiding this comment.
evaluate_pre_tool catches Exception, which also catches MemoryError/RecursionError. This breaks the invoker’s contract that non-recoverable errors are re-raised and can mask OOM/recursion failures as a DENY verdict. Add an explicit except MemoryError, RecursionError: raise (and consider letting other BaseException subclasses propagate) before the broad exception handler.
| def _safe_evaluate( | ||
| self, | ||
| rule: SecurityRule, | ||
| context: SecurityContext, | ||
| ) -> SecurityVerdict | None: | ||
| """Evaluate a single rule, catching exceptions (fail-closed).""" | ||
| try: | ||
| return rule.evaluate(context) | ||
| except Exception: | ||
| logger.exception( | ||
| SECURITY_RULE_ERROR, | ||
| rule_name=rule.name, | ||
| tool_name=context.tool_name, | ||
| ) | ||
| return SecurityVerdict( | ||
| verdict=SecurityVerdictType.DENY, | ||
| reason=f"Security rule {rule.name!r} failed (fail-closed)", | ||
| risk_level=ApprovalRiskLevel.CRITICAL, | ||
| matched_rules=(rule.name,), | ||
| evaluated_at=datetime.now(UTC), | ||
| evaluation_duration_ms=0.0, | ||
| ) |
There was a problem hiding this comment.
RuleEngine._safe_evaluate catches Exception, which includes MemoryError/RecursionError. That turns non-recoverable failures into a normal DENY verdict and can hide serious runtime problems. Re-raise MemoryError/RecursionError (and possibly other BaseException subclasses) before the generic exception handler.
| @model_validator(mode="after") | ||
| def _check_action_type_format(self) -> SecurityContext: | ||
| """Validate that action_type uses ``category:action`` format.""" | ||
| if ":" not in self.action_type: | ||
| msg = ( | ||
| f"action_type {self.action_type!r} must use " | ||
| "'category:action' format (missing ':')" | ||
| ) | ||
| raise ValueError(msg) | ||
| return self |
There was a problem hiding this comment.
SecurityContext._check_action_type_format only checks for the presence of :. This allows invalid values like ":action", "category:", or "a:b:c" even though the docs describe a two-level category:action format and ActionTypeRegistry.get_category requires exactly one colon with non-empty parts. Tighten validation to require exactly one colon and non-empty category/action segments.
src/ai_company/tools/base.py
Outdated
| if action_type is not None: | ||
| if ":" not in action_type: | ||
| msg = f"action_type {action_type!r} must use 'category:action' format" | ||
| raise ValueError(msg) | ||
| self._action_type = action_type | ||
| else: | ||
| self._action_type = str( | ||
| DEFAULT_CATEGORY_ACTION_MAP.get( | ||
| category, | ||
| ActionType.CODE_READ, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
BaseTool validates action_type with only ":" in action_type, so values like ":x", "x:", or "a:b:c" are accepted despite the intended two-level category:action taxonomy. Consider validating action_type.count(':') == 1 and rejecting empty segments for consistency with ActionTypeRegistry and SecurityContext expectations.
| @model_validator(mode="after") | ||
| def _check_action_type_format(self) -> SecurityPolicyRule: | ||
| """Validate that action_types entries use ``category:action`` format.""" | ||
| for at in self.action_types: | ||
| if ":" not in at: | ||
| msg = ( | ||
| f"action_type {at!r} in policy {self.name!r} must use " | ||
| "'category:action' format" | ||
| ) | ||
| raise ValueError(msg) | ||
| return self |
There was a problem hiding this comment.
SecurityPolicyRule._check_action_type_format only checks that each entry contains :, so invalid strings like "category:", ":action", or "a:b:c" pass validation. Since policies are meant to target two-level category:action values, enforce exactly one colon and non-empty segments (matching ActionTypeRegistry.get_category).
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/ai_company/tools/_git_base.py (1)
227-236:⚠️ Potential issue | 🟡 MinorAvoid logging raw rejected git args.
This branch logs untrusted
valuebefore any control-character sanitization. An input like"-branch\nfake=1"will take this path and can contaminate logs; userepr(value)or the same sanitization strategy as the control-character branch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/tools/_git_base.py` around lines 227 - 236, The logger currently emits the raw untrusted `value` when rejecting git args (see the GIT_REF_INJECTION_BLOCKED log call in the branch that checks value.startswith("-")), which can leak control characters; update that log call to log a sanitized representation (e.g., use repr(value) or the same sanitize_control_chars(value) used elsewhere) instead of the raw `value`, and ensure the ToolExecutionResult return remains unchanged.
♻️ Duplicate comments (16)
src/ai_company/security/action_types.py (1)
54-63:⚠️ Potential issue | 🟡 MinorDo not rely on
assertfor taxonomy integrity checks.These guards disappear under
python -O, so category drift betweenActionTypeandActionTypeCategorycan ship unnoticed. RaiseRuntimeErrorexplicitly instead.Proposed fix
_missing_in_enum = _extracted_categories - _enum_categories _missing_in_map = _enum_categories - _extracted_categories -assert not _missing_in_enum, ( # noqa: S101 - f"ActionType categories missing from ActionTypeCategory: {_missing_in_enum}" -) -assert not _missing_in_map, ( # noqa: S101 - f"ActionTypeCategory members missing from ActionType: {_missing_in_map}" -) +if _missing_in_enum: + msg = ( + "ActionType categories missing from ActionTypeCategory: " + f"{_missing_in_enum}" + ) + raise RuntimeError(msg) +if _missing_in_map: + msg = ( + "ActionTypeCategory members missing from ActionType: " + f"{_missing_in_map}" + ) + raise RuntimeError(msg)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/security/action_types.py` around lines 54 - 63, The current integrity checks use assert (variables: _extracted_categories, _enum_categories, _missing_in_enum, _missing_in_map) which are removed under python -O; replace those assert statements with explicit runtime checks that raise RuntimeError with the same descriptive messages when _missing_in_enum or _missing_in_map are non-empty, referencing ActionTypeCategory, ActionType and _CATEGORY_MAP in the error text so category drift is always detected at import time.src/ai_company/security/audit.py (1)
105-107:⚠️ Potential issue | 🟡 MinorLog invalid
limitbefore raising.
query()raises onlimit < 1without any warning, unlike__init__. Add a warning with the invalid limit before theValueError.As per coding guidelines, "All error paths must log at WARNING or ERROR with context before raising."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/security/audit.py` around lines 105 - 107, In query(), log the invalid limit at WARNING level before raising: call logging.getLogger(__name__).warning(...) (or the module's existing logger) with a message like "invalid limit in query(): %s" and include the limit value, then raise the same ValueError; this mirrors __init__'s behavior and ensures the error path is logged with context.src/ai_company/core/enums.py (1)
371-407:⚠️ Potential issue | 🟠 MajorPreserve legacy
action_typevalues during this taxonomy switch.
SecurityContext.action_typeis still plainstr, so older persisted/configured values can keep loading after this enum rewrite. Without a normalization layer, legacy strings like"budget_spend"or"architecture_change"will silently bypass their intended policy/risk branches and fall into unknown-value handling instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/core/enums.py` around lines 371 - 407, SecurityContext.action_type was changed to use the new ActionType taxonomy (ActionType enum) but legacy plain-string values (e.g., "budget_spend", "architecture_change") will still be persisted and must be normalized on read; add a normalization layer that maps legacy strings to the corresponding ActionType member (e.g., "budget_spend" -> ActionType.BUDGET_SPEND, "architecture_change" -> ActionType.ARCH_DECIDE) and use it wherever SecurityContext.action_type is loaded/validated (deserialize/constructor/from_dict/parsing functions) so old values are converted to the new enum and policy/risk branches continue to match. Ensure the mapper is centralized and referenced by SecurityContext and any code paths that accept raw action_type strings.src/ai_company/security/rules/protocol.py (1)
3-6:⚠️ Potential issue | 🟠 MajorImport protocol annotation types at runtime.
SecurityContextandSecurityVerdictonly exist underTYPE_CHECKING, so Python 3.14 runtime annotation evaluation can fail withNameErroron this public protocol.Proposed fix
-from typing import TYPE_CHECKING, Protocol, runtime_checkable - -if TYPE_CHECKING: - from ai_company.security.models import SecurityContext, SecurityVerdict +from typing import Protocol, runtime_checkable + +from ai_company.security.models import SecurityContext, SecurityVerdictAlso applies to: 23-26
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/security/rules/protocol.py` around lines 3 - 6, The Protocol uses SecurityContext and SecurityVerdict only under TYPE_CHECKING which causes runtime NameError under Python 3.14; move the imports for SecurityContext and SecurityVerdict out of the TYPE_CHECKING block so they are available at runtime (or alternatively enable postponed evaluation with from __future__ import annotations), ensuring the public protocol decorated with runtime_checkable can reference SecurityContext and SecurityVerdict without raising NameError..gitleaks.toml (1)
4-8:⚠️ Potential issue | 🟠 MajorNarrow this allowlist to specific test secrets.
Allowlisting entire test files means any real secret accidentally added later will also be skipped. Prefer inline
gitleaks:allow, rule-scoped allowlists, or.gitleaksignorefingerprints so only the intentional fixtures are exempted.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.gitleaks.toml around lines 4 - 8, The current .gitleaks.toml allowlist broadly exempts entire test files (entries referencing tests/unit/security/rules/test_credential_detector.py and tests/unit/security/test_output_scanner.py); replace these file-level exemptions with targeted allowlists: remove the path entries under [allowlist] and instead annotate the specific test fixtures or strings with inline markers (e.g., gitleaks:allow or gitleaks:allow=<rule> comments), add rule-scoped allowlist entries in .gitleaks.toml that reference exact secret fingerprints, or add .gitleaksignore entries that contain the precise fingerprints of the test secrets so only the intended test fixtures are exempted.tests/unit/security/rules/test_utils.py (1)
79-87:⚠️ Potential issue | 🟡 MinorTest does not validate depth-limit behavior.
The test only asserts
isinstance(result, list), which always passes. Per thewalk_string_valuesimplementation, when depth exceeds 20, it returns early and logsSECURITY_SCAN_DEPTH_EXCEEDED. The nested structure places"leaf"at depth 26, soresultshould be empty.💡 Suggested improvement to assert concrete behavior
def test_stops_at_max_depth(self) -> None: """Build a structure deeper than 20 levels — no crash.""" data: dict[str, object] = {"val": "leaf"} for _ in range(25): data = {"nested": data} - # Should not raise; values beyond max depth are silently skipped. - result = list(walk_string_values(data)) - assert isinstance(result, list) + # The leaf is at depth 26; max depth is 20, so it should not be yielded. + result = list(walk_string_values(data)) + assert result == [], f"Expected empty list at depth >20, got {result}"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/rules/test_utils.py` around lines 79 - 87, The test currently only checks isinstance(result, list) which doesn't verify depth behavior; update test_stops_at_max_depth to assert that walk_string_values returns an empty list for the nested structure (since the leaf is beyond the 20-level limit) and additionally assert that the LOGGER (or the component that records messages when walk_string_values hits the limit) logged the SECURITY_SCAN_DEPTH_EXCEEDED message; locate the test and modify assertions to check result == [] and to assert the logger was called with SECURITY_SCAN_DEPTH_EXCEEDED when invoking walk_string_values.tests/unit/security/rules/test_credential_detector.py (1)
33-84:⚠️ Potential issue | 🟡 MinorUse provider-neutral credential fixtures and expectations.
These cases still bake branded names like
AWSandGitHubinto project-owned tests and expected output. That conflicts with the repo rule and keeps vendor-specific wording in failure messages. Please switch the labels/fixtures and the reason assertion to neutral equivalents.As per coding guidelines, "Never use real vendor names in test code; use generic test provider names (test-provider, test-small-001, etc.)."
Also applies to: 149-164
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/rules/test_credential_detector.py` around lines 33 - 84, Replace vendor-branded test labels and fixture values in the pytest.mark.parametrize block (the tuples like "AWS access key", "AWS secret key", "GitHub PAT", etc.) with provider-neutral names and example values (e.g., "cloud provider access key" -> "test-provider-access-key", "service token" -> "test-service-token", "personal access token" -> "test-provider-pat", SSH/private key cases can stay generic). Update any expectations/assertion strings that reference vendor names (the reason or message checks tied to these param cases) to use the same neutral wording. Do the same replacement for the second parametrized block referenced in the comment (the similar tuples around the later section) so all fixtures and expected messages in test_credential_detector.py are vendor-neutral and consistent with the repo guideline.tests/unit/security/rules/test_risk_classifier.py (1)
186-200:⚠️ Potential issue | 🟡 MinorThis completeness check still can’t catch a missing default mapping.
RiskClassifier.classify()falls back toApprovalRiskLevel.HIGHfor unmapped action types, so both assertions here still pass on the fallback path. Assert direct membership in the classifier’s backing map instead.🧪 Proposed fix
def test_all_action_types_are_mapped(self) -> None: """Every ActionType enum member resolves without falling back to HIGH.""" classifier = RiskClassifier() - for action_type in ActionType: - risk = classifier.classify(action_type) - # All built-in types should have an explicit mapping, - # so none should silently fall back to HIGH (the unknown default). - assert isinstance(risk, ApprovalRiskLevel), ( - f"{action_type} not mapped in default risk map" - ) - # Verify that the mapping is explicit (not the unknown fallback). - # We don't assert risk != HIGH since some types (DB_MUTATE etc.) - # are legitimately HIGH — just verify it's a valid ApprovalRiskLevel. - assert risk in ApprovalRiskLevel, ( - f"{action_type} returned invalid risk level" - ) + missing = { + action_type + for action_type in ActionType + if action_type not in classifier._risk_map + } + assert not missing, f"Missing default risk mappings: {sorted(missing)}"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/rules/test_risk_classifier.py` around lines 186 - 200, The test test_all_action_types_are_mapped is ineffective because RiskClassifier.classify() returns ApprovalRiskLevel.HIGH for unmapped types; change the test to assert that each ActionType is present in the classifier's backing mapping rather than relying on classify(). Specifically, instantiate RiskClassifier() and check membership of each action_type in its internal map (e.g., classifier._risk_map or the actual private map attribute used by RiskClassifier), then assert that classifier._risk_map[action_type] is a member of ApprovalRiskLevel to ensure an explicit mapping exists.tests/unit/security/test_service.py (1)
331-343:⚠️ Potential issue | 🟡 MinorMake the scan-output fixtures vendor-neutral.
The mocked findings and sample output here still use branded
AWSwording and key shapes. Since these tests are exercising service flow rather than provider-specific parsing, neutral fixtures preserve the coverage without violating the repository rule.As per coding guidelines, "Never use real vendor names in test code; use generic test provider names (test-provider, test-small-001, etc.)."
Also applies to: 382-389
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/test_service.py` around lines 331 - 343, The test uses vendor-specific strings ("AWS access key" and example key shapes) in the mocked OutputScanResult and sample output; update the fixtures in the test cases that create OutputScanResult (e.g., the finding_result variable) and the sample input passed to service.scan_output to use vendor-neutral identifiers like "test-provider access key" or "test-provider credential" and generic sample output (e.g., "sample output"); do the same for the other occurrence mentioned (the second test around the other OutputScanResult at lines 382–389) so that assertions (result.findings, redacted_content) and the mocked scan call (service._test_output_scanner.scan.assert_called_once_with) all reference the neutral strings instead of "AWS".tests/unit/security/test_audit.py (1)
285-290: 🧹 Nitpick | 🔵 TrivialAdd a test for negative
limitvalues.The test covers
limit=0but not negative values. SinceAuditLog.queryraisesValueErrorforlimit < 1, consider adding a case for negative limits to fully exercise the boundary.✨ Suggested addition
def test_query_limit_zero_raises_value_error(self) -> None: log = AuditLog() self._populate(log) with pytest.raises(ValueError, match="limit must be >= 1"): log.query(limit=0) + + def test_query_limit_negative_raises_value_error(self) -> None: + log = AuditLog() + self._populate(log) + + with pytest.raises(ValueError, match="limit must be >= 1"): + log.query(limit=-5)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/security/test_audit.py` around lines 285 - 290, Add a test that asserts AuditLog.query raises ValueError for negative limits: extend tests/unit/security/test_audit.py by adding either a new test (e.g., test_query_negative_limit_raises_value_error) or parametrize the existing test_query_limit_zero_raises_value_error to include negative values (e.g., -1, -10), call AuditLog.query(limit=<negative>) and expect pytest.raises(ValueError, match="limit must be >= 1"); reference AuditLog.query and the existing test method to locate the right place.DESIGN_SPEC.md (1)
3037-3053:⚠️ Potential issue | 🟠 MajorSync this security tree with the implementation being merged.
This still omits
credential_detector.pyandpath_traversal_detector.py, anddata_leak_detector.pyis still described as secret detection even though that responsibility now lives incredential_detector.py. The spec will keep drifting unless this tree mirrors the actual M7 module split.📝 Suggested doc fix
│ │ ├── service.py # SecOpsService — meta-agent coordinating security │ │ └── rules/ # Rule engine and detectors │ │ ├── engine.py # RuleEngine (soft-allow + hard-deny, fail-closed) │ │ ├── protocol.py # SecurityRule protocol │ │ ├── policy_validator.py # Policy list validation rule │ │ ├── risk_classifier.py # RiskClassifier (ActionType → ApprovalRiskLevel) +│ │ ├── credential_detector.py # Credential/secret pattern detection │ │ ├── destructive_op_detector.py # Destructive operation detection -│ │ ├── data_leak_detector.py # Credential/secret pattern detection +│ │ ├── data_leak_detector.py # Sensitive path / PII detection +│ │ ├── path_traversal_detector.py # Path traversal detection │ │ └── _utils.py # walk_string_values utility (recursive argument scanning)Based on learnings, "When approved deviations occur, update DESIGN_SPEC.md to reflect the new reality".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@DESIGN_SPEC.md` around lines 3037 - 3053, The DESIGN_SPEC security tree is out-of-sync with the merged implementation: add entries for credential_detector.py and path_traversal_detector.py under security/rules, update the description of data_leak_detector.py (or rename its role) to reflect credential detection now living in credential_detector.py, and ensure the top-level security/ listing (files like action_type_mapping.py, service.py) matches the actual module set; specifically, update the security/ and security/rules/ file list and the inline descriptions for data_leak_detector.py, credential_detector.py, and path_traversal_detector.py so the spec mirrors the current codebase.src/ai_company/tools/invoker.py (2)
335-345:⚠️ Potential issue | 🟠 MajorBuild
SecurityContextinside the guarded fail-closed path.
_build_security_context()can still raise before_check_security()enters its fail-closed handler, so invalidagent_id,task_id, oraction_typevalues currently bubble out ofinvoke()as uncaught exceptions instead of a blockedToolResult. Move context creation into the same guarded path that evaluates the interceptor and reuse that instance for the later output scan.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/tools/invoker.py` around lines 335 - 345, The SecurityContext is being built before entering the guarded fail-closed path so _build_security_context(tool_or_error, tool_call) can raise and escape invoke(); move the call to _build_security_context into the same guarded path where you evaluate self._security_interceptor (i.e., only build it when _security_interceptor is not None and inside the try/except/fail-closed handling used for _check_security), then pass that created instance into the subsequent await self._check_security(...) and later reuse it for the output scan; ensure you no longer construct security_context outside the guarded block so all construction errors produce a blocked ToolResult via the existing fail-closed logic.
256-294:⚠️ Potential issue | 🔴 CriticalScan error outputs too, and block when redaction is impossible.
The early
if result.is_error: return resultlets secrets in tool error text bypass SecOps entirely, andhas_sensitive_data=Truewithredacted_content=Nonestill falls through to the raw output. Both cases violate the fail-closed guarantee this interceptor is meant to provide.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/tools/invoker.py` around lines 256 - 294, The code currently returns early on error results, allowing error text to skip security scanning and also lets outputs with has_sensitive_data True and redacted_content None pass through; change the flow so you always call self._security_interceptor.scan_output(context, result.content) regardless of result.is_error, keep the existing exception handling around scan_output, and after scanning treat any scan_result where has_sensitive_data is True but redacted_content is None as a fail-closed condition: log the incident (similar to SECURITY_OUTPUT_SCAN_ERROR/TOOL_OUTPUT_REDACTED) and return a ToolExecutionResult that is is_error=True with content like "Output scan failed (fail-closed). Tool output withheld." and metadata indicating output_scan_failed/output_redacted as appropriate; preserve propagation of MemoryError/RecursionError and reuse result.metadata when returning redacted content via scan_result.redacted_content.src/ai_company/security/service.py (1)
136-150:⚠️ Potential issue | 🟠 MajorRe-raise non-recoverable failures from the rule engine.
This blanket
except ExceptionconvertsMemoryErrorandRecursionErrorinto normal DENY verdicts, which hides non-recoverable runtime failures and keeps the system running in a bad state. Re-raise them before the generic handler, the same wayToolInvokerdoes.Based on learnings, "Handle errors explicitly; never silently swallow exceptions".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/security/service.py` around lines 136 - 150, The current blanket except around self._rule_engine.evaluate hides non-recoverable errors (e.g., MemoryError, RecursionError) by turning them into DENY SecurityVerdicts; change the except clause to first re-raise non-recoverable exceptions (MemoryError, RecursionError, and other fatal exceptions your code treats as fatal) and only then handle the remaining exceptions by logging and returning a fail-closed SecurityVerdict (constructed via SecurityVerdict and SecurityVerdictType.DENY with the same fields). Keep the logging call (SECURITY_INTERCEPTOR_ERROR, tool_name=context.tool_name, note=...) and the fallback verdict creation unchanged for recoverable exceptions, but do not swallow or convert fatal exceptions thrown by self._rule_engine.evaluate.src/ai_company/engine/agent_engine.py (2)
703-709:⚠️ Potential issue | 🟠 MajorUse a shared
AuditLoginstead of creating one per run.
_make_tool_invoker()rebuilds the interceptor on every execution, and this path allocates a fresh in-memoryAuditLog()each time. That resets query history andtotal_recorded, so the audit trail is no longer comprehensive across runs.♻️ Minimal direction
self._security_config = security_config self._approval_store = approval_store + self._audit_log = AuditLog() self._recovery_strategy = recovery_strategyreturn SecOpsService( config=cfg, rule_engine=rule_engine, - audit_log=AuditLog(), + audit_log=self._audit_log, output_scanner=OutputScanner(), approval_store=self._approval_store, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/agent_engine.py` around lines 703 - 709, The SecOpsService is being instantiated with a new in-memory AuditLog() each run inside _make_tool_invoker(), which resets history and total_recorded; instead, create and reuse a single AuditLog instance (e.g., self._audit_log) on the engine object and pass that shared instance into SecOpsService(config=cfg, rule_engine=rule_engine, audit_log=self._audit_log, ...), ensuring _audit_log is initialized once (lazy-init in constructor or first call) so the audit trail is preserved across runs.
672-677:⚠️ Potential issue | 🔴 CriticalFail closed when
SecurityConfigis missing.Returning
Nonehere means tool execution proceeds with no security interceptor whenever a caller forgets to providesecurity_config. That makes SecOps opt-in and bypassable instead of a mandatory gatekeeper.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/agent_engine.py` around lines 672 - 677, Currently the code returns None when self._security_config is None (logging SECURITY_DISABLED), which allows tool execution to bypass security; change this to fail closed by either raising a clear exception (e.g., RuntimeError or a custom SecurityConfigMissingError) or by instantiating a strict-deny SecurityInterceptor that blocks all tool execution. Update the logic in the method that checks self._security_config (the block that currently calls logger.warning with SECURITY_DISABLED) so it does not return None—instead raise the error or return the deny-all interceptor and ensure callers handle that contract accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/engine/agent_engine.py`:
- Around line 683-697: The config's custom_policies are never converted into
SecurityRule instances and thus ignored; update the detector construction in the
agent_engine (where PolicyValidator is created and the detectors list is built)
to iterate over cfg.security.custom_policies (or SecurityConfig.custom_policies)
and instantiate/validate each into a SecurityRule (or wrap them in a
CallableRule/CustomPolicyRule) before appending to detectors, ensuring they are
type-checked and ordered (e.g., appended after PolicyValidator) and that any
invalid custom policy raises or logs an error; keep the existing re_cfg feature
flags but ensure custom policies are honored by adding them to the detectors
list used for enforcement.
In `@src/ai_company/security/action_types.py`:
- Around line 152-164: get_category currently raises ValueError for malformed
action_type without logging; add logging calls before each raise to record the
invalid action_type and context. Specifically, in the get_category function (the
block that checks if action_type.count(":") != 1 and the block that checks if
not category or not action), call the module logger at WARNING or ERROR (e.g.,
logger.warning(...)) and include the offending action_type and a short message
before raising the ValueError so the invalid value and reason are recorded in
logs. Ensure the log messages mirror the ValueError text and use the existing
module-level logger variable.
In `@src/ai_company/security/audit.py`:
- Around line 62-69: The audit append currently logs the state change at DEBUG
(logger.debug with SECURITY_AUDIT_RECORDED) so the mutation of self._entries and
self._total_recorded is invisible at normal prod levels; change the call to
logger.info (preserving SECURITY_AUDIT_RECORDED and the structured fields
audit_id=entry.id, tool_name=entry.tool_name, verdict=entry.verdict) so the
append/state transition is emitted at INFO when appending to _entries and
incrementing _total_recorded.
In `@src/ai_company/security/config.py`:
- Around line 28-45: The action_types field and its validator are too
permissive; replace the raw tuple[str, ...] with the shared constrained type
tuple[NotBlankStr, ...] (import NotBlankStr from core.types if not already
present) and remove or tighten the manual _check_action_type_format to rely on
NotBlankStr (or validate each NotBlankStr still contains a single ":" if
needed). Update any other similar fields in the same file (the block referenced
at lines 89–112) to use NotBlankStr or tuple[NotBlankStr, ...] instead of raw
str so identifier/name fields follow the NotBlankStr convention.
In `@src/ai_company/security/rules/_utils.py`:
- Around line 26-32: The current behavior in the depth check (when _depth >=
_MAX_RECURSION_DEPTH) logs a warning via
logger.warning(SECURITY_SCAN_DEPTH_EXCEEDED, ...) and returns silently, allowing
unscanned deep payloads; change this to fail-closed by raising an exception
(e.g., a new SecurityScanDepthExceededError or a RuntimeError) instead of
returning, include the _depth and _MAX_RECURSION_DEPTH in the exception message,
keep or augment the existing logger.warning call for observability, and ensure
callers of the function (places that call the function that contains _walk_value
or the depth check) are updated to catch and handle this exception
appropriately.
In `@src/ai_company/security/rules/data_leak_detector.py`:
- Around line 101-108: The return value currently hardcodes
evaluation_duration_ms=0.0; wrap the detection logic in a monotonic timer (e.g.,
time.monotonic()) inside the function in data_leak_detector.py, record start
before evaluation and end after building the verdict, compute (end - start) *
1000 to get milliseconds, and set SecurityVerdict(evaluation_duration_ms=...) to
that value while keeping the other fields (SecurityVerdictType.DENY, reason,
matched_rules=_RULE_NAME, evaluated_at=datetime.now(UTC)) unchanged.
In `@src/ai_company/security/rules/engine.py`:
- Around line 56-72: The RuleEngine __init__ stores RuleEngineConfig but the
evaluate() path never uses the configured max_argument_length, so long argument
strings get scanned by every rule; update the constructor to extract and store
the max_argument_length (from RuleEngineConfig) and enforce it centrally before
any rule scanning (either by truncating or splitting results from
walk_string_values(context.arguments) based on that cap), refactor evaluate() by
moving argument-walking and length-enforcement into a new helper (e.g.,
_iter_capped_argument_strings or _prepare_arguments_for_evaluation) to keep
evaluate() under 50 lines, and ensure all rule detectors consume the capped
iterator instead of raw walk_string_values so the cap is applied uniformly.
In `@src/ai_company/security/rules/policy_validator.py`:
- Around line 37-50: SecurityConfig currently accepts hard_deny_action_types and
auto_approve_action_types verbatim and that allows category shortcuts like
"code" to slip through; update SecurityConfig (in the constructor/validators for
the hard_deny_action_types and auto_approve_action_types inputs) to validate
each string is either a known ActionType enum member or matches the required
"category:action" pattern, rejecting invalid entries with a clear ValueError;
alternatively, if category shortcuts are supported, implement expansion logic
there (map "category" to all matching ActionType values) before storing into
PolicyValidator so PolicyValidator's fields (_hard_deny, _auto_approve) only
contain valid leaf action types.
In `@tests/unit/security/rules/test_engine.py`:
- Around line 291-316: The test name
test_exception_does_not_stop_subsequent_rules is misleading: the body creates
_ExplodingRule and second_rule, calls engine.evaluate(ctx), and asserts the
exploding rule produces DENY and prevents second_rule from being called; rename
the test to reflect that an exception short-circuits subsequent rules (e.g.,
test_exception_stops_subsequent_rules or
test_exception_short_circuits_subsequent_rules) so the name matches the
assertions involving _ExplodingRule, second_rule.called, engine.evaluate, and
SecurityVerdictType.DENY.
In `@tests/unit/security/test_action_type_mapping.py`:
- Around line 22-23: Rename the test function
test_mapping_has_exactly_12_entries to a name that doesn't hardcode a numeric
count (e.g., test_mapping_length_matches_toolcategory or
test_default_category_action_map_matches_toolcategory) and update its definition
accordingly; the body should remain the same (assert
len(DEFAULT_CATEGORY_ACTION_MAP) == len(ToolCategory)) so it clearly reflects
that DEFAULT_CATEGORY_ACTION_MAP and ToolCategory are being compared.
In `@tests/unit/security/test_output_scanner.py`:
- Around line 42-80: Update the tests to remove branded vendor/card-network
names and use neutral placeholders: in the parameterized
test_credential_detected change labels and sample texts like "AWS access key",
"GitHub PAT", "Bearer token", and any provider-specific strings to generic
equivalents (e.g., "cloud-provider access key", "git-token", "bearer token",
"generic-secret") and adjust assertions that check result.findings in
test_aws_key_in_findings to look for the neutral label (or assert based on
detection class via _scanner rather than exact branded text). Locate and update
the strings in test_credential_detected, test_aws_key_in_findings, and the other
similar tests referenced (the later parameterized block around the other test
cases) so all sample inputs, labels, and expected finding keys use neutral names
like test-provider, test-token, test-secret.
---
Outside diff comments:
In `@src/ai_company/tools/_git_base.py`:
- Around line 227-236: The logger currently emits the raw untrusted `value` when
rejecting git args (see the GIT_REF_INJECTION_BLOCKED log call in the branch
that checks value.startswith("-")), which can leak control characters; update
that log call to log a sanitized representation (e.g., use repr(value) or the
same sanitize_control_chars(value) used elsewhere) instead of the raw `value`,
and ensure the ToolExecutionResult return remains unchanged.
---
Duplicate comments:
In @.gitleaks.toml:
- Around line 4-8: The current .gitleaks.toml allowlist broadly exempts entire
test files (entries referencing
tests/unit/security/rules/test_credential_detector.py and
tests/unit/security/test_output_scanner.py); replace these file-level exemptions
with targeted allowlists: remove the path entries under [allowlist] and instead
annotate the specific test fixtures or strings with inline markers (e.g.,
gitleaks:allow or gitleaks:allow=<rule> comments), add rule-scoped allowlist
entries in .gitleaks.toml that reference exact secret fingerprints, or add
.gitleaksignore entries that contain the precise fingerprints of the test
secrets so only the intended test fixtures are exempted.
In `@DESIGN_SPEC.md`:
- Around line 3037-3053: The DESIGN_SPEC security tree is out-of-sync with the
merged implementation: add entries for credential_detector.py and
path_traversal_detector.py under security/rules, update the description of
data_leak_detector.py (or rename its role) to reflect credential detection now
living in credential_detector.py, and ensure the top-level security/ listing
(files like action_type_mapping.py, service.py) matches the actual module set;
specifically, update the security/ and security/rules/ file list and the inline
descriptions for data_leak_detector.py, credential_detector.py, and
path_traversal_detector.py so the spec mirrors the current codebase.
In `@src/ai_company/core/enums.py`:
- Around line 371-407: SecurityContext.action_type was changed to use the new
ActionType taxonomy (ActionType enum) but legacy plain-string values (e.g.,
"budget_spend", "architecture_change") will still be persisted and must be
normalized on read; add a normalization layer that maps legacy strings to the
corresponding ActionType member (e.g., "budget_spend" ->
ActionType.BUDGET_SPEND, "architecture_change" -> ActionType.ARCH_DECIDE) and
use it wherever SecurityContext.action_type is loaded/validated
(deserialize/constructor/from_dict/parsing functions) so old values are
converted to the new enum and policy/risk branches continue to match. Ensure the
mapper is centralized and referenced by SecurityContext and any code paths that
accept raw action_type strings.
In `@src/ai_company/engine/agent_engine.py`:
- Around line 703-709: The SecOpsService is being instantiated with a new
in-memory AuditLog() each run inside _make_tool_invoker(), which resets history
and total_recorded; instead, create and reuse a single AuditLog instance (e.g.,
self._audit_log) on the engine object and pass that shared instance into
SecOpsService(config=cfg, rule_engine=rule_engine, audit_log=self._audit_log,
...), ensuring _audit_log is initialized once (lazy-init in constructor or first
call) so the audit trail is preserved across runs.
- Around line 672-677: Currently the code returns None when
self._security_config is None (logging SECURITY_DISABLED), which allows tool
execution to bypass security; change this to fail closed by either raising a
clear exception (e.g., RuntimeError or a custom SecurityConfigMissingError) or
by instantiating a strict-deny SecurityInterceptor that blocks all tool
execution. Update the logic in the method that checks self._security_config (the
block that currently calls logger.warning with SECURITY_DISABLED) so it does not
return None—instead raise the error or return the deny-all interceptor and
ensure callers handle that contract accordingly.
In `@src/ai_company/security/action_types.py`:
- Around line 54-63: The current integrity checks use assert (variables:
_extracted_categories, _enum_categories, _missing_in_enum, _missing_in_map)
which are removed under python -O; replace those assert statements with explicit
runtime checks that raise RuntimeError with the same descriptive messages when
_missing_in_enum or _missing_in_map are non-empty, referencing
ActionTypeCategory, ActionType and _CATEGORY_MAP in the error text so category
drift is always detected at import time.
In `@src/ai_company/security/audit.py`:
- Around line 105-107: In query(), log the invalid limit at WARNING level before
raising: call logging.getLogger(__name__).warning(...) (or the module's existing
logger) with a message like "invalid limit in query(): %s" and include the limit
value, then raise the same ValueError; this mirrors __init__'s behavior and
ensures the error path is logged with context.
In `@src/ai_company/security/rules/protocol.py`:
- Around line 3-6: The Protocol uses SecurityContext and SecurityVerdict only
under TYPE_CHECKING which causes runtime NameError under Python 3.14; move the
imports for SecurityContext and SecurityVerdict out of the TYPE_CHECKING block
so they are available at runtime (or alternatively enable postponed evaluation
with from __future__ import annotations), ensuring the public protocol decorated
with runtime_checkable can reference SecurityContext and SecurityVerdict without
raising NameError.
In `@src/ai_company/security/service.py`:
- Around line 136-150: The current blanket except around
self._rule_engine.evaluate hides non-recoverable errors (e.g., MemoryError,
RecursionError) by turning them into DENY SecurityVerdicts; change the except
clause to first re-raise non-recoverable exceptions (MemoryError,
RecursionError, and other fatal exceptions your code treats as fatal) and only
then handle the remaining exceptions by logging and returning a fail-closed
SecurityVerdict (constructed via SecurityVerdict and SecurityVerdictType.DENY
with the same fields). Keep the logging call (SECURITY_INTERCEPTOR_ERROR,
tool_name=context.tool_name, note=...) and the fallback verdict creation
unchanged for recoverable exceptions, but do not swallow or convert fatal
exceptions thrown by self._rule_engine.evaluate.
In `@src/ai_company/tools/invoker.py`:
- Around line 335-345: The SecurityContext is being built before entering the
guarded fail-closed path so _build_security_context(tool_or_error, tool_call)
can raise and escape invoke(); move the call to _build_security_context into the
same guarded path where you evaluate self._security_interceptor (i.e., only
build it when _security_interceptor is not None and inside the
try/except/fail-closed handling used for _check_security), then pass that
created instance into the subsequent await self._check_security(...) and later
reuse it for the output scan; ensure you no longer construct security_context
outside the guarded block so all construction errors produce a blocked
ToolResult via the existing fail-closed logic.
- Around line 256-294: The code currently returns early on error results,
allowing error text to skip security scanning and also lets outputs with
has_sensitive_data True and redacted_content None pass through; change the flow
so you always call self._security_interceptor.scan_output(context,
result.content) regardless of result.is_error, keep the existing exception
handling around scan_output, and after scanning treat any scan_result where
has_sensitive_data is True but redacted_content is None as a fail-closed
condition: log the incident (similar to
SECURITY_OUTPUT_SCAN_ERROR/TOOL_OUTPUT_REDACTED) and return a
ToolExecutionResult that is is_error=True with content like "Output scan failed
(fail-closed). Tool output withheld." and metadata indicating
output_scan_failed/output_redacted as appropriate; preserve propagation of
MemoryError/RecursionError and reuse result.metadata when returning redacted
content via scan_result.redacted_content.
In `@tests/unit/security/rules/test_credential_detector.py`:
- Around line 33-84: Replace vendor-branded test labels and fixture values in
the pytest.mark.parametrize block (the tuples like "AWS access key", "AWS secret
key", "GitHub PAT", etc.) with provider-neutral names and example values (e.g.,
"cloud provider access key" -> "test-provider-access-key", "service token" ->
"test-service-token", "personal access token" -> "test-provider-pat",
SSH/private key cases can stay generic). Update any expectations/assertion
strings that reference vendor names (the reason or message checks tied to these
param cases) to use the same neutral wording. Do the same replacement for the
second parametrized block referenced in the comment (the similar tuples around
the later section) so all fixtures and expected messages in
test_credential_detector.py are vendor-neutral and consistent with the repo
guideline.
In `@tests/unit/security/rules/test_risk_classifier.py`:
- Around line 186-200: The test test_all_action_types_are_mapped is ineffective
because RiskClassifier.classify() returns ApprovalRiskLevel.HIGH for unmapped
types; change the test to assert that each ActionType is present in the
classifier's backing mapping rather than relying on classify(). Specifically,
instantiate RiskClassifier() and check membership of each action_type in its
internal map (e.g., classifier._risk_map or the actual private map attribute
used by RiskClassifier), then assert that classifier._risk_map[action_type] is a
member of ApprovalRiskLevel to ensure an explicit mapping exists.
In `@tests/unit/security/rules/test_utils.py`:
- Around line 79-87: The test currently only checks isinstance(result, list)
which doesn't verify depth behavior; update test_stops_at_max_depth to assert
that walk_string_values returns an empty list for the nested structure (since
the leaf is beyond the 20-level limit) and additionally assert that the LOGGER
(or the component that records messages when walk_string_values hits the limit)
logged the SECURITY_SCAN_DEPTH_EXCEEDED message; locate the test and modify
assertions to check result == [] and to assert the logger was called with
SECURITY_SCAN_DEPTH_EXCEEDED when invoking walk_string_values.
In `@tests/unit/security/test_audit.py`:
- Around line 285-290: Add a test that asserts AuditLog.query raises ValueError
for negative limits: extend tests/unit/security/test_audit.py by adding either a
new test (e.g., test_query_negative_limit_raises_value_error) or parametrize the
existing test_query_limit_zero_raises_value_error to include negative values
(e.g., -1, -10), call AuditLog.query(limit=<negative>) and expect
pytest.raises(ValueError, match="limit must be >= 1"); reference AuditLog.query
and the existing test method to locate the right place.
In `@tests/unit/security/test_service.py`:
- Around line 331-343: The test uses vendor-specific strings ("AWS access key"
and example key shapes) in the mocked OutputScanResult and sample output; update
the fixtures in the test cases that create OutputScanResult (e.g., the
finding_result variable) and the sample input passed to service.scan_output to
use vendor-neutral identifiers like "test-provider access key" or "test-provider
credential" and generic sample output (e.g., "sample output"); do the same for
the other occurrence mentioned (the second test around the other
OutputScanResult at lines 382–389) so that assertions (result.findings,
redacted_content) and the mocked scan call
(service._test_output_scanner.scan.assert_called_once_with) all reference the
neutral strings instead of "AWS".
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d1ceb8b6-0f70-4b7d-a508-ffce62b93b92
📒 Files selected for processing (61)
.gitleaks.tomlCLAUDE.mdDESIGN_SPEC.mdREADME.mdsrc/ai_company/config/defaults.pysrc/ai_company/config/schema.pysrc/ai_company/core/enums.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/observability/events/security.pysrc/ai_company/observability/events/tool.pysrc/ai_company/security/__init__.pysrc/ai_company/security/action_type_mapping.pysrc/ai_company/security/action_types.pysrc/ai_company/security/audit.pysrc/ai_company/security/config.pysrc/ai_company/security/models.pysrc/ai_company/security/output_scanner.pysrc/ai_company/security/protocol.pysrc/ai_company/security/rules/__init__.pysrc/ai_company/security/rules/_utils.pysrc/ai_company/security/rules/credential_detector.pysrc/ai_company/security/rules/data_leak_detector.pysrc/ai_company/security/rules/destructive_op_detector.pysrc/ai_company/security/rules/engine.pysrc/ai_company/security/rules/path_traversal_detector.pysrc/ai_company/security/rules/policy_validator.pysrc/ai_company/security/rules/protocol.pysrc/ai_company/security/rules/risk_classifier.pysrc/ai_company/security/service.pysrc/ai_company/tools/_git_base.pysrc/ai_company/tools/base.pysrc/ai_company/tools/file_system/_base_fs_tool.pysrc/ai_company/tools/file_system/delete_file.pysrc/ai_company/tools/file_system/edit_file.pysrc/ai_company/tools/file_system/list_directory.pysrc/ai_company/tools/file_system/read_file.pysrc/ai_company/tools/file_system/write_file.pysrc/ai_company/tools/git_tools.pysrc/ai_company/tools/invoker.pytests/unit/config/conftest.pytests/unit/core/test_enums.pytests/unit/observability/test_events.pytests/unit/security/__init__.pytests/unit/security/rules/__init__.pytests/unit/security/rules/test_credential_detector.pytests/unit/security/rules/test_data_leak_detector.pytests/unit/security/rules/test_destructive_op_detector.pytests/unit/security/rules/test_engine.pytests/unit/security/rules/test_path_traversal_detector.pytests/unit/security/rules/test_policy_validator.pytests/unit/security/rules/test_risk_classifier.pytests/unit/security/rules/test_utils.pytests/unit/security/test_action_type_mapping.pytests/unit/security/test_action_types.pytests/unit/security/test_audit.pytests/unit/security/test_config.pytests/unit/security/test_models.pytests/unit/security/test_output_scanner.pytests/unit/security/test_service.pytests/unit/tools/test_base_action_type.pytests/unit/tools/test_invoker_security.py
| policy_validator = PolicyValidator( | ||
| hard_deny_action_types=frozenset(cfg.hard_deny_action_types), | ||
| auto_approve_action_types=frozenset(cfg.auto_approve_action_types), | ||
| ) | ||
| # Build the detector list respecting config flags. | ||
| detectors: list[SecurityRule] = [policy_validator] | ||
| if re_cfg.credential_patterns_enabled: | ||
| detectors.append(CredentialDetector()) | ||
| if re_cfg.path_traversal_detection_enabled: | ||
| detectors.append(PathTraversalDetector()) | ||
| if re_cfg.destructive_op_detection_enabled: | ||
| detectors.append(DestructiveOpDetector()) | ||
| if re_cfg.data_leak_detection_enabled: | ||
| detectors.append(DataLeakDetector()) | ||
|
|
There was a problem hiding this comment.
custom_policies are accepted in config but never enforced.
This factory only installs PolicyValidator plus the built-in detectors; nothing turns SecurityConfig.custom_policies into executable SecurityRules. As written, user-supplied policies are silently ignored at runtime, which misses one of the PR’s stated acceptance criteria.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/agent_engine.py` around lines 683 - 697, The config's
custom_policies are never converted into SecurityRule instances and thus
ignored; update the detector construction in the agent_engine (where
PolicyValidator is created and the detectors list is built) to iterate over
cfg.security.custom_policies (or SecurityConfig.custom_policies) and
instantiate/validate each into a SecurityRule (or wrap them in a
CallableRule/CustomPolicyRule) before appending to detectors, ensuring they are
type-checked and ordered (e.g., appended after PolicyValidator) and that any
invalid custom policy raises or logs an error; keep the existing re_cfg feature
flags but ensure custom policies are honored by adding them to the detectors
list used for enforcement.
| if action_type.count(":") != 1: | ||
| msg = ( | ||
| f"Action type {action_type!r} must use 'category:action' " | ||
| "format (exactly one ':')" | ||
| ) | ||
| raise ValueError(msg) | ||
| category, action = action_type.split(":") | ||
| if not category or not action: | ||
| msg = ( | ||
| f"Action type {action_type!r} must have non-empty " | ||
| "category and action parts" | ||
| ) | ||
| raise ValueError(msg) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Log invalid action types before raising.
get_category() validates caller-provided strings but raises silently on malformed values. This module already logs invalid action types elsewhere, and this path should do the same.
As per coding guidelines, "All error paths must log at WARNING or ERROR with context before raising."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/action_types.py` around lines 152 - 164, get_category
currently raises ValueError for malformed action_type without logging; add
logging calls before each raise to record the invalid action_type and context.
Specifically, in the get_category function (the block that checks if
action_type.count(":") != 1 and the block that checks if not category or not
action), call the module logger at WARNING or ERROR (e.g., logger.warning(...))
and include the offending action_type and a short message before raising the
ValueError so the invalid value and reason are recorded in logs. Ensure the log
messages mirror the ValueError text and use the existing module-level logger
variable.
| self._entries.append(entry) | ||
| self._total_recorded += 1 | ||
| logger.debug( | ||
| SECURITY_AUDIT_RECORDED, | ||
| audit_id=entry.id, | ||
| tool_name=entry.tool_name, | ||
| verdict=entry.verdict, | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Emit audit append events at INFO.
Recording an entry mutates audit-log state, but this path is only logged at DEBUG. That makes the primary state transition disappear at normal production log levels.
As per coding guidelines, "All state transitions must log at INFO level."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/audit.py` around lines 62 - 69, The audit append
currently logs the state change at DEBUG (logger.debug with
SECURITY_AUDIT_RECORDED) so the mutation of self._entries and
self._total_recorded is invisible at normal prod levels; change the call to
logger.info (preserving SECURITY_AUDIT_RECORDED and the structured fields
audit_id=entry.id, tool_name=entry.tool_name, verdict=entry.verdict) so the
append/state transition is emitted at INFO when appending to _entries and
incrementing _total_recorded.
| name: NotBlankStr | ||
| description: str = "" | ||
| action_types: tuple[str, ...] = () | ||
| verdict: SecurityVerdictType = SecurityVerdictType.DENY | ||
| risk_level: ApprovalRiskLevel = ApprovalRiskLevel.MEDIUM | ||
| enabled: bool = True | ||
|
|
||
| @model_validator(mode="after") | ||
| def _check_action_type_format(self) -> SecurityPolicyRule: | ||
| """Validate that action_types entries use ``category:action`` format.""" | ||
| for at in self.action_types: | ||
| if ":" not in at: | ||
| msg = ( | ||
| f"action_type {at!r} in policy {self.name!r} must use " | ||
| "'category:action' format" | ||
| ) | ||
| raise ValueError(msg) | ||
| return self |
There was a problem hiding this comment.
Strengthen action-type validation at the config boundary.
These fields still accept plain strings with either no validation or only a ":" check, so malformed identifiers like " deploy:production " or "code:" can be loaded and then silently miss policy matches at runtime. Use a shared constrained action-type type here instead of raw str tuples.
As per coding guidelines, "Use NotBlankStr from core.types for all identifier/name fields — including optional (NotBlankStr | None) and tuple (tuple[NotBlankStr, ...]) variants — instead of manual whitespace validators".
Also applies to: 89-112
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/config.py` around lines 28 - 45, The action_types
field and its validator are too permissive; replace the raw tuple[str, ...] with
the shared constrained type tuple[NotBlankStr, ...] (import NotBlankStr from
core.types if not already present) and remove or tighten the manual
_check_action_type_format to rely on NotBlankStr (or validate each NotBlankStr
still contains a single ":" if needed). Update any other similar fields in the
same file (the block referenced at lines 89–112) to use NotBlankStr or
tuple[NotBlankStr, ...] instead of raw str so identifier/name fields follow the
NotBlankStr convention.
| if _depth >= _MAX_RECURSION_DEPTH: | ||
| logger.warning( | ||
| SECURITY_SCAN_DEPTH_EXCEEDED, | ||
| depth=_depth, | ||
| max_depth=_MAX_RECURSION_DEPTH, | ||
| ) | ||
| return |
There was a problem hiding this comment.
Consider fail-closed behavior when depth limit is exceeded.
When recursion depth hits 20, the function logs a warning and returns silently—meaning values beyond that depth are never scanned. A malicious payload could hide credentials or destructive commands at depth 21+ to bypass detection.
Since this is security-critical code, consider raising an exception instead of silently returning, which would fail-closed and force the caller to handle the edge case explicitly.
🔧 Potential fail-closed approach
if _depth >= _MAX_RECURSION_DEPTH:
logger.warning(
SECURITY_SCAN_DEPTH_EXCEEDED,
depth=_depth,
max_depth=_MAX_RECURSION_DEPTH,
)
- return
+ msg = f"Recursion depth {_depth} exceeds max {_MAX_RECURSION_DEPTH}"
+ raise RecursionError(msg)Note: The previous review comment about list recursion not working is inaccurate—the code at lines 43-45 correctly recurses into nested lists via _walk_value.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if _depth >= _MAX_RECURSION_DEPTH: | |
| logger.warning( | |
| SECURITY_SCAN_DEPTH_EXCEEDED, | |
| depth=_depth, | |
| max_depth=_MAX_RECURSION_DEPTH, | |
| ) | |
| return | |
| if _depth >= _MAX_RECURSION_DEPTH: | |
| logger.warning( | |
| SECURITY_SCAN_DEPTH_EXCEEDED, | |
| depth=_depth, | |
| max_depth=_MAX_RECURSION_DEPTH, | |
| ) | |
| msg = f"Recursion depth {_depth} exceeds max {_MAX_RECURSION_DEPTH}" | |
| raise RecursionError(msg) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/rules/_utils.py` around lines 26 - 32, The current
behavior in the depth check (when _depth >= _MAX_RECURSION_DEPTH) logs a warning
via logger.warning(SECURITY_SCAN_DEPTH_EXCEEDED, ...) and returns silently,
allowing unscanned deep payloads; change this to fail-closed by raising an
exception (e.g., a new SecurityScanDepthExceededError or a RuntimeError) instead
of returning, include the _depth and _MAX_RECURSION_DEPTH in the exception
message, keep or augment the existing logger.warning call for observability, and
ensure callers of the function (places that call the function that contains
_walk_value or the depth check) are updated to catch and handle this exception
appropriately.
| def __init__( | ||
| self, | ||
| *, | ||
| rules: tuple[SecurityRule, ...], | ||
| risk_classifier: RiskClassifier, | ||
| config: RuleEngineConfig, | ||
| ) -> None: | ||
| """Initialize the rule engine. | ||
|
|
||
| Args: | ||
| rules: Ordered tuple of rules to evaluate. | ||
| risk_classifier: Fallback risk classifier. | ||
| config: Rule engine configuration. | ||
| """ | ||
| self._rules = rules | ||
| self._risk_classifier = risk_classifier | ||
| self._config = config |
There was a problem hiding this comment.
max_argument_length is dead config on the hot path.
Line 72 stores RuleEngineConfig, but evaluate() never consults it. The detectors added in this PR all scan walk_string_values(context.arguments) directly, so a single oversized string still gets regex-scanned unbounded by every rule before tool execution. Please apply the configured cap centrally here (or in the shared walker) and split this method while doing it; evaluate() is already well past the 50-line limit.
As per coding guidelines, "src/**/*.py: Limit functions to less than 50 lines and files to less than 800 lines" and "Validate at system boundaries (user input, external APIs, config files)".
Also applies to: 74-151
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/rules/engine.py` around lines 56 - 72, The RuleEngine
__init__ stores RuleEngineConfig but the evaluate() path never uses the
configured max_argument_length, so long argument strings get scanned by every
rule; update the constructor to extract and store the max_argument_length (from
RuleEngineConfig) and enforce it centrally before any rule scanning (either by
truncating or splitting results from walk_string_values(context.arguments) based
on that cap), refactor evaluate() by moving argument-walking and
length-enforcement into a new helper (e.g., _iter_capped_argument_strings or
_prepare_arguments_for_evaluation) to keep evaluate() under 50 lines, and ensure
all rule detectors consume the capped iterator instead of raw walk_string_values
so the cap is applied uniformly.
| def __init__( | ||
| self, | ||
| *, | ||
| hard_deny_action_types: frozenset[str], | ||
| auto_approve_action_types: frozenset[str], | ||
| ) -> None: | ||
| """Initialize with deny and approve lists. | ||
|
|
||
| Args: | ||
| hard_deny_action_types: Action types that are always denied. | ||
| auto_approve_action_types: Action types that are always approved. | ||
| """ | ||
| self._hard_deny = hard_deny_action_types | ||
| self._auto_approve = auto_approve_action_types |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "PolicyValidator construction sites:"
rg -n -C3 '\bPolicyValidator\s*\('
echo
echo "Definitions/usages of the action-type sets:"
rg -n -C3 '\b(auto_approve_action_types|hard_deny_action_types)\b'
echo
echo "Potential category-to-leaf expansion helpers:"
rg -n -C3 'ActionTypeCategory|expand.*action|all_action_types|category shortcut'Repository: Aureliolo/ai-company
Length of output: 22075
Add format validation for action-type sets in SecurityConfig.
SecurityConfig accepts user-provided hard_deny_action_types and auto_approve_action_types but only validates that they don't overlap. If a user mistakenly passes category shortcuts (e.g., "code") instead of leaf values (e.g., "code:read"), those entries will silently flow through to PolicyValidator and never match any actions, breaking the intended policy enforcement.
Add a validator in SecurityConfig to reject invalid action type strings—either by checking membership in ActionType enum or by enforcing the category:action format. If category shortcuts are intended as a feature (mentioned in the design spec), implement the expansion logic here rather than deferring to PolicyValidator.
This applies to lines 37–50 and 57–98 (the constructor and usage sites).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/security/rules/policy_validator.py` around lines 37 - 50,
SecurityConfig currently accepts hard_deny_action_types and
auto_approve_action_types verbatim and that allows category shortcuts like
"code" to slip through; update SecurityConfig (in the constructor/validators for
the hard_deny_action_types and auto_approve_action_types inputs) to validate
each string is either a known ActionType enum member or matches the required
"category:action" pattern, rejecting invalid entries with a clear ValueError;
alternatively, if category shortcuts are supported, implement expansion logic
there (map "category" to all matching ActionType values) before storing into
PolicyValidator so PolicyValidator's fields (_hard_deny, _auto_approve) only
contain valid leaf action types.
| def test_exception_does_not_stop_subsequent_rules(self) -> None: | ||
| """A failing rule produces DENY, short-circuiting remaining rules.""" | ||
|
|
||
| class _ExplodingRule: | ||
| @property | ||
| def name(self) -> str: | ||
| return "exploding" | ||
|
|
||
| def evaluate( | ||
| self, | ||
| context: SecurityContext, | ||
| ) -> SecurityVerdict | None: | ||
| msg = "kaboom" | ||
| raise RuntimeError(msg) | ||
|
|
||
| second_rule = _StubRule("second", verdict=None) | ||
| engine = _make_engine( | ||
| rules=(_ExplodingRule(), second_rule), # type: ignore[arg-type] | ||
| ) | ||
| ctx = _make_context() | ||
|
|
||
| verdict = engine.evaluate(ctx) | ||
|
|
||
| # The failing rule returns DENY which short-circuits. | ||
| assert verdict.verdict == SecurityVerdictType.DENY | ||
| assert not second_rule.called |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Rename this fail-closed test to match what it asserts.
The body verifies that the exploding rule does stop later rules (second_rule.called stays false), so the current name sends future readers in the opposite direction.
📝 Suggested rename
- def test_exception_does_not_stop_subsequent_rules(self) -> None:
+ def test_exception_short_circuits_subsequent_rules(self) -> None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/security/rules/test_engine.py` around lines 291 - 316, The test
name test_exception_does_not_stop_subsequent_rules is misleading: the body
creates _ExplodingRule and second_rule, calls engine.evaluate(ctx), and asserts
the exploding rule produces DENY and prevents second_rule from being called;
rename the test to reflect that an exception short-circuits subsequent rules
(e.g., test_exception_stops_subsequent_rules or
test_exception_short_circuits_subsequent_rules) so the name matches the
assertions involving _ExplodingRule, second_rule.called, engine.evaluate, and
SecurityVerdictType.DENY.
| def test_mapping_has_exactly_12_entries(self) -> None: | ||
| assert len(DEFAULT_CATEGORY_ACTION_MAP) == len(ToolCategory) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider renaming to avoid hardcoded count in test name.
The test name test_mapping_has_exactly_12_entries embeds a specific count, but the assertion dynamically compares against len(ToolCategory). If ToolCategory grows or shrinks, the test passes but the name becomes misleading.
📝 Suggested rename
- def test_mapping_has_exactly_12_entries(self) -> None:
+ def test_mapping_has_entry_for_each_category(self) -> None:
assert len(DEFAULT_CATEGORY_ACTION_MAP) == len(ToolCategory)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/security/test_action_type_mapping.py` around lines 22 - 23, Rename
the test function test_mapping_has_exactly_12_entries to a name that doesn't
hardcode a numeric count (e.g., test_mapping_length_matches_toolcategory or
test_default_category_action_map_matches_toolcategory) and update its definition
accordingly; the body should remain the same (assert
len(DEFAULT_CATEGORY_ACTION_MAP) == len(ToolCategory)) so it clearly reflects
that DEFAULT_CATEGORY_ACTION_MAP and ToolCategory are being compared.
| @pytest.mark.parametrize( | ||
| ("label", "text"), | ||
| [ | ||
| ( | ||
| "AWS access key", | ||
| "Found key: AKIAIOSFODNN7EXAMPLE in config", | ||
| ), | ||
| ( | ||
| "SSH private key", | ||
| "-----BEGIN RSA PRIVATE KEY-----\nMIIEow...", | ||
| ), | ||
| ( | ||
| "SSH private key (OPENSSH)", | ||
| "-----BEGIN OPENSSH PRIVATE KEY-----\nb3Blbn...", | ||
| ), | ||
| ( | ||
| "Bearer token", | ||
| "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.long", | ||
| ), | ||
| ( | ||
| "GitHub PAT", | ||
| "Token: ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij", | ||
| ), | ||
| ( | ||
| "Generic secret value", | ||
| "SECRET=my_super_secret_value_1234", | ||
| ), | ||
| ], | ||
| ) | ||
| def test_credential_detected(self, label: str, text: str) -> None: | ||
| result = _scanner().scan(text) | ||
|
|
||
| assert result.has_sensitive_data is True | ||
| assert len(result.findings) >= 1 | ||
|
|
||
| def test_aws_key_in_findings(self) -> None: | ||
| result = _scanner().scan("AKIAIOSFODNN7EXAMPLE is the key") | ||
|
|
||
| assert "AWS access key" in result.findings |
There was a problem hiding this comment.
Replace branded scanner fixtures with neutral equivalents.
These tests currently encode branded provider/card-network names in labels, sample strings, and expected findings. That conflicts with the repo rule and unnecessarily couples the assertions to branded wording instead of detection/redaction behavior.
As per coding guidelines, "Never use real vendor names in test code; use generic test provider names (test-provider, test-small-001, etc.)."
Also applies to: 106-130
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/security/test_output_scanner.py` around lines 42 - 80, Update the
tests to remove branded vendor/card-network names and use neutral placeholders:
in the parameterized test_credential_detected change labels and sample texts
like "AWS access key", "GitHub PAT", "Bearer token", and any provider-specific
strings to generic equivalents (e.g., "cloud-provider access key", "git-token",
"bearer token", "generic-secret") and adjust assertions that check
result.findings in test_aws_key_in_findings to look for the neutral label (or
assert based on detection class via _scanner rather than exact branded text).
Locate and update the strings in test_credential_detected,
test_aws_key_in_findings, and the other similar tests referenced (the later
parameterized block around the other test cases) so all sample inputs, labels,
and expected finding keys use neutral names like test-provider, test-token,
test-secret.
- _walk_value: add depth guard for list-of-lists recursion - service.py: use SECURITY_CONFIG_LOADED for init-time warning - service.py: re-raise MemoryError/RecursionError in rule engine - service.py: replace "output_scan" magic string with OUTPUT_SCAN_VERDICT - path_traversal_detector: remove redundant pattern (covered by pattern 1) - action_types.py: replace assert with RuntimeError (survives -O) - audit.py: log warning before raising on invalid query limit - destructive_op_detector: collect all matches per string (not first-hit) - base.py: stricter action_type validation (exactly one colon, non-empty) - base.py: fail-closed on unknown ToolCategory (no silent fallback) - invoker.py: build SecurityContext inside fail-closed handler - invoker.py: remove is_error skip in output scanning - invoker.py: fail-closed when sensitive data found but no redaction - agent_engine.py: share AuditLog across runs (not per-run) - DESIGN_SPEC.md: update security tree with credential/path detectors - tests: update for all runtime behavior changes
| if config.custom_policies: | ||
| logger.warning( | ||
| SECURITY_CONFIG_LOADED, | ||
| note=( | ||
| "custom_policies configured but not yet " | ||
| "evaluated — enforcement is not implemented" | ||
| ), | ||
| policy_count=len(config.custom_policies), | ||
| ) |
There was a problem hiding this comment.
custom_policies configured but never evaluated — silent security gap
SecurityConfig.custom_policies accepts user-defined SecurityPolicyRule entries, the SecurityConfig model validator, the docstring, and the DESIGN_SPEC all suggest these rules will influence enforcement decisions. In practice, however, neither PolicyValidator nor any other rule in the pipeline receives or evaluates them:
_make_security_interceptorinagent_engine.py(line 684) constructsPolicyValidatorwith onlyhard_deny_action_typesandauto_approve_action_types—cfg.custom_policiesis never passed anywhere.SecOpsService.__init__logs aWARNINGat construction time but takes no further action.
The consequence is that a user who adds, for example:
SecurityPolicyRule(
name="block-prod-write",
action_types=("infra:write",),
verdict=SecurityVerdictType.DENY,
)…will receive the warning log entry and believe the rule is active, while every matching tool call is silently allowed through all subsequent detectors. This is a false sense of security rather than a "not yet implemented" stub — it is a publicly-visible API that accepts policy configuration and then ignores it at runtime.
Until custom_policies evaluation is implemented, the custom_policies field on SecurityConfig should either be removed from the public API (to avoid misleading callers) or the current gap should be surfaced more prominently than a single log warning — for example, raising a NotImplementedError or ValueError at construction time so callers cannot accidentally rely on it.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/security/service.py
Line: 96-104
Comment:
**`custom_policies` configured but never evaluated — silent security gap**
`SecurityConfig.custom_policies` accepts user-defined `SecurityPolicyRule` entries, the `SecurityConfig` model validator, the docstring, and the `DESIGN_SPEC` all suggest these rules will influence enforcement decisions. In practice, however, neither `PolicyValidator` nor any other rule in the pipeline receives or evaluates them:
- `_make_security_interceptor` in `agent_engine.py` (line 684) constructs `PolicyValidator` with only `hard_deny_action_types` and `auto_approve_action_types` — `cfg.custom_policies` is never passed anywhere.
- `SecOpsService.__init__` logs a `WARNING` at construction time but takes no further action.
The consequence is that a user who adds, for example:
```python
SecurityPolicyRule(
name="block-prod-write",
action_types=("infra:write",),
verdict=SecurityVerdictType.DENY,
)
```
…will receive the warning log entry and believe the rule is active, while every matching tool call is silently allowed through all subsequent detectors. This is a false sense of security rather than a "not yet implemented" stub — it is a publicly-visible API that accepts policy configuration and then ignores it at runtime.
Until `custom_policies` evaluation is implemented, the `custom_policies` field on `SecurityConfig` should either be removed from the public API (to avoid misleading callers) or the current gap should be surfaced more prominently than a single log warning — for example, raising a `NotImplementedError` or `ValueError` at construction time so callers cannot accidentally rely on it.
How can I resolve this? If you propose a fix, please make it concise.| def scan(self, output: str) -> OutputScanResult: | ||
| """Scan output text for sensitive patterns. | ||
|
|
||
| Detection runs on the original output. Redaction builds | ||
| a separate redacted copy by applying substitutions in order. | ||
|
|
||
| Args: | ||
| output: The tool's output string. | ||
|
|
||
| Returns: | ||
| An ``OutputScanResult`` with findings and optional | ||
| redacted content. | ||
| """ | ||
| logger.debug( | ||
| SECURITY_OUTPUT_SCAN_START, | ||
| output_length=len(output), | ||
| ) | ||
| findings: list[str] = [] | ||
| redacted = output | ||
|
|
||
| for pattern_name, pattern in _OUTPUT_PATTERNS: | ||
| if pattern.search(output): | ||
| findings.append(pattern_name) | ||
| logger.warning( | ||
| SECURITY_OUTPUT_SCAN_FINDING, | ||
| finding=pattern_name, | ||
| ) | ||
| redacted = pattern.sub(_REDACTED, redacted) |
There was a problem hiding this comment.
No maximum output length guard — ReDoS risk on large tool outputs
The scan method applies all 9 patterns in _OUTPUT_PATTERNS to the full raw output string with no size limit. Unlike the argument-scanning path (which at least has the max_argument_length field in RuleEngineConfig), the output scanner has no configurable cap at all.
For a tool that returns a multi-MB response (e.g., a read_file on a large log), each pattern runs against the entire string. Several of the borrowed credential patterns — e.g., the "Generic API key/token/secret" pattern ((?:api[_-]?key|...)\s*[=:]\s*['\"]?[A-Za-z0-9_\-/.+=]{16,}['\"]?) — are susceptible to catastrophic backtracking on crafted input.
Consider truncating the output to a configurable maximum length before scanning:
_MAX_OUTPUT_SCAN_BYTES: Final[int] = 256_000 # 256 KB
def scan(self, output: str) -> OutputScanResult:
scan_target = output[:_MAX_OUTPUT_SCAN_BYTES]
...The truncated value should be used for pattern matching only; the full original output should still be returned (or redacted based on the match). Exposing the limit via RuleEngineConfig (or a dedicated OutputScannerConfig) would let operators tune it.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/security/output_scanner.py
Line: 34-61
Comment:
**No maximum output length guard — ReDoS risk on large tool outputs**
The `scan` method applies all 9 patterns in `_OUTPUT_PATTERNS` to the full raw `output` string with no size limit. Unlike the argument-scanning path (which at least has the `max_argument_length` field in `RuleEngineConfig`), the output scanner has no configurable cap at all.
For a tool that returns a multi-MB response (e.g., a `read_file` on a large log), each pattern runs against the entire string. Several of the borrowed credential patterns — e.g., the "Generic API key/token/secret" pattern (`(?:api[_-]?key|...)\s*[=:]\s*['\"]?[A-Za-z0-9_\-/.+=]{16,}['\"]?`) — are susceptible to catastrophic backtracking on crafted input.
Consider truncating the output to a configurable maximum length before scanning:
```python
_MAX_OUTPUT_SCAN_BYTES: Final[int] = 256_000 # 256 KB
def scan(self, output: str) -> OutputScanResult:
scan_target = output[:_MAX_OUTPUT_SCAN_BYTES]
...
```
The truncated value should be used for pattern matching only; the full original output should still be returned (or redacted based on the match). Exposing the limit via `RuleEngineConfig` (or a dedicated `OutputScannerConfig`) would let operators tune it.
How can I resolve this? If you propose a fix, please make it concise.| findings = _check_sensitive_paths(context.arguments) | ||
| findings.extend(_check_pii(context.arguments)) |
There was a problem hiding this comment.
Double traversal of arguments — single pass would be more efficient
evaluate calls _check_sensitive_paths(context.arguments) and then _check_pii(context.arguments) as two independent functions, each of which iterates the full result of walk_string_values(arguments). For a deeply nested argument tree (up to the 20-level depth limit), this means every string value is visited twice and every pattern group is scanned separately per visit.
Combining the two scans into a single traversal avoids the duplicate cost:
findings: list[str] = []
for value in walk_string_values(context.arguments):
for label, pattern in _SENSITIVE_PATHS:
if pattern.search(value):
findings.append(f"sensitive path detected ({label})")
break
for name, pattern in PII_PATTERNS:
if pattern.search(value):
findings.append(name)This also produces the same result as the current code since sorted(set(findings)) de-duplicates before building the verdict reason string.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/security/rules/data_leak_detector.py
Line: 91-92
Comment:
**Double traversal of `arguments` — single pass would be more efficient**
`evaluate` calls `_check_sensitive_paths(context.arguments)` and then `_check_pii(context.arguments)` as two independent functions, each of which iterates the full result of `walk_string_values(arguments)`. For a deeply nested argument tree (up to the 20-level depth limit), this means every string value is visited twice and every pattern group is scanned separately per visit.
Combining the two scans into a single traversal avoids the duplicate cost:
```python
findings: list[str] = []
for value in walk_string_values(context.arguments):
for label, pattern in _SENSITIVE_PATHS:
if pattern.search(value):
findings.append(f"sensitive path detected ({label})")
break
for name, pattern in PII_PATTERNS:
if pattern.search(value):
findings.append(name)
```
This also produces the same result as the current code since `sorted(set(findings))` de-duplicates before building the verdict reason string.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Key design decisions
walk_string_values()with depth-limited recursion (max 20) shared across all detectorscopy.deepcopyfor arguments in SecurityContextPre-PR review coverage
Closes #40
Test plan