Zero-dependency pub-sub message hub for multi-agent AI systems.
Inspired by AgentScope's MsgHub — but framework-agnostic, lightweight, and production-grade.
agent-pubsub lets multiple AI agents subscribe to each other in named rooms or channels. When any agent publishes a message, it is automatically broadcast to all other participants — enabling natural multi-agent conversations without manual routing.
alice ──publish──► Room "debate" ──broadcast──► bob
──broadcast──► charlie
──broadcast──► dave
pip install agent-pubsubOr from source:
git clone https://github.com/darshjme/agent-pubsub
cd agent-pubsub
pip install -e ".[dev]"from agent_pubsub import MsgHub, Message
class MyAgent:
def __init__(self, name):
self.name = name
self.inbox = []
def receive(self, message):
self.inbox.append(message)
print(f"[{self.name}] got: {message.content!r}")
alice = MyAgent("alice")
bob = MyAgent("bob")
charlie = MyAgent("charlie")
hub = MsgHub()
with hub.room("debate", participants=[alice, bob, charlie]) as room:
# System message to everyone
room.broadcast(Message("system", "You are debating AI safety", role="system"))
# alice publishes — bob and charlie receive it automatically
room.publish(Message("alice", "AI safety is the most important problem of our time"))
# Dynamic membership
dave = MyAgent("dave")
room.add(dave)
room.publish(Message("bob", "I agree — alignment is existential"))
# dave receives this one, alice does not (she's the sender's peer, not herself)
room.remove(charlie)
# charlie won't receive future messagesfrom agent_pubsub import Channel, Message
channel = Channel("research")
# agent1 only gets messages mentioning "python"
channel.subscribe(agent1, filter=lambda m: "python" in m.content.lower())
# agent2 gets everything
channel.subscribe(agent2)
channel.publish(Message("user", "What is Python?"))
# → agent1 receives (python in content)
# → agent2 receives
channel.publish(Message("user", "What is Java?"))
# → agent1 does NOT receive
# → agent2 receivesfrom agent_pubsub import MsgHub, Message
hub = MsgHub()
agents = [agent1, agent2, agent3, agent4, agent5]
msg = Message("orchestrator", "Execute your sub-task now")
# Sequential
results = hub.fanout(agents, msg)
# Parallel (threaded)
results = hub.fanout(agents, msg, parallel=True)
# With timeout per agent
results = hub.fanout(agents, msg, parallel=True, timeout=5.0)with hub.room("chat", participants=[alice, bob]) as room:
room.on_message(lambda msg, r: print(f"[{r.name}] {msg.sender}: {msg.content}"))
room.on_join(lambda agent, r: print(f"{agent.name} joined {r.name}"))
room.on_leave(lambda agent, r: print(f"{agent.name} left {r.name}"))
room.add(charlie) # triggers on_join
room.remove(bob) # triggers on_leave
room.publish(Message("alice", "hello")) # triggers on_messageimport asyncio
from agent_pubsub.fanout import fanout_async
async def main():
results = await fanout_async(agents, msg, timeout=10.0)
return results
asyncio.run(main())Message(
sender: str,
content: str,
role: str = "assistant", # "user" | "assistant" | "system"
id: str = auto_uuid,
timestamp: float = time.time(),
metadata: dict = {},
)| Method | Description |
|---|---|
reply(content, sender, role) |
Create a reply message referencing this one |
to_dict() |
Serialize to dict |
Message.from_dict(d) |
Deserialize from dict |
| Method | Description |
|---|---|
hub.room(name, participants, echo, max_history) |
Context manager — open a room |
hub.create_room(name, ...) |
Create without context manager |
hub.get_room(name) |
Retrieve existing room |
hub.delete_room(name) |
Remove room |
hub.fanout(agents, msg, parallel, timeout) |
Broadcast to a list of agents |
| Method | Description |
|---|---|
room.publish(msg) |
Broadcast to all except sender |
room.broadcast(msg) |
Broadcast to ALL including sender |
room.add(agent) |
Add participant (dynamic) |
room.remove(agent) |
Remove participant |
room.on_message(cb) |
Register post-publish callback |
room.on_join(cb) |
Register on-join callback |
room.on_leave(cb) |
Register on-leave callback |
room.history |
List of all messages |
room.clear_history() |
Clear message history |
| Method | Description |
|---|---|
channel.subscribe(agent, filter=None) |
Subscribe with optional filter |
channel.unsubscribe(agent) |
Remove subscription |
channel.publish(msg) |
Deliver to matching subscribers |
channel.is_subscribed(agent) |
Check if agent is subscribed |
channel.update_filter(agent, filter) |
Replace filter on existing subscription |
channel.on_message(cb) |
Callback after each publish |
channel.history |
Message history |
# Sync
results = fanout_sync(agents, msg, parallel=True, timeout=5.0)
# Async
results = await fanout_async(agents, msg, timeout=5.0)
# Auto-dispatch (sync outside loop, async inside loop)
results = fanout(agents, msg, parallel=True)- Zero dependencies — pure Python stdlib only
- Thread-safe — all mutations are protected by
threading.RLock - Agent-agnostic — any object with
name: strandreceive(Message)works - Non-breaking — individual agent failures never crash the room/channel
- Composable — rooms, channels, and fanout work independently
Any object satisfying this protocol is a valid agent:
class MyAgent:
name: str # required
def receive(self, message: Message) -> Any:
... # called on each delivered message
async def areceive(self, message: Message) -> Any:
... # optional — used by fanout_async if presentpip install pytest pytest-asyncio
pytest tests/ -vMIT — see LICENSE
Built for darshjme/arsenal