Skip to content

feat: implement communication foundation — message bus, dispatcher, and messenger#157

Merged
Aureliolo merged 4 commits intomainfrom
feat/communication-foundation
Mar 7, 2026
Merged

feat: implement communication foundation — message bus, dispatcher, and messenger#157
Aureliolo merged 4 commits intomainfrom
feat/communication-foundation

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

Summary

  • Implement InMemoryMessageBus with pub/sub, direct messaging, broadcast, channel management, and message history (DESIGN_SPEC §5.4)
  • Implement MessageDispatcher with concurrent handler dispatch via asyncio.TaskGroup, type/priority filtering, and error isolation
  • Implement AgentMessenger per-agent facade that auto-fills sender, timestamp, and message ID
  • Add MessageBus protocol (@runtime_checkable) for swappable bus backends
  • Add MessageHandler protocol, FunctionHandler adapter, and HandlerRegistration model
  • Add Subscription, DeliveryEnvelope models and full communication error hierarchy
  • Add 29 structured logging event constants (COMM_*) in observability/events/communication.py
  • Update DESIGN_SPEC.md §15.3 with all new communication files

Closes #8, Closes #10

Key Design Decisions

  • Pull model: consumers call receive() rather than registering push callbacks
  • @computed_field for DispatchResult.handlers_matched (derived from succeeded + failed)
  • MappingProxyType for _PRIORITY_ORDER immutable dict
  • NotBlankStr for handler registration identifier fields
  • inspect.iscoroutinefunction validation in FunctionHandler (Python 3.14 compatible)
  • Deterministic direct channels: @{sorted(a,b)} naming for DM channels

Test Plan

  • 2605 tests pass (0 failures)
  • 96.47% coverage (80% minimum)
  • ruff lint + format clean
  • mypy strict — no issues
  • New tests: protocol conformance, combined type+priority filtering, send_direct on stopped bus, unsubscribe nonexistent channel, receive without timeout, direct channel history, FunctionHandler validation, deregister without dispatcher

Review Coverage

Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 29 findings addressed, 0 skipped.

🤖 Generated with Claude Code

Aureliolo and others added 2 commits March 7, 2026 16:43
…nd messenger (#8, #10)

Add the runtime communication layer for multi-agent orchestration:

- Error hierarchy (CommunicationError + 5 subclasses) with immutable context
- MessageBus protocol (pull model) and InMemoryMessageBus implementation
  (asyncio Lock+Queue, FIFO, fan-out, BROADCAST, direct messaging, retention)
- MessageHandler protocol, FunctionHandler adapter, HandlerRegistration model
- MessageDispatcher with concurrent TaskGroup dispatch and error isolation
- AgentMessenger per-agent facade (send, direct, broadcast, subscribe, handlers)
- Subscription and DeliveryEnvelope models
- MessageRetentionConfig added to MessageBusConfig
- 20 observability event constants for communication domain
- 114 unit tests (all passing), 96.46% coverage

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Pre-reviewed by 9 agents, 29 findings addressed:

- Use @computed_field for DispatchResult.handlers_matched (derived value)
- Add DESIGN_SPEC Section 5 references to module docstrings
- Add MappingProxyType wrapping for _PRIORITY_ORDER dict
- Use NotBlankStr for HandlerRegistration identifier fields
- Add FunctionHandler async validation via inspect.iscoroutinefunction
- Add agent_id/agent_name blank validation in AgentMessenger.__init__
- Improve logging: COMM_BUS_NOT_RUNNING, COMM_BUS_ALREADY_RUNNING events
- Log COMM_HANDLER_DEREGISTER_MISS and COMM_DISPATCH_NO_DISPATCHER
- Refactor send_direct into 3 focused methods
- Add unsubscribe running guard for consistency
- Fix docstrings: broadcast semantics, receive blocking, send_direct channel
- Update DESIGN_SPEC §15.3 with 8 new communication files + events module
- Add 7 new tests: protocol conformance, combined filters, edge cases
- Polish: dict.setdefault, tuple(pair), simplified validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 7, 2026 16:22
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 7, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 7, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 9bc70286-4b75-4337-9daa-654167422752

📥 Commits

Reviewing files that changed from the base of the PR and between 5fcd681 and dc36a44.

📒 Files selected for processing (2)
  • src/ai_company/communication/bus_memory.py
  • src/ai_company/observability/events/communication.py

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Modular message bus with in-process backend, channel subscriptions, message history and retention.
    • Per-agent messenger for sending, broadcasting and deterministic direct chats.
    • Message dispatcher with handler registration, priority filtering, and delivery tracking (envelopes).
    • New observability event names and richer communication error types.
  • Refactor

    • Communication API reorganized from monolithic to protocol-driven, pluggable components.
  • Tests

    • Extensive unit tests covering bus, dispatcher, messenger, handlers, subscriptions and errors.

Walkthrough

Adds a protocol-driven communication subsystem: a MessageBus protocol with an InMemoryMessageBus backend, channel/subscription and delivery models, MessageDispatcher with concurrent handler dispatch and DispatchResult, AgentMessenger facade, errors, observability event constants, retention config, and comprehensive unit tests.

Changes

Cohort / File(s) Summary
Message Bus Core
src/ai_company/communication/bus_protocol.py, src/ai_company/communication/bus_memory.py
Adds MessageBus protocol and InMemoryMessageBus implementation: async lifecycle, publish/send_direct, channel create/get/list, subscribe/unsubscribe, receive with timeout/shutdown handling, per-channel FIFO history and retention, and lifecycle events.
Models & Config
src/ai_company/communication/config.py, src/ai_company/communication/subscription.py, src/ai_company/communication/message.py
Introduces MessageRetentionConfig on MessageBusConfig; adds immutable Subscription and DeliveryEnvelope models; relocates/refactors Message model.
Dispatch & Handlers
src/ai_company/communication/handler.py, src/ai_company/communication/dispatcher.py
Adds MessageHandler protocol, FunctionHandler, HandlerRegistration, priority utilities, MessageDispatcher and DispatchResult supporting concurrent, per-handler-isolated dispatch with type/priority filtering and observability events.
Per-agent Facade
src/ai_company/communication/messenger.py
Adds AgentMessenger facade: builds/sends messages, deterministic direct-channel naming, broadcast, subscribe/unsubscribe delegation, handler register/deregister, and dispatch passthrough (auto-creates dispatcher if needed).
Errors
src/ai_company/communication/errors.py
Adds CommunicationError with immutable context and subclasses: ChannelNotFoundError, ChannelAlreadyExistsError, NotSubscribedError, MessageBusNotRunningError, MessageBusAlreadyRunningError.
Observability
src/ai_company/observability/events/communication.py
Adds typed event-name constants for bus lifecycle, channels, publish/deliver, messenger operations, dispatcher/handler lifecycle, receive timeouts, and shutdown.
Package exports
src/ai_company/communication/__init__.py
Re-exports new public primitives (InMemoryMessageBus, MessageBus, MessageDispatcher, DispatchResult, AgentMessenger, handlers, errors, Subscription, DeliveryEnvelope, retention config, etc.).
Tests & Fixtures
tests/unit/communication/conftest.py, tests/unit/communication/test_*.py, tests/unit/observability/test_events.py
Comprehensive unit tests covering bus lifecycle, pub/sub and direct messaging, retention/history, receive semantics and timeouts, dispatcher routing/concurrency/error isolation, handler registration behavior, error classes, subscription/delivery models, and event constants.

Sequence Diagram(s)

sequenceDiagram
    participant AgentA as Agent A
    participant MessengerA as AgentMessenger(A)
    participant MessageBus as InMemoryMessageBus
    participant Queue as DeliveryQueue
    participant MessengerB as AgentMessenger(B)
    participant AgentB as Agent B

    AgentA->>MessengerA: send_message(to="AgentB", channel="#eng", content)
    MessengerA->>MessengerA: construct Message (id, ts, sender, channel, type, priority)
    MessengerA->>MessageBus: publish(message)
    MessageBus->>MessageBus: validate channel, append history
    MessageBus->>Queue: enqueue DeliveryEnvelope for subscribers/recipient
    MessageBus-->>MessengerA: ack
    AgentB->>MessengerB: receive(channel="#eng")
    Queue-->>MessengerB: deliver DeliveryEnvelope
    MessengerB->>AgentB: deliver envelope.message
Loading
sequenceDiagram
    participant Messenger as AgentMessenger
    participant Dispatcher as MessageDispatcher
    participant Handler1 as Handler 1
    participant Handler2 as Handler 2

    Messenger->>Dispatcher: register_handler(handler1, types, min_priority)
    Dispatcher-->>Messenger: handler_id_1
    Messenger->>Dispatcher: register_handler(handler2, ...)
    Dispatcher-->>Messenger: handler_id_2

    Messenger->>Dispatcher: dispatch_message(message)
    Dispatcher->>Dispatcher: find matching registrations
    par concurrent handlers
        Dispatcher->>Handler1: handle(message)
        Dispatcher->>Handler2: handle(message)
    end
    Handler1-->>Dispatcher: success / error
    Handler2-->>Dispatcher: success / error
    Dispatcher->>Dispatcher: aggregate results -> DispatchResult
    Dispatcher-->>Messenger: return DispatchResult
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 39.44% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: implementing the core communication infrastructure (message bus, dispatcher, and messenger) as described in the design spec.
Description check ✅ Passed The description comprehensively relates to the changeset, detailing implementation of all major components (InMemoryMessageBus, MessageDispatcher, AgentMessenger, protocols, models, error hierarchy, logging) with test results and references to linked issues.
Linked Issues check ✅ Passed The PR directly addresses all requirements from issues #8 and #10: abstract MessageBus protocol [#8], InMemoryMessageBus backend [#8], pub/sub channels [#8], direct messaging [#8,#10], message persistence/retention [#8], FIFO ordering [#8], AgentMessenger facade [#10], structured messages [#10], message types/priorities [#10], handler registration/dispatch [#10], and comprehensive tests [#8,#10].
Out of Scope Changes check ✅ Passed All changes are in-scope: communication foundation components (bus, dispatcher, messenger), supporting protocols/models/errors, observability constants, test fixtures, and documentation updates directly align with design spec sections 5.1, 5.3, 5.4 and linked issue requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/communication-foundation
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/communication-foundation

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request establishes the foundational communication layer for agents, enabling robust and flexible inter-agent messaging. It introduces core components like a message bus for various communication patterns, a dispatcher for handling incoming messages, and a messenger facade for agents to interact with the system. The design emphasizes a pull model for message consumption, concurrent handler execution, and clear error handling, providing a scalable and observable communication framework.

Highlights

  • Message Bus Implementation: Implemented InMemoryMessageBus with pub/sub, direct messaging, broadcast, channel management, and message history.
  • Message Dispatcher: Introduced MessageDispatcher for concurrent handler dispatch via asyncio.TaskGroup, including type/priority filtering and error isolation.
  • Agent Messenger Facade: Created AgentMessenger as a per-agent facade that automatically fills sender, timestamp, and message ID for simplified interaction.
  • Communication Protocols and Models: Defined MessageBus and MessageHandler protocols for swappable backends and flexible handler registration, along with Subscription, DeliveryEnvelope models, and a comprehensive communication error hierarchy.
  • Structured Logging: Incorporated 29 new structured logging event constants (COMM_*) for communication-related activities.
  • Documentation Update: Updated the DESIGN_SPEC.md to reflect the new communication architecture and file structure.
Changelog
  • DESIGN_SPEC.md
    • Updated the design specification to include new communication components and logging events.
  • src/ai_company/communication/init.py
    • Updated the communication package's __init__.py to export newly added modules and classes.
  • src/ai_company/communication/bus_memory.py
    • Implemented the InMemoryMessageBus for in-process message handling, supporting pub/sub, direct messages, and history.
  • src/ai_company/communication/bus_protocol.py
    • Defined the MessageBus protocol, outlining the interface for message bus implementations.
  • src/ai_company/communication/config.py
    • Added MessageRetentionConfig and updated MessageBusConfig to include message retention settings.
  • src/ai_company/communication/dispatcher.py
    • Introduced MessageDispatcher for routing messages to registered handlers with filtering capabilities and DispatchResult for summarizing dispatch outcomes.
  • src/ai_company/communication/errors.py
    • Established a hierarchy of custom exceptions for communication-related errors.
  • src/ai_company/communication/handler.py
    • Added the MessageHandler protocol, FunctionHandler adapter for async functions, and HandlerRegistration model.
  • src/ai_company/communication/messenger.py
    • Provided AgentMessenger as a high-level interface for agents to send, receive, and dispatch messages.
  • src/ai_company/communication/subscription.py
    • Defined Subscription and DeliveryEnvelope models for managing channel subscriptions and message delivery metadata.
  • src/ai_company/observability/events/communication.py
    • Added a comprehensive set of COMM_* constants for structured logging of communication events.
  • tests/unit/communication/conftest.py
    • Updated communication test fixtures and factories to support new models and configurations.
  • tests/unit/communication/test_bus_memory.py
    • Added extensive unit tests for the InMemoryMessageBus functionality, including lifecycle, channel management, subscriptions, publishing, direct messaging, retention, history, and concurrency.
  • tests/unit/communication/test_dispatcher.py
    • Added unit tests for MessageDispatcher, covering registration, deregistration, routing, priority filtering, error isolation, concurrent execution, and dispatch results.
  • tests/unit/communication/test_errors.py
    • Added unit tests to verify the behavior and structure of the communication error hierarchy.
  • tests/unit/communication/test_handler.py
    • Added unit tests for the MessageHandler protocol, FunctionHandler adapter, HandlerRegistration, and priority comparison logic.
  • tests/unit/communication/test_messenger.py
    • Added unit tests for AgentMessenger, covering message sending (general, direct, broadcast), subscriptions, and handler delegation.
  • tests/unit/communication/test_subscription.py
    • Added unit tests for the Subscription and DeliveryEnvelope models, including validation and immutability.
  • tests/unit/observability/test_events.py
    • Updated tests to ensure the new COMM_* event constants are correctly discovered and exist.
Activity
  • The pull request was pre-reviewed by 9 AI agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency).
  • 29 findings from these reviews were addressed, and 0 were skipped.
  • All 2605 existing tests passed, with 0 failures.
  • Code coverage is at 96.47%, exceeding the 80% minimum.
  • ruff lint and format checks passed cleanly.
  • mypy strict checks reported no issues.
  • New tests were added for protocol conformance, combined type+priority filtering, send_direct on stopped bus, unsubscribing from a nonexistent channel, receive without timeout, direct channel history, FunctionHandler validation, and deregistering without a dispatcher.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request lays a comprehensive and well-designed foundation for inter-agent communication, introducing a message bus, dispatcher, and messenger along with their protocols and error handling. However, critical security flaws were identified in the InMemoryMessageBus related to broken access control. Specifically, the bus lacks authorization checks for publishing to channels, subscribing agents to channels, and retrieving channel history. Additionally, the deterministic naming scheme for direct message channels is vulnerable to collisions if agent IDs contain colons. These issues could allow malicious or compromised agents to eavesdrop on private conversations, inject messages into other agents' channels, or disclose sensitive message history. Furthermore, there is a minor docstring inconsistency in the MessageBus protocol that should be addressed to ensure contract alignment.

