运行时流与旁路输出
可视化边界:runtime stream 图用于表达旁路观察链路,不代表 flow config 导出能力。
runtime_stream 是旁路输出通道,不等于业务状态,也不等于最终结果。
1. 运行时流链路
如何阅读这张图
runtime_stream与state、result并列存在,不是它们的别名。- 它的价值在于中间态可观察,而不是承担可恢复业务状态。
sub_flow里的 stream 事件现在也可以桥接回父 execution。
2. 公开接口
data.put_into_stream(...)data.async_put_into_stream(...)data.stop_stream()data.async_stop_stream()flow.get_runtime_stream(...)flow.get_async_runtime_stream(...)execution.get_runtime_stream(...)execution.get_async_runtime_stream(...)
3. 基础示例
python
from agently import TriggerFlow, TriggerFlowRuntimeData
flow = TriggerFlow()
@flow.chunk("stream_steps")
async def stream_steps(data: TriggerFlowRuntimeData):
data.put_into_stream({"step": 1})
data.put_into_stream({"step": 2})
data.stop_stream()
return "done"
flow.to(stream_steps)
for event in flow.get_runtime_stream("start", timeout=None):
print(event)4. sub_flow 默认会桥接 child stream
v4.0.8.3 起,如果父流程调用:
python
flow.to_sub_flow(child_flow)那么 child execution 内部的:
put_into_stream(...)async_put_into_stream(...)
事件会默认汇总到父 execution 的 runtime stream。
这意味着 UI 或日志消费者可以只监听父 execution,就拿到整条执行链的流式输出。
5. interrupt 也会写入 stream
当 execution 进入:
pause_for()continue_with()
runtime stream 会写入结构化 interrupt 事件。
这意味着 UI 可以直接消费 stream,而不是轮询内部状态。
6. timeout 语义
如果 stream 长时间没有新事件,get_runtime_stream(timeout=...) 会停止并发出 warning。
这不会自动终止 execution 本身,只是 stream 消费结束。
7. 推荐用法
- 中间态展示走 stream
- 可恢复状态走
state - 最终业务结果走
result - 父子流程之间的最终数据交换走
write_back
8. 不要混淆的边界
- stream 不是
write_back - stream 不是
state - stream 的桥接只解决观察链路,不解决父子状态同步