Workflow Engine

WorkflowExecutor lifecycle, BaseStep pattern, step factory, and dynamic values

15+
step types
9
lifecycle phases
async
execution model
dict
result accumulation

WorkflowExecutor Lifecycle

The WorkflowExecutor orchestrates the entire run from YAML parse to final cleanup. Each phase is executed sequentially and any failure triggers the cleanup path.

nuke optional pull image sandbox start deploy contracts nodes start register remote steps execute cleanup always on failure

BaseStep Pattern

All step types extend the abstract BaseStep class. The pattern enforces validation before execution and standardizes result collection.

class BaseStep(ABC):
    step_type: str
    config: dict        # raw step config from YAML
    executor: WorkflowExecutor # back-reference for results/nodes

    @abstractmethod
    def validate(self) -> None:
        # Raise StepValidationError if config is invalid

    @abstractmethod
    async def execute(self) -> StepResult:
        # Run the step, return ok() or fail()

    def resolve_placeholders(self, value: str) -> str:
        # Replace ${results.step_name.field} with actual values

Step Factory

The step factory maps step_type strings to their corresponding BaseStep subclass. When the WorkflowExecutor encounters a step in the YAML, it calls the factory to instantiate the correct handler.

STEP_REGISTRY = {
    "install_application": InstallApplicationStep,
    "create_context": CreateContextStep,
    "call": CallStep,
    "wait_for_sync": WaitForSyncStep,
    "repeat": RepeatStep,
    "parallel": ParallelStep,
    "script": ScriptStep,
    "assert": AssertStep,
    "fuzzy_test": FuzzyTestStep,
}

Placeholder Resolution

Steps can reference results from previous steps using the ${results.step_name.field} syntax. The executor resolves these at execution time from the workflow_results dictionary.

Environment Variables

Expanded at YAML load time:

# In YAML
contract_path: ${CONTRACT_DIR}/my_app.wasm

# After expansion
contract_path: /home/user/contracts/my_app.wasm

Dynamic Values

Resolved at step execution time:

# In YAML
context_id: ${results.create_ctx.context_id}

# Resolves from workflow_results dict
context_id: abc123def456...

workflow_results Accumulation

Each step that produces output stores it under its name key in the workflow_results dict. Subsequent steps can reference any previously stored value.

workflow_results = {
    "deploy_app": { "application_id": "abc123" },
    "create_ctx": { "context_id": "def456" },
    "call_method": { "result": 42, "output": "success" },
}

Step Types

Each step type serves a specific purpose in the workflow execution pipeline.

install_application

Installs a WASM application on a target node. Resolves the application binary from a local path or URL.

core

create_context

Creates a new context on a target node, binding an application to a group. Returns context_id for subsequent steps.

core

call

Invokes a method on an application within a context via JSON-RPC. Supports args, expected results, and retries.

core

wait_for_sync

Waits until all nodes in the workflow have synchronized state for a given context. Polls with configurable timeout.

core

repeat

Loops a sequence of sub-steps a specified number of times. Supports iteration variables accessible within child steps.

control

parallel

Executes multiple sub-steps concurrently using asyncio.gather. All sub-steps must succeed for the parallel step to succeed.

control

script

Runs an arbitrary shell script or Python snippet. Captures stdout/stderr and exit code. Useful for custom validation.

utility

assert

Validates a condition against workflow results. Supports equality, contains, regex, and custom comparators.

testing

fuzzy_test

Randomized testing: generates random inputs, executes calls across nodes, and verifies eventual consistency.

advanced

Step Execution Detail

Each step follows a consistent execution pattern within the WorkflowExecutor.

1

Instantiate

The step factory creates a BaseStep subclass from the YAML config. The executor reference is injected for access to nodes and results.

2

Validate

The validate() method checks required fields, node references, and config sanity. Raises StepValidationError on failure.

3

Resolve

All string values in the config are passed through resolve_placeholders(), replacing ${results.*} and ${nodes.*} references.

4

Execute

The async execute() method runs the step logic. For RPC steps, this calls the node via calimero-client-py. Returns ok(data) or fail(error).

5

Store

If the step has a name field, its result data is stored in workflow_results[name] for use by subsequent steps.