asyncio.Queue(),
)

async def publish(self, message: Message) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The publish method does not verify if the sender of the message is an authorized participant or subscriber of the target channel. This allows any agent to inject messages into any channel, including private direct message channels between other agents, provided they know the channel name.

Remediation: Implement a check to verify that message.sender is a member of channel.subscribers before allowing the publication to TOPIC or DIRECT channels.

Comment on lines +286 to +335
self,
channel_name: str,
subscriber_id: str,
) -> Subscription:
"""Subscribe an agent to a channel.

Idempotent — returns a fresh subscription record if already
subscribed (the channel's subscriber list is not duplicated).

Args:
channel_name: Channel to subscribe to.
subscriber_id: Agent ID of the subscriber.

Returns:
The subscription record.

Raises:
MessageBusNotRunningError: If not running.
ChannelNotFoundError: If the channel does not exist.
"""
async with self._lock:
self._require_running()
if channel_name not in self._channels:
_raise_channel_not_found(channel_name)
self._known_agents.add(subscriber_id)
channel = self._channels[channel_name]
if subscriber_id in channel.subscribers:
return Subscription(
channel_name=channel_name,
subscriber_id=subscriber_id,
subscribed_at=datetime.now(UTC),
)
new_subs = (*channel.subscribers, subscriber_id)
self._channels[channel_name] = channel.model_copy(
update={"subscribers": new_subs},
)
self._ensure_queue(channel_name, subscriber_id)
now = datetime.now(UTC)
logger.info(
COMM_SUBSCRIPTION_CREATED,
channel=channel_name,
subscriber=subscriber_id,
)
return Subscription(
channel_name=channel_name,
subscriber_id=subscriber_id,
subscribed_at=now,
)

