Skip to content

MadBomber/simple_flow

Repository files navigation

SimpleFlow

Ruby Version Test Coverage Tests Documentation License

A lightweight, modular Ruby framework for building composable data processing pipelines with middleware support, flow control, and parallel execution.

📚 Full Documentation | 🚀 Getting Started | 📖 API Reference

Overview

SimpleFlow provides a clean architecture for orchestrating multi-step workflows with:

  • Immutable Results - Thread-safe value objects
  • Composable Steps - Mix and match processing units
  • Flow Control - Built-in halt/continue mechanisms
  • Middleware Support - Cross-cutting concerns via decorator pattern
  • Parallel Execution - Automatic and explicit concurrency
  • Visualization - Export pipelines to Graphviz, Mermaid, HTML

Installation

Add to your Gemfile:

gem 'simple_flow'

# Optional: for fiber-based concurrency (recommended for I/O-bound tasks)
gem 'async', '~> 2.0'

Then run:

bundle install

Note on Parallel Execution:

  • Without async gem: Uses Ruby threads for parallel execution
  • With async gem: Uses fiber-based concurrency (more efficient for I/O-bound operations)

Quick Start

Basic Pipeline

require 'simple_flow'

pipeline = SimpleFlow::Pipeline.new do
  step ->(result) { result.continue(result.value.strip) }
  step ->(result) { result.continue(result.value.upcase) }
  step ->(result) { result.continue("Hello, #{result.value}!") }
end

result = pipeline.call(SimpleFlow::Result.new("  world  "))
puts result.value  # => "Hello, WORLD!"

Error Handling

Sequential steps automatically depend on the previous step's success. When a step halts, the pipeline stops immediately and subsequent steps are not executed.

pipeline = SimpleFlow::Pipeline.new do
  step ->(result) {
    puts "Step 1: Validating..."
    if result.value < 18
      return result
        .with_error(:validation, 'Must be 18+')
        .halt  # Pipeline stops here
    end
    result.continue(result.value)
  }

  step ->(result) {
    puts "Step 2: Processing..."  # This never executes if validation fails
    result.continue("Age #{result.value} is valid")
  }
end

result = pipeline.call(SimpleFlow::Result.new(15))
# Output: "Step 1: Validating..."
# (Step 2 is skipped because Step 1 halted)

puts result.continue?  # => false
puts result.errors     # => {:validation=>["Must be 18+"]}

Architecture

graph TB
    subgraph Pipeline
        MW[Middleware Stack]
        S1[Step 1]
        S2[Step 2]
        S3[Step 3]
    end

    Input[Input Result] --> MW
    MW --> S1
    S1 -->|continue?| S2
    S2 -->|continue?| S3
    S3 --> Output[Output Result]

    S1 -.->|halt| Output
    S2 -.->|halt| Output

    style MW fill:#e1f5ff
    style Output fill:#d4edda
Loading

Execution Modes

SimpleFlow supports two execution modes:

Sequential Steps (Default)

Unnamed steps execute in order, with each step automatically depending on the previous step's success.

pipeline = SimpleFlow::Pipeline.new do
  step ->(result) { result.continue(result.value.strip) }
  step ->(result) { result.continue(result.value.upcase) }
  step ->(result) { result.continue("Hello, #{result.value}!") }
end

result = pipeline.call(SimpleFlow::Result.new("  world  "))

Key behavior:

  • Steps run one at a time in definition order
  • Each step receives the result from the previous step
  • If any step halts, the pipeline stops immediately
  • No explicit dependencies needed

Parallel Steps

Named steps with dependencies run concurrently based on a dependency graph.

pipeline = SimpleFlow::Pipeline.new do
  step :validate, validator, depends_on: :none  # Or use []
  step :fetch_a, fetcher_a, depends_on: [:validate]  # Parallel
  step :fetch_b, fetcher_b, depends_on: [:validate]  # Parallel
  step :merge, merger, depends_on: [:fetch_a, :fetch_b]
end

result = pipeline.call_parallel(SimpleFlow::Result.new(data))

Key behavior:

  • Steps run based on dependency graph, not definition order
  • Steps with satisfied dependencies run in parallel
  • Must explicitly specify all dependencies
  • Use call_parallel to execute

Core Concepts

Result Object

Immutable value object that carries data, context, and errors through the pipeline:

result = SimpleFlow::Result.new({ user: 'alice' })
  .with_context(:timestamp, Time.now)
  .with_error(:validation, 'Email required')
  .continue({ user: 'alice', processed: true })

Learn more →

