Skip to content

batch 并发分支

batch 适合“多个互相独立的任务并发执行,再汇合结果”。

推荐示例

python
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow()

@flow.chunk("facts")
async def facts(data: TriggerFlowRuntimeData):
    await asyncio.sleep(0.01)
    return f"facts:{data.value}"

@flow.chunk("risks")
async def risks(data: TriggerFlowRuntimeData):
    await asyncio.sleep(0.01)
    return f"risks:{data.value}"

flow.batch(facts, risks).end()
print(flow.start("AI chips"))

适用场景

  • 并行调用多个工具 / 服务
  • 并行抽取多个维度信息
  • 一个输入拆成多个独立分析任务

当前最佳实践

  • batch 里的任务彼此应尽量独立
  • 分支建议使用具名 chunk,方便结果键名稳定、Mermaid 清晰、配置导出友好
  • 如果外部依赖有限流要求,再配合 concurrency

不再推荐

  • 把存在强前后依赖的任务硬塞进 batch
  • 在 batch 分支里共享可变外部状态却不做隔离