async def unsubscribe(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The subscribe and unsubscribe methods allow any caller to modify the subscription status of any agent ID for any channel. This can be abused to force agents to receive messages from channels they shouldn't be part of (potentially leading to DoS or unauthorized data processing) or to eavesdrop on channels by subscribing a controlled agent ID.

Remediation: Implement authorization checks to ensure that the caller is permitted to modify the subscription for the specified subscriber_id (e.g., an agent should only be able to subscribe/unsubscribe itself).

return tuple(self._channels.values())

async def get_channel_history(
self,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The get_channel_history method allows any caller to retrieve the full message history of any channel, including private direct message channels, without verifying if the caller is a participant or authorized subscriber of that channel. This is a significant information disclosure vulnerability.

Remediation: Implement an authorization check to verify that the caller (agent) is a participant in the channel before returning its history.

"""
sender = message.sender
pair = sorted([sender, recipient])
channel_name = f"@{pair[0]}:{pair[1]}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

Direct message channel names are constructed by joining sorted agent IDs with a colon (@{a}:{b}). If agent IDs are allowed to contain colons, this leads to channel name collisions. For example, agents 'alice' and 'bob:charles' would share the same direct channel name (@alice:bob:charles) as agents 'alice:bob' and 'charles'. This could allow unintended agents to access or inject messages into private conversations.

Remediation: Sanitize agent IDs to disallow the separator character (colon) or use a more robust encoding/structure for channel names that prevents collisions.

Comment on lines +89 to +90
Idempotent — returns the existing subscription if already
subscribed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring here states that an idempotent call "returns the existing subscription if already subscribed." This is inconsistent with the InMemoryMessageBus implementation, which returns a new Subscription object with a fresh timestamp on subsequent calls for an existing subscription. This is because the bus implementation does not store the original Subscription object.

To align the protocol contract with the provided implementation and avoid confusion for future implementers, I recommend updating this docstring. The phrasing used in InMemoryMessageBus's docstring, "returns a fresh subscription record if already subscribed," would be a good replacement.

Suggested change
Idempotentreturns the existing subscription if already
subscribed.
Idempotentreturns a fresh subscription record if already
subscribed.

@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Mar 7, 2026

Greptile Summary

This PR implements the communication foundation described in DESIGN_SPEC §5.4: an InMemoryMessageBus (pub/sub, direct messaging, broadcast, history), a MessageDispatcher with concurrent handler execution via asyncio.TaskGroup, and an AgentMessenger per-agent facade. The overall architecture is well-designed — proper asyncio locking, a shutdown event for graceful receive wakeup, index-based error slots to avoid shared-state issues in concurrent dispatch, and immutable config/result models throughout.

Key findings:

  • Logic issue — unsubscribe() leaves blocked receive() callers stuck: When a subscriber calls unsubscribe() while another coroutine is blocked inside receive(timeout=None) on the same channel, the active waiter holds a local reference to the queue that was just popped from _queues. Since no new messages will arrive (the subscriber is removed) and _shutdown_event is not set (the bus is still running), the waiter hangs indefinitely. A sentinel put_nowait into the evicted queue on unsubscription would fix this.
  • Style — COMM_RECEIVE_TIMEOUT event fires on both timeout and clean shutdown: _await_with_shutdown returns None for two distinct reasons (timeout elapsed vs. _shutdown_event set), but receive() logs COMM_RECEIVE_TIMEOUT for both. This makes it impossible for monitoring to distinguish slow consumers from orderly shutdown.
  • Style — AgentMessenger is missing a receive() wrapper: The facade auto-fills agent_id for every send and subscription method, but there is no receive(channel_name, *, timeout) counterpart. Consumers still need to hold a direct reference to the bus and supply agent_id themselves, breaking the opaque-facade contract.

Confidence Score: 3/5

  • PR is mostly safe to merge but contains one logic issue in unsubscribe() that can cause silent hangs in production receive loops.
  • The dispatcher and handler layers are solid and the previously-raised issues (wrong log constants, message.channel placeholder, receive validation, errors list races) are all addressed. The unsubscribe/receive hang is a real correctness issue that can silently block agents in a production receive loop — not a crash, but a liveness bug. The two style issues are low risk but degrade observability and API ergonomics.
  • Pay close attention to src/ai_company/communication/bus_memory.py (unsubscribe wakeup logic) and src/ai_company/communication/messenger.py (missing receive wrapper).

Important Files Changed

Filename Overview
src/ai_company/communication/bus_memory.py Core in-memory bus implementation — well-structured with proper locking, shutdown signaling via asyncio.Event, and input validation. Two issues found: unsubscribe() doesn't wake blocked receive() callers, and COMM_RECEIVE_TIMEOUT is logged for both timeout and shutdown cases.
src/ai_company/communication/dispatcher.py MessageDispatcher with concurrent handler dispatch via asyncio.TaskGroup; uses index-based error list to avoid shared-state races; DispatchResult uses @computed_field correctly. No issues found.
src/ai_company/communication/messenger.py Per-agent facade correctly auto-fills sender/timestamp/channel for all send paths; send_direct computes the deterministic channel name before constructing the Message. Missing receive() wrapper means consumers must hold a direct bus reference to receive messages.
src/ai_company/communication/handler.py MessageHandler protocol, FunctionHandler adapter with iscoroutinefunction guard, and immutable _PRIORITY_ORDER via MappingProxyType. Clean and correct.
src/ai_company/communication/bus_protocol.py @runtime_checkable MessageBus Protocol with full lifecycle, pub/sub, direct messaging, and history methods. Well-documented with accurate docstrings.

Sequence Diagram

sequenceDiagram
    participant Agent as Agent (consumer)
    participant AM as AgentMessenger
    participant Bus as InMemoryMessageBus
    participant Q as asyncio.Queue
    participant SE as _shutdown_event

    Note over Agent,SE: Send path
    Agent->>AM: send_direct(to, content, type)
    AM->>AM: _direct_channel_name(agent_id, to)
    AM->>AM: Message(channel=@a:b, sender=agent_id)
    AM->>Bus: send_direct(msg, recipient=to)
    Bus->>Bus: _require_running() [under lock]
    Bus->>Bus: _ensure_direct_channel(@a:b)
    Bus->>Q: put_nowait(envelope) × both agents
    Bus-->>AM: return
    AM-->>Agent: return Message

    Note over Agent,SE: Receive path
    Agent->>Bus: receive(@a:b, agent_id, timeout=5.0)
    Bus->>Bus: _require_running() + subscription check [under lock]
    Bus->>Bus: _ensure_queue(@a:b, agent_id)
    Bus->>Bus: _await_with_shutdown(queue, 5.0)
    Bus->>Q: ensure_future(queue.get())
    Bus->>SE: ensure_future(shutdown_event.wait())
    alt message arrives
        Q-->>Bus: DeliveryEnvelope
        Bus-->>Agent: DeliveryEnvelope
    else timeout expires
        Bus-->>Agent: None  [logs COMM_RECEIVE_TIMEOUT]
    else bus stopped
        SE-->>Bus: event set
        Bus-->>Agent: None  [also logs COMM_RECEIVE_TIMEOUT ⚠️]
    end

    Note over Agent,SE: Dispatch path
    Agent->>AM: dispatch_message(envelope.message)
    AM->>AM: MessageDispatcher.dispatch(message)
    AM->>AM: _matches() filter (type + priority)
    par asyncio.TaskGroup
        AM->>AM: _guarded_handle(reg1, message, errors, 0)
    and
        AM->>AM: _guarded_handle(reg2, message, errors, 1)
    end
    AM-->>Agent: DispatchResult(succeeded, failed, errors)
Loading

Last reviewed commit: dc36a44

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/communication/bus_memory.py`:
- Around line 368-398: Add the same running-state and subscription validation to
receive() as other methods: call self._require_running() at the start, and
instead of unconditionally creating an orphan queue with
self._ensure_queue(channel_name, subscriber_id), check whether a queue already
exists for the subscriber (use the internal non-creating accessor — e.g.
_get_queue(channel_name, subscriber_id) or check the channel's subscribers set)
and if the subscriber is not actually subscribed raise a clear error (or return
None per project convention) rather than creating a new queue; only call
_ensure_queue when you intend to create a subscription. Ensure you reference
receive, _require_running, _ensure_queue and the non-creating accessor (e.g.
_get_queue or channel subscriber set) when making the change.
- Around line 461-491: In get_channel_history, the current slice logic
mishandles limit <= 0 because messages[-0:] returns the full list and negative
limits produce incorrect slices; fix by explicitly handling limit values: after
reading messages from self._history[channel_name] (inside the existing async
with self._lock block), if limit is None keep all messages, if limit <= 0 set
messages to an empty list/tuple, otherwise set messages = messages[-limit:];
ensure you update the logged count passed to COMM_HISTORY_QUERIED to reflect the
final number of messages returned and preserve use of _channels, _history,
get_channel_history and COMM_HISTORY_QUERIED.

In `@src/ai_company/communication/bus_protocol.py`:
- Around line 61-66: Update the public bus protocol methods to use NotBlankStr
for identifier/name parameters instead of plain str: change the signature of
send_direct (and the other protocol methods referenced: the methods around the
other ranges that accept channel/agent identifiers) so parameters like
recipient, channel, agent_id, and similar use NotBlankStr from core.types; add
the necessary import for NotBlankStr at the top of the module and adjust any
type hints/annotations and docstrings accordingly so the protocol enforces
non-blank identifiers at the boundary.

In `@src/ai_company/communication/errors.py`:
- Around line 32-35: The exception constructor currently wraps the passed-in
context with MappingProxyType but only copies the top-level mapping, leaving
nested mutable structures shared; update the constructor where self.context is
assigned to perform a deep copy of the incoming context (e.g., use
copy.deepcopy(context) when context is truthy) before wrapping it in
MappingProxyType so nested dicts/lists become immutable, and ensure the import
for copy.deepcopy is added if missing and the assignment in the class
(self.context: MappingProxyType[...] = MappingProxyType(...)) uses the
deep-copied object.

In `@src/ai_company/communication/handler.py`:
- Around line 3-13: This module lacks a structured logger and fails to log
before raising on the "invalid-handler" TypeError branch; add "from
ai_company.observability import get_logger" and define "logger =
get_logger(__name__)" near the top of the file, then update the invalid-handler
branch (the path that currently raises TypeError when a handler is invalid) to
call logger.warning or logger.error with context (e.g., include the offending
handler identifier, message type/priority and function name like the handler
resolution function) immediately before raising the TypeError so the error path
is logged with appropriate context.

In `@tests/unit/communication/test_bus_memory.py`:
- Around line 1-27: Add the module-level pytest timeout marker to this test file
by declaring pytestmark = pytest.mark.timeout(30) at module scope (e.g., near
the top after the imports) so it matches other tests in the communication
module; the file already imports pytest, so just add the module-level variable
to apply the 30s timeout for all tests in
tests/unit/communication/test_bus_memory.py (which covers tests for
InMemoryMessageBus / MessageBus behaviors).

In `@tests/unit/communication/test_messenger.py`:
- Around line 1-18: Add the missing module-level pytest timeout marker by
defining pytestmark = pytest.mark.timeout(30) near the top of the test module
(after the imports) in test_messenger.py so its timeout behavior matches other
files; ensure you import pytest if not already and place the pytestmark
assignment at module scope.

In `@tests/unit/communication/test_subscription.py`:
- Around line 1-14: Add a module-level pytest marker tuple to this test module:
define pytestmark = (pytest.mark.unit, pytest.mark.timeout(30)) at top of
tests/unit/communication/test_subscription.py so every test (e.g., those
exercising Subscription and DeliveryEnvelope) inherits the 30-second timeout and
unit marker; then remove redundant per-test `@pytest.mark.unit` decorators if
present.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7a22f372-d0f3-45ea-b20a-6354245a34f4

📥 Commits

Reviewing files that changed from the base of the PR and between f566fb4 and c0d38bb.

📒 Files selected for processing (19)
  • DESIGN_SPEC.md
  • src/ai_company/communication/__init__.py
  • src/ai_company/communication/bus_memory.py
  • src/ai_company/communication/bus_protocol.py
  • src/ai_company/communication/config.py
  • src/ai_company/communication/dispatcher.py
  • src/ai_company/communication/errors.py
  • src/ai_company/communication/handler.py
  • src/ai_company/communication/messenger.py
  • src/ai_company/communication/subscription.py
  • src/ai_company/observability/events/communication.py
  • tests/unit/communication/conftest.py
  • tests/unit/communication/test_bus_memory.py
  • tests/unit/communication/test_dispatcher.py
  • tests/unit/communication/test_errors.py
  • tests/unit/communication/test_handler.py
  • tests/unit/communication/test_messenger.py
  • tests/unit/communication/test_subscription.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: syntax (no parentheses) for exception handling — ruff enforces this on Python 3.14
Include type hints on all public functions; enforce with mypy strict mode
Use Google-style docstrings on all public classes and functions — enforced by ruff D rules
Use immutability: create new objects, never mutate existing ones. For non-Pydantic internal collections, use copy.deepcopy() at construction + MappingProxyType wrapping. For dict/list fields in frozen Pydantic models, rely on frozen=True and copy.deepcopy() at system boundaries
Use frozen Pydantic models for config/identity; use separate mutable-via-copy models for runtime state that evolves
Use Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict). Use @computed_field for derived values; use NotBlankStr from core.types for all identifier/name fields
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code; prefer structured concurrency over bare create_task
Maintain line length of 88 characters — enforced by ruff
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate input at system boundaries: user input, external APIs, and config files

Files:

  • tests/unit/communication/test_messenger.py
  • src/ai_company/communication/subscription.py
  • src/ai_company/communication/dispatcher.py
  • src/ai_company/communication/bus_protocol.py
  • src/ai_company/communication/messenger.py
  • src/ai_company/observability/events/communication.py
  • src/ai_company/communication/__init__.py
  • tests/unit/communication/test_bus_memory.py
  • tests/unit/observability/test_events.py
  • tests/unit/communication/test_subscription.py
  • src/ai_company/communication/config.py
  • src/ai_company/communication/handler.py
  • tests/unit/communication/test_dispatcher.py
  • tests/unit/communication/test_errors.py
  • tests/unit/communication/conftest.py
  • tests/unit/communication/test_handler.py
  • src/ai_company/communication/bus_memory.py
  • src/ai_company/communication/errors.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use pytest markers: @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, @pytest.mark.slow
Maintain 80% minimum code coverage — enforced in CI
Use asyncio_mode = "auto" in pytest configuration — no manual @pytest.mark.asyncio needed
Set 30-second timeout per test
Use pytest-xdist via -n auto for parallel test execution
Prefer @pytest.mark.parametrize for testing similar cases

Files:

  • tests/unit/communication/test_messenger.py
  • tests/unit/communication/test_bus_memory.py
  • tests/unit/observability/test_events.py
  • tests/unit/communication/test_subscription.py
  • tests/unit/communication/test_dispatcher.py
  • tests/unit/communication/test_errors.py
  • tests/unit/communication/conftest.py
  • tests/unit/communication/test_handler.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code — use get_logger() from observability module
Use variable name logger (not _logger, not log) for logging instances
Always use event name constants from ai_company.observability.events domain-specific modules; import directly (e.g., from ai_company.observability.events.<domain> import EVENT_CONSTANT)
Use structured logging format: logger.info(EVENT, key=value) — never logger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions
Pure data models, enums, and re-exports do NOT need logging

Files:

  • src/ai_company/communication/subscription.py
  • src/ai_company/communication/dispatcher.py
  • src/ai_company/communication/bus_protocol.py
  • src/ai_company/communication/messenger.py
  • src/ai_company/observability/events/communication.py
  • src/ai_company/communication/__init__.py
  • src/ai_company/communication/config.py
  • src/ai_company/communication/handler.py
  • src/ai_company/communication/bus_memory.py
  • src/ai_company/communication/errors.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like example-provider, example-large-001, large/medium/small as aliases, or test-provider, test-small-001 in tests

Files:

  • src/ai_company/communication/subscription.py
  • src/ai_company/communication/dispatcher.py
  • src/ai_company/communication/bus_protocol.py
  • src/ai_company/communication/messenger.py
  • src/ai_company/observability/events/communication.py
  • src/ai_company/communication/__init__.py
  • src/ai_company/communication/config.py
  • src/ai_company/communication/handler.py
  • src/ai_company/communication/bus_memory.py
  • src/ai_company/communication/errors.py
🧠 Learnings (3)
📚 Learning: 2026-03-07T14:50:05.694Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T14:50:05.694Z
Learning: Applies to src/ai_company/**/*.py : Always use event name constants from `ai_company.observability.events` domain-specific modules; import directly (e.g., `from ai_company.observability.events.<domain> import EVENT_CONSTANT`)

Applied to files:

  • src/ai_company/observability/events/communication.py
  • tests/unit/observability/test_events.py
  • DESIGN_SPEC.md
📚 Learning: 2026-03-07T14:50:05.694Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T14:50:05.694Z
Learning: Applies to src/ai_company/providers/**/*.py : RetryConfig and RateLimiterConfig are set per-provider in `ProviderConfig`

Applied to files:

  • tests/unit/communication/conftest.py
📚 Learning: 2026-03-07T14:50:05.694Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T14:50:05.694Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising

Applied to files:

  • src/ai_company/communication/errors.py
🧬 Code graph analysis (9)
src/ai_company/communication/subscription.py (1)
src/ai_company/communication/message.py (1)
  • Message (88-138)
src/ai_company/communication/dispatcher.py (3)
src/ai_company/communication/enums.py (2)
  • MessagePriority (22-32)
  • MessageType (6-19)
src/ai_company/communication/handler.py (6)
  • FunctionHandler (33-60)
  • HandlerRegistration (89-106)
  • MessageHandler (17-26)
  • priority_at_least (73-86)
  • handle (20-26)
  • handle (54-60)
src/ai_company/communication/message.py (1)
  • Message (88-138)
src/ai_company/communication/messenger.py (6)
src/ai_company/communication/bus_protocol.py (5)
  • MessageBus (19-200)
  • publish (47-59)
  • send_direct (61-80)
  • subscribe (82-103)
  • unsubscribe (105-120)
src/ai_company/communication/dispatcher.py (5)
  • DispatchResult (37-59)
  • MessageDispatcher (62-244)
  • register (75-113)
  • deregister (115-138)
  • dispatch (140-223)
src/ai_company/communication/enums.py (2)
  • MessagePriority (22-32)
  • MessageType (6-19)
src/ai_company/communication/handler.py (1)
  • MessageHandler (17-26)
src/ai_company/communication/message.py (1)
  • Message (88-138)
src/ai_company/communication/subscription.py (1)
  • Subscription (9-22)
tests/unit/communication/test_bus_memory.py (6)
src/ai_company/communication/bus_protocol.py (13)
  • MessageBus (19-200)
  • is_running (43-45)
  • start (30-36)
  • stop (38-40)
  • publish (47-59)
  • subscribe (82-103)
  • create_channel (144-158)
  • list_channels (174-180)
  • send_direct (61-80)
  • unsubscribe (105-120)
  • get_channel (160-172)
  • receive (122-142)
  • get_channel_history (182-200)
src/ai_company/communication/channel.py (1)
  • Channel (14-39)
src/ai_company/communication/config.py (2)
  • MessageBusConfig (45-75)
  • MessageRetentionConfig (29-42)
src/ai_company/communication/enums.py (2)
  • ChannelType (35-46)
  • MessageType (6-19)
src/ai_company/communication/errors.py (5)
  • ChannelAlreadyExistsError (50-51)
  • ChannelNotFoundError (46-47)
  • MessageBusAlreadyRunningError (62-63)
  • MessageBusNotRunningError (58-59)
  • NotSubscribedError (54-55)
src/ai_company/communication/message.py (1)
  • Message (88-138)
tests/unit/communication/test_subscription.py (3)
src/ai_company/communication/enums.py (2)
  • MessagePriority (22-32)
  • MessageType (6-19)
src/ai_company/communication/message.py (1)
  • Message (88-138)
src/ai_company/communication/subscription.py (2)
  • DeliveryEnvelope (25-41)
  • Subscription (9-22)
src/ai_company/communication/handler.py (5)
src/ai_company/communication/enums.py (2)
  • MessagePriority (22-32)
  • MessageType (6-19)
src/ai_company/communication/message.py (1)
  • Message (88-138)
tests/unit/communication/test_dispatcher.py (2)
  • handle (39-40)
  • handle (49-50)
tests/unit/communication/test_handler.py (1)
  • handle (44-45)
tests/unit/communication/test_messenger.py (4)
  • handler (299-300)
  • handler (320-321)
  • handler (356-357)
  • handler (412-413)
tests/unit/communication/test_errors.py (1)
src/ai_company/communication/errors.py (6)
  • ChannelAlreadyExistsError (50-51)
  • ChannelNotFoundError (46-47)
  • CommunicationError (11-43)
  • MessageBusAlreadyRunningError (62-63)
  • MessageBusNotRunningError (58-59)
  • NotSubscribedError (54-55)
tests/unit/communication/conftest.py (2)
src/ai_company/communication/config.py (2)
  • MessageRetentionConfig (29-42)
  • MessageBusConfig (45-75)
src/ai_company/communication/subscription.py (2)
  • DeliveryEnvelope (25-41)
  • Subscription (9-22)
src/ai_company/communication/bus_memory.py (7)
src/ai_company/communication/channel.py (1)
  • Channel (14-39)
src/ai_company/communication/config.py (1)
  • MessageBusConfig (45-75)
src/ai_company/communication/errors.py (5)
  • ChannelAlreadyExistsError (50-51)
  • ChannelNotFoundError (46-47)
  • MessageBusAlreadyRunningError (62-63)
  • MessageBusNotRunningError (58-59)
  • NotSubscribedError (54-55)
src/ai_company/communication/subscription.py (2)
  • DeliveryEnvelope (25-41)
  • Subscription (9-22)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/communication/bus_protocol.py (12)
  • is_running (43-45)
  • start (30-36)
  • stop (38-40)
  • publish (47-59)
  • send_direct (61-80)
  • subscribe (82-103)
  • unsubscribe (105-120)
  • receive (122-142)
  • create_channel (144-158)
  • get_channel (160-172)
  • list_channels (174-180)
  • get_channel_history (182-200)
src/ai_company/communication/messenger.py (3)
  • send_direct (116-160)
  • subscribe (207-226)
  • unsubscribe (228-242)
🔇 Additional comments (23)
tests/unit/communication/test_errors.py (1)

1-84: LGTM!

The error hierarchy tests are comprehensive and well-structured:

  • Base CommunicationError behavior is thoroughly tested (message storage, context immutability, string formatting, context copying).
  • Subclass tests use @pytest.mark.parametrize efficiently to validate inheritance and context propagation for all error types.
tests/unit/communication/test_handler.py (1)

1-175: LGTM!

Comprehensive test coverage for the handler framework:

  • Protocol conformance tests validate runtime_checkable behavior.
  • FunctionHandler tests cover async wrapping, error cases for non-callable/sync inputs.
  • HandlerRegistration tests verify defaults, customization, and immutability.
  • priority_at_least tests use parametrize effectively for exhaustive priority combination coverage.
tests/unit/communication/test_dispatcher.py (1)

1-387: LGTM!

Excellent dispatcher test coverage:

  • Registration/deregistration lifecycle including bare function wrapping.
  • Dispatch routing by message type with matching, non-matching, and all-types filters.
  • Priority filtering with boundary conditions (equal, higher, lower).
  • Combined type+priority filtering requiring both to match.
  • Error isolation ensuring one failing handler doesn't block others.
  • Concurrent execution verification that handlers run in parallel.
  • DispatchResult accuracy and immutability validation.
  • Edge cases with no handlers or no matching handlers.
tests/unit/communication/test_messenger.py (1)

36-444: LGTM!

Comprehensive messenger test coverage:

  • Auto-enrichment of sender, timestamp, and message ID.
  • Priority propagation and bus publish invocation.
  • Direct messaging with @direct channel placeholder.
  • Broadcast behavior with default and custom channels.
  • Subscribe/unsubscribe delegation to the bus.
  • Handler registration/deregistration delegation to dispatcher.
  • Auto-creation of dispatcher when not provided.
  • Graceful handling when no dispatcher is configured.
tests/unit/communication/test_bus_memory.py (1)

28-589: LGTM!

Excellent InMemoryMessageBus test coverage:

  • Lifecycle states (start/stop/is_running) with error conditions for double-start and operations on stopped bus.
  • Protocol conformance via isinstance check.
  • Channel management (create/get/list) with duplicate and missing channel errors.
  • Subscription lifecycle including idempotent subscribe and unsubscribe errors.
  • Publish/receive flow with FIFO ordering, fan-out to multiple subscribers, and timeout handling.
  • Blocking receive with timeout=None.
  • Direct messaging with lazy channel creation and deterministic channel naming.
  • Retention limits and history queries with limits.
  • Concurrent publish/receive scenarios with TaskGroup.
tests/unit/communication/conftest.py (3)

58-64: LGTM!

Clean factory additions:

  • MessageRetentionConfigFactory for retention settings.
  • MessageBusConfigFactory properly wires the retention field.

95-102: LGTM!

Well-structured factories for subscription and delivery models:

  • SubscriptionFactory for subscription records.
  • DeliveryEnvelopeFactory with message composition via MessageFactory.

167-192: LGTM!

Useful sample fixtures providing concrete test data:

  • sample_subscription with channel and subscriber info.
  • sample_delivery_envelope composing with sample_message.
  • sample_bus_config with test channel and retention settings.
src/ai_company/communication/dispatcher.py (4)

1-60: LGTM!

Clean DispatchResult model:

  • Frozen Pydantic model with appropriate field constraints (ge=0).
  • @computed_field correctly derives handlers_matched from succeeded + failed.
  • Clear docstrings following Google style.

62-113: LGTM!

register method is well-implemented:

  • Auto-wraps bare async functions with FunctionHandler.
  • Creates immutable HandlerRegistration with filter criteria.
  • Logs registration with structured event constant.

140-223: LGTM!

dispatch method implementation is solid:

  • Uses asyncio.TaskGroup for structured concurrent execution per coding guidelines.
  • Error isolation via _guarded_handle catches Exception while allowing BaseException to propagate (documented behavior).
  • Proper logging at DEBUG for start/no-handlers/matched, INFO for completion.
  • Returns accurate DispatchResult with error aggregation.

225-244: LGTM!

_matches filter logic is correct:

  • Empty message_types means match all types.
  • Priority comparison uses priority_at_least helper for ordering.
src/ai_company/communication/messenger.py (4)

1-69: LGTM!

AgentMessenger initialization is well-designed:

  • Uses __slots__ for memory efficiency.
  • Validates agent_id and agent_name are non-blank at the boundary.
  • Logs creation at DEBUG level.
  • Clear docstring with exception documentation.

70-160: LGTM!

send_message and send_direct methods are correctly implemented:

  • Auto-fill sender, timestamp, and ID via Message construction.
  • send_message delegates to bus.publish for channel-based delivery.
  • send_direct uses @direct placeholder channel and bus.send_direct for point-to-point routing.
  • Proper INFO-level logging with structured events.

162-243: LGTM!

broadcast, subscribe, and unsubscribe methods:

  • Broadcast correctly sets to field to channel name for fan-out semantics.
  • Subscribe/unsubscribe delegate to bus with agent_id, logging state transitions at INFO.
  • Docstrings document raised exceptions per bus contract.

244-315: LGTM!

Handler delegation methods are robust:

  • register_handler auto-creates dispatcher when needed, documented in docstring.
  • deregister_handler gracefully handles missing dispatcher with DEBUG log.
  • dispatch_message returns empty DispatchResult when no dispatcher is configured, avoiding errors.
DESIGN_SPEC.md (1)

2321-2332: LGTM!

The project structure documentation accurately reflects the new communication module files added in this PR. The file organization follows the existing conventions and aligns with the modular, protocol-driven architecture described in the PR objectives.

Also applies to: 2347-2347

src/ai_company/communication/__init__.py (1)

1-81: LGTM!

The expanded public API exports are well-organized with alphabetically sorted __all__ for maintainability. The imports cleanly expose the new communication layer components (bus, dispatcher, messenger, handlers, errors) from a single entry point.

src/ai_company/communication/bus_memory.py (5)

1-45: LGTM on module setup!

Proper imports, logger setup using get_logger(__name__), and event constants from the observability domain module per coding guidelines.


75-189: LGTM on class initialization and publish flow!

The implementation correctly uses:

  • asyncio.Lock for thread-safe async access
  • deque(maxlen=...) for bounded history per retention config
  • model_copy(update=...) for immutable Channel updates
  • Proper broadcast vs topic semantics (broadcast → all known agents, topic → subscribers only)
  • Structured logging at appropriate levels (INFO for state changes, DEBUG for delivery details)

191-283: LGTM on direct messaging implementation!

The deterministic channel naming pattern @{sorted_a}:{sorted_b} ensures consistent channel identification regardless of sender/recipient order. Lazy channel creation and subscriber registration are handled correctly under the lock.


285-366: LGTM on subscription management!

The subscribe method is correctly idempotent — returning a fresh Subscription record without duplicating the subscriber list when already subscribed. The unsubscribe method appropriately raises NotSubscribedError for both missing channels and missing subscriptions, simplifying error handling for callers.


400-459: LGTM on channel management methods!

Proper validation, error handling, and logging throughout. The list_channels method returns an immutable tuple per the codebase's immutability conventions.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces the foundational communication layer for the AI company framework: a swappable MessageBus abstraction with an in-memory backend, a concurrent MessageDispatcher for handler routing, and an AgentMessenger facade for per-agent messaging, plus observability event constants and supporting models/errors.

Changes:

  • Added MessageBus protocol and InMemoryMessageBus implementation supporting channels, pub/sub, direct messaging, broadcast, and history retention.
  • Added MessageDispatcher (concurrent dispatch + filtering) and AgentMessenger (agent-friendly send/subscribe/dispatch API).
  • Added communication models/errors + COMM_* observability events and comprehensive unit tests.

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Registers/discovers the new communication observability events module and validates key constants.
tests/unit/communication/test_subscription.py Tests Subscription and DeliveryEnvelope validation + immutability.
tests/unit/communication/test_messenger.py Tests AgentMessenger auto-fill behavior and delegation to bus/dispatcher.
tests/unit/communication/test_handler.py Tests handler protocol, FunctionHandler, HandlerRegistration, and priority filtering helper.
tests/unit/communication/test_errors.py Tests communication error hierarchy and immutable context behavior.
tests/unit/communication/test_dispatcher.py Tests dispatch routing, priority/type filtering, concurrency, and error isolation.
tests/unit/communication/test_bus_memory.py Tests in-memory bus lifecycle, channel mgmt, pub/sub flow, direct messaging, broadcast behavior, and retention/history.
tests/unit/communication/conftest.py Adds factories/fixtures for new communication config + subscription/envelope models.
src/ai_company/observability/events/communication.py Adds COMM_* structured logging event constants for communication subsystem.
src/ai_company/communication/subscription.py Adds Subscription and DeliveryEnvelope frozen Pydantic models.
src/ai_company/communication/messenger.py Implements AgentMessenger facade for sending/broadcasting/subscribing and dispatch delegation.
src/ai_company/communication/handler.py Adds MessageHandler protocol, FunctionHandler adapter, HandlerRegistration, and priority ordering helper.
src/ai_company/communication/errors.py Adds communication error hierarchy with immutable structured context.
src/ai_company/communication/dispatcher.py Implements concurrent MessageDispatcher + DispatchResult model.
src/ai_company/communication/config.py Adds MessageRetentionConfig and wires retention into MessageBusConfig.
src/ai_company/communication/bus_protocol.py Defines the swappable MessageBus protocol (pull-based receive()).
src/ai_company/communication/bus_memory.py Implements InMemoryMessageBus backend using asyncio queues + in-memory history.
src/ai_company/communication/init.py Re-exports new communication APIs/types for public package access.
DESIGN_SPEC.md Updates design spec file tree to include the new communication modules and events.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +388 to +392
async with self._lock:
queue = self._ensure_queue(channel_name, subscriber_id)
try:
if timeout is not None:
return await asyncio.wait_for(
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemoryMessageBus.receive() currently creates/returns a per-(channel, subscriber) queue without verifying the bus is running, the channel exists, or that the subscriber is allowed to receive from that channel. This can cause callers to silently block on a stopped bus or typo’d/missing channel and can also recreate queues after unsubscribe. Consider calling _require_running(), validating channel_name exists (raise ChannelNotFoundError), and enforcing subscription (or explicit BROADCAST semantics) before creating/awaiting a queue; if you change behavior, document the raised errors in MessageBus.receive().

Copilot uses AI. Check for mistakes.
Comment on lines +209 to +213
sender = message.sender
pair = sorted([sender, recipient])
channel_name = f"@{pair[0]}:{pair[1]}"
async with self._lock:
self._require_running()
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_direct() derives routing from the separate recipient argument but does not validate that it matches message.to. This can produce internally inconsistent messages (e.g., message.to != recipient) that are hard to debug downstream. Consider asserting equality (or normalizing by creating a copied Message with to=recipient) so the message metadata always matches the actual delivery target.

Copilot uses AI. Check for mistakes.
Comment on lines +269 to +274
self._history[channel_name].append(message)
now = datetime.now(UTC)
for agent_id in pair:
envelope = DeliveryEnvelope(
message=message,
channel_name=channel_name,
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct messages are stored/delivered under the deterministic channel_name, but _deliver_to_pair() appends/envelopes the original Message instance unchanged. If callers use the AgentMessenger placeholder channel ("@direct"), get_channel_history("@A:b") will return Message objects whose message.channel does not match the queried channel, which is confusing and can break invariants. Consider storing/delivering a copied Message with channel updated to the deterministic direct channel name (preserving id/timestamp).

Copilot uses AI. Check for mistakes.
…abbit, Greptile, and Copilot

- Add shutdown signaling (asyncio.Event) to InMemoryMessageBus.receive()
- Add receive() validation: require running bus, existing channel, active subscription
- Add send_direct() validation: recipient vs message.to mismatch, colon in agent IDs
- Fix get_channel_history limit<=0 edge case (return empty list)
- Extract _guarded_handle from dispatcher closure to class method with pre-allocated error list
- Add logger + event constant to handler.py FunctionHandler validation
- Add logger + validation to AgentMessenger.__init__ (blank agent_id/name)
- Use copy.deepcopy() for nested error context immutability
- Compute deterministic direct channel name (@sorted_a:sorted_b) in messenger
- Add NotBlankStr validation on all bus_protocol boundary identifiers
- Fix docstrings: subscribe idempotency, receive() Raises, history limit
- Update module docstrings for accuracy (dispatcher §5.4, __init__ subsystem)
- Add 12 new unit tests covering all new validation paths
- Update DESIGN_SPEC.md §5 current-state callout and §15.5 convention entry
- Update CLAUDE.md communication/ package description

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…idation

Greptile post-push review caught that send_direct validation paths
(recipient mismatch, separator-in-ID) were logging COMM_BUS_NOT_RUNNING
instead of a validation-specific event, which would poison bus-lifecycle
monitoring.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 7, 2026 17:00
@Aureliolo Aureliolo merged commit 8e71bfd into main Mar 7, 2026
10 checks passed
@Aureliolo Aureliolo deleted the feat/communication-foundation branch March 7, 2026 17:01
Comment on lines +397 to +403
)
self._queues.pop((channel_name, subscriber_id), None)
logger.info(
COMM_SUBSCRIPTION_REMOVED,
channel=channel_name,
subscriber=subscriber_id,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsubscribe() leaves blocked receive() callers permanently stuck

unsubscribe() pops the subscriber's queue from _queues (line 398), but any coroutine already blocked inside _await_with_shutdown on that queue object holds a local reference to it and will never receive a wakeup signal.

The _await_with_shutdown helper monitors two things: the queue and _shutdown_event. After unsubscribe, no new messages will be enqueued (the subscriber is removed from the channel's subscriber list), and _shutdown_event is not set (the bus is still running). A consumer calling receive(channel, subscriber, timeout=None) and then unsubscribe(channel, subscriber) from another task will hang until bus shutdown or an external cancellation — there is no per-subscriber cancellation path.

To fix this, unsubscribe() should put a sentinel value into any active queue so that _await_with_shutdown can detect it:

# after removing the queue from _queues
queue = self._queues.pop((channel_name, subscriber_id), None)
if queue is not None:
    # wake up any blocked receive() for this subscriber
    queue.put_nowait(None)  # type: ignore[arg-type]

This would require _await_with_shutdown to distinguish a None sentinel from a real envelope (both map to "return None"), which it already handles correctly since a real DeliveryEnvelope is never None.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/bus_memory.py
Line: 397-403

Comment:
**`unsubscribe()` leaves blocked `receive()` callers permanently stuck**

`unsubscribe()` pops the subscriber's queue from `_queues` (line 398), but any coroutine already blocked inside `_await_with_shutdown` on that queue object holds a local reference to it and will never receive a wakeup signal.

The `_await_with_shutdown` helper monitors two things: the queue and `_shutdown_event`. After `unsubscribe`, no new messages will be enqueued (the subscriber is removed from the channel's subscriber list), and `_shutdown_event` is not set (the bus is still running). A consumer calling `receive(channel, subscriber, timeout=None)` and then `unsubscribe(channel, subscriber)` from another task will hang until bus shutdown or an external cancellation — there is no per-subscriber cancellation path.

To fix this, `unsubscribe()` should put a sentinel value into any active queue so that `_await_with_shutdown` can detect it:

```python
# after removing the queue from _queues
queue = self._queues.pop((channel_name, subscriber_id), None)
if queue is not None:
    # wake up any blocked receive() for this subscriber
    queue.put_nowait(None)  # type: ignore[arg-type]
```

This would require `_await_with_shutdown` to distinguish a `None` sentinel from a real envelope (both map to "return None"), which it already handles correctly since a real `DeliveryEnvelope` is never `None`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +443 to +450
result = await self._await_with_shutdown(queue, timeout)
if result is None:
logger.debug(
COMM_RECEIVE_TIMEOUT,
channel=channel_name,
subscriber=subscriber_id,
timeout=timeout,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COMM_RECEIVE_TIMEOUT event fires on graceful shutdown, not only timeout

result is None is returned by _await_with_shutdown for two distinct reasons:

  1. The timeout expired with no message — a genuine timeout.
  2. _shutdown_event fired before a message arrived — a clean shutdown wakeup.

Both paths log COMM_RECEIVE_TIMEOUT with timeout=timeout. A monitoring system watching communication.receive.timeout will fire on clean shutdowns too, making it impossible to distinguish slow consumers from orderly process termination.

Consider differentiating the two cases, for example by returning a sentinel or a small enum from _await_with_shutdown:

from enum import Enum, auto

class _ReceiveResult(Enum):
    TIMEOUT = auto()
    SHUTDOWN = auto()

Then log COMM_RECEIVE_TIMEOUT only on TIMEOUT, and a new COMM_BUS_RECEIVE_SHUTDOWN constant on SHUTDOWN.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/bus_memory.py
Line: 443-450

Comment:
**`COMM_RECEIVE_TIMEOUT` event fires on graceful shutdown, not only timeout**

`result is None` is returned by `_await_with_shutdown` for two distinct reasons:
1. The `timeout` expired with no message — a genuine timeout.
2. `_shutdown_event` fired before a message arrived — a clean shutdown wakeup.

Both paths log `COMM_RECEIVE_TIMEOUT` with `timeout=timeout`. A monitoring system watching `communication.receive.timeout` will fire on clean shutdowns too, making it impossible to distinguish slow consumers from orderly process termination.

Consider differentiating the two cases, for example by returning a sentinel or a small enum from `_await_with_shutdown`:

```python
from enum import Enum, auto

class _ReceiveResult(Enum):
    TIMEOUT = auto()
    SHUTDOWN = auto()
```

Then log `COMM_RECEIVE_TIMEOUT` only on `TIMEOUT`, and a new `COMM_BUS_RECEIVE_SHUTDOWN` constant on `SHUTDOWN`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +46 to +96
class AgentMessenger:
"""Per-agent facade for sending, receiving, and dispatching messages.

Wraps a :class:`MessageBus` and optional :class:`MessageDispatcher`
to provide a high-level API that auto-fills sender, timestamp, and
message ID.

Args:
agent_id: Identifier of the owning agent.
agent_name: Human-readable name of the agent.
bus: The underlying message bus.
dispatcher: Optional message dispatcher for handler routing.

Raises:
ValueError: If *agent_id* or *agent_name* is blank.
"""

__slots__ = ("_agent_id", "_agent_name", "_bus", "_dispatcher")

def __init__(
self,
agent_id: str,
agent_name: str,
bus: MessageBus,
dispatcher: MessageDispatcher | None = None,
) -> None:
if not agent_id.strip():
logger.warning(
COMM_MESSENGER_INVALID_AGENT,
field="agent_id",
value=repr(agent_id),
)
msg = "agent_id must not be blank"
raise ValueError(msg)
if not agent_name.strip():
logger.warning(
COMM_MESSENGER_INVALID_AGENT,
field="agent_name",
value=repr(agent_name),
)
msg = "agent_name must not be blank"
raise ValueError(msg)
self._agent_id = agent_id
self._agent_name = agent_name
self._bus = bus
self._dispatcher = dispatcher
logger.debug(
COMM_MESSENGER_CREATED,
agent_id=agent_id,
agent_name=agent_name,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AgentMessenger facade is missing a receive() method

The class auto-fills agent_id for every sending operation (send_message, send_direct, broadcast) and for subscription management (subscribe, unsubscribe), but there is no corresponding receive(channel_name, *, timeout) wrapper that auto-fills self._agent_id as the subscriber.

As a result, consumers who use AgentMessenger still need to hold a direct reference to the underlying MessageBus and pass agent_id explicitly:

# current — breaks the opaque facade
envelope = await bus.receive(channel_name, agent_id, timeout=5.0)

A convenience wrapper would complete the facade contract:

async def receive(
    self,
    channel_name: str,
    *,
    timeout: float | None = None,
) -> DeliveryEnvelope | None:
    return await self._bus.receive(channel_name, self._agent_id, timeout=timeout)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/messenger.py
Line: 46-96

Comment:
**`AgentMessenger` facade is missing a `receive()` method**

The class auto-fills `agent_id` for every sending operation (`send_message`, `send_direct`, `broadcast`) and for subscription management (`subscribe`, `unsubscribe`), but there is no corresponding `receive(channel_name, *, timeout)` wrapper that auto-fills `self._agent_id` as the subscriber.

As a result, consumers who use `AgentMessenger` still need to hold a direct reference to the underlying `MessageBus` and pass `agent_id` explicitly:

```python
# current — breaks the opaque facade
envelope = await bus.receive(channel_name, agent_id, timeout=5.0)
```

A convenience wrapper would complete the facade contract:

```python
async def receive(
    self,
    channel_name: str,
    *,
    timeout: float | None = None,
) -> DeliveryEnvelope | None:
    return await self._bus.receive(channel_name, self._agent_id, timeout=timeout)
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +97 to +99
if not isinstance(handler, MessageHandler):
handler = FunctionHandler(handler)

Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageHandler is @runtime_checkable, so isinstance(handler, MessageHandler) only checks for a handle attribute—not that it’s async. A handler with a synchronous handle() will pass registration but later cause await registration.handler.handle(...) to raise a TypeError. Consider validating that handler.handle is a coroutine function (or that calling it returns an awaitable) and raising TypeError + logging COMM_HANDLER_INVALID when it’s not.

Copilot uses AI. Check for mistakes.
Comment on lines +481 to +485
if not get_task.done():
get_task.cancel()
if not shutdown_task.done():
shutdown_task.cancel()
if get_task in done and not get_task.cancelled():
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _await_with_shutdown, cancelled tasks (get_task / shutdown_task) are not awaited after .cancel(). This can leave pending cancellation and may emit "Task was destroyed but it is pending" warnings in some event-loop implementations. Consider using asyncio.create_task() and after cancelling, await the tasks with contextlib.suppress(asyncio.CancelledError) to ensure clean cancellation.

Copilot uses AI. Check for mistakes.
_raise_not_subscribed(channel_name, subscriber_id)
queue = self._ensure_queue(channel_name, subscriber_id)
result = await self._await_with_shutdown(queue, timeout)
if result is None:
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COMM_RECEIVE_TIMEOUT is logged whenever receive() returns None, but None can also mean shutdown (per docstring and _await_with_shutdown). This makes the event misleading for observability. Consider distinguishing timeout vs shutdown (e.g., check self._shutdown_event.is_set() and log a different event / add a reason field, or only log timeout when timeout is not None).

Suggested change
if result is None:
if result is None and timeout is not None and not self._shutdown_event.is_set():

Copilot uses AI. Check for mistakes.
Aureliolo added a commit that referenced this pull request Mar 7, 2026
Address 13 findings from post-merge bot reviews on PRs #157-#162:

Communication layer (PR #157):
- Wake blocked receive() callers on unsubscribe via None sentinel
- Distinguish shutdown vs timeout in receive() logging
- Add AgentMessenger.receive() facade method
- Validate MessageHandler.handle() is async at registration
- Await cancelled tasks in _await_with_shutdown to prevent warnings

Observability (PR #158):
- Add log-before-raise to all validators missing it (company.py,
  schema.py) — 14 raise sites across 11 validators

Parallel execution (PR #161):
- Log suppressed ExceptionGroup instead of silent pass
- Add PARALLEL_AGENT_CANCELLED structured event for cancellations
- Fix progress.in_progress semantics (increment after semaphore)
- Use PARALLEL_LOCK_RELEASE_ERROR for lock release failures
- Remove duplicate plan_parsing.py from DESIGN_SPEC file tree

Template inheritance (PR #162):
- Update DESIGN_SPEC merge key docs to include merge_id
- Preserve merge_id in _expand_single_agent (confirmed bug fix)
- Defer deepcopy in _apply_child_agent past _remove early-return

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Aureliolo added a commit that referenced this pull request Mar 10, 2026
🤖 I have created a release *beep* *boop*
---


##
[0.1.1](ai-company-v0.1.0...ai-company-v0.1.1)
(2026-03-10)


### Features

* add autonomy levels and approval timeout policies
([#42](#42),
[#126](#126))
([#197](#197))
([eecc25a](eecc25a))
* add CFO cost optimization service with anomaly detection, reports, and
approval decisions
([#186](#186))
([a7fa00b](a7fa00b))
* add code quality toolchain (ruff, mypy, pre-commit, dependabot)
([#63](#63))
([36681a8](36681a8))
* add configurable cost tiers and subscription/quota-aware tracking
([#67](#67))
([#185](#185))
([9baedfa](9baedfa))
* add container packaging, Docker Compose, and CI pipeline
([#269](#269))
([435bdfe](435bdfe)),
closes [#267](#267)
* add coordination error taxonomy classification pipeline
([#146](#146))
([#181](#181))
([70c7480](70c7480))
* add cost-optimized, hierarchical, and auction assignment strategies
([#175](#175))
([ce924fa](ce924fa)),
closes [#173](#173)
* add design specification, license, and project setup
([8669a09](8669a09))
* add env var substitution and config file auto-discovery
([#77](#77))
([7f53832](7f53832))
* add FastestStrategy routing + vendor-agnostic cleanup
([#140](#140))
([09619cb](09619cb)),
closes [#139](#139)
* add HR engine and performance tracking
([#45](#45),
[#47](#47))
([#193](#193))
([2d091ea](2d091ea))
* add issue auto-search and resolution verification to PR review skill
([#119](#119))
([deecc39](deecc39))
* add memory retrieval, ranking, and context injection pipeline
([#41](#41))
([873b0aa](873b0aa))
* add pluggable MemoryBackend protocol with models, config, and events
([#180](#180))
([46cfdd4](46cfdd4))
* add pluggable MemoryBackend protocol with models, config, and events
([#32](#32))
([46cfdd4](46cfdd4))
* add pluggable PersistenceBackend protocol with SQLite implementation
([#36](#36))
([f753779](f753779))
* add progressive trust and promotion/demotion subsystems
([#43](#43),
[#49](#49))
([3a87c08](3a87c08))
* add retry handler, rate limiter, and provider resilience
([#100](#100))
([b890545](b890545))
* add SecOps security agent with rule engine, audit log, and ToolInvoker
integration ([#40](#40))
([83b7b6c](83b7b6c))
* add shared org memory and memory consolidation/archival
([#125](#125),
[#48](#48))
([4a0832b](4a0832b))
* design unified provider interface
([#86](#86))
([3e23d64](3e23d64))
* expand template presets, rosters, and add inheritance
([#80](#80),
[#81](#81),
[#84](#84))
([15a9134](15a9134))
* implement agent runtime state vs immutable config split
([#115](#115))
([4cb1ca5](4cb1ca5))
* implement AgentEngine core orchestrator
([#11](#11))
([#143](#143))
([f2eb73a](f2eb73a))
* implement basic tool system (registry, invocation, results)
([#15](#15))
([c51068b](c51068b))
* implement built-in file system tools
([#18](#18))
([325ef98](325ef98))
* implement communication foundation — message bus, dispatcher, and
messenger ([#157](#157))
([8e71bfd](8e71bfd))
* implement company template system with 7 built-in presets
([#85](#85))
([cbf1496](cbf1496))
* implement conflict resolution protocol
([#122](#122))
([#166](#166))
([e03f9f2](e03f9f2))
* implement core entity and role system models
([#69](#69))
([acf9801](acf9801))
* implement crash recovery with fail-and-reassign strategy
([#149](#149))
([e6e91ed](e6e91ed))
* implement engine extensions — Plan-and-Execute loop and call
categorization
([#134](#134),
[#135](#135))
([#159](#159))
([9b2699f](9b2699f))
* implement enterprise logging system with structlog
([#73](#73))
([2f787e5](2f787e5))
* implement graceful shutdown with cooperative timeout strategy
([#130](#130))
([6592515](6592515))
* implement hierarchical delegation and loop prevention
([#12](#12),
[#17](#17))
([6be60b6](6be60b6))
* implement LiteLLM driver and provider registry
([#88](#88))
([ae3f18b](ae3f18b)),
closes [#4](#4)
* implement LLM decomposition strategy and workspace isolation
([#174](#174))
([aa0eefe](aa0eefe))
* implement meeting protocol system
([#123](#123))
([ee7caca](ee7caca))
* implement message and communication domain models
([#74](#74))
([560a5d2](560a5d2))
* implement model routing engine
([#99](#99))
([d3c250b](d3c250b))
* implement parallel agent execution
([#22](#22))
([#161](#161))
([65940b3](65940b3))
* implement per-call cost tracking service
([#7](#7))
([#102](#102))
([c4f1f1c](c4f1f1c))
* implement personality injection and system prompt construction
([#105](#105))
([934dd85](934dd85))
* implement single-task execution lifecycle
([#21](#21))
([#144](#144))
([c7e64e4](c7e64e4))
* implement subprocess sandbox for tool execution isolation
([#131](#131))
([#153](#153))
([3c8394e](3c8394e))
* implement task assignment subsystem with pluggable strategies
([#172](#172))
([c7f1b26](c7f1b26)),
closes [#26](#26)
[#30](#30)
* implement task decomposition and routing engine
([#14](#14))
([9c7fb52](9c7fb52))
* implement Task, Project, Artifact, Budget, and Cost domain models
([#71](#71))
([81eabf1](81eabf1))
* implement tool permission checking
([#16](#16))
([833c190](833c190))
* implement YAML config loader with Pydantic validation
([#59](#59))
([ff3a2ba](ff3a2ba))
* implement YAML config loader with Pydantic validation
([#75](#75))
([ff3a2ba](ff3a2ba))
* initialize project with uv, hatchling, and src layout
([39005f9](39005f9))
* initialize project with uv, hatchling, and src layout
([#62](#62))
([39005f9](39005f9))
* Litestar REST API, WebSocket feed, and approval queue (M6)
([#189](#189))
([29fcd08](29fcd08))
* make TokenUsage.total_tokens a computed field
([#118](#118))
([c0bab18](c0bab18)),
closes [#109](#109)
* parallel tool execution in ToolInvoker.invoke_all
([#137](#137))
([58517ee](58517ee))
* testing framework, CI pipeline, and M0 gap fixes
([#64](#64))
([f581749](f581749))
* wire all modules into observability system
([#97](#97))
([f7a0617](f7a0617))


### Bug Fixes

* address Greptile post-merge review findings from PRs
[#170](https://github.com/Aureliolo/ai-company/issues/170)-[#175](https://github.com/Aureliolo/ai-company/issues/175)
([#176](#176))
([c5ca929](c5ca929))
* address post-merge review feedback from PRs
[#164](https://github.com/Aureliolo/ai-company/issues/164)-[#167](https://github.com/Aureliolo/ai-company/issues/167)
([#170](#170))
([3bf897a](3bf897a)),
closes [#169](#169)
* enforce strict mypy on test files
([#89](#89))
([aeeff8c](aeeff8c))
* harden Docker sandbox, MCP bridge, and code runner
([#50](#50),
[#53](#53))
([d5e1b6e](d5e1b6e))
* harden git tools security + code quality improvements
([#150](#150))
([000a325](000a325))
* harden subprocess cleanup, env filtering, and shutdown resilience
([#155](#155))
([d1fe1fb](d1fe1fb))
* incorporate post-merge feedback + pre-PR review fixes
([#164](#164))
([c02832a](c02832a))
* pre-PR review fixes for post-merge findings
([#183](#183))
([26b3108](26b3108))
* strengthen immutability for BaseTool schema and ToolInvoker boundaries
([#117](#117))
([7e5e861](7e5e861))


### Performance

* harden non-inferable principle implementation
([#195](#195))
([02b5f4e](02b5f4e)),
closes [#188](#188)


### Refactoring

* adopt NotBlankStr across all models
([#108](#108))
([#120](#120))
([ef89b90](ef89b90))
* extract _SpendingTotals base class from spending summary models
([#111](#111))
([2f39c1b](2f39c1b))
* harden BudgetEnforcer with error handling, validation extraction, and
review fixes
([#182](#182))
([c107bf9](c107bf9))
* harden personality profiles, department validation, and template
rendering ([#158](#158))
([10b2299](10b2299))
* pre-PR review improvements for ExecutionLoop + ReAct loop
([#124](#124))
([8dfb3c0](8dfb3c0))
* split events.py into per-domain event modules
([#136](#136))
([e9cba89](e9cba89))


### Documentation

* add ADR-001 memory layer evaluation and selection
([#178](#178))
([db3026f](db3026f)),
closes [#39](#39)
* add agent scaling research findings to DESIGN_SPEC
([#145](#145))
([57e487b](57e487b))
* add CLAUDE.md, contributing guide, and dev documentation
([#65](#65))
([55c1025](55c1025)),
closes [#54](#54)
* add crash recovery, sandboxing, analytics, and testing decisions
([#127](#127))
([5c11595](5c11595))
* address external review feedback with MVP scope and new protocols
([#128](#128))
([3b30b9a](3b30b9a))
* expand design spec with pluggable strategy protocols
([#121](#121))
([6832db6](6832db6))
* finalize 23 design decisions (ADR-002)
([#190](#190))
([8c39742](8c39742))
* update project docs for M2.5 conventions and add docs-consistency
review agent
([#114](#114))
([99766ee](99766ee))


### Tests

* add e2e single agent integration tests
([#24](#24))
([#156](#156))
([f566fb4](f566fb4))
* add provider adapter integration tests
([#90](#90))
([40a61f4](40a61f4))


### CI/CD

* add Release Please for automated versioning and GitHub Releases
([#278](#278))
([a488758](a488758))
* bump actions/checkout from 4 to 6
([#95](#95))
([1897247](1897247))
* bump actions/upload-artifact from 4 to 7
([#94](#94))
([27b1517](27b1517))
* harden CI/CD pipeline
([#92](#92))
([ce4693c](ce4693c))
* split vulnerability scans into critical-fail and high-warn tiers
([#277](#277))
([aba48af](aba48af))


### Maintenance

* add /worktree skill for parallel worktree management
([#171](#171))
([951e337](951e337))
* add design spec context loading to research-link skill
([8ef9685](8ef9685))
* add post-merge-cleanup skill
([#70](#70))
([f913705](f913705))
* add pre-pr-review skill and update CLAUDE.md
([#103](#103))
([92e9023](92e9023))
* add research-link skill and rename skill files to SKILL.md
([#101](#101))
([651c577](651c577))
* bump aiosqlite from 0.21.0 to 0.22.1
([#191](#191))
([3274a86](3274a86))
* bump pyyaml from 6.0.2 to 6.0.3 in the minor-and-patch group
([#96](#96))
([0338d0c](0338d0c))
* bump ruff from 0.15.4 to 0.15.5
([a49ee46](a49ee46))
* fix M0 audit items
([#66](#66))
([c7724b5](c7724b5))
* pin setup-uv action to full SHA
([#281](#281))
([4448002](4448002))
* post-audit cleanup — PEP 758, loggers, bug fixes, refactoring, tests,
hookify rules
([#148](#148))
([c57a6a9](c57a6a9))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Aureliolo added a commit that referenced this pull request Mar 11, 2026
🤖 I have created a release *beep* *boop*
---


##
[0.1.0](v0.0.0...v0.1.0)
(2026-03-11)


### Features

* add autonomy levels and approval timeout policies
([#42](#42),
[#126](#126))
([#197](#197))
([eecc25a](eecc25a))
* add CFO cost optimization service with anomaly detection, reports, and
approval decisions
([#186](#186))
([a7fa00b](a7fa00b))
* add code quality toolchain (ruff, mypy, pre-commit, dependabot)
([#63](#63))
([36681a8](36681a8))
* add configurable cost tiers and subscription/quota-aware tracking
([#67](#67))
([#185](#185))
([9baedfa](9baedfa))
* add container packaging, Docker Compose, and CI pipeline
([#269](#269))
([435bdfe](435bdfe)),
closes [#267](#267)
* add coordination error taxonomy classification pipeline
([#146](#146))
([#181](#181))
([70c7480](70c7480))
* add cost-optimized, hierarchical, and auction assignment strategies
([#175](#175))
([ce924fa](ce924fa)),
closes [#173](#173)
* add design specification, license, and project setup
([8669a09](8669a09))
* add env var substitution and config file auto-discovery
([#77](#77))
([7f53832](7f53832))
* add FastestStrategy routing + vendor-agnostic cleanup
([#140](#140))
([09619cb](09619cb)),
closes [#139](#139)
* add HR engine and performance tracking
([#45](#45),
[#47](#47))
([#193](#193))
([2d091ea](2d091ea))
* add issue auto-search and resolution verification to PR review skill
([#119](#119))
([deecc39](deecc39))
* add mandatory JWT + API key authentication
([#256](#256))
([c279cfe](c279cfe))
* add memory retrieval, ranking, and context injection pipeline
([#41](#41))
([873b0aa](873b0aa))
* add pluggable MemoryBackend protocol with models, config, and events
([#180](#180))
([46cfdd4](46cfdd4))
* add pluggable MemoryBackend protocol with models, config, and events
([#32](#32))
([46cfdd4](46cfdd4))
* add pluggable output scan response policies
([#263](#263))
([b9907e8](b9907e8))
* add pluggable PersistenceBackend protocol with SQLite implementation
([#36](#36))
([f753779](f753779))
* add progressive trust and promotion/demotion subsystems
([#43](#43),
[#49](#49))
([3a87c08](3a87c08))
* add retry handler, rate limiter, and provider resilience
([#100](#100))
([b890545](b890545))
* add SecOps security agent with rule engine, audit log, and ToolInvoker
integration ([#40](#40))
([83b7b6c](83b7b6c))
* add shared org memory and memory consolidation/archival
([#125](#125),
[#48](#48))
([4a0832b](4a0832b))
* design unified provider interface
([#86](#86))
([3e23d64](3e23d64))
* expand template presets, rosters, and add inheritance
([#80](#80),
[#81](#81),
[#84](#84))
([15a9134](15a9134))
* implement agent runtime state vs immutable config split
([#115](#115))
([4cb1ca5](4cb1ca5))
* implement AgentEngine core orchestrator
([#11](#11))
([#143](#143))
([f2eb73a](f2eb73a))
* implement AuditRepository for security audit log persistence
([#279](#279))
([94bc29f](94bc29f))
* implement basic tool system (registry, invocation, results)
([#15](#15))
([c51068b](c51068b))
* implement built-in file system tools
([#18](#18))
([325ef98](325ef98))
* implement communication foundation — message bus, dispatcher, and
messenger ([#157](#157))
([8e71bfd](8e71bfd))
* implement company template system with 7 built-in presets
([#85](#85))
([cbf1496](cbf1496))
* implement conflict resolution protocol
([#122](#122))
([#166](#166))
([e03f9f2](e03f9f2))
* implement core entity and role system models
([#69](#69))
([acf9801](acf9801))
* implement crash recovery with fail-and-reassign strategy
([#149](#149))
([e6e91ed](e6e91ed))
* implement engine extensions — Plan-and-Execute loop and call
categorization
([#134](#134),
[#135](#135))
([#159](#159))
([9b2699f](9b2699f))
* implement enterprise logging system with structlog
([#73](#73))
([2f787e5](2f787e5))
* implement graceful shutdown with cooperative timeout strategy
([#130](#130))
([6592515](6592515))
* implement hierarchical delegation and loop prevention
([#12](#12),
[#17](#17))
([6be60b6](6be60b6))
* implement LiteLLM driver and provider registry
([#88](#88))
([ae3f18b](ae3f18b)),
closes [#4](#4)
* implement LLM decomposition strategy and workspace isolation
([#174](#174))
([aa0eefe](aa0eefe))
* implement meeting protocol system
([#123](#123))
([ee7caca](ee7caca))
* implement message and communication domain models
([#74](#74))
([560a5d2](560a5d2))
* implement model routing engine
([#99](#99))
([d3c250b](d3c250b))
* implement parallel agent execution
([#22](#22))
([#161](#161))
([65940b3](65940b3))
* implement per-call cost tracking service
([#7](#7))
([#102](#102))
([c4f1f1c](c4f1f1c))
* implement personality injection and system prompt construction
([#105](#105))
([934dd85](934dd85))
* implement single-task execution lifecycle
([#21](#21))
([#144](#144))
([c7e64e4](c7e64e4))
* implement subprocess sandbox for tool execution isolation
([#131](#131))
([#153](#153))
([3c8394e](3c8394e))
* implement task assignment subsystem with pluggable strategies
([#172](#172))
([c7f1b26](c7f1b26)),
closes [#26](#26)
[#30](#30)
* implement task decomposition and routing engine
([#14](#14))
([9c7fb52](9c7fb52))
* implement Task, Project, Artifact, Budget, and Cost domain models
([#71](#71))
([81eabf1](81eabf1))
* implement tool permission checking
([#16](#16))
([833c190](833c190))
* implement YAML config loader with Pydantic validation
([#59](#59))
([ff3a2ba](ff3a2ba))
* implement YAML config loader with Pydantic validation
([#75](#75))
([ff3a2ba](ff3a2ba))
* initialize project with uv, hatchling, and src layout
([39005f9](39005f9))
* initialize project with uv, hatchling, and src layout
([#62](#62))
([39005f9](39005f9))
* Litestar REST API, WebSocket feed, and approval queue (M6)
([#189](#189))
([29fcd08](29fcd08))
* make TokenUsage.total_tokens a computed field
([#118](#118))
([c0bab18](c0bab18)),
closes [#109](#109)
* parallel tool execution in ToolInvoker.invoke_all
([#137](#137))
([58517ee](58517ee))
* testing framework, CI pipeline, and M0 gap fixes
([#64](#64))
([f581749](f581749))
* wire all modules into observability system
([#97](#97))
([f7a0617](f7a0617))


### Bug Fixes

* address Greptile post-merge review findings from PRs
[#170](https://github.com/Aureliolo/ai-company/issues/170)-[#175](https://github.com/Aureliolo/ai-company/issues/175)
([#176](#176))
([c5ca929](c5ca929))
* address post-merge review feedback from PRs
[#164](https://github.com/Aureliolo/ai-company/issues/164)-[#167](https://github.com/Aureliolo/ai-company/issues/167)
([#170](#170))
([3bf897a](3bf897a)),
closes [#169](#169)
* enforce strict mypy on test files
([#89](#89))
([aeeff8c](aeeff8c))
* harden Docker sandbox, MCP bridge, and code runner
([#50](#50),
[#53](#53))
([d5e1b6e](d5e1b6e))
* harden git tools security + code quality improvements
([#150](#150))
([000a325](000a325))
* harden subprocess cleanup, env filtering, and shutdown resilience
([#155](#155))
([d1fe1fb](d1fe1fb))
* incorporate post-merge feedback + pre-PR review fixes
([#164](#164))
([c02832a](c02832a))
* pre-PR review fixes for post-merge findings
([#183](#183))
([26b3108](26b3108))
* resolve circular imports, bump litellm, fix release tag format
([#286](#286))
([a6659b5](a6659b5))
* strengthen immutability for BaseTool schema and ToolInvoker boundaries
([#117](#117))
([7e5e861](7e5e861))


### Performance

* harden non-inferable principle implementation
([#195](#195))
([02b5f4e](02b5f4e)),
closes [#188](#188)


### Refactoring

* adopt NotBlankStr across all models
([#108](#108))
([#120](#120))
([ef89b90](ef89b90))
* extract _SpendingTotals base class from spending summary models
([#111](#111))
([2f39c1b](2f39c1b))
* harden BudgetEnforcer with error handling, validation extraction, and
review fixes
([#182](#182))
([c107bf9](c107bf9))
* harden personality profiles, department validation, and template
rendering ([#158](#158))
([10b2299](10b2299))
* pre-PR review improvements for ExecutionLoop + ReAct loop
([#124](#124))
([8dfb3c0](8dfb3c0))
* split events.py into per-domain event modules
([#136](#136))
([e9cba89](e9cba89))


### Documentation

* add ADR-001 memory layer evaluation and selection
([#178](#178))
([db3026f](db3026f)),
closes [#39](#39)
* add agent scaling research findings to DESIGN_SPEC
([#145](#145))
([57e487b](57e487b))
* add CLAUDE.md, contributing guide, and dev documentation
([#65](#65))
([55c1025](55c1025)),
closes [#54](#54)
* add crash recovery, sandboxing, analytics, and testing decisions
([#127](#127))
([5c11595](5c11595))
* address external review feedback with MVP scope and new protocols
([#128](#128))
([3b30b9a](3b30b9a))
* expand design spec with pluggable strategy protocols
([#121](#121))
([6832db6](6832db6))
* finalize 23 design decisions (ADR-002)
([#190](#190))
([8c39742](8c39742))
* update project docs for M2.5 conventions and add docs-consistency
review agent
([#114](#114))
([99766ee](99766ee))


### Tests

* add e2e single agent integration tests
([#24](#24))
([#156](#156))
([f566fb4](f566fb4))
* add provider adapter integration tests
([#90](#90))
([40a61f4](40a61f4))


### CI/CD

* add Release Please for automated versioning and GitHub Releases
([#278](#278))
([a488758](a488758))
* bump actions/checkout from 4 to 6
([#95](#95))
([1897247](1897247))
* bump actions/upload-artifact from 4 to 7
([#94](#94))
([27b1517](27b1517))
* bump anchore/scan-action from 6.5.1 to 7.3.2
([#271](#271))
([80a1c15](80a1c15))
* bump docker/build-push-action from 6.19.2 to 7.0.0
([#273](#273))
([dd0219e](dd0219e))
* bump docker/login-action from 3.7.0 to 4.0.0
([#272](#272))
([33d6238](33d6238))
* bump docker/metadata-action from 5.10.0 to 6.0.0
([#270](#270))
([baee04e](baee04e))
* bump docker/setup-buildx-action from 3.12.0 to 4.0.0
([#274](#274))
([5fc06f7](5fc06f7))
* bump sigstore/cosign-installer from 3.9.1 to 4.1.0
([#275](#275))
([29dd16c](29dd16c))
* harden CI/CD pipeline
([#92](#92))
([ce4693c](ce4693c))
* split vulnerability scans into critical-fail and high-warn tiers
([#277](#277))
([aba48af](aba48af))


### Maintenance

* add /worktree skill for parallel worktree management
([#171](#171))
([951e337](951e337))
* add design spec context loading to research-link skill
([8ef9685](8ef9685))
* add post-merge-cleanup skill
([#70](#70))
([f913705](f913705))
* add pre-pr-review skill and update CLAUDE.md
([#103](#103))
([92e9023](92e9023))
* add research-link skill and rename skill files to SKILL.md
([#101](#101))
([651c577](651c577))
* bump aiosqlite from 0.21.0 to 0.22.1
([#191](#191))
([3274a86](3274a86))
* bump pyyaml from 6.0.2 to 6.0.3 in the minor-and-patch group
([#96](#96))
([0338d0c](0338d0c))
* bump ruff from 0.15.4 to 0.15.5
([a49ee46](a49ee46))
* fix M0 audit items
([#66](#66))
([c7724b5](c7724b5))
* **main:** release ai-company 0.1.1
([#282](#282))
([2f4703d](2f4703d))
* pin setup-uv action to full SHA
([#281](#281))
([4448002](4448002))
* post-audit cleanup — PEP 758, loggers, bug fixes, refactoring, tests,
hookify rules
([#148](#148))
([c57a6a9](c57a6a9))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

---------

Signed-off-by: Aurelio <19254254+Aureliolo@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement agent-to-agent messaging with channels and topics Evaluate and implement message bus (asyncio queues vs external broker)

2 participants