Skip to content

darshjme/agent-pubsub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

agent-pubsub

Zero-dependency pub-sub message hub for multi-agent AI systems.

Inspired by AgentScope's MsgHub — but framework-agnostic, lightweight, and production-grade.

Python 3.9+ License: MIT Zero Dependencies


What is it?

agent-pubsub lets multiple AI agents subscribe to each other in named rooms or channels. When any agent publishes a message, it is automatically broadcast to all other participants — enabling natural multi-agent conversations without manual routing.

alice ──publish──► Room "debate" ──broadcast──► bob
                                              ──broadcast──► charlie
                                              ──broadcast──► dave

Installation

pip install agent-pubsub

Or from source:

git clone https://github.com/darshjme/agent-pubsub
cd agent-pubsub
pip install -e ".[dev]"

Quick Start

1. Room-based broadcasting (MsgHub)

from agent_pubsub import MsgHub, Message

class MyAgent:
    def __init__(self, name):
        self.name = name
        self.inbox = []

    def receive(self, message):
        self.inbox.append(message)
        print(f"[{self.name}] got: {message.content!r}")

alice   = MyAgent("alice")
bob     = MyAgent("bob")
charlie = MyAgent("charlie")

hub = MsgHub()

with hub.room("debate", participants=[alice, bob, charlie]) as room:
    # System message to everyone
    room.broadcast(Message("system", "You are debating AI safety", role="system"))

    # alice publishes — bob and charlie receive it automatically
    room.publish(Message("alice", "AI safety is the most important problem of our time"))

    # Dynamic membership
    dave = MyAgent("dave")
    room.add(dave)

    room.publish(Message("bob", "I agree — alignment is existential"))
    # dave receives this one, alice does not (she's the sender's peer, not herself)

    room.remove(charlie)
    # charlie won't receive future messages

2. Channel subscriptions with filters

from agent_pubsub import Channel, Message

channel = Channel("research")

# agent1 only gets messages mentioning "python"
channel.subscribe(agent1, filter=lambda m: "python" in m.content.lower())

# agent2 gets everything
channel.subscribe(agent2)

channel.publish(Message("user", "What is Python?"))
# → agent1 receives (python in content)
# → agent2 receives

channel.publish(Message("user", "What is Java?"))
# → agent1 does NOT receive
# → agent2 receives

3. Fanout (parallel broadcast)

from agent_pubsub import MsgHub, Message

hub = MsgHub()
agents = [agent1, agent2, agent3, agent4, agent5]
msg = Message("orchestrator", "Execute your sub-task now")

# Sequential
results = hub.fanout(agents, msg)

# Parallel (threaded)
results = hub.fanout(agents, msg, parallel=True)

# With timeout per agent
results = hub.fanout(agents, msg, parallel=True, timeout=5.0)

4. Event callbacks

with hub.room("chat", participants=[alice, bob]) as room:
    room.on_message(lambda msg, r: print(f"[{r.name}] {msg.sender}: {msg.content}"))
    room.on_join(lambda agent, r: print(f"{agent.name} joined {r.name}"))
    room.on_leave(lambda agent, r: print(f"{agent.name} left {r.name}"))

    room.add(charlie)   # triggers on_join
    room.remove(bob)    # triggers on_leave
    room.publish(Message("alice", "hello"))  # triggers on_message

5. Async fanout

import asyncio
from agent_pubsub.fanout import fanout_async

async def main():
    results = await fanout_async(agents, msg, timeout=10.0)
    return results

asyncio.run(main())

API Reference

Message

Message(
    sender: str,
    content: str,
    role: str = "assistant",   # "user" | "assistant" | "system"
    id: str = auto_uuid,
    timestamp: float = time.time(),
    metadata: dict = {},
)
Method Description
reply(content, sender, role) Create a reply message referencing this one
to_dict() Serialize to dict
Message.from_dict(d) Deserialize from dict

MsgHub

Method Description
hub.room(name, participants, echo, max_history) Context manager — open a room
hub.create_room(name, ...) Create without context manager
hub.get_room(name) Retrieve existing room
hub.delete_room(name) Remove room
hub.fanout(agents, msg, parallel, timeout) Broadcast to a list of agents

Room

Method Description
room.publish(msg) Broadcast to all except sender
room.broadcast(msg) Broadcast to ALL including sender
room.add(agent) Add participant (dynamic)
room.remove(agent) Remove participant
room.on_message(cb) Register post-publish callback
room.on_join(cb) Register on-join callback
room.on_leave(cb) Register on-leave callback
room.history List of all messages
room.clear_history() Clear message history

Channel

Method Description
channel.subscribe(agent, filter=None) Subscribe with optional filter
channel.unsubscribe(agent) Remove subscription
channel.publish(msg) Deliver to matching subscribers
channel.is_subscribed(agent) Check if agent is subscribed
channel.update_filter(agent, filter) Replace filter on existing subscription
channel.on_message(cb) Callback after each publish
channel.history Message history

fanout_sync / fanout_async

# Sync
results = fanout_sync(agents, msg, parallel=True, timeout=5.0)

# Async
results = await fanout_async(agents, msg, timeout=5.0)

# Auto-dispatch (sync outside loop, async inside loop)
results = fanout(agents, msg, parallel=True)

Design Principles

  • Zero dependencies — pure Python stdlib only
  • Thread-safe — all mutations are protected by threading.RLock
  • Agent-agnostic — any object with name: str and receive(Message) works
  • Non-breaking — individual agent failures never crash the room/channel
  • Composable — rooms, channels, and fanout work independently

Agent Protocol

Any object satisfying this protocol is a valid agent:

class MyAgent:
    name: str  # required

    def receive(self, message: Message) -> Any:
        ...  # called on each delivered message

    async def areceive(self, message: Message) -> Any:
        ...  # optional — used by fanout_async if present

Running Tests

pip install pytest pytest-asyncio
pytest tests/ -v

License

MIT — see LICENSE


Built for darshjme/arsenal

About

Zero-dep pub-sub message hub for multi-agent AI systems — inspired by AgentScope MsgHub

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages