Skip to content

运行时流与旁路输出

可视化边界:runtime stream 图用于表达旁路观察链路,不代表 flow config 导出能力。

runtime_stream 是旁路输出通道,不等于业务状态,也不等于最终结果。

1. 运行时流链路

如何阅读这张图

  • runtime_streamstateresult 并列存在,不是它们的别名。
  • 它的价值在于中间态可观察,而不是承担可恢复业务状态。
  • sub_flow 里的 stream 事件现在也可以桥接回父 execution。

2. 公开接口

  • data.put_into_stream(...)
  • data.async_put_into_stream(...)
  • data.stop_stream()
  • data.async_stop_stream()
  • flow.get_runtime_stream(...)
  • flow.get_async_runtime_stream(...)
  • execution.get_runtime_stream(...)
  • execution.get_async_runtime_stream(...)

3. 基础示例

python
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow()

@flow.chunk("stream_steps")
async def stream_steps(data: TriggerFlowRuntimeData):
    data.put_into_stream({"step": 1})
    data.put_into_stream({"step": 2})
    data.stop_stream()
    return "done"

flow.to(stream_steps)

for event in flow.get_runtime_stream("start", timeout=None):
    print(event)

4. sub_flow 默认会桥接 child stream

v4.0.8.3 起,如果父流程调用:

python
flow.to_sub_flow(child_flow)

那么 child execution 内部的:

  • put_into_stream(...)
  • async_put_into_stream(...)

事件会默认汇总到父 execution 的 runtime stream。

这意味着 UI 或日志消费者可以只监听父 execution,就拿到整条执行链的流式输出。

5. interrupt 也会写入 stream

当 execution 进入:

  • pause_for()
  • continue_with()

runtime stream 会写入结构化 interrupt 事件。

这意味着 UI 可以直接消费 stream,而不是轮询内部状态。

6. timeout 语义

如果 stream 长时间没有新事件,get_runtime_stream(timeout=...) 会停止并发出 warning。

这不会自动终止 execution 本身,只是 stream 消费结束。

7. 推荐用法

  • 中间态展示走 stream
  • 可恢复状态走 state
  • 最终业务结果走 result
  • 父子流程之间的最终数据交换走 write_back

8. 不要混淆的边界

  • stream 不是 write_back
  • stream 不是 state
  • stream 的桥接只解决观察链路,不解决父子状态同步