Pipeline

Orchestrates step execution with short-circuit evaluation:

pipeline = SimpleFlow::Pipeline.new do
  use_middleware SimpleFlow::MiddleWare::Logging
  use_middleware SimpleFlow::MiddleWare::Instrumentation, api_key: 'app'

  step ->(result) { validate(result) }
  step ->(result) { process(result) }
  step ->(result) { save(result) }
end

Learn more →

Middleware

Add cross-cutting concerns without modifying steps:

class CachingMiddleware
  def initialize(callable, cache:)
    @callable = callable
    @cache = cache
  end

  def call(result)
    cached = @cache.get(cache_key(result))
    return result.continue(cached) if cached

    result = @callable.call(result)
    @cache.set(cache_key(result), result.value)
    result
  end
end

pipeline = SimpleFlow::Pipeline.new do
  use CachingMiddleware, cache: Redis.new
  step ->(result) { expensive_operation(result) }
end

Learn more →

Parallel Execution

Automatic Parallelization

SimpleFlow automatically detects which steps can run in parallel based on dependencies:

pipeline = SimpleFlow::Pipeline.new do
  step :validate, ->(r) { validate(r) }, depends_on: :none

  # These run in parallel (both depend only on :validate)
  step :fetch_orders, ->(r) { fetch_orders(r) }, depends_on: [:validate]
  step :fetch_products, ->(r) { fetch_products(r) }, depends_on: [:validate]

  # Waits for both parallel steps
  step :calculate, ->(r) { calculate(r) }, depends_on: [:fetch_orders, :fetch_products]
end

result = pipeline.call_parallel(SimpleFlow::Result.new(data))

Note: For steps with no dependencies, you can use either depends_on: :none (more readable) or depends_on: [].

Optional Steps (Dynamic DAG)

Optional steps are declared with depends_on: :optional and only execute when explicitly activated at runtime:

pipeline = SimpleFlow::Pipeline.new do
  step :router, ->(r) {
    case r.value[:type]
    when :pdf then r.continue(r.value).activate(:process_pdf)
    when :image then r.continue(r.value).activate(:process_image)
    else r.continue(r.value).activate(:process_default)
    end
  }, depends_on: :none

  step :process_pdf, ->(r) { process_pdf(r) }, depends_on: :optional
  step :process_image, ->(r) { process_image(r) }, depends_on: :optional
  step :process_default, ->(r) { process_default(r) }, depends_on: :optional
end

result = pipeline.call_parallel(SimpleFlow::Result.new({ type: :pdf, data: "..." }))
# Only :router and :process_pdf execute

Key features:

  • Declare optional steps with depends_on: :optional
  • Activate steps dynamically with result.activate(:step_name)
  • Activate multiple steps at once: result.activate(:step_a, :step_b)
  • Optional steps can activate other optional steps (chaining)
  • Great for routing, feature flags, and soft error handling

Learn more →

Execution flow:

graph TD
    V[validate] --> O[fetch_orders]
    V --> P[fetch_products]
    O --> C[calculate]
    P --> C

    style V fill:#e1f5ff
    style O fill:#fff3cd
    style P fill:#fff3cd
    style C fill:#d4edda

    classDef parallel fill:#fff3cd,stroke:#ffc107
    class O,P parallel
Loading

Explicit Parallel Blocks

pipeline = SimpleFlow::Pipeline.new do
  step ->(r) { validate(r) }

  parallel do
    step ->(r) { r.with_context(:api, fetch_api).continue(r.value) }
    step ->(r) { r.with_context(:db, fetch_db).continue(r.value) }
    step ->(r) { r.with_context(:cache, fetch_cache).continue(r.value) }
  end

  step ->(r) { merge_results(r) }
end

Concurrency Control

Choose the concurrency model per pipeline:

# Auto-detect (default): uses async if available, otherwise threads
pipeline = SimpleFlow::Pipeline.new do
  # steps...
end

# Force threads (even if async gem is installed)
user_pipeline = SimpleFlow::Pipeline.new(concurrency: :threads) do
  step :fetch_profile, profile_fetcher, depends_on: []
  step :fetch_settings, settings_fetcher, depends_on: []
end

# Require async (raises error if async gem not available)
batch_pipeline = SimpleFlow::Pipeline.new(concurrency: :async) do
  step :load_batch, batch_loader, depends_on: []
  step :process_batch, batch_processor, depends_on: [:load_batch]
end

# Mix concurrency models in the same application!
user_result = user_pipeline.call_parallel(user_data)    # Uses threads
batch_result = batch_pipeline.call_parallel(batch_data) # Uses async

