feat: implement communication foundation — message bus, dispatcher, and messenger#157
feat: implement communication foundation — message bus, dispatcher, and messenger#157
Conversation
…nd messenger (#8, #10) Add the runtime communication layer for multi-agent orchestration: - Error hierarchy (CommunicationError + 5 subclasses) with immutable context - MessageBus protocol (pull model) and InMemoryMessageBus implementation (asyncio Lock+Queue, FIFO, fan-out, BROADCAST, direct messaging, retention) - MessageHandler protocol, FunctionHandler adapter, HandlerRegistration model - MessageDispatcher with concurrent TaskGroup dispatch and error isolation - AgentMessenger per-agent facade (send, direct, broadcast, subscribe, handlers) - Subscription and DeliveryEnvelope models - MessageRetentionConfig added to MessageBusConfig - 20 observability event constants for communication domain - 114 unit tests (all passing), 96.46% coverage Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Pre-reviewed by 9 agents, 29 findings addressed: - Use @computed_field for DispatchResult.handlers_matched (derived value) - Add DESIGN_SPEC Section 5 references to module docstrings - Add MappingProxyType wrapping for _PRIORITY_ORDER dict - Use NotBlankStr for HandlerRegistration identifier fields - Add FunctionHandler async validation via inspect.iscoroutinefunction - Add agent_id/agent_name blank validation in AgentMessenger.__init__ - Improve logging: COMM_BUS_NOT_RUNNING, COMM_BUS_ALREADY_RUNNING events - Log COMM_HANDLER_DEREGISTER_MISS and COMM_DISPATCH_NO_DISPATCHER - Refactor send_direct into 3 focused methods - Add unsubscribe running guard for consistency - Fix docstrings: broadcast semantics, receive blocking, send_direct channel - Update DESIGN_SPEC §15.3 with 8 new communication files + events module - Add 7 new tests: protocol conformance, combined filters, edge cases - Polish: dict.setdefault, tuple(pair), simplified validation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a protocol-driven communication subsystem: a MessageBus protocol with an InMemoryMessageBus backend, channel/subscription and delivery models, MessageDispatcher with concurrent handler dispatch and DispatchResult, AgentMessenger facade, errors, observability event constants, retention config, and comprehensive unit tests. Changes
Sequence Diagram(s)sequenceDiagram
participant AgentA as Agent A
participant MessengerA as AgentMessenger(A)
participant MessageBus as InMemoryMessageBus
participant Queue as DeliveryQueue
participant MessengerB as AgentMessenger(B)
participant AgentB as Agent B
AgentA->>MessengerA: send_message(to="AgentB", channel="#eng", content)
MessengerA->>MessengerA: construct Message (id, ts, sender, channel, type, priority)
MessengerA->>MessageBus: publish(message)
MessageBus->>MessageBus: validate channel, append history
MessageBus->>Queue: enqueue DeliveryEnvelope for subscribers/recipient
MessageBus-->>MessengerA: ack
AgentB->>MessengerB: receive(channel="#eng")
Queue-->>MessengerB: deliver DeliveryEnvelope
MessengerB->>AgentB: deliver envelope.message
sequenceDiagram
participant Messenger as AgentMessenger
participant Dispatcher as MessageDispatcher
participant Handler1 as Handler 1
participant Handler2 as Handler 2
Messenger->>Dispatcher: register_handler(handler1, types, min_priority)
Dispatcher-->>Messenger: handler_id_1
Messenger->>Dispatcher: register_handler(handler2, ...)
Dispatcher-->>Messenger: handler_id_2
Messenger->>Dispatcher: dispatch_message(message)
Dispatcher->>Dispatcher: find matching registrations
par concurrent handlers
Dispatcher->>Handler1: handle(message)
Dispatcher->>Handler2: handle(message)
end
Handler1-->>Dispatcher: success / error
Handler2-->>Dispatcher: success / error
Dispatcher->>Dispatcher: aggregate results -> DispatchResult
Dispatcher-->>Messenger: return DispatchResult
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request establishes the foundational communication layer for agents, enabling robust and flexible inter-agent messaging. It introduces core components like a message bus for various communication patterns, a dispatcher for handling incoming messages, and a messenger facade for agents to interact with the system. The design emphasizes a pull model for message consumption, concurrent handler execution, and clear error handling, providing a scalable and observable communication framework. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request lays a comprehensive and well-designed foundation for inter-agent communication, introducing a message bus, dispatcher, and messenger along with their protocols and error handling. However, critical security flaws were identified in the InMemoryMessageBus related to broken access control. Specifically, the bus lacks authorization checks for publishing to channels, subscribing agents to channels, and retrieving channel history. Additionally, the deterministic naming scheme for direct message channels is vulnerable to collisions if agent IDs contain colons. These issues could allow malicious or compromised agents to eavesdrop on private conversations, inject messages into other agents' channels, or disclose sensitive message history. Furthermore, there is a minor docstring inconsistency in the MessageBus protocol that should be addressed to ensure contract alignment.
| asyncio.Queue(), | ||
| ) | ||
|
|
||
| async def publish(self, message: Message) -> None: |
There was a problem hiding this comment.
The publish method does not verify if the sender of the message is an authorized participant or subscriber of the target channel. This allows any agent to inject messages into any channel, including private direct message channels between other agents, provided they know the channel name.
Remediation: Implement a check to verify that message.sender is a member of channel.subscribers before allowing the publication to TOPIC or DIRECT channels.
| self, | ||
| channel_name: str, | ||
| subscriber_id: str, | ||
| ) -> Subscription: | ||
| """Subscribe an agent to a channel. | ||
|
|
||
| Idempotent — returns a fresh subscription record if already | ||
| subscribed (the channel's subscriber list is not duplicated). | ||
|
|
||
| Args: | ||
| channel_name: Channel to subscribe to. | ||
| subscriber_id: Agent ID of the subscriber. | ||
|
|
||
| Returns: | ||
| The subscription record. | ||
|
|
||
| Raises: | ||
| MessageBusNotRunningError: If not running. | ||
| ChannelNotFoundError: If the channel does not exist. | ||
| """ | ||
| async with self._lock: | ||
| self._require_running() | ||
| if channel_name not in self._channels: | ||
| _raise_channel_not_found(channel_name) | ||
| self._known_agents.add(subscriber_id) | ||
| channel = self._channels[channel_name] | ||
| if subscriber_id in channel.subscribers: | ||
| return Subscription( | ||
| channel_name=channel_name, | ||
| subscriber_id=subscriber_id, | ||
| subscribed_at=datetime.now(UTC), | ||
| ) | ||
| new_subs = (*channel.subscribers, subscriber_id) | ||
| self._channels[channel_name] = channel.model_copy( | ||
| update={"subscribers": new_subs}, | ||
| ) | ||
| self._ensure_queue(channel_name, subscriber_id) | ||
| now = datetime.now(UTC) | ||
| logger.info( | ||
| COMM_SUBSCRIPTION_CREATED, | ||
| channel=channel_name, | ||
| subscriber=subscriber_id, | ||
| ) | ||
| return Subscription( | ||
| channel_name=channel_name, | ||
| subscriber_id=subscriber_id, | ||
| subscribed_at=now, | ||
| ) | ||
|
|
||
| async def unsubscribe( |
There was a problem hiding this comment.
The subscribe and unsubscribe methods allow any caller to modify the subscription status of any agent ID for any channel. This can be abused to force agents to receive messages from channels they shouldn't be part of (potentially leading to DoS or unauthorized data processing) or to eavesdrop on channels by subscribing a controlled agent ID.
Remediation: Implement authorization checks to ensure that the caller is permitted to modify the subscription for the specified subscriber_id (e.g., an agent should only be able to subscribe/unsubscribe itself).
| return tuple(self._channels.values()) | ||
|
|
||
| async def get_channel_history( | ||
| self, |
There was a problem hiding this comment.
The get_channel_history method allows any caller to retrieve the full message history of any channel, including private direct message channels, without verifying if the caller is a participant or authorized subscriber of that channel. This is a significant information disclosure vulnerability.
Remediation: Implement an authorization check to verify that the caller (agent) is a participant in the channel before returning its history.
| """ | ||
| sender = message.sender | ||
| pair = sorted([sender, recipient]) | ||
| channel_name = f"@{pair[0]}:{pair[1]}" |
There was a problem hiding this comment.
Direct message channel names are constructed by joining sorted agent IDs with a colon (@{a}:{b}). If agent IDs are allowed to contain colons, this leads to channel name collisions. For example, agents 'alice' and 'bob:charles' would share the same direct channel name (@alice:bob:charles) as agents 'alice:bob' and 'charles'. This could allow unintended agents to access or inject messages into private conversations.
Remediation: Sanitize agent IDs to disallow the separator character (colon) or use a more robust encoding/structure for channel names that prevents collisions.
| Idempotent — returns the existing subscription if already | ||
| subscribed. |
There was a problem hiding this comment.
The docstring here states that an idempotent call "returns the existing subscription if already subscribed." This is inconsistent with the InMemoryMessageBus implementation, which returns a new Subscription object with a fresh timestamp on subsequent calls for an existing subscription. This is because the bus implementation does not store the original Subscription object.
To align the protocol contract with the provided implementation and avoid confusion for future implementers, I recommend updating this docstring. The phrasing used in InMemoryMessageBus's docstring, "returns a fresh subscription record if already subscribed," would be a good replacement.
| Idempotent — returns the existing subscription if already | |
| subscribed. | |
| Idempotent — returns a fresh subscription record if already | |
| subscribed. |
Greptile SummaryThis PR implements the communication foundation described in DESIGN_SPEC §5.4: an Key findings:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Agent as Agent (consumer)
participant AM as AgentMessenger
participant Bus as InMemoryMessageBus
participant Q as asyncio.Queue
participant SE as _shutdown_event
Note over Agent,SE: Send path
Agent->>AM: send_direct(to, content, type)
AM->>AM: _direct_channel_name(agent_id, to)
AM->>AM: Message(channel=@a:b, sender=agent_id)
AM->>Bus: send_direct(msg, recipient=to)
Bus->>Bus: _require_running() [under lock]
Bus->>Bus: _ensure_direct_channel(@a:b)
Bus->>Q: put_nowait(envelope) × both agents
Bus-->>AM: return
AM-->>Agent: return Message
Note over Agent,SE: Receive path
Agent->>Bus: receive(@a:b, agent_id, timeout=5.0)
Bus->>Bus: _require_running() + subscription check [under lock]
Bus->>Bus: _ensure_queue(@a:b, agent_id)
Bus->>Bus: _await_with_shutdown(queue, 5.0)
Bus->>Q: ensure_future(queue.get())
Bus->>SE: ensure_future(shutdown_event.wait())
alt message arrives
Q-->>Bus: DeliveryEnvelope
Bus-->>Agent: DeliveryEnvelope
else timeout expires
Bus-->>Agent: None [logs COMM_RECEIVE_TIMEOUT]
else bus stopped
SE-->>Bus: event set
Bus-->>Agent: None [also logs COMM_RECEIVE_TIMEOUT ⚠️]
end
Note over Agent,SE: Dispatch path
Agent->>AM: dispatch_message(envelope.message)
AM->>AM: MessageDispatcher.dispatch(message)
AM->>AM: _matches() filter (type + priority)
par asyncio.TaskGroup
AM->>AM: _guarded_handle(reg1, message, errors, 0)
and
AM->>AM: _guarded_handle(reg2, message, errors, 1)
end
AM-->>Agent: DispatchResult(succeeded, failed, errors)
Last reviewed commit: dc36a44 |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/communication/bus_memory.py`:
- Around line 368-398: Add the same running-state and subscription validation to
receive() as other methods: call self._require_running() at the start, and
instead of unconditionally creating an orphan queue with
self._ensure_queue(channel_name, subscriber_id), check whether a queue already
exists for the subscriber (use the internal non-creating accessor — e.g.
_get_queue(channel_name, subscriber_id) or check the channel's subscribers set)
and if the subscriber is not actually subscribed raise a clear error (or return
None per project convention) rather than creating a new queue; only call
_ensure_queue when you intend to create a subscription. Ensure you reference
receive, _require_running, _ensure_queue and the non-creating accessor (e.g.
_get_queue or channel subscriber set) when making the change.
- Around line 461-491: In get_channel_history, the current slice logic
mishandles limit <= 0 because messages[-0:] returns the full list and negative
limits produce incorrect slices; fix by explicitly handling limit values: after
reading messages from self._history[channel_name] (inside the existing async
with self._lock block), if limit is None keep all messages, if limit <= 0 set
messages to an empty list/tuple, otherwise set messages = messages[-limit:];
ensure you update the logged count passed to COMM_HISTORY_QUERIED to reflect the
final number of messages returned and preserve use of _channels, _history,
get_channel_history and COMM_HISTORY_QUERIED.
In `@src/ai_company/communication/bus_protocol.py`:
- Around line 61-66: Update the public bus protocol methods to use NotBlankStr
for identifier/name parameters instead of plain str: change the signature of
send_direct (and the other protocol methods referenced: the methods around the
other ranges that accept channel/agent identifiers) so parameters like
recipient, channel, agent_id, and similar use NotBlankStr from core.types; add
the necessary import for NotBlankStr at the top of the module and adjust any
type hints/annotations and docstrings accordingly so the protocol enforces
non-blank identifiers at the boundary.
In `@src/ai_company/communication/errors.py`:
- Around line 32-35: The exception constructor currently wraps the passed-in
context with MappingProxyType but only copies the top-level mapping, leaving
nested mutable structures shared; update the constructor where self.context is
assigned to perform a deep copy of the incoming context (e.g., use
copy.deepcopy(context) when context is truthy) before wrapping it in
MappingProxyType so nested dicts/lists become immutable, and ensure the import
for copy.deepcopy is added if missing and the assignment in the class
(self.context: MappingProxyType[...] = MappingProxyType(...)) uses the
deep-copied object.
In `@src/ai_company/communication/handler.py`:
- Around line 3-13: This module lacks a structured logger and fails to log
before raising on the "invalid-handler" TypeError branch; add "from
ai_company.observability import get_logger" and define "logger =
get_logger(__name__)" near the top of the file, then update the invalid-handler
branch (the path that currently raises TypeError when a handler is invalid) to
call logger.warning or logger.error with context (e.g., include the offending
handler identifier, message type/priority and function name like the handler
resolution function) immediately before raising the TypeError so the error path
is logged with appropriate context.
In `@tests/unit/communication/test_bus_memory.py`:
- Around line 1-27: Add the module-level pytest timeout marker to this test file
by declaring pytestmark = pytest.mark.timeout(30) at module scope (e.g., near
the top after the imports) so it matches other tests in the communication
module; the file already imports pytest, so just add the module-level variable
to apply the 30s timeout for all tests in
tests/unit/communication/test_bus_memory.py (which covers tests for
InMemoryMessageBus / MessageBus behaviors).
In `@tests/unit/communication/test_messenger.py`:
- Around line 1-18: Add the missing module-level pytest timeout marker by
defining pytestmark = pytest.mark.timeout(30) near the top of the test module
(after the imports) in test_messenger.py so its timeout behavior matches other
files; ensure you import pytest if not already and place the pytestmark
assignment at module scope.
In `@tests/unit/communication/test_subscription.py`:
- Around line 1-14: Add a module-level pytest marker tuple to this test module:
define pytestmark = (pytest.mark.unit, pytest.mark.timeout(30)) at top of
tests/unit/communication/test_subscription.py so every test (e.g., those
exercising Subscription and DeliveryEnvelope) inherits the 30-second timeout and
unit marker; then remove redundant per-test `@pytest.mark.unit` decorators if
present.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7a22f372-d0f3-45ea-b20a-6354245a34f4
📒 Files selected for processing (19)
DESIGN_SPEC.mdsrc/ai_company/communication/__init__.pysrc/ai_company/communication/bus_memory.pysrc/ai_company/communication/bus_protocol.pysrc/ai_company/communication/config.pysrc/ai_company/communication/dispatcher.pysrc/ai_company/communication/errors.pysrc/ai_company/communication/handler.pysrc/ai_company/communication/messenger.pysrc/ai_company/communication/subscription.pysrc/ai_company/observability/events/communication.pytests/unit/communication/conftest.pytests/unit/communication/test_bus_memory.pytests/unit/communication/test_dispatcher.pytests/unit/communication/test_errors.pytests/unit/communication/test_handler.pytests/unit/communication/test_messenger.pytests/unit/communication/test_subscription.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Agent
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:syntax (no parentheses) for exception handling — ruff enforces this on Python 3.14
Include type hints on all public functions; enforce with mypy strict mode
Use Google-style docstrings on all public classes and functions — enforced by ruff D rules
Use immutability: create new objects, never mutate existing ones. For non-Pydantic internal collections, usecopy.deepcopy()at construction +MappingProxyTypewrapping. Fordict/listfields in frozen Pydantic models, rely onfrozen=Trueandcopy.deepcopy()at system boundaries
Use frozen Pydantic models for config/identity; use separate mutable-via-copy models for runtime state that evolves
Use Pydantic v2 (BaseModel,model_validator,computed_field,ConfigDict). Use@computed_fieldfor derived values; useNotBlankStrfromcore.typesfor all identifier/name fields
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code; prefer structured concurrency over barecreate_task
Maintain line length of 88 characters — enforced by ruff
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate input at system boundaries: user input, external APIs, and config files
Files:
tests/unit/communication/test_messenger.pysrc/ai_company/communication/subscription.pysrc/ai_company/communication/dispatcher.pysrc/ai_company/communication/bus_protocol.pysrc/ai_company/communication/messenger.pysrc/ai_company/observability/events/communication.pysrc/ai_company/communication/__init__.pytests/unit/communication/test_bus_memory.pytests/unit/observability/test_events.pytests/unit/communication/test_subscription.pysrc/ai_company/communication/config.pysrc/ai_company/communication/handler.pytests/unit/communication/test_dispatcher.pytests/unit/communication/test_errors.pytests/unit/communication/conftest.pytests/unit/communication/test_handler.pysrc/ai_company/communication/bus_memory.pysrc/ai_company/communication/errors.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers:@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow
Maintain 80% minimum code coverage — enforced in CI
Useasyncio_mode = "auto"in pytest configuration — no manual@pytest.mark.asyncioneeded
Set 30-second timeout per test
Usepytest-xdistvia-n autofor parallel test execution
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/communication/test_messenger.pytests/unit/communication/test_bus_memory.pytests/unit/observability/test_events.pytests/unit/communication/test_subscription.pytests/unit/communication/test_dispatcher.pytests/unit/communication/test_errors.pytests/unit/communication/conftest.pytests/unit/communication/test_handler.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code — useget_logger()from observability module
Use variable namelogger(not_logger, notlog) for logging instances
Always use event name constants fromai_company.observability.eventsdomain-specific modules; import directly (e.g.,from ai_company.observability.events.<domain> import EVENT_CONSTANT)
Use structured logging format:logger.info(EVENT, key=value)— neverlogger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions
Pure data models, enums, and re-exports do NOT need logging
Files:
src/ai_company/communication/subscription.pysrc/ai_company/communication/dispatcher.pysrc/ai_company/communication/bus_protocol.pysrc/ai_company/communication/messenger.pysrc/ai_company/observability/events/communication.pysrc/ai_company/communication/__init__.pysrc/ai_company/communication/config.pysrc/ai_company/communication/handler.pysrc/ai_company/communication/bus_memory.pysrc/ai_company/communication/errors.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like
example-provider,example-large-001,large/medium/smallas aliases, ortest-provider,test-small-001in tests
Files:
src/ai_company/communication/subscription.pysrc/ai_company/communication/dispatcher.pysrc/ai_company/communication/bus_protocol.pysrc/ai_company/communication/messenger.pysrc/ai_company/observability/events/communication.pysrc/ai_company/communication/__init__.pysrc/ai_company/communication/config.pysrc/ai_company/communication/handler.pysrc/ai_company/communication/bus_memory.pysrc/ai_company/communication/errors.py
🧠 Learnings (3)
📚 Learning: 2026-03-07T14:50:05.694Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T14:50:05.694Z
Learning: Applies to src/ai_company/**/*.py : Always use event name constants from `ai_company.observability.events` domain-specific modules; import directly (e.g., `from ai_company.observability.events.<domain> import EVENT_CONSTANT`)
Applied to files:
src/ai_company/observability/events/communication.pytests/unit/observability/test_events.pyDESIGN_SPEC.md
📚 Learning: 2026-03-07T14:50:05.694Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T14:50:05.694Z
Learning: Applies to src/ai_company/providers/**/*.py : RetryConfig and RateLimiterConfig are set per-provider in `ProviderConfig`
Applied to files:
tests/unit/communication/conftest.py
📚 Learning: 2026-03-07T14:50:05.694Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T14:50:05.694Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising
Applied to files:
src/ai_company/communication/errors.py
🧬 Code graph analysis (9)
src/ai_company/communication/subscription.py (1)
src/ai_company/communication/message.py (1)
Message(88-138)
src/ai_company/communication/dispatcher.py (3)
src/ai_company/communication/enums.py (2)
MessagePriority(22-32)MessageType(6-19)src/ai_company/communication/handler.py (6)
FunctionHandler(33-60)HandlerRegistration(89-106)MessageHandler(17-26)priority_at_least(73-86)handle(20-26)handle(54-60)src/ai_company/communication/message.py (1)
Message(88-138)
src/ai_company/communication/messenger.py (6)
src/ai_company/communication/bus_protocol.py (5)
MessageBus(19-200)publish(47-59)send_direct(61-80)subscribe(82-103)unsubscribe(105-120)src/ai_company/communication/dispatcher.py (5)
DispatchResult(37-59)MessageDispatcher(62-244)register(75-113)deregister(115-138)dispatch(140-223)src/ai_company/communication/enums.py (2)
MessagePriority(22-32)MessageType(6-19)src/ai_company/communication/handler.py (1)
MessageHandler(17-26)src/ai_company/communication/message.py (1)
Message(88-138)src/ai_company/communication/subscription.py (1)
Subscription(9-22)
tests/unit/communication/test_bus_memory.py (6)
src/ai_company/communication/bus_protocol.py (13)
MessageBus(19-200)is_running(43-45)start(30-36)stop(38-40)publish(47-59)subscribe(82-103)create_channel(144-158)list_channels(174-180)send_direct(61-80)unsubscribe(105-120)get_channel(160-172)receive(122-142)get_channel_history(182-200)src/ai_company/communication/channel.py (1)
Channel(14-39)src/ai_company/communication/config.py (2)
MessageBusConfig(45-75)MessageRetentionConfig(29-42)src/ai_company/communication/enums.py (2)
ChannelType(35-46)MessageType(6-19)src/ai_company/communication/errors.py (5)
ChannelAlreadyExistsError(50-51)ChannelNotFoundError(46-47)MessageBusAlreadyRunningError(62-63)MessageBusNotRunningError(58-59)NotSubscribedError(54-55)src/ai_company/communication/message.py (1)
Message(88-138)
tests/unit/communication/test_subscription.py (3)
src/ai_company/communication/enums.py (2)
MessagePriority(22-32)MessageType(6-19)src/ai_company/communication/message.py (1)
Message(88-138)src/ai_company/communication/subscription.py (2)
DeliveryEnvelope(25-41)Subscription(9-22)
src/ai_company/communication/handler.py (5)
src/ai_company/communication/enums.py (2)
MessagePriority(22-32)MessageType(6-19)src/ai_company/communication/message.py (1)
Message(88-138)tests/unit/communication/test_dispatcher.py (2)
handle(39-40)handle(49-50)tests/unit/communication/test_handler.py (1)
handle(44-45)tests/unit/communication/test_messenger.py (4)
handler(299-300)handler(320-321)handler(356-357)handler(412-413)
tests/unit/communication/test_errors.py (1)
src/ai_company/communication/errors.py (6)
ChannelAlreadyExistsError(50-51)ChannelNotFoundError(46-47)CommunicationError(11-43)MessageBusAlreadyRunningError(62-63)MessageBusNotRunningError(58-59)NotSubscribedError(54-55)
tests/unit/communication/conftest.py (2)
src/ai_company/communication/config.py (2)
MessageRetentionConfig(29-42)MessageBusConfig(45-75)src/ai_company/communication/subscription.py (2)
DeliveryEnvelope(25-41)Subscription(9-22)
src/ai_company/communication/bus_memory.py (7)
src/ai_company/communication/channel.py (1)
Channel(14-39)src/ai_company/communication/config.py (1)
MessageBusConfig(45-75)src/ai_company/communication/errors.py (5)
ChannelAlreadyExistsError(50-51)ChannelNotFoundError(46-47)MessageBusAlreadyRunningError(62-63)MessageBusNotRunningError(58-59)NotSubscribedError(54-55)src/ai_company/communication/subscription.py (2)
DeliveryEnvelope(25-41)Subscription(9-22)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/communication/bus_protocol.py (12)
is_running(43-45)start(30-36)stop(38-40)publish(47-59)send_direct(61-80)subscribe(82-103)unsubscribe(105-120)receive(122-142)create_channel(144-158)get_channel(160-172)list_channels(174-180)get_channel_history(182-200)src/ai_company/communication/messenger.py (3)
send_direct(116-160)subscribe(207-226)unsubscribe(228-242)
🔇 Additional comments (23)
tests/unit/communication/test_errors.py (1)
1-84: LGTM!The error hierarchy tests are comprehensive and well-structured:
- Base
CommunicationErrorbehavior is thoroughly tested (message storage, context immutability, string formatting, context copying).- Subclass tests use
@pytest.mark.parametrizeefficiently to validate inheritance and context propagation for all error types.tests/unit/communication/test_handler.py (1)
1-175: LGTM!Comprehensive test coverage for the handler framework:
- Protocol conformance tests validate runtime_checkable behavior.
FunctionHandlertests cover async wrapping, error cases for non-callable/sync inputs.HandlerRegistrationtests verify defaults, customization, and immutability.priority_at_leasttests use parametrize effectively for exhaustive priority combination coverage.tests/unit/communication/test_dispatcher.py (1)
1-387: LGTM!Excellent dispatcher test coverage:
- Registration/deregistration lifecycle including bare function wrapping.
- Dispatch routing by message type with matching, non-matching, and all-types filters.
- Priority filtering with boundary conditions (equal, higher, lower).
- Combined type+priority filtering requiring both to match.
- Error isolation ensuring one failing handler doesn't block others.
- Concurrent execution verification that handlers run in parallel.
DispatchResultaccuracy and immutability validation.- Edge cases with no handlers or no matching handlers.
tests/unit/communication/test_messenger.py (1)
36-444: LGTM!Comprehensive messenger test coverage:
- Auto-enrichment of sender, timestamp, and message ID.
- Priority propagation and bus publish invocation.
- Direct messaging with
@directchannel placeholder.- Broadcast behavior with default and custom channels.
- Subscribe/unsubscribe delegation to the bus.
- Handler registration/deregistration delegation to dispatcher.
- Auto-creation of dispatcher when not provided.
- Graceful handling when no dispatcher is configured.
tests/unit/communication/test_bus_memory.py (1)
28-589: LGTM!Excellent InMemoryMessageBus test coverage:
- Lifecycle states (start/stop/is_running) with error conditions for double-start and operations on stopped bus.
- Protocol conformance via isinstance check.
- Channel management (create/get/list) with duplicate and missing channel errors.
- Subscription lifecycle including idempotent subscribe and unsubscribe errors.
- Publish/receive flow with FIFO ordering, fan-out to multiple subscribers, and timeout handling.
- Blocking receive with
timeout=None.- Direct messaging with lazy channel creation and deterministic channel naming.
- Retention limits and history queries with limits.
- Concurrent publish/receive scenarios with TaskGroup.
tests/unit/communication/conftest.py (3)
58-64: LGTM!Clean factory additions:
MessageRetentionConfigFactoryfor retention settings.MessageBusConfigFactoryproperly wires the retention field.
95-102: LGTM!Well-structured factories for subscription and delivery models:
SubscriptionFactoryfor subscription records.DeliveryEnvelopeFactorywith message composition viaMessageFactory.
167-192: LGTM!Useful sample fixtures providing concrete test data:
sample_subscriptionwith channel and subscriber info.sample_delivery_envelopecomposing withsample_message.sample_bus_configwith test channel and retention settings.src/ai_company/communication/dispatcher.py (4)
1-60: LGTM!Clean
DispatchResultmodel:
- Frozen Pydantic model with appropriate field constraints (
ge=0).@computed_fieldcorrectly deriveshandlers_matchedfrom succeeded + failed.- Clear docstrings following Google style.
62-113: LGTM!
registermethod is well-implemented:
- Auto-wraps bare async functions with
FunctionHandler.- Creates immutable
HandlerRegistrationwith filter criteria.- Logs registration with structured event constant.
140-223: LGTM!
dispatchmethod implementation is solid:
- Uses
asyncio.TaskGroupfor structured concurrent execution per coding guidelines.- Error isolation via
_guarded_handlecatchesExceptionwhile allowingBaseExceptionto propagate (documented behavior).- Proper logging at DEBUG for start/no-handlers/matched, INFO for completion.
- Returns accurate
DispatchResultwith error aggregation.
225-244: LGTM!
_matchesfilter logic is correct:
- Empty
message_typesmeans match all types.- Priority comparison uses
priority_at_leasthelper for ordering.src/ai_company/communication/messenger.py (4)
1-69: LGTM!
AgentMessengerinitialization is well-designed:
- Uses
__slots__for memory efficiency.- Validates
agent_idandagent_nameare non-blank at the boundary.- Logs creation at DEBUG level.
- Clear docstring with exception documentation.
70-160: LGTM!
send_messageandsend_directmethods are correctly implemented:
- Auto-fill sender, timestamp, and ID via
Messageconstruction.send_messagedelegates tobus.publishfor channel-based delivery.send_directuses@directplaceholder channel andbus.send_directfor point-to-point routing.- Proper INFO-level logging with structured events.
162-243: LGTM!
broadcast,subscribe, andunsubscribemethods:
- Broadcast correctly sets
tofield to channel name for fan-out semantics.- Subscribe/unsubscribe delegate to bus with agent_id, logging state transitions at INFO.
- Docstrings document raised exceptions per bus contract.
244-315: LGTM!Handler delegation methods are robust:
register_handlerauto-creates dispatcher when needed, documented in docstring.deregister_handlergracefully handles missing dispatcher with DEBUG log.dispatch_messagereturns emptyDispatchResultwhen no dispatcher is configured, avoiding errors.DESIGN_SPEC.md (1)
2321-2332: LGTM!The project structure documentation accurately reflects the new communication module files added in this PR. The file organization follows the existing conventions and aligns with the modular, protocol-driven architecture described in the PR objectives.
Also applies to: 2347-2347
src/ai_company/communication/__init__.py (1)
1-81: LGTM!The expanded public API exports are well-organized with alphabetically sorted
__all__for maintainability. The imports cleanly expose the new communication layer components (bus, dispatcher, messenger, handlers, errors) from a single entry point.src/ai_company/communication/bus_memory.py (5)
1-45: LGTM on module setup!Proper imports, logger setup using
get_logger(__name__), and event constants from the observability domain module per coding guidelines.
75-189: LGTM on class initialization and publish flow!The implementation correctly uses:
asyncio.Lockfor thread-safe async accessdeque(maxlen=...)for bounded history per retention configmodel_copy(update=...)for immutable Channel updates- Proper broadcast vs topic semantics (broadcast → all known agents, topic → subscribers only)
- Structured logging at appropriate levels (INFO for state changes, DEBUG for delivery details)
191-283: LGTM on direct messaging implementation!The deterministic channel naming pattern
@{sorted_a}:{sorted_b}ensures consistent channel identification regardless of sender/recipient order. Lazy channel creation and subscriber registration are handled correctly under the lock.
285-366: LGTM on subscription management!The
subscribemethod is correctly idempotent — returning a freshSubscriptionrecord without duplicating the subscriber list when already subscribed. Theunsubscribemethod appropriately raisesNotSubscribedErrorfor both missing channels and missing subscriptions, simplifying error handling for callers.
400-459: LGTM on channel management methods!Proper validation, error handling, and logging throughout. The
list_channelsmethod returns an immutable tuple per the codebase's immutability conventions.
There was a problem hiding this comment.
Pull request overview
This PR introduces the foundational communication layer for the AI company framework: a swappable MessageBus abstraction with an in-memory backend, a concurrent MessageDispatcher for handler routing, and an AgentMessenger facade for per-agent messaging, plus observability event constants and supporting models/errors.
Changes:
- Added
MessageBusprotocol andInMemoryMessageBusimplementation supporting channels, pub/sub, direct messaging, broadcast, and history retention. - Added
MessageDispatcher(concurrent dispatch + filtering) andAgentMessenger(agent-friendly send/subscribe/dispatch API). - Added communication models/errors +
COMM_*observability events and comprehensive unit tests.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Registers/discovers the new communication observability events module and validates key constants. |
| tests/unit/communication/test_subscription.py | Tests Subscription and DeliveryEnvelope validation + immutability. |
| tests/unit/communication/test_messenger.py | Tests AgentMessenger auto-fill behavior and delegation to bus/dispatcher. |
| tests/unit/communication/test_handler.py | Tests handler protocol, FunctionHandler, HandlerRegistration, and priority filtering helper. |
| tests/unit/communication/test_errors.py | Tests communication error hierarchy and immutable context behavior. |
| tests/unit/communication/test_dispatcher.py | Tests dispatch routing, priority/type filtering, concurrency, and error isolation. |
| tests/unit/communication/test_bus_memory.py | Tests in-memory bus lifecycle, channel mgmt, pub/sub flow, direct messaging, broadcast behavior, and retention/history. |
| tests/unit/communication/conftest.py | Adds factories/fixtures for new communication config + subscription/envelope models. |
| src/ai_company/observability/events/communication.py | Adds COMM_* structured logging event constants for communication subsystem. |
| src/ai_company/communication/subscription.py | Adds Subscription and DeliveryEnvelope frozen Pydantic models. |
| src/ai_company/communication/messenger.py | Implements AgentMessenger facade for sending/broadcasting/subscribing and dispatch delegation. |
| src/ai_company/communication/handler.py | Adds MessageHandler protocol, FunctionHandler adapter, HandlerRegistration, and priority ordering helper. |
| src/ai_company/communication/errors.py | Adds communication error hierarchy with immutable structured context. |
| src/ai_company/communication/dispatcher.py | Implements concurrent MessageDispatcher + DispatchResult model. |
| src/ai_company/communication/config.py | Adds MessageRetentionConfig and wires retention into MessageBusConfig. |
| src/ai_company/communication/bus_protocol.py | Defines the swappable MessageBus protocol (pull-based receive()). |
| src/ai_company/communication/bus_memory.py | Implements InMemoryMessageBus backend using asyncio queues + in-memory history. |
| src/ai_company/communication/init.py | Re-exports new communication APIs/types for public package access. |
| DESIGN_SPEC.md | Updates design spec file tree to include the new communication modules and events. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async with self._lock: | ||
| queue = self._ensure_queue(channel_name, subscriber_id) | ||
| try: | ||
| if timeout is not None: | ||
| return await asyncio.wait_for( |
There was a problem hiding this comment.
InMemoryMessageBus.receive() currently creates/returns a per-(channel, subscriber) queue without verifying the bus is running, the channel exists, or that the subscriber is allowed to receive from that channel. This can cause callers to silently block on a stopped bus or typo’d/missing channel and can also recreate queues after unsubscribe. Consider calling _require_running(), validating channel_name exists (raise ChannelNotFoundError), and enforcing subscription (or explicit BROADCAST semantics) before creating/awaiting a queue; if you change behavior, document the raised errors in MessageBus.receive().
| sender = message.sender | ||
| pair = sorted([sender, recipient]) | ||
| channel_name = f"@{pair[0]}:{pair[1]}" | ||
| async with self._lock: | ||
| self._require_running() |
There was a problem hiding this comment.
send_direct() derives routing from the separate recipient argument but does not validate that it matches message.to. This can produce internally inconsistent messages (e.g., message.to != recipient) that are hard to debug downstream. Consider asserting equality (or normalizing by creating a copied Message with to=recipient) so the message metadata always matches the actual delivery target.
| self._history[channel_name].append(message) | ||
| now = datetime.now(UTC) | ||
| for agent_id in pair: | ||
| envelope = DeliveryEnvelope( | ||
| message=message, | ||
| channel_name=channel_name, |
There was a problem hiding this comment.
Direct messages are stored/delivered under the deterministic channel_name, but _deliver_to_pair() appends/envelopes the original Message instance unchanged. If callers use the AgentMessenger placeholder channel ("@direct"), get_channel_history("@A:b") will return Message objects whose message.channel does not match the queried channel, which is confusing and can break invariants. Consider storing/delivering a copied Message with channel updated to the deterministic direct channel name (preserving id/timestamp).
…abbit, Greptile, and Copilot - Add shutdown signaling (asyncio.Event) to InMemoryMessageBus.receive() - Add receive() validation: require running bus, existing channel, active subscription - Add send_direct() validation: recipient vs message.to mismatch, colon in agent IDs - Fix get_channel_history limit<=0 edge case (return empty list) - Extract _guarded_handle from dispatcher closure to class method with pre-allocated error list - Add logger + event constant to handler.py FunctionHandler validation - Add logger + validation to AgentMessenger.__init__ (blank agent_id/name) - Use copy.deepcopy() for nested error context immutability - Compute deterministic direct channel name (@sorted_a:sorted_b) in messenger - Add NotBlankStr validation on all bus_protocol boundary identifiers - Fix docstrings: subscribe idempotency, receive() Raises, history limit - Update module docstrings for accuracy (dispatcher §5.4, __init__ subsystem) - Add 12 new unit tests covering all new validation paths - Update DESIGN_SPEC.md §5 current-state callout and §15.5 convention entry - Update CLAUDE.md communication/ package description Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…idation Greptile post-push review caught that send_direct validation paths (recipient mismatch, separator-in-ID) were logging COMM_BUS_NOT_RUNNING instead of a validation-specific event, which would poison bus-lifecycle monitoring. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| ) | ||
| self._queues.pop((channel_name, subscriber_id), None) | ||
| logger.info( | ||
| COMM_SUBSCRIPTION_REMOVED, | ||
| channel=channel_name, | ||
| subscriber=subscriber_id, | ||
| ) |
There was a problem hiding this comment.
unsubscribe() leaves blocked receive() callers permanently stuck
unsubscribe() pops the subscriber's queue from _queues (line 398), but any coroutine already blocked inside _await_with_shutdown on that queue object holds a local reference to it and will never receive a wakeup signal.
The _await_with_shutdown helper monitors two things: the queue and _shutdown_event. After unsubscribe, no new messages will be enqueued (the subscriber is removed from the channel's subscriber list), and _shutdown_event is not set (the bus is still running). A consumer calling receive(channel, subscriber, timeout=None) and then unsubscribe(channel, subscriber) from another task will hang until bus shutdown or an external cancellation — there is no per-subscriber cancellation path.
To fix this, unsubscribe() should put a sentinel value into any active queue so that _await_with_shutdown can detect it:
# after removing the queue from _queues
queue = self._queues.pop((channel_name, subscriber_id), None)
if queue is not None:
# wake up any blocked receive() for this subscriber
queue.put_nowait(None) # type: ignore[arg-type]This would require _await_with_shutdown to distinguish a None sentinel from a real envelope (both map to "return None"), which it already handles correctly since a real DeliveryEnvelope is never None.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/bus_memory.py
Line: 397-403
Comment:
**`unsubscribe()` leaves blocked `receive()` callers permanently stuck**
`unsubscribe()` pops the subscriber's queue from `_queues` (line 398), but any coroutine already blocked inside `_await_with_shutdown` on that queue object holds a local reference to it and will never receive a wakeup signal.
The `_await_with_shutdown` helper monitors two things: the queue and `_shutdown_event`. After `unsubscribe`, no new messages will be enqueued (the subscriber is removed from the channel's subscriber list), and `_shutdown_event` is not set (the bus is still running). A consumer calling `receive(channel, subscriber, timeout=None)` and then `unsubscribe(channel, subscriber)` from another task will hang until bus shutdown or an external cancellation — there is no per-subscriber cancellation path.
To fix this, `unsubscribe()` should put a sentinel value into any active queue so that `_await_with_shutdown` can detect it:
```python
# after removing the queue from _queues
queue = self._queues.pop((channel_name, subscriber_id), None)
if queue is not None:
# wake up any blocked receive() for this subscriber
queue.put_nowait(None) # type: ignore[arg-type]
```
This would require `_await_with_shutdown` to distinguish a `None` sentinel from a real envelope (both map to "return None"), which it already handles correctly since a real `DeliveryEnvelope` is never `None`.
How can I resolve this? If you propose a fix, please make it concise.| result = await self._await_with_shutdown(queue, timeout) | ||
| if result is None: | ||
| logger.debug( | ||
| COMM_RECEIVE_TIMEOUT, | ||
| channel=channel_name, | ||
| subscriber=subscriber_id, | ||
| timeout=timeout, | ||
| ) |
There was a problem hiding this comment.
COMM_RECEIVE_TIMEOUT event fires on graceful shutdown, not only timeout
result is None is returned by _await_with_shutdown for two distinct reasons:
- The
timeoutexpired with no message — a genuine timeout. _shutdown_eventfired before a message arrived — a clean shutdown wakeup.
Both paths log COMM_RECEIVE_TIMEOUT with timeout=timeout. A monitoring system watching communication.receive.timeout will fire on clean shutdowns too, making it impossible to distinguish slow consumers from orderly process termination.
Consider differentiating the two cases, for example by returning a sentinel or a small enum from _await_with_shutdown:
from enum import Enum, auto
class _ReceiveResult(Enum):
TIMEOUT = auto()
SHUTDOWN = auto()Then log COMM_RECEIVE_TIMEOUT only on TIMEOUT, and a new COMM_BUS_RECEIVE_SHUTDOWN constant on SHUTDOWN.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/bus_memory.py
Line: 443-450
Comment:
**`COMM_RECEIVE_TIMEOUT` event fires on graceful shutdown, not only timeout**
`result is None` is returned by `_await_with_shutdown` for two distinct reasons:
1. The `timeout` expired with no message — a genuine timeout.
2. `_shutdown_event` fired before a message arrived — a clean shutdown wakeup.
Both paths log `COMM_RECEIVE_TIMEOUT` with `timeout=timeout`. A monitoring system watching `communication.receive.timeout` will fire on clean shutdowns too, making it impossible to distinguish slow consumers from orderly process termination.
Consider differentiating the two cases, for example by returning a sentinel or a small enum from `_await_with_shutdown`:
```python
from enum import Enum, auto
class _ReceiveResult(Enum):
TIMEOUT = auto()
SHUTDOWN = auto()
```
Then log `COMM_RECEIVE_TIMEOUT` only on `TIMEOUT`, and a new `COMM_BUS_RECEIVE_SHUTDOWN` constant on `SHUTDOWN`.
How can I resolve this? If you propose a fix, please make it concise.| class AgentMessenger: | ||
| """Per-agent facade for sending, receiving, and dispatching messages. | ||
|
|
||
| Wraps a :class:`MessageBus` and optional :class:`MessageDispatcher` | ||
| to provide a high-level API that auto-fills sender, timestamp, and | ||
| message ID. | ||
|
|
||
| Args: | ||
| agent_id: Identifier of the owning agent. | ||
| agent_name: Human-readable name of the agent. | ||
| bus: The underlying message bus. | ||
| dispatcher: Optional message dispatcher for handler routing. | ||
|
|
||
| Raises: | ||
| ValueError: If *agent_id* or *agent_name* is blank. | ||
| """ | ||
|
|
||
| __slots__ = ("_agent_id", "_agent_name", "_bus", "_dispatcher") | ||
|
|
||
| def __init__( | ||
| self, | ||
| agent_id: str, | ||
| agent_name: str, | ||
| bus: MessageBus, | ||
| dispatcher: MessageDispatcher | None = None, | ||
| ) -> None: | ||
| if not agent_id.strip(): | ||
| logger.warning( | ||
| COMM_MESSENGER_INVALID_AGENT, | ||
| field="agent_id", | ||
| value=repr(agent_id), | ||
| ) | ||
| msg = "agent_id must not be blank" | ||
| raise ValueError(msg) | ||
| if not agent_name.strip(): | ||
| logger.warning( | ||
| COMM_MESSENGER_INVALID_AGENT, | ||
| field="agent_name", | ||
| value=repr(agent_name), | ||
| ) | ||
| msg = "agent_name must not be blank" | ||
| raise ValueError(msg) | ||
| self._agent_id = agent_id | ||
| self._agent_name = agent_name | ||
| self._bus = bus | ||
| self._dispatcher = dispatcher | ||
| logger.debug( | ||
| COMM_MESSENGER_CREATED, | ||
| agent_id=agent_id, | ||
| agent_name=agent_name, | ||
| ) |
There was a problem hiding this comment.
AgentMessenger facade is missing a receive() method
The class auto-fills agent_id for every sending operation (send_message, send_direct, broadcast) and for subscription management (subscribe, unsubscribe), but there is no corresponding receive(channel_name, *, timeout) wrapper that auto-fills self._agent_id as the subscriber.
As a result, consumers who use AgentMessenger still need to hold a direct reference to the underlying MessageBus and pass agent_id explicitly:
# current — breaks the opaque facade
envelope = await bus.receive(channel_name, agent_id, timeout=5.0)A convenience wrapper would complete the facade contract:
async def receive(
self,
channel_name: str,
*,
timeout: float | None = None,
) -> DeliveryEnvelope | None:
return await self._bus.receive(channel_name, self._agent_id, timeout=timeout)Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/messenger.py
Line: 46-96
Comment:
**`AgentMessenger` facade is missing a `receive()` method**
The class auto-fills `agent_id` for every sending operation (`send_message`, `send_direct`, `broadcast`) and for subscription management (`subscribe`, `unsubscribe`), but there is no corresponding `receive(channel_name, *, timeout)` wrapper that auto-fills `self._agent_id` as the subscriber.
As a result, consumers who use `AgentMessenger` still need to hold a direct reference to the underlying `MessageBus` and pass `agent_id` explicitly:
```python
# current — breaks the opaque facade
envelope = await bus.receive(channel_name, agent_id, timeout=5.0)
```
A convenience wrapper would complete the facade contract:
```python
async def receive(
self,
channel_name: str,
*,
timeout: float | None = None,
) -> DeliveryEnvelope | None:
return await self._bus.receive(channel_name, self._agent_id, timeout=timeout)
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not isinstance(handler, MessageHandler): | ||
| handler = FunctionHandler(handler) | ||
|
|
There was a problem hiding this comment.
MessageHandler is @runtime_checkable, so isinstance(handler, MessageHandler) only checks for a handle attribute—not that it’s async. A handler with a synchronous handle() will pass registration but later cause await registration.handler.handle(...) to raise a TypeError. Consider validating that handler.handle is a coroutine function (or that calling it returns an awaitable) and raising TypeError + logging COMM_HANDLER_INVALID when it’s not.
| if not get_task.done(): | ||
| get_task.cancel() | ||
| if not shutdown_task.done(): | ||
| shutdown_task.cancel() | ||
| if get_task in done and not get_task.cancelled(): |
There was a problem hiding this comment.
In _await_with_shutdown, cancelled tasks (get_task / shutdown_task) are not awaited after .cancel(). This can leave pending cancellation and may emit "Task was destroyed but it is pending" warnings in some event-loop implementations. Consider using asyncio.create_task() and after cancelling, await the tasks with contextlib.suppress(asyncio.CancelledError) to ensure clean cancellation.
| _raise_not_subscribed(channel_name, subscriber_id) | ||
| queue = self._ensure_queue(channel_name, subscriber_id) | ||
| result = await self._await_with_shutdown(queue, timeout) | ||
| if result is None: |
There was a problem hiding this comment.
COMM_RECEIVE_TIMEOUT is logged whenever receive() returns None, but None can also mean shutdown (per docstring and _await_with_shutdown). This makes the event misleading for observability. Consider distinguishing timeout vs shutdown (e.g., check self._shutdown_event.is_set() and log a different event / add a reason field, or only log timeout when timeout is not None).
| if result is None: | |
| if result is None and timeout is not None and not self._shutdown_event.is_set(): |
Address 13 findings from post-merge bot reviews on PRs #157-#162: Communication layer (PR #157): - Wake blocked receive() callers on unsubscribe via None sentinel - Distinguish shutdown vs timeout in receive() logging - Add AgentMessenger.receive() facade method - Validate MessageHandler.handle() is async at registration - Await cancelled tasks in _await_with_shutdown to prevent warnings Observability (PR #158): - Add log-before-raise to all validators missing it (company.py, schema.py) — 14 raise sites across 11 validators Parallel execution (PR #161): - Log suppressed ExceptionGroup instead of silent pass - Add PARALLEL_AGENT_CANCELLED structured event for cancellations - Fix progress.in_progress semantics (increment after semaphore) - Use PARALLEL_LOCK_RELEASE_ERROR for lock release failures - Remove duplicate plan_parsing.py from DESIGN_SPEC file tree Template inheritance (PR #162): - Update DESIGN_SPEC merge key docs to include merge_id - Preserve merge_id in _expand_single_agent (confirmed bug fix) - Defer deepcopy in _apply_child_agent past _remove early-return Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
🤖 I have created a release *beep* *boop* --- ## [0.1.1](ai-company-v0.1.0...ai-company-v0.1.1) (2026-03-10) ### Features * add autonomy levels and approval timeout policies ([#42](#42), [#126](#126)) ([#197](#197)) ([eecc25a](eecc25a)) * add CFO cost optimization service with anomaly detection, reports, and approval decisions ([#186](#186)) ([a7fa00b](a7fa00b)) * add code quality toolchain (ruff, mypy, pre-commit, dependabot) ([#63](#63)) ([36681a8](36681a8)) * add configurable cost tiers and subscription/quota-aware tracking ([#67](#67)) ([#185](#185)) ([9baedfa](9baedfa)) * add container packaging, Docker Compose, and CI pipeline ([#269](#269)) ([435bdfe](435bdfe)), closes [#267](#267) * add coordination error taxonomy classification pipeline ([#146](#146)) ([#181](#181)) ([70c7480](70c7480)) * add cost-optimized, hierarchical, and auction assignment strategies ([#175](#175)) ([ce924fa](ce924fa)), closes [#173](#173) * add design specification, license, and project setup ([8669a09](8669a09)) * add env var substitution and config file auto-discovery ([#77](#77)) ([7f53832](7f53832)) * add FastestStrategy routing + vendor-agnostic cleanup ([#140](#140)) ([09619cb](09619cb)), closes [#139](#139) * add HR engine and performance tracking ([#45](#45), [#47](#47)) ([#193](#193)) ([2d091ea](2d091ea)) * add issue auto-search and resolution verification to PR review skill ([#119](#119)) ([deecc39](deecc39)) * add memory retrieval, ranking, and context injection pipeline ([#41](#41)) ([873b0aa](873b0aa)) * add pluggable MemoryBackend protocol with models, config, and events ([#180](#180)) ([46cfdd4](46cfdd4)) * add pluggable MemoryBackend protocol with models, config, and events ([#32](#32)) ([46cfdd4](46cfdd4)) * add pluggable PersistenceBackend protocol with SQLite implementation ([#36](#36)) ([f753779](f753779)) * add progressive trust and promotion/demotion subsystems ([#43](#43), [#49](#49)) ([3a87c08](3a87c08)) * add retry handler, rate limiter, and provider resilience ([#100](#100)) ([b890545](b890545)) * add SecOps security agent with rule engine, audit log, and ToolInvoker integration ([#40](#40)) ([83b7b6c](83b7b6c)) * add shared org memory and memory consolidation/archival ([#125](#125), [#48](#48)) ([4a0832b](4a0832b)) * design unified provider interface ([#86](#86)) ([3e23d64](3e23d64)) * expand template presets, rosters, and add inheritance ([#80](#80), [#81](#81), [#84](#84)) ([15a9134](15a9134)) * implement agent runtime state vs immutable config split ([#115](#115)) ([4cb1ca5](4cb1ca5)) * implement AgentEngine core orchestrator ([#11](#11)) ([#143](#143)) ([f2eb73a](f2eb73a)) * implement basic tool system (registry, invocation, results) ([#15](#15)) ([c51068b](c51068b)) * implement built-in file system tools ([#18](#18)) ([325ef98](325ef98)) * implement communication foundation — message bus, dispatcher, and messenger ([#157](#157)) ([8e71bfd](8e71bfd)) * implement company template system with 7 built-in presets ([#85](#85)) ([cbf1496](cbf1496)) * implement conflict resolution protocol ([#122](#122)) ([#166](#166)) ([e03f9f2](e03f9f2)) * implement core entity and role system models ([#69](#69)) ([acf9801](acf9801)) * implement crash recovery with fail-and-reassign strategy ([#149](#149)) ([e6e91ed](e6e91ed)) * implement engine extensions — Plan-and-Execute loop and call categorization ([#134](#134), [#135](#135)) ([#159](#159)) ([9b2699f](9b2699f)) * implement enterprise logging system with structlog ([#73](#73)) ([2f787e5](2f787e5)) * implement graceful shutdown with cooperative timeout strategy ([#130](#130)) ([6592515](6592515)) * implement hierarchical delegation and loop prevention ([#12](#12), [#17](#17)) ([6be60b6](6be60b6)) * implement LiteLLM driver and provider registry ([#88](#88)) ([ae3f18b](ae3f18b)), closes [#4](#4) * implement LLM decomposition strategy and workspace isolation ([#174](#174)) ([aa0eefe](aa0eefe)) * implement meeting protocol system ([#123](#123)) ([ee7caca](ee7caca)) * implement message and communication domain models ([#74](#74)) ([560a5d2](560a5d2)) * implement model routing engine ([#99](#99)) ([d3c250b](d3c250b)) * implement parallel agent execution ([#22](#22)) ([#161](#161)) ([65940b3](65940b3)) * implement per-call cost tracking service ([#7](#7)) ([#102](#102)) ([c4f1f1c](c4f1f1c)) * implement personality injection and system prompt construction ([#105](#105)) ([934dd85](934dd85)) * implement single-task execution lifecycle ([#21](#21)) ([#144](#144)) ([c7e64e4](c7e64e4)) * implement subprocess sandbox for tool execution isolation ([#131](#131)) ([#153](#153)) ([3c8394e](3c8394e)) * implement task assignment subsystem with pluggable strategies ([#172](#172)) ([c7f1b26](c7f1b26)), closes [#26](#26) [#30](#30) * implement task decomposition and routing engine ([#14](#14)) ([9c7fb52](9c7fb52)) * implement Task, Project, Artifact, Budget, and Cost domain models ([#71](#71)) ([81eabf1](81eabf1)) * implement tool permission checking ([#16](#16)) ([833c190](833c190)) * implement YAML config loader with Pydantic validation ([#59](#59)) ([ff3a2ba](ff3a2ba)) * implement YAML config loader with Pydantic validation ([#75](#75)) ([ff3a2ba](ff3a2ba)) * initialize project with uv, hatchling, and src layout ([39005f9](39005f9)) * initialize project with uv, hatchling, and src layout ([#62](#62)) ([39005f9](39005f9)) * Litestar REST API, WebSocket feed, and approval queue (M6) ([#189](#189)) ([29fcd08](29fcd08)) * make TokenUsage.total_tokens a computed field ([#118](#118)) ([c0bab18](c0bab18)), closes [#109](#109) * parallel tool execution in ToolInvoker.invoke_all ([#137](#137)) ([58517ee](58517ee)) * testing framework, CI pipeline, and M0 gap fixes ([#64](#64)) ([f581749](f581749)) * wire all modules into observability system ([#97](#97)) ([f7a0617](f7a0617)) ### Bug Fixes * address Greptile post-merge review findings from PRs [#170](https://github.com/Aureliolo/ai-company/issues/170)-[#175](https://github.com/Aureliolo/ai-company/issues/175) ([#176](#176)) ([c5ca929](c5ca929)) * address post-merge review feedback from PRs [#164](https://github.com/Aureliolo/ai-company/issues/164)-[#167](https://github.com/Aureliolo/ai-company/issues/167) ([#170](#170)) ([3bf897a](3bf897a)), closes [#169](#169) * enforce strict mypy on test files ([#89](#89)) ([aeeff8c](aeeff8c)) * harden Docker sandbox, MCP bridge, and code runner ([#50](#50), [#53](#53)) ([d5e1b6e](d5e1b6e)) * harden git tools security + code quality improvements ([#150](#150)) ([000a325](000a325)) * harden subprocess cleanup, env filtering, and shutdown resilience ([#155](#155)) ([d1fe1fb](d1fe1fb)) * incorporate post-merge feedback + pre-PR review fixes ([#164](#164)) ([c02832a](c02832a)) * pre-PR review fixes for post-merge findings ([#183](#183)) ([26b3108](26b3108)) * strengthen immutability for BaseTool schema and ToolInvoker boundaries ([#117](#117)) ([7e5e861](7e5e861)) ### Performance * harden non-inferable principle implementation ([#195](#195)) ([02b5f4e](02b5f4e)), closes [#188](#188) ### Refactoring * adopt NotBlankStr across all models ([#108](#108)) ([#120](#120)) ([ef89b90](ef89b90)) * extract _SpendingTotals base class from spending summary models ([#111](#111)) ([2f39c1b](2f39c1b)) * harden BudgetEnforcer with error handling, validation extraction, and review fixes ([#182](#182)) ([c107bf9](c107bf9)) * harden personality profiles, department validation, and template rendering ([#158](#158)) ([10b2299](10b2299)) * pre-PR review improvements for ExecutionLoop + ReAct loop ([#124](#124)) ([8dfb3c0](8dfb3c0)) * split events.py into per-domain event modules ([#136](#136)) ([e9cba89](e9cba89)) ### Documentation * add ADR-001 memory layer evaluation and selection ([#178](#178)) ([db3026f](db3026f)), closes [#39](#39) * add agent scaling research findings to DESIGN_SPEC ([#145](#145)) ([57e487b](57e487b)) * add CLAUDE.md, contributing guide, and dev documentation ([#65](#65)) ([55c1025](55c1025)), closes [#54](#54) * add crash recovery, sandboxing, analytics, and testing decisions ([#127](#127)) ([5c11595](5c11595)) * address external review feedback with MVP scope and new protocols ([#128](#128)) ([3b30b9a](3b30b9a)) * expand design spec with pluggable strategy protocols ([#121](#121)) ([6832db6](6832db6)) * finalize 23 design decisions (ADR-002) ([#190](#190)) ([8c39742](8c39742)) * update project docs for M2.5 conventions and add docs-consistency review agent ([#114](#114)) ([99766ee](99766ee)) ### Tests * add e2e single agent integration tests ([#24](#24)) ([#156](#156)) ([f566fb4](f566fb4)) * add provider adapter integration tests ([#90](#90)) ([40a61f4](40a61f4)) ### CI/CD * add Release Please for automated versioning and GitHub Releases ([#278](#278)) ([a488758](a488758)) * bump actions/checkout from 4 to 6 ([#95](#95)) ([1897247](1897247)) * bump actions/upload-artifact from 4 to 7 ([#94](#94)) ([27b1517](27b1517)) * harden CI/CD pipeline ([#92](#92)) ([ce4693c](ce4693c)) * split vulnerability scans into critical-fail and high-warn tiers ([#277](#277)) ([aba48af](aba48af)) ### Maintenance * add /worktree skill for parallel worktree management ([#171](#171)) ([951e337](951e337)) * add design spec context loading to research-link skill ([8ef9685](8ef9685)) * add post-merge-cleanup skill ([#70](#70)) ([f913705](f913705)) * add pre-pr-review skill and update CLAUDE.md ([#103](#103)) ([92e9023](92e9023)) * add research-link skill and rename skill files to SKILL.md ([#101](#101)) ([651c577](651c577)) * bump aiosqlite from 0.21.0 to 0.22.1 ([#191](#191)) ([3274a86](3274a86)) * bump pyyaml from 6.0.2 to 6.0.3 in the minor-and-patch group ([#96](#96)) ([0338d0c](0338d0c)) * bump ruff from 0.15.4 to 0.15.5 ([a49ee46](a49ee46)) * fix M0 audit items ([#66](#66)) ([c7724b5](c7724b5)) * pin setup-uv action to full SHA ([#281](#281)) ([4448002](4448002)) * post-audit cleanup — PEP 758, loggers, bug fixes, refactoring, tests, hookify rules ([#148](#148)) ([c57a6a9](c57a6a9)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
🤖 I have created a release *beep* *boop* --- ## [0.1.0](v0.0.0...v0.1.0) (2026-03-11) ### Features * add autonomy levels and approval timeout policies ([#42](#42), [#126](#126)) ([#197](#197)) ([eecc25a](eecc25a)) * add CFO cost optimization service with anomaly detection, reports, and approval decisions ([#186](#186)) ([a7fa00b](a7fa00b)) * add code quality toolchain (ruff, mypy, pre-commit, dependabot) ([#63](#63)) ([36681a8](36681a8)) * add configurable cost tiers and subscription/quota-aware tracking ([#67](#67)) ([#185](#185)) ([9baedfa](9baedfa)) * add container packaging, Docker Compose, and CI pipeline ([#269](#269)) ([435bdfe](435bdfe)), closes [#267](#267) * add coordination error taxonomy classification pipeline ([#146](#146)) ([#181](#181)) ([70c7480](70c7480)) * add cost-optimized, hierarchical, and auction assignment strategies ([#175](#175)) ([ce924fa](ce924fa)), closes [#173](#173) * add design specification, license, and project setup ([8669a09](8669a09)) * add env var substitution and config file auto-discovery ([#77](#77)) ([7f53832](7f53832)) * add FastestStrategy routing + vendor-agnostic cleanup ([#140](#140)) ([09619cb](09619cb)), closes [#139](#139) * add HR engine and performance tracking ([#45](#45), [#47](#47)) ([#193](#193)) ([2d091ea](2d091ea)) * add issue auto-search and resolution verification to PR review skill ([#119](#119)) ([deecc39](deecc39)) * add mandatory JWT + API key authentication ([#256](#256)) ([c279cfe](c279cfe)) * add memory retrieval, ranking, and context injection pipeline ([#41](#41)) ([873b0aa](873b0aa)) * add pluggable MemoryBackend protocol with models, config, and events ([#180](#180)) ([46cfdd4](46cfdd4)) * add pluggable MemoryBackend protocol with models, config, and events ([#32](#32)) ([46cfdd4](46cfdd4)) * add pluggable output scan response policies ([#263](#263)) ([b9907e8](b9907e8)) * add pluggable PersistenceBackend protocol with SQLite implementation ([#36](#36)) ([f753779](f753779)) * add progressive trust and promotion/demotion subsystems ([#43](#43), [#49](#49)) ([3a87c08](3a87c08)) * add retry handler, rate limiter, and provider resilience ([#100](#100)) ([b890545](b890545)) * add SecOps security agent with rule engine, audit log, and ToolInvoker integration ([#40](#40)) ([83b7b6c](83b7b6c)) * add shared org memory and memory consolidation/archival ([#125](#125), [#48](#48)) ([4a0832b](4a0832b)) * design unified provider interface ([#86](#86)) ([3e23d64](3e23d64)) * expand template presets, rosters, and add inheritance ([#80](#80), [#81](#81), [#84](#84)) ([15a9134](15a9134)) * implement agent runtime state vs immutable config split ([#115](#115)) ([4cb1ca5](4cb1ca5)) * implement AgentEngine core orchestrator ([#11](#11)) ([#143](#143)) ([f2eb73a](f2eb73a)) * implement AuditRepository for security audit log persistence ([#279](#279)) ([94bc29f](94bc29f)) * implement basic tool system (registry, invocation, results) ([#15](#15)) ([c51068b](c51068b)) * implement built-in file system tools ([#18](#18)) ([325ef98](325ef98)) * implement communication foundation — message bus, dispatcher, and messenger ([#157](#157)) ([8e71bfd](8e71bfd)) * implement company template system with 7 built-in presets ([#85](#85)) ([cbf1496](cbf1496)) * implement conflict resolution protocol ([#122](#122)) ([#166](#166)) ([e03f9f2](e03f9f2)) * implement core entity and role system models ([#69](#69)) ([acf9801](acf9801)) * implement crash recovery with fail-and-reassign strategy ([#149](#149)) ([e6e91ed](e6e91ed)) * implement engine extensions — Plan-and-Execute loop and call categorization ([#134](#134), [#135](#135)) ([#159](#159)) ([9b2699f](9b2699f)) * implement enterprise logging system with structlog ([#73](#73)) ([2f787e5](2f787e5)) * implement graceful shutdown with cooperative timeout strategy ([#130](#130)) ([6592515](6592515)) * implement hierarchical delegation and loop prevention ([#12](#12), [#17](#17)) ([6be60b6](6be60b6)) * implement LiteLLM driver and provider registry ([#88](#88)) ([ae3f18b](ae3f18b)), closes [#4](#4) * implement LLM decomposition strategy and workspace isolation ([#174](#174)) ([aa0eefe](aa0eefe)) * implement meeting protocol system ([#123](#123)) ([ee7caca](ee7caca)) * implement message and communication domain models ([#74](#74)) ([560a5d2](560a5d2)) * implement model routing engine ([#99](#99)) ([d3c250b](d3c250b)) * implement parallel agent execution ([#22](#22)) ([#161](#161)) ([65940b3](65940b3)) * implement per-call cost tracking service ([#7](#7)) ([#102](#102)) ([c4f1f1c](c4f1f1c)) * implement personality injection and system prompt construction ([#105](#105)) ([934dd85](934dd85)) * implement single-task execution lifecycle ([#21](#21)) ([#144](#144)) ([c7e64e4](c7e64e4)) * implement subprocess sandbox for tool execution isolation ([#131](#131)) ([#153](#153)) ([3c8394e](3c8394e)) * implement task assignment subsystem with pluggable strategies ([#172](#172)) ([c7f1b26](c7f1b26)), closes [#26](#26) [#30](#30) * implement task decomposition and routing engine ([#14](#14)) ([9c7fb52](9c7fb52)) * implement Task, Project, Artifact, Budget, and Cost domain models ([#71](#71)) ([81eabf1](81eabf1)) * implement tool permission checking ([#16](#16)) ([833c190](833c190)) * implement YAML config loader with Pydantic validation ([#59](#59)) ([ff3a2ba](ff3a2ba)) * implement YAML config loader with Pydantic validation ([#75](#75)) ([ff3a2ba](ff3a2ba)) * initialize project with uv, hatchling, and src layout ([39005f9](39005f9)) * initialize project with uv, hatchling, and src layout ([#62](#62)) ([39005f9](39005f9)) * Litestar REST API, WebSocket feed, and approval queue (M6) ([#189](#189)) ([29fcd08](29fcd08)) * make TokenUsage.total_tokens a computed field ([#118](#118)) ([c0bab18](c0bab18)), closes [#109](#109) * parallel tool execution in ToolInvoker.invoke_all ([#137](#137)) ([58517ee](58517ee)) * testing framework, CI pipeline, and M0 gap fixes ([#64](#64)) ([f581749](f581749)) * wire all modules into observability system ([#97](#97)) ([f7a0617](f7a0617)) ### Bug Fixes * address Greptile post-merge review findings from PRs [#170](https://github.com/Aureliolo/ai-company/issues/170)-[#175](https://github.com/Aureliolo/ai-company/issues/175) ([#176](#176)) ([c5ca929](c5ca929)) * address post-merge review feedback from PRs [#164](https://github.com/Aureliolo/ai-company/issues/164)-[#167](https://github.com/Aureliolo/ai-company/issues/167) ([#170](#170)) ([3bf897a](3bf897a)), closes [#169](#169) * enforce strict mypy on test files ([#89](#89)) ([aeeff8c](aeeff8c)) * harden Docker sandbox, MCP bridge, and code runner ([#50](#50), [#53](#53)) ([d5e1b6e](d5e1b6e)) * harden git tools security + code quality improvements ([#150](#150)) ([000a325](000a325)) * harden subprocess cleanup, env filtering, and shutdown resilience ([#155](#155)) ([d1fe1fb](d1fe1fb)) * incorporate post-merge feedback + pre-PR review fixes ([#164](#164)) ([c02832a](c02832a)) * pre-PR review fixes for post-merge findings ([#183](#183)) ([26b3108](26b3108)) * resolve circular imports, bump litellm, fix release tag format ([#286](#286)) ([a6659b5](a6659b5)) * strengthen immutability for BaseTool schema and ToolInvoker boundaries ([#117](#117)) ([7e5e861](7e5e861)) ### Performance * harden non-inferable principle implementation ([#195](#195)) ([02b5f4e](02b5f4e)), closes [#188](#188) ### Refactoring * adopt NotBlankStr across all models ([#108](#108)) ([#120](#120)) ([ef89b90](ef89b90)) * extract _SpendingTotals base class from spending summary models ([#111](#111)) ([2f39c1b](2f39c1b)) * harden BudgetEnforcer with error handling, validation extraction, and review fixes ([#182](#182)) ([c107bf9](c107bf9)) * harden personality profiles, department validation, and template rendering ([#158](#158)) ([10b2299](10b2299)) * pre-PR review improvements for ExecutionLoop + ReAct loop ([#124](#124)) ([8dfb3c0](8dfb3c0)) * split events.py into per-domain event modules ([#136](#136)) ([e9cba89](e9cba89)) ### Documentation * add ADR-001 memory layer evaluation and selection ([#178](#178)) ([db3026f](db3026f)), closes [#39](#39) * add agent scaling research findings to DESIGN_SPEC ([#145](#145)) ([57e487b](57e487b)) * add CLAUDE.md, contributing guide, and dev documentation ([#65](#65)) ([55c1025](55c1025)), closes [#54](#54) * add crash recovery, sandboxing, analytics, and testing decisions ([#127](#127)) ([5c11595](5c11595)) * address external review feedback with MVP scope and new protocols ([#128](#128)) ([3b30b9a](3b30b9a)) * expand design spec with pluggable strategy protocols ([#121](#121)) ([6832db6](6832db6)) * finalize 23 design decisions (ADR-002) ([#190](#190)) ([8c39742](8c39742)) * update project docs for M2.5 conventions and add docs-consistency review agent ([#114](#114)) ([99766ee](99766ee)) ### Tests * add e2e single agent integration tests ([#24](#24)) ([#156](#156)) ([f566fb4](f566fb4)) * add provider adapter integration tests ([#90](#90)) ([40a61f4](40a61f4)) ### CI/CD * add Release Please for automated versioning and GitHub Releases ([#278](#278)) ([a488758](a488758)) * bump actions/checkout from 4 to 6 ([#95](#95)) ([1897247](1897247)) * bump actions/upload-artifact from 4 to 7 ([#94](#94)) ([27b1517](27b1517)) * bump anchore/scan-action from 6.5.1 to 7.3.2 ([#271](#271)) ([80a1c15](80a1c15)) * bump docker/build-push-action from 6.19.2 to 7.0.0 ([#273](#273)) ([dd0219e](dd0219e)) * bump docker/login-action from 3.7.0 to 4.0.0 ([#272](#272)) ([33d6238](33d6238)) * bump docker/metadata-action from 5.10.0 to 6.0.0 ([#270](#270)) ([baee04e](baee04e)) * bump docker/setup-buildx-action from 3.12.0 to 4.0.0 ([#274](#274)) ([5fc06f7](5fc06f7)) * bump sigstore/cosign-installer from 3.9.1 to 4.1.0 ([#275](#275)) ([29dd16c](29dd16c)) * harden CI/CD pipeline ([#92](#92)) ([ce4693c](ce4693c)) * split vulnerability scans into critical-fail and high-warn tiers ([#277](#277)) ([aba48af](aba48af)) ### Maintenance * add /worktree skill for parallel worktree management ([#171](#171)) ([951e337](951e337)) * add design spec context loading to research-link skill ([8ef9685](8ef9685)) * add post-merge-cleanup skill ([#70](#70)) ([f913705](f913705)) * add pre-pr-review skill and update CLAUDE.md ([#103](#103)) ([92e9023](92e9023)) * add research-link skill and rename skill files to SKILL.md ([#101](#101)) ([651c577](651c577)) * bump aiosqlite from 0.21.0 to 0.22.1 ([#191](#191)) ([3274a86](3274a86)) * bump pyyaml from 6.0.2 to 6.0.3 in the minor-and-patch group ([#96](#96)) ([0338d0c](0338d0c)) * bump ruff from 0.15.4 to 0.15.5 ([a49ee46](a49ee46)) * fix M0 audit items ([#66](#66)) ([c7724b5](c7724b5)) * **main:** release ai-company 0.1.1 ([#282](#282)) ([2f4703d](2f4703d)) * pin setup-uv action to full SHA ([#281](#281)) ([4448002](4448002)) * post-audit cleanup — PEP 758, loggers, bug fixes, refactoring, tests, hookify rules ([#148](#148)) ([c57a6a9](c57a6a9)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --------- Signed-off-by: Aurelio <19254254+Aureliolo@users.noreply.github.com>
Summary
InMemoryMessageBuswith pub/sub, direct messaging, broadcast, channel management, and message history (DESIGN_SPEC §5.4)MessageDispatcherwith concurrent handler dispatch viaasyncio.TaskGroup, type/priority filtering, and error isolationAgentMessengerper-agent facade that auto-fills sender, timestamp, and message IDMessageBusprotocol (@runtime_checkable) for swappable bus backendsMessageHandlerprotocol,FunctionHandleradapter, andHandlerRegistrationmodelSubscription,DeliveryEnvelopemodels and full communication error hierarchyCOMM_*) inobservability/events/communication.pyDESIGN_SPEC.md§15.3 with all new communication filesCloses #8, Closes #10
Key Design Decisions
receive()rather than registering push callbacks@computed_fieldforDispatchResult.handlers_matched(derived from succeeded + failed)MappingProxyTypefor_PRIORITY_ORDERimmutable dictNotBlankStrfor handler registration identifier fieldsinspect.iscoroutinefunctionvalidation inFunctionHandler(Python 3.14 compatible)@{sorted(a,b)}naming for DM channelsTest Plan
Review Coverage
Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 29 findings addressed, 0 skipped.
🤖 Generated with Claude Code