Note
Go to the end to download the full example code.
Memory¶
The memory module in AgentScope is responsible for
storing the messages and
managing them with specific marks
in different storage implementations.
The mark is a string label associated with each message in the memory, which can be used to categorize, filter, and retrieve messages based on their context or purpose.
It’s powerful for high-level memory management in agents. For example, In ReActAgent class, the hint messages are stored with the mark “hint”, and the memory compression functionality is also implemented based on marks.
Note
The memory module only provides storage and management functionalities. The algorithm logic such as compression is implemented in the agent level.
Currently, AgentScope provides the following memory storage implementations:
Memory Class |
Description |
|---|---|
|
A simple in-memory implementation of memory storage. |
|
An asynchronous SQLAlchemy-based implementation of memory storage, which supports various databases such as SQLite, PostgreSQL, MySQL, etc. |
|
A Redis-based implementation of memory storage. |
Tip
If you’re interested in contributing new memory storage implementations, please refer to the Contribution Guide.
All the above memory classes inherit from the base class MemoryBase, and
provide the following methods to manage the messages in the memory:
import asyncio
import json
import fakeredis
from sqlalchemy.ext.asyncio import create_async_engine
from agentscope.memory import (
InMemoryMemory,
AsyncSQLAlchemyMemory,
RedisMemory,
)
from agentscope.message import Msg
In-Memory Memory¶
The in-memory memory provides a simple way to store messages in memory. Together with the State/Session Management module, it can persist the memory content across different users and sessions.
async def in_memory_example():
"""An example of using InMemoryMemory to store messages in memory."""
memory = InMemoryMemory()
await memory.add(
Msg("Alice", "Generate a report about AgentScope", "user"),
)
# Add a hint message with the mark "hint"
await memory.add(
[
Msg(
"system",
"<system-hint>Create a plan first to collect information and "
"generate the report step by step.</system-hint>",
"system",
),
],
marks="hint",
)
msgs = await memory.get_memory(mark="hint")
print("The messages with mark 'hint':")
for msg in msgs:
print(f"- {msg}")
# All the stored messages can be exported and loaded via ``state_dict`` and ``load_state_dict`` methods.
state = memory.state_dict()
print("The state dict of the memory:")
print(json.dumps(state, indent=2))
# delete messages by mark
deleted_count = await memory.delete_by_mark("hint")
print(f"Deleted {deleted_count} messages with mark 'hint'.")
print("The state dict of the memory after deletion:")
state = memory.state_dict()
print(json.dumps(state, indent=2))
asyncio.run(in_memory_example())
The messages with mark 'hint':
- Msg(id='dHKhG2kAWXuJeJs33TAwZn', name='system', content='<system-hint>Create a plan first to collect information and generate the report step by step.</system-hint>', role='system', metadata=None, timestamp='2026-01-15 12:10:52.179', invocation_id='None')
The state dict of the memory:
{
"content": [
[
{
"id": "7DMBSaWh5U6cmVBVNJ2gX3",
"name": "Alice",
"role": "user",
"content": "Generate a report about AgentScope",
"metadata": null,
"timestamp": "2026-01-15 12:10:52.179"
},
[]
],
[
{
"id": "dHKhG2kAWXuJeJs33TAwZn",
"name": "system",
"role": "system",
"content": "<system-hint>Create a plan first to collect information and generate the report step by step.</system-hint>",
"metadata": null,
"timestamp": "2026-01-15 12:10:52.179"
},
[
"hint"
]
]
]
}
Deleted 1 messages with mark 'hint'.
The state dict of the memory after deletion:
{
"content": [
[
{
"id": "7DMBSaWh5U6cmVBVNJ2gX3",
"name": "Alice",
"role": "user",
"content": "Generate a report about AgentScope",
"metadata": null,
"timestamp": "2026-01-15 12:10:52.179"
},
[]
]
]
}
Relational Database Memory¶
AgentScope provides a unified interface to work with relational databases via SQLAlchemy, supporting
various databases such as SQLite, PostgreSQL, MySQL, etc.
user and session management, and
connection pooling in the production environment
Specifically, here we use a memory backed by SQLite as an example.
async def sqlalchemy_example() -> None:
"""An example of using AsyncSQLAlchemyMemory to store messages in a SQLite database."""
# Create an async SQLAlchemy engine first
engine = create_async_engine("sqlite+aiosqlite:///./test_memory.db")
# Then create the memory with the engine
memory = AsyncSQLAlchemyMemory(
engine_or_session=engine,
# Optionally specify user_id and session_id
user_id="user_1",
session_id="session_1",
)
await memory.add(
Msg("Alice", "Generate a report about AgentScope", "user"),
)
await memory.add(
[
Msg(
"system",
"<system-hint>Create a plan first to collect information and "
"generate the report step by step.</system-hint>",
"system",
),
],
marks="hint",
)
msgs = await memory.get_memory(mark="hint")
print("The messages with mark 'hint':")
for msg in msgs:
print(f"- {msg}")
# Close the engine when done
await memory.close()
asyncio.run(sqlalchemy_example())
The messages with mark 'hint':
- Msg(id='cqRdBpffpyzTcLgoCtxTDo', name='system', content='<system-hint>Create a plan first to collect information and generate the report step by step.</system-hint>', role='system', metadata=None, timestamp='2026-01-15 12:10:52.220', invocation_id='None')
Optionally, you can also use the AsyncSQLAlchemyMemory as an async context manager, and the session will be closed automatically when exiting the context.
async def sqlalchemy_context_example() -> None:
"""Example of using AsyncSQLAlchemyMemory as an async context manager."""
engine = create_async_engine("sqlite+aiosqlite:///./test_memory.db")
async with AsyncSQLAlchemyMemory(
engine_or_session=engine,
user_id="user_1",
session_id="session_1",
) as memory:
await memory.add(
Msg("Alice", "Generate a report about AgentScope", "user"),
)
msgs = await memory.get_memory()
print("All messages in the memory:")
for msg in msgs:
print(f"- {msg}")
asyncio.run(sqlalchemy_context_example())
All messages in the memory:
- Msg(id='guXWMRfxietG7EzpopCi3B', name='Alice', content='Generate a report about AgentScope', role='user', metadata=None, timestamp='2026-01-15 12:10:52.190', invocation_id='None')
- Msg(id='cqRdBpffpyzTcLgoCtxTDo', name='system', content='<system-hint>Create a plan first to collect information and generate the report step by step.</system-hint>', role='system', metadata=None, timestamp='2026-01-15 12:10:52.220', invocation_id='None')
- Msg(id='8hX2zFWaTfi5ftKFgRJpNg', name='Alice', content='Generate a report about AgentScope', role='user', metadata=None, timestamp='2026-01-15 12:10:52.227', invocation_id='None')
In production environment e.g. with FastAPI, the connection pooling can be enabled as follows:
from typing import AsyncGenerator
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from agentscope.agent import ReActAgent
from agentscope.pipeline import stream_printing_messages
app = FastAPI()
# Create an async SQLAlchemy engine with connection pooling
engine = create_async_engine(
"sqlite+aiosqlite:///./test_memory.db",
pool_size=10,
max_overflow=20,
pool_timeout=30,
# ... The other pool settings
)
# Create a session maker
async_session_marker = async_sessionmaker(
engine,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_marker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@app.post("/chat")
async def chat_endpoint(
user_id: str,
session_id: str,
input: str,
db_session: AsyncSession = Depends(get_db),
):
# Some setup for the agent
...
# Create the agent with the SQLAlchemy memory
agent = ReActAgent(
# ...
memory=AsyncSQLAlchemyMemory(
engine_or_session=db_session,
user_id=user_id,
session_id=session_id,
),
)
# Handle the chat with the agent
async for msg, _ in stream_printing_messages(
agents=[agent],
coroutine_task=agent(Msg("user", input, "user")),
):
# yield msg to the client
...
NoSQL Database Memory¶
AgentScope also provides memory implementations based on NoSQL databases such as Redis. It also supports user and session management, and connection pooling in the production environment.
First, we can initialize the Redis memory as follows:
async def redis_memory_example() -> None:
"""An example of using RedisMemory to store messages in Redis."""
# Use fakeredis for in-memory testing without a real Redis server
fake_redis = fakeredis.aioredis.FakeRedis(decode_responses=True)
# Create the Redis memory
memory = RedisMemory(
# Using fake redis for demonstration
connection_pool=fake_redis.connection_pool,
# You can also connect to a real Redis server by specifying host and port
# host="localhost",
# port=6379,
# Optionally specify user_id and session_id
user_id="user_1",
session_id="session_1",
)
# Add a message to the memory
await memory.add(
Msg(
"Alice",
"Generate a report about AgentScope",
"user",
),
)
# Add a hint message with the mark "hint"
await memory.add(
Msg(
"system",
"<system-hint>Create a plan first to collect information and "
"generate the report step by step.</system-hint>",
"system",
),
marks="hint",
)
# Retrieve messages with the mark "hint"
msgs = await memory.get_memory(mark="hint")
print("The messages with mark 'hint':")
for msg in msgs:
print(f"- {msg}")
asyncio.run(redis_memory_example())
The messages with mark 'hint':
- Msg(id='MpVjmgzeQrhtH2QHQp5cBP', name='system', content='<system-hint>Create a plan first to collect information and generate the report step by step.</system-hint>', role='system', metadata=None, timestamp='2026-01-15 12:10:52.245', invocation_id='None')
Similarly, the RedisMemory can also be used with connection pooling in the production environment, e.g., with FastAPI.
from fastapi import FastAPI, HTTPException
from redis.asyncio import ConnectionPool
from contextlib import asynccontextmanager
# Global Redis connection pool
redis_pool: ConnectionPool | None = None
# Use the lifespan event to manage the Redis connection pool
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_pool
redis_pool = ConnectionPool(
host="localhost",
port=6379,
db=0,
password=None,
decode_responses=True,
max_connections=10,
encoding="utf-8",
)
print("✅ Redis connection established")
yield
await redis_pool.disconnect()
print("✅ Redis connection closed")
app = FastAPI(lifespan=lifespan)
@app.post("/chat_endpoint")
async def chat_endpoint(
user_id: str, session_id: str, input: str
): # ✅ 直接使用BaseModel
"""A chat endpoint"""
global redis_pool
if redis_pool is None:
raise HTTPException(
status_code=500,
detail="Redis connection pool is not initialized.",
)
# Create the Redis memory
memory = RedisMemory(
connection_pool=redis_pool,
user_id=user_id,
session_id=session_id,
)
...
# Close the Redis client connection when done
client = memory.get_client()
await client.aclose()
Customizing Memory¶
To customize your own memory, just inherit from MemoryBase and implement the following methods:
Method |
Description |
|---|---|
|
Add |
|
Delete |
|
Delete |
|
The size of the memory |
|
Clear the memory content |
|
Get the memory content as a list of |
|
Update marks of messages in the memory |
|
Get the state dictionary of the memory |
|
Load the state dictionary of the memory |
Further Reading¶
Total running time of the script: (0 minutes 0.092 seconds)