Skip to content

Latest commit

 

History

History
435 lines (259 loc) · 8.35 KB

File metadata and controls

435 lines (259 loc) · 8.35 KB

SoloFlow API Reference

Core Modules

WorkflowService

The main service for managing workflow lifecycle.

from hermes_plugin.services.workflow_service import WorkflowService

ws = WorkflowService(store)

Methods

create_workflow(name, description, steps, edges, config=None)

Create a new workflow with steps and DAG edges.

Parameters:

  • name (str): Workflow name
  • description (str): Workflow description
  • steps (list[dict]): List of step definitions
  • edges (list[tuple]): List of (from_id, to_id) dependency edges
  • config (dict, optional): Additional configuration

Returns: dict - Created workflow with id, state, steps, edges

Example:

workflow = await ws.create_workflow(
    name="research",
    description="Research workflow",
    steps=[
        {"id": "search", "name": "Search", "discipline": "quick", "prompt": "Search for info"},
        {"id": "analyze", "name": "Analyze", "discipline": "deep", "prompt": "Analyze results"},
    ],
    edges=[("search", "analyze")],
)
start_workflow(workflow_id)

Start executing a workflow. Root steps (no dependencies) become ready.

Parameters:

  • workflow_id (str): UUID of the workflow to start

Returns: dict - Updated workflow with state='running'

advance_step(workflow_id, step_id, result=None, error=None)

Report the result of a step execution.

Parameters:

  • workflow_id (str): Workflow UUID
  • step_id (str): Step ID to advance
  • result (str, optional): Step result on success
  • error (str, optional): Error message on failure

Returns: dict - Updated workflow status

get_status(workflow_id)

Get detailed workflow status including all steps and progress.

Parameters:

  • workflow_id (str): Workflow UUID

Returns: dict - Workflow status with steps, progress, state

list_workflows(limit=50, state_filter=None)

List workflows with optional state filter.

Parameters:

  • limit (int): Maximum results (default 50)
  • state_filter (str, optional): Filter by state (draft/active/running/completed/failed/cancelled)

Returns: list[dict] - List of workflows

cancel_workflow(workflow_id)

Cancel a running or active workflow.

Parameters:

  • workflow_id (str): Workflow UUID

Returns: dict - Cancelled workflow


Scheduler

Async scheduler for parallel step execution.

from hermes_plugin.services.scheduler import Scheduler

scheduler = Scheduler(store, ws, config={"max_parallelism": 4})

Methods

run_workflow(workflow_id)

Execute a workflow to completion with automatic parallel scheduling.

Parameters:

  • workflow_id (str): Workflow UUID
cancel_step(workflow_id, step_id)

Cancel a specific running step.

Parameters:

  • workflow_id (str): Workflow UUID
  • step_id (str): Step ID to cancel

Returns: bool - True if cancelled

cancel_all(workflow_id)

Cancel all running steps for a workflow.

Parameters:

  • workflow_id (str): Workflow UUID

Returns: int - Number of cancelled tasks


MCP Tools

SoloFlowMCPServer

MCP server for exposing workflow operations as tools.

from mcp.server import SoloFlowMCPServer

server = SoloFlowMCPServer(store_path=Path("soloflow.db"))
await server.start()

Tools

Tool Description Parameters
soloflow_create Create a new workflow name, description, steps, edges
soloflow_run Execute a workflow workflow_id, executor
soloflow_status Get workflow status workflow_id
soloflow_list List workflows limit, state
soloflow_cancel Cancel a workflow workflow_id

Trace System

TraceCollector

Collects and stores execution traces.

from trace.collector import TraceCollector

collector = TraceCollector(db_path=Path("traces.db"))

Methods

start_span(operation, node_name, parent_id, trace_id, input_data, metadata)

Start a new trace span.

Returns: Span - Created span

finish_span(span_id, status, output_data, error_message, token_usage)

Finish a span.

Returns: Span - Finished span

get_trace(trace_id)

Get all spans for a trace.

Returns: list[dict] - List of spans

get_span_stats(trace_id)

Get statistics for a trace.

Returns: dict - Statistics (total_spans, success_count, total_tokens, etc.)

TraceExporter

Exports traces to various formats.

from trace.exporter import TraceExporter

exporter = TraceExporter(collector)

Methods

export_json(trace_id, output_path)

Export trace to JSON.

format_trace_tree(trace_id)

Format trace as a tree string.


Memory System

ForgettingCurve

Ebbinghaus forgetting curve implementation.

from memory.forgetting.curve import ForgettingCurve

curve = ForgettingCurve()

Methods

retention(time_elapsed, stability, base_retention)

Calculate retention at a given time.

Formula: R(t) = base × e^(-t / stability)

consolidate(entry)

Consolidate a memory (increase stability).

time_until_forget(stability, base_retention, target_retention)

Calculate time until retention drops to target.

MemoryConsolidator

Automatic memory consolidation system.

from memory.forgetting.consolidation import MemoryConsolidator

consolidator = MemoryConsolidator(db_path=Path("memory.db"))

Methods

add_memory(key, content, tier, stability)

Add a new memory.

get_memory(key)

Get a memory and record access.

search_memories(query, tier, limit)

Search memories by content.

consolidate_all()

Run consolidation cycle on all memories.


Routing System

TaskClassifier

Classify tasks by complexity.

from routing.classifier import TaskClassifier

classifier = TaskClassifier()

Methods

classify(task_description)

Classify a task and determine discipline.

Returns: ClassificationResult with discipline, confidence, reasoning

extract_features(task_description)

Extract features from task description.

Returns: dict - Feature flags

DisciplineRouter

Route tasks to appropriate executors.

from routing.router import DisciplineRouter, Executor

router = DisciplineRouter(classifier=classifier)

Methods

register_executor(executor)

Register an executor for a discipline.

route(task)

Route a task to an executor.

Returns: RoutingResult with classification, executor, task

route_and_execute(task)

Route and execute a task.

Returns: Any - Execution result


Evolution System

PatternDetector

Detect repeated workflow patterns.

from evolution.pattern_detector import PatternDetector

detector = PatternDetector(db_path=Path("patterns.db"))

Methods

record_execution(workflow, success, duration_ms)

Record a workflow execution.

detect_patterns(min_occurrences, min_success_rate)

Detect repeated patterns.

Returns: list[Pattern] - Detected patterns

SkillPackager

Package patterns into versioned skills.

from evolution.skill_packager import SkillPackager

packager = SkillPackager(db_path=Path("skills.db"))

Methods

package_pattern(pattern)

Package a pattern into a skill.

Returns: Skill - Created skill with MCP tool definition

QualityScorer

Evaluate skill quality.

from evolution.quality_scorer import QualityScorer

scorer = QualityScorer()

Methods

score_skill(skill, pattern)

Score a skill across 4 dimensions.

Returns: QualityScore with overall, reliability, efficiency, maturity, reusability scores

rank_skills(skills, patterns)

Rank skills by quality.

Returns: list[tuple[Skill, QualityScore]] - Ranked skills


Data Models

StepState

class StepState(str, Enum):
    PENDING = "pending"
    READY = "ready"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

WorkflowState

class WorkflowState(str, Enum):
    DRAFT = "draft"
    ACTIVE = "active"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

Discipline

class Discipline(str, Enum):
    QUICK = "quick"        # ~2s
    DEEP = "deep"          # ~30s
    VISUAL = "visual"      # ~30s
    ULTRABRAIN = "ultrabrain"  # ~120s