Concurrency options:

  • :auto (default) - Auto-detects best option (async if available, otherwise threads)
  • :threads - Always uses Ruby threads (simpler, works with any gems)
  • :async - Requires async gem (efficient for high-concurrency workloads)

Learn more →

Parallel execution →

Visualization

Visualize your pipelines to understand execution flow:

pipeline = SimpleFlow::Pipeline.new do
  step :load, loader, depends_on: []
  step :transform, transformer, depends_on: [:load]
  step :validate, validator, depends_on: [:transform]
  step :save, saver, depends_on: [:validate]
end

# ASCII visualization
puts pipeline.visualize_ascii

# Export to Graphviz
File.write('pipeline.dot', pipeline.visualize_dot)

# Export to Mermaid
File.write('pipeline.mmd', pipeline.visualize_mermaid)

# View execution plan
puts pipeline.execution_plan

Generated Mermaid diagram:

graph TB
    load --> transform
    transform --> validate
    validate --> save

    style load fill:#e1f5ff
    style transform fill:#fff3cd
    style validate fill:#fce4ec
    style save fill:#d4edda
Loading

Learn more →

Real-World Example

E-commerce order processing pipeline:

pipeline = SimpleFlow::Pipeline.new do
  use SimpleFlow::MiddleWare::Logging
  use SimpleFlow::MiddleWare::Instrumentation, api_key: 'orders'

  step :validate_order, ->(r) {
    # Validation logic
    r.continue(r.value)
  }, depends_on: []

  # Run in parallel
  step :check_inventory, ->(r) {
    inventory = InventoryService.check(r.value[:items])
    r.with_context(:inventory, inventory).continue(r.value)
  }, depends_on: [:validate_order]

  step :calculate_shipping, ->(r) {
    shipping = ShippingService.calculate(r.value[:address])
    r.with_context(:shipping, shipping).continue(r.value)
  }, depends_on: [:validate_order]

  # Wait for parallel steps
  step :process_payment, ->(r) {
    payment = PaymentService.charge(r.value, r.context)
    r.with_context(:payment, payment).continue(r.value)
  }, depends_on: [:check_inventory, :calculate_shipping]

  step :send_confirmation, ->(r) {
    EmailService.send_confirmation(r.value, r.context)
    r.continue(r.value)
  }, depends_on: [:process_payment]
end

Execution flow:

graph TB
    V[validate_order] --> I[check_inventory]
    V --> S[calculate_shipping]
    I --> P[process_payment]
    S --> P
    P --> C[send_confirmation]

    style V fill:#e1f5ff
    style I fill:#fff3cd
    style S fill:#fff3cd
    style P fill:#fce4ec
    style C fill:#d4edda

    classDef parallel fill:#fff3cd,stroke:#ffc107,stroke-width:3px
    class I,S parallel
Loading

Testing

SimpleFlow has excellent test coverage:

bundle exec rake test

Test Results:

  • ✅ 134 tests passing
  • ✅ 480 assertions
  • ✅ 95.57% line coverage

Testing Guide →

Documentation

📚 Comprehensive documentation available at madbomber.github.io/simple_flow

Key Resources

Examples

Check out the examples/ directory for comprehensive examples:

  1. 01_basic_pipeline.rb - Basic sequential processing
  2. 02_error_handling.rb - Error handling patterns
  3. 03_middleware.rb - Middleware usage
  4. 04_parallel_automatic.rb - Automatic parallel discovery
  5. 05_parallel_explicit.rb - Explicit parallel blocks
  6. 06_real_world_ecommerce.rb - Complete e-commerce workflow
  7. 07_real_world_etl.rb - ETL pipeline example
  8. 08_graph_visualization.rb - Manual visualization
  9. 09_pipeline_visualization.rb - Direct pipeline visualization
  10. 10_concurrency_control.rb - Per-pipeline concurrency control
  11. 11_sequential_dependencies.rb - Sequential step dependencies and halting
  12. 12_none_constant.rb - Using reserved dependency symbols :none and :nothing
  13. 13_optional_steps_in_dynamic_dag.rb - Optional steps with dynamic activation

Requirements

  • Ruby 3.2 or higher
  • Optional: async gem (~> 2.0) for parallel execution

License

MIT License - See LICENSE file for details

Contributing

Contributions welcome! See CONTRIBUTING.md for guidelines.

Links


Made with ❤️ by Dewayne VanHoozer

About

Simple workflow orchestration

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages