Workflow Engine
WorkflowExecutor lifecycle, BaseStep pattern, step factory, and dynamic values
WorkflowExecutor Lifecycle
The WorkflowExecutor orchestrates the entire run from YAML parse to final cleanup. Each phase is executed sequentially and any failure triggers the cleanup path.
BaseStep Pattern
All step types extend the abstract BaseStep class. The pattern enforces validation before execution and standardizes result collection.
step_type: str
config: dict # raw step config from YAML
executor: WorkflowExecutor # back-reference for results/nodes
@abstractmethod
def validate(self) -> None:
# Raise StepValidationError if config is invalid
@abstractmethod
async def execute(self) -> StepResult:
# Run the step, return ok() or fail()
def resolve_placeholders(self, value: str) -> str:
# Replace ${results.step_name.field} with actual values
Step Factory
The step factory maps step_type strings to their corresponding BaseStep subclass. When the WorkflowExecutor encounters a step in the YAML, it calls the factory to instantiate the correct handler.
"install_application": InstallApplicationStep,
"create_context": CreateContextStep,
"call": CallStep,
"wait_for_sync": WaitForSyncStep,
"repeat": RepeatStep,
"parallel": ParallelStep,
"script": ScriptStep,
"assert": AssertStep,
"fuzzy_test": FuzzyTestStep,
}
Placeholder Resolution
Steps can reference results from previous steps using the ${results.step_name.field} syntax. The executor resolves these at execution time from the workflow_results dictionary.
Environment Variables
Expanded at YAML load time:
contract_path: ${CONTRACT_DIR}/my_app.wasm
# After expansion
contract_path: /home/user/contracts/my_app.wasm
Dynamic Values
Resolved at step execution time:
context_id: ${results.create_ctx.context_id}
# Resolves from workflow_results dict
context_id: abc123def456...
workflow_results Accumulation
Each step that produces output stores it under its name key in the workflow_results dict. Subsequent steps can reference any previously stored value.
"deploy_app": { "application_id": "abc123" },
"create_ctx": { "context_id": "def456" },
"call_method": { "result": 42, "output": "success" },
}
Step Types
Each step type serves a specific purpose in the workflow execution pipeline.
install_application
Installs a WASM application on a target node. Resolves the application binary from a local path or URL.
core
create_context
Creates a new context on a target node, binding an application to a group. Returns context_id for subsequent steps.
core
call
Invokes a method on an application within a context via JSON-RPC. Supports args, expected results, and retries.
core
wait_for_sync
Waits until all nodes in the workflow have synchronized state for a given context. Polls with configurable timeout.
core
repeat
Loops a sequence of sub-steps a specified number of times. Supports iteration variables accessible within child steps.
control
parallel
Executes multiple sub-steps concurrently using asyncio.gather. All sub-steps must succeed for the parallel step to succeed.
control
script
Runs an arbitrary shell script or Python snippet. Captures stdout/stderr and exit code. Useful for custom validation.
utility
assert
Validates a condition against workflow results. Supports equality, contains, regex, and custom comparators.
testing
fuzzy_test
Randomized testing: generates random inputs, executes calls across nodes, and verifies eventual consistency.
advanced
Step Execution Detail
Each step follows a consistent execution pattern within the WorkflowExecutor.
Instantiate
The step factory creates a BaseStep subclass from the YAML config. The executor reference is injected for access to nodes and results.
Validate
The validate() method checks required fields, node references, and config sanity. Raises StepValidationError on failure.
Resolve
All string values in the config are passed through resolve_placeholders(), replacing ${results.*} and ${nodes.*} references.
Execute
The async execute() method runs the step logic. For RPC steps, this calls the node via calimero-client-py. Returns ok(data) or fail(error).
Store
If the step has a name field, its result data is stored in workflow_results[name] for use by subsequent steps.