The main service for managing workflow lifecycle.
from hermes_plugin.services.workflow_service import WorkflowService
ws = WorkflowService(store)Create a new workflow with steps and DAG edges.
Parameters:
name(str): Workflow namedescription(str): Workflow descriptionsteps(list[dict]): List of step definitionsedges(list[tuple]): List of (from_id, to_id) dependency edgesconfig(dict, optional): Additional configuration
Returns: dict - Created workflow with id, state, steps, edges
Example:
workflow = await ws.create_workflow(
name="research",
description="Research workflow",
steps=[
{"id": "search", "name": "Search", "discipline": "quick", "prompt": "Search for info"},
{"id": "analyze", "name": "Analyze", "discipline": "deep", "prompt": "Analyze results"},
],
edges=[("search", "analyze")],
)Start executing a workflow. Root steps (no dependencies) become ready.
Parameters:
workflow_id(str): UUID of the workflow to start
Returns: dict - Updated workflow with state='running'
Report the result of a step execution.
Parameters:
workflow_id(str): Workflow UUIDstep_id(str): Step ID to advanceresult(str, optional): Step result on successerror(str, optional): Error message on failure
Returns: dict - Updated workflow status
Get detailed workflow status including all steps and progress.
Parameters:
workflow_id(str): Workflow UUID
Returns: dict - Workflow status with steps, progress, state
List workflows with optional state filter.
Parameters:
limit(int): Maximum results (default 50)state_filter(str, optional): Filter by state (draft/active/running/completed/failed/cancelled)
Returns: list[dict] - List of workflows
Cancel a running or active workflow.
Parameters:
workflow_id(str): Workflow UUID
Returns: dict - Cancelled workflow
Async scheduler for parallel step execution.
from hermes_plugin.services.scheduler import Scheduler
scheduler = Scheduler(store, ws, config={"max_parallelism": 4})Execute a workflow to completion with automatic parallel scheduling.
Parameters:
workflow_id(str): Workflow UUID
Cancel a specific running step.
Parameters:
workflow_id(str): Workflow UUIDstep_id(str): Step ID to cancel
Returns: bool - True if cancelled
Cancel all running steps for a workflow.
Parameters:
workflow_id(str): Workflow UUID
Returns: int - Number of cancelled tasks
MCP server for exposing workflow operations as tools.
from mcp.server import SoloFlowMCPServer
server = SoloFlowMCPServer(store_path=Path("soloflow.db"))
await server.start()| Tool | Description | Parameters |
|---|---|---|
soloflow_create |
Create a new workflow | name, description, steps, edges |
soloflow_run |
Execute a workflow | workflow_id, executor |
soloflow_status |
Get workflow status | workflow_id |
soloflow_list |
List workflows | limit, state |
soloflow_cancel |
Cancel a workflow | workflow_id |
Collects and stores execution traces.
from trace.collector import TraceCollector
collector = TraceCollector(db_path=Path("traces.db"))Start a new trace span.
Returns: Span - Created span
Finish a span.
Returns: Span - Finished span
Get all spans for a trace.
Returns: list[dict] - List of spans
Get statistics for a trace.
Returns: dict - Statistics (total_spans, success_count, total_tokens, etc.)
Exports traces to various formats.
from trace.exporter import TraceExporter
exporter = TraceExporter(collector)Export trace to JSON.
Format trace as a tree string.
Ebbinghaus forgetting curve implementation.
from memory.forgetting.curve import ForgettingCurve
curve = ForgettingCurve()Calculate retention at a given time.
Formula: R(t) = base × e^(-t / stability)
Consolidate a memory (increase stability).
Calculate time until retention drops to target.
Automatic memory consolidation system.
from memory.forgetting.consolidation import MemoryConsolidator
consolidator = MemoryConsolidator(db_path=Path("memory.db"))Add a new memory.
Get a memory and record access.
Search memories by content.
Run consolidation cycle on all memories.
Classify tasks by complexity.
from routing.classifier import TaskClassifier
classifier = TaskClassifier()Classify a task and determine discipline.
Returns: ClassificationResult with discipline, confidence, reasoning
Extract features from task description.
Returns: dict - Feature flags
Route tasks to appropriate executors.
from routing.router import DisciplineRouter, Executor
router = DisciplineRouter(classifier=classifier)Register an executor for a discipline.
Route a task to an executor.
Returns: RoutingResult with classification, executor, task
Route and execute a task.
Returns: Any - Execution result
Detect repeated workflow patterns.
from evolution.pattern_detector import PatternDetector
detector = PatternDetector(db_path=Path("patterns.db"))Record a workflow execution.
Detect repeated patterns.
Returns: list[Pattern] - Detected patterns
Package patterns into versioned skills.
from evolution.skill_packager import SkillPackager
packager = SkillPackager(db_path=Path("skills.db"))Package a pattern into a skill.
Returns: Skill - Created skill with MCP tool definition
Evaluate skill quality.
from evolution.quality_scorer import QualityScorer
scorer = QualityScorer()Score a skill across 4 dimensions.
Returns: QualityScore with overall, reliability, efficiency, maturity, reusability scores
Rank skills by quality.
Returns: list[tuple[Skill, QualityScore]] - Ranked skills
class StepState(str, Enum):
PENDING = "pending"
READY = "ready"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"class WorkflowState(str, Enum):
DRAFT = "draft"
ACTIVE = "active"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"class Discipline(str, Enum):
QUICK = "quick" # ~2s
DEEP = "deep" # ~30s
VISUAL = "visual" # ~30s
ULTRABRAIN = "ultrabrain" # ~120s