| description | Token-level async streaming with astream(), tool call support, and provider fallbacks | |||
|---|---|---|---|---|
| tags |
|
Import: from selectools import Agent
Stability: stable
import asyncio
from selectools import Agent, AgentConfig, Message, Role, tool
from selectools.providers.stubs import LocalProvider
@tool(description="Search the web")
def search(query: str) -> str:
return f"Results for '{query}': Python is a popular programming language."
provider = LocalProvider()
agent = Agent(
tools=[search],
provider=provider,
config=AgentConfig(max_iterations=2),
)
async def main():
async for item in agent.astream(
[Message(role=Role.USER, content="Search for Python tutorials")]
):
# item is either StreamChunk (text) or AgentResult (final)
print(type(item).__name__, getattr(item, "content", "")[:80])
asyncio.run(main())!!! tip "See Also" - Agent - Agent lifecycle, hooks, and configuration - Providers - Provider implementations and streaming support
Directory: src/selectools/agent/
Key Types: StreamChunk, AgentResult (from selectools.types)
- Overview
- Quick Start
- E2E Streaming (v0.11.0)
- Parallel Tool Execution (v0.11.0)
- Native Function Calling (v0.10.0)
- Routing Mode (v0.10.0)
- Context Propagation (v0.10.0)
- AgentResult (v0.9.0)
- Custom System Prompt (v0.9.0)
- Agent.reset() (v0.9.0)
- Performance Comparison
- Practical Examples
- Best Practices
- Troubleshooting
- Further Reading
The selectools library provides a rich set of streaming and performance features that enable real-time token delivery, concurrent tool execution, and programmatic inspection of agent behavior. These capabilities span from token-level streaming (astream) to routing without execution (routing_only), from native function calling to context-preserving tool execution.
| Feature | Version | Purpose |
|---|---|---|
| E2E Streaming | v0.11.0 | Token-by-token output with native tool call support |
| Parallel Tool Execution | v0.11.0 | Run multiple tools concurrently in a single iteration |
| Native Function Calling | v0.10.0 | Provider-native tool APIs, no regex parsing |
| Routing Mode | v0.10.0 | Select a tool without executing it (classification, intent routing) |
| Context Propagation | v0.10.0 | Preserve tracing and auth when running sync tools in executors |
| AgentResult | v0.9.0 | Structured return with message, tool metadata, iterations |
| Custom System Prompt | v0.9.0 | Inject domain instructions via AgentConfig |
| Agent.reset() | v0.9.0 | Clear state for clean reuse across requests |
from selectools import Agent, AgentConfig, Message, Role
from selectools.types import StreamChunk, AgentResultimport asyncio
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import StreamChunk, AgentResult
agent = Agent(
tools=[search_tool],
provider=OpenAIProvider(),
config=AgentConfig(max_iterations=3),
)
async def main():
async for item in agent.astream([Message(role=Role.USER, content="Search for Python tutorials")]):
if isinstance(item, StreamChunk):
print(item.content, end="", flush=True)
elif isinstance(item, AgentResult):
print(f"\n\nDone in {item.iterations} iterations")
if item.tool_calls:
print(f"Tools used: {[tc.tool_name for tc in item.tool_calls]}")
asyncio.run(main())Agent.astream(messages) returns an AsyncGenerator yielding Union[StreamChunk, AgentResult]:
- StreamChunk — Intermediate content chunks (text and/or tool calls)
- AgentResult — Final result, yielded once when the agent completes
@dataclass
class StreamChunk:
content: str = "" # Text delta
role: Role = Role.ASSISTANT
tool_calls: Optional[List[ToolCall]] = None # Optional; present when chunk contains tool invocationscontent: The text portion of this chunktool_calls: Optional list ofToolCallobjects when the LLM emits tool invocations during streaming
The last item yielded by astream() is always an AgentResult. It carries:
message— Final assistant responsetool_name— Last tool called (orNone)tool_args— Args for last tooliterations— Number of loop iterationstool_calls— AllToolCallobjects from the run
Providers implement astream() yielding Union[str, ToolCall]:
- Text deltas — Raw
strchunks (token-by-token) - Tool calls — Complete
ToolCallobjects when ready (native function calling)
graph LR
P["Provider.astream()"] --> A["yield 'Hello' (str)"]
P --> B["yield ' ' (str)"]
P --> C["yield 'world' (str)"]
P --> D["yield ToolCall(...) (tool invocation)"]
P --> E["yield '!' (str)"]
When a provider does not support streaming:
flowchart TD
A["astream() requested"] --> B{"Provider has astream()?"}
B -- Yes --> C["Use it"]
B -- No --> D{"Provider has acomplete()?"}
D -- Yes --> E["Call it, yield full response\nas single StreamChunk"]
D -- No --> F["Run complete() in\nThreadPoolExecutor"]
- Accumulation: Tool calls are accumulated as they stream in from the provider.
- Execution: When all tool calls in a response are ready, they are executed (in parallel if
parallel_tool_execution=True). - Continue: Results are appended to history; streaming continues with the next LLM call.
- Final result: When the LLM produces a final text response with no tool calls,
AgentResultis yielded.
graph TD
subgraph Iteration1["Iteration 1"]
A1["StreamChunk('Searching...')"] --> A2["StreamChunk(tool_calls=[...])"]
A2 --> A3["Tools executed"]
end
subgraph Iteration2["Iteration 2"]
B1["StreamChunk('Here are the results:')"] --> B2["StreamChunk('- Result 1')"]
B2 --> B3["..."]
B3 --> B4["AgentResult(iterations=2, tool_calls=[...])"]
end
Iteration1 --> Iteration2
When the LLM requests multiple tool calls in a single response (common with native function calling), the agent executes them concurrently instead of sequentially.
config = AgentConfig(
parallel_tool_execution=True # Default: enabled
)Set to False for strictly sequential execution.
Uses asyncio.gather() for concurrent tool runs:
results = await asyncio.gather(*[run_tool(tc) for tc in tool_calls])Uses ThreadPoolExecutor with one worker per tool:
with ThreadPoolExecutor(max_workers=len(tool_calls)) as pool:
futures = [pool.submit(run_tool, tc) for tc in tool_calls]
results = [f.result() for f in futures]| Guarantee | Description |
|---|---|
| Result ordering | Results appended to history in original request order |
| Error isolation | One tool failure does not block others |
| Hook invocation | on_tool_start, on_tool_end, on_tool_error fire for every tool |
| Single-tool optimization | Only one tool called → sequential path, no executor overhead |
selectools uses provider-native tool APIs instead of regex parsing:
- OpenAI —
functions/tool_usein chat completions - Anthropic —
tool_useblocks - Gemini —
function_callingin responses
Responses carry structured ToolCall objects on Message.tool_calls:
response = provider.complete(...)
msg = response[0]
if msg.tool_calls:
for tc in msg.tool_calls:
print(f"Tool: {tc.tool_name}, Args: {tc.parameters}")- Providers return
ToolCallobjects directly. - No text-based patterns such as
TOOL_CALL {...}.
When a provider returns plain text only (no native tool format), the agent falls back to ToolCallParser regex parsing.
AgentConfig(routing_only=True) makes the agent choose a tool but not run it. Useful for classification, intent routing, and tool selection.
config = AgentConfig(routing_only=True)
agent = Agent(tools=[...], provider=provider, config=config)Returns AgentResult with:
tool_name— Selected tooltool_args— Parsed argumentsmessage— Assistant message containing the selectiontrace— Execution trace (LLM call + tool selection steps)
No tool execution; only one LLM call. Observer events on_iteration_start and on_iteration_end both fire for the single iteration, along with on_run_start/on_run_end.
| Use Case | Example |
|---|---|
| Classification | Route to sales vs support vs billing |
| Intent detection | Choose between search, calculator, or Q&A |
| Tool preselection | Decide which tools to enable before full execution |
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import AgentResult
config = AgentConfig(routing_only=True)
agent = Agent(
tools=[search_tool, calculator_tool, support_tool],
provider=OpenAIProvider(),
config=config,
)
result = agent.run([Message(role=Role.USER, content="I need help with my bill")])
# Inspect routing decision without executing
assert result.tool_name == "support_tool"
assert "billing" in str(result.tool_args).lower() or "bill" in str(result.tool_args).lower()When sync tools run inside a ThreadPoolExecutor (e.g. async agent calling sync tools), contextvars.copy_context() is used so request-scoped state (tracing, auth, etc.) is preserved.
# In tools/base.py - sync tool execution from async context
context = contextvars.copy_context()
func_with_args = functools.partial(self.function, **call_args)
result = await loop.run_in_executor(executor, context.run, func_with_args)- OpenTelemetry tracing spans
- Auth tokens
- Request IDs
- Other
contextvarsvalues
Async tools run in the same event loop as the agent; no executor, so context is already intact.
agent.run() and agent.arun() return AgentResult instead of Message, enabling programmatic inspection of tool usage and iterations.
| Field | Type | Description |
|---|---|---|
message |
Message |
Final assistant response |
tool_name |
Optional[str] |
Last tool called, or None |
tool_args |
Dict[str, Any] |
Args for last tool call |
iterations |
int |
Number of agent loop iterations |
tool_calls |
List[ToolCall] |
All tool calls in order |
result.content→result.message.contentresult.role→result.message.role
result = agent.run([Message(role=Role.USER, content="What's the weather in Tokyo?")])
print(result.content) # Final text
print(result.tool_name) # e.g. "get_weather"
print(result.tool_args) # e.g. {"location": "Tokyo"}
print(result.iterations) # e.g. 2
print(len(result.tool_calls)) # Number of tools invokedAgentConfig(system_prompt="...") injects domain instructions before tool schemas. They persist across iterations.
config = AgentConfig(
system_prompt="You are a medical assistant. Only provide information you are confident about."
)
agent = Agent(tools=[...], provider=provider, config=config)- Domain constraints (medical, legal, etc.)
- Tone and persona
- Guardrails and safety
- Language or formatting rules
config = AgentConfig(
system_prompt="""You are a financial advisor.
- Never guarantee returns.
- Always recommend consulting a licensed professional.
- Use clear, non-technical language."""
)
agent = Agent(tools=[lookup_stock, get_news], provider=provider, config=config)Agent.reset() clears history, usage stats, analytics, and memory so the same agent instance can be reused across requests.
_history— Message historyusage— Token/cost statsanalytics— If enabledmemory— IfConversationMemoryis set, callsmemory.clear()
agent = Agent(tools=[...], provider=provider, memory=ConversationMemory())
# Create once, reset between requests
for user_request in requests:
agent.reset()
result = agent.run([Message(role=Role.USER, content=user_request)])| Scenario | Sequential | Parallel | Speedup |
|---|---|---|---|
| 3 tools × 0.15s each | ~0.45s | ~0.15s | ~3× |
| 5 tools × 0.2s each | ~1.0s | ~0.2s | ~5× |
| 1 tool | 0.15s | 0.15s | 1× (no overhead) |
import time
from selectools import Agent, AgentConfig, Message, Role, tool
@tool(description="Simulate slow API")
def slow_api(delay: float) -> str:
time.sleep(delay)
return f"Done after {delay}s"
agent_parallel = Agent(
tools=[slow_api],
provider=provider,
config=AgentConfig(parallel_tool_execution=True, max_iterations=2),
)
agent_sequential = Agent(
tools=[slow_api],
provider=provider,
config=AgentConfig(parallel_tool_execution=False, max_iterations=2),
)
# With a prompt that triggers 3 tool calls:
# parallel: ~0.15s
# sequential: ~0.45sconfig = AgentConfig(routing_only=True)
agent = Agent(
tools=[sales_tool, support_tool, billing_tool],
provider=provider,
config=config,
)
intent = agent.run([Message(role=Role.USER, content=user_message)])
if intent.tool_name == "sales_tool":
route_to_sales_team(intent.tool_args)
elif intent.tool_name == "support_tool":
create_support_ticket(intent.tool_args)
else:
forward_to_billing(intent.tool_args)result = agent.run(messages)
if result.tool_calls:
for tc in result.tool_calls:
log_tool_usage(tc.tool_name, tc.parameters)
if result.iterations > 3:
alert_complex_conversation()config = AgentConfig(
system_prompt="You are a Python expert. Prefer type hints and modern syntax. Suggest tests when relevant.",
max_iterations=5,
)
agent = Agent(tools=[search_docs, run_code], provider=provider, config=config)async for item in agent.astream(messages):
if isinstance(item, StreamChunk):
await websocket.send_json({"type": "chunk", "content": item.content})
elif isinstance(item, AgentResult):
await websocket.send_json({"type": "done", "iterations": item.iterations})Default is True; disable only when tool ordering or side effects require sequential execution.
Use routing mode for cheap classification instead of a full agent run.
agent = Agent(...)
for req in queue:
agent.reset()
result = agent.run(req)Use result.tool_calls and result.iterations for logging and monitoring.
Cause: Provider lacks astream(); agent falls back to acomplete() and yields a single chunk.
Fix: Use a provider that implements astream() (e.g. OpenAI, Anthropic, Gemini).
Cause: parallel_tool_execution=False or only one tool per response.
Fix: Set AgentConfig(parallel_tool_execution=True) and use prompts that trigger multiple tools.
Cause: Older selectools versions or custom executor usage without context propagation.
Fix: Upgrade to v0.10.0+; sync tools from async agent should receive proper context propagation.
Cause: Misconfiguration or different code path.
Fix: Ensure AgentConfig(routing_only=True) is passed to Agent, not just AgentConfig().
Stability: beta
Selectools supports multimodal messages through the ContentPart, image_message(), and text_content() helpers. These let you send images alongside text to vision-capable models.
from selectools.types import ContentPart, Message, Role
# Build a message with multiple content parts
parts = [
ContentPart(type="text", text="What's in these images?"),
ContentPart(type="image_url", image_url="https://example.com/photo.jpg"),
ContentPart(type="image_base64", image_base64="...", media_type="image/png"),
]
msg = Message(role=Role.USER, content="", content_parts=parts)from selectools.types import image_message
# From a URL
msg = image_message("https://example.com/photo.jpg", prompt="Describe this image.")
# From a local file path
msg = image_message("/path/to/photo.png", prompt="What do you see?")Extract text from a message regardless of format:
from selectools.types import text_content
# Works with both plain content and content_parts messages
text = text_content(message)When streaming with astream(), providers serialize content_parts into their native multimodal format (OpenAI content arrays, Anthropic content blocks, Gemini parts). The streaming pipeline handles content parts transparently -- text deltas and tool calls are yielded as usual.
- Agent Module - Agent lifecycle, hooks, configuration
- Tools Module - Tool definition and validation
- Providers Module - Provider implementations and streaming
- Memory Module - Conversation memory and
reset()
Next Steps: Enable streaming with agent.astream() and optimize tool-heavy workflows with parallel_tool_execution=True.
| # | Script | Description |
|---|---|---|
| 07 | 07_streaming_tools.py |
Token-level async streaming with tool call support |
| 08 | 08_streaming_parallel.py |
Streaming with parallel tool execution |