Skip to content

lupodevelop/distribute

Repository files navigation

distribute

distribute logo

Package Version Hex Docs

Distribute brings the full power of Erlang's distributed computing to Gleam, with a type-safe and gleam_otp-integrated API.

While Gleam runs on the BEAM, accessing distributed primitives (like connecting nodes, global registration, or RPC) traditionally required dealing with untyped atoms and unsafe Erlang terms. Distribute v2.0 solves this by providing:

Type-safe messaging using binary codecs (Encoder(a), Decoder(a)) and gleam/erlang/process.Subject(BitArray)
Full gleam_otp integration — integrates with actors and selectors
Explicit error handling — all operations return typed Result values
Composable codecs — built-in support for primitives, Option, Result, tuples, and custom types
Production-ready — comprehensive error handling, deprecated legacy APIs

Note: Use the _typed variants of all functions (e.g., send_global_typed, broadcast_typed) for full type safety. Legacy untyped functions are deprecated and will be removed in v3.0.

Features

Core Distributed Primitives

  • Node Management — Start distributed nodes, connect to peers, and list connected nodes with proper error handling.
  • Global Registry — Register and lookup processes across the cluster using a type-safe wrapper around :global.
  • Cross-node Messaging — Send messages to remote processes or globally registered names transparently.
  • Process Groups — Join/leave groups and broadcast messages to all members (wrapper around :pg).
  • Remote Monitoring — Monitor processes and nodes for failure detection across the network.
  • RPC — Perform Remote Procedure Calls to any Erlang/Gleam module with timeout control.

Type-Safe API (v2.0)

  • Binary Codec System — Encoder/Decoder types for compile-time safe serialization
  • Envelope Protocol — Tag + version validation for protocol mismatch detection (see distribute/codec.wrap_envelope and distribute/codec.unwrap_envelope)
  • Typed Messagingsend_typed, call_typed, broadcast_typed with explicit errors
  • Receiver Helpers — Convenient receive_typed integration with gleam/erlang/process
  • gleam_otp Compatible — Use standard Subject(BitArray) from gleam/erlang/process

Advanced Features

  • SWIM-like Membership — A built-in background service for failure detection, gossip, and cluster membership.
  • Raft-lite Election — A lightweight leader election implementation for simple coordination needs.

Installation

# gleam.toml
[dependencies]
gleam_stdlib = ">= 0.43.0"
gleam_erlang = ">= 0.5.0"
gleam_otp = ">= 0.1.0"
distribute = "~> 2.0"
gleam add distribute 

Documentation

Quick Start (Type-Safe API)

1. Start a Distributed Node

import distribute/node_builder
import gleam/io

pub fn main() {
  // Start this node with builder pattern
  let assert Ok(_) = node_builder.new()
    |> node_builder.with_name("app@127.0.0.1")
    |> node_builder.with_cookie("secret")
    |> node_builder.start()
  
  io.println("Node started!")
}

2. Connect to Another Node

import distribute/cluster
import gleam/io
import gleam/string

pub fn connect_peer() {
  case cluster.connect("other@127.0.0.1") {
    Ok(_) -> io.println("Connected to peer!")
    Error(err) -> io.println("Connection failed: " <> string.inspect(err))
  }
}

3. Register and Lookup Processes Globally (Type-Safe)

import distribute/codec
import distribute/global
import distribute/messaging
import distribute/receiver
import distribute/registry
import gleam/erlang/process

pub fn register_and_lookup() {
  // Create a type-safe global subject with encoder/decoder
  let encoder = codec.string_encoder()
  let decoder = codec.string_decoder()
  let global = global.new(encoder, decoder)
  
  // Register it globally
  let _ = registry.register_typed("my_service", global.subject(global))

  // From another node/process: look up the service type-safely
  let assert Ok(remote_service) = registry.whereis_global(
    "my_service",
    encoder,
    decoder
  )
  
  // Send a typed message through GlobalSubject
  let _ = global.send(remote_service, "Hello, world!")
  
  // Or use messaging API directly
  let _ = messaging.send_global_typed(
    "my_service", 
    "Hello, world!",
    encoder
  )
  
  // Receive typed messages
  let assert Ok(msg) = receiver.receive_typed(
    global.subject(global),
    decoder,
    1000
  )
}

4. Process Groups (Type-Safe)

import distribute/codec
import distribute/codec/builder
import distribute/global
import distribute/groups

pub type Task {
  Task(name: String, id: Int)
}

pub fn use_groups() {
  // Build a codec for Task
  let #(task_encoder, task_decoder) = builder.custom2(
    codec.string_encoder(),
    codec.int_encoder(),
    codec.string_decoder(),
    codec.int_decoder(),
    Task,
    fn(t) { #(t.name, t.id) },
  )
  
  // Create a global subject for group communication
  let global = global.new(task_encoder, task_decoder)

  // Join a group named "workers"
  let _ = groups.join_typed("workers", global.subject(global))

  // Broadcast a typed message to all members
  let _ = global.send(global, Task("process_data", 42))

  // Get members list
  let members = groups.members_typed("workers")
}

5. SWIM Membership Service

import distribute/cluster/membership

pub fn use_membership() {
  // Start the background membership service (probe every 500ms)
  membership.start_service(500)

  // Get a list of alive nodes
  let alive = membership.alive()

  // Get nodes with full status details
  let nodes = membership.members_with_status()
  // Returns: [#("node@host", Alive, 0), ...]

  // Get current leader (lexicographically largest alive node)
  let leader = membership.current_leader()

  // Stop the service when done
  membership.stop_service()
}

6. Leader Election (Raft-lite)

import distribute/election/raft_lite
import gleam/io

pub fn elect_leader() {
  case raft_lite.elect() {
    raft_lite.Leader(name) -> io.println("Current Leader: " <> name)
    raft_lite.NoLeader -> io.println("No leader available yet")
  }
}

7. Remote Procedure Calls

import distribute/remote_call
import gleam/io
import gleam/string

pub fn call_remote() {
  // Call erlang:node() on a remote node
  case remote_call.call("other@host", "erlang", "node", [], []) {
    Ok(result) -> io.println("Remote node name: " <> string.inspect(result))
    Error(remote_call.RpcBadRpc(reason)) -> io.println("RPC failed: " <> reason)
    Error(_) -> io.println("RPC error")
  }
}

Module Reference

Module Description
distribute/cluster Node management: start, connect, ping, list nodes
distribute/registry Global process registration with type-safe subjects
distribute/messaging Type-safe cross-node messaging with codecs
distribute/groups Process groups with type-safe broadcast
distribute/monitor Monitor processes and nodes
distribute/remote_call Type-safe RPC to remote nodes
distribute/codec Binary encoding/decoding for primitives and composite types
distribute/codec/builder Helpers for building custom type codecs
distribute/codec/tagged Tag and version validation for protocol safety
distribute/global Type-safe global subjects with integrated codecs
distribute/receiver Type-safe message receiving with codecs
distribute/sugar Convenience helpers for common patterns
distribute/actor Simple actor wrappers
distribute/cluster/membership SWIM-like membership with gossip and failure detection
distribute/cluster/gossip Gossip protocol for membership state propagation
distribute/cluster/health Health checks for nodes and cluster
distribute/election/raft_lite Lightweight leader election with term-based voting

Note: Legacy untyped functions (send_global, broadcast, call) are deprecated and will be removed in v3.0. Use the _typed variants with codecs for full type safety.

Running Tests

gleam test

Integration Tests

You can run a multi-node SWIM integration test using the provided script:

./examples/two_nodes/swim_integration.sh

This script starts 3 local Erlang nodes, runs the membership service, and verifies gossip convergence.

Note: The legacy wrapper examples/two_nodes/run_swim_integration.sh has been removed — run examples/two_nodes/swim_integration.sh directly.

Note: Scripts under examples/ are manual integration demos. They rely on local node naming (-sname + hostname) and are not intended to be a CI hard requirement.

Architecture

src/
├── distribute.gleam        # Top-level module
├── distribute/
│   ├── cluster.gleam       # Node management
│   ├── registry.gleam      # Global registry
│   ├── messaging.gleam     # Cross-node messaging
│   ├── groups.gleam        # Process groups
│   ├── monitor.gleam       # Process/node monitoring
│   ├── remote_call.gleam   # RPC
│   ├── cluster/
│   │   ├── membership.gleam # SWIM-like membership
│   │   ├── gossip.gleam     # Gossip protocol
│   │   └── health.gleam     # Health checks
│   └── election/
│       └── raft_lite.gleam  # Leader election
└── *_ffi.erl               # Erlang FFI files (in src root)

Design Philosophy

  1. No magic — Everything is explicit.
  2. Zero custom runtime — Only wraps standard BEAM features.
  3. Small but complete APIs — Just what you need for clustering.
  4. Type-safe — Gleam's type system prevents common errors.
  5. Compatible with gleam/otp — Works alongside standard OTP patterns.

Safety

  • Atom creation: Some module APIs (FFI) accept string inputs that are converted to Erlang atoms. Creating atoms dynamically from arbitrary input can exhaust the BEAM atom table (which is not garbage-collected). Use caution when passing untrusted input to functions that convert strings to atoms. Where possible, use existing atoms or map inputs to a constrained set of known atoms.

    The library makes a best-effort attempt to avoid creating new atoms where possible (attempting binary_to_existing_atom first), but code paths that require atoms (e.g., global:register_name, pg:join) may still create atoms in some cases. Prefer using stable, pre-defined names or registering via a controlled mapping to avoid growing the atom table.

Settings

This library exposes a small settings API which controls behaviour that impacts safety and logging.

  • settings.set_allow_atom_creation(allow: Bool) — default: False.

    • If False, FFI calls will attempt binary_to_existing_atom and will not create a new atom if it doesn't exist, returning an error instead. This prevents uncontrolled atom creation from untrusted inputs.
    • If True, the library may create atoms when necessary for compatibility (e.g., RPC and monitor fallback).
  • settings.set_use_crypto_ids(use_crypto: Bool) — default: False.

    • If True, log.generate_correlation_id() will prefer crypto-derived IDs for better randomness/uniqueness. If False, a monotonic/time-based fallback is used.

Example:

import distribute/settings

pub fn main() {
  // Secure default — don't allow uncontrolled atom creation
  settings.set_allow_atom_creation(False)

  // Prefer crypto-based correlation ids
  settings.set_use_crypto_ids(True)
}

Logging & Correlation IDs

This library provides a structured logging helper log with metadata and correlation ids.

  • Set logging backend:

    • log.set_backend("console") (default)
    • log.set_backend("erlang_logger")
  • Generate correlation ids:

    • let id = log.generate_correlation_id()
    • Use log.info_with_correlation, log.error_with_correlation, etc. to attach id to logs and metadata.

Example:

import distribute/log
import distribute/settings

pub fn main() {
  settings.set_use_crypto_ids(True)
  log.set_backend("erlang_logger")
  let id = log.generate_correlation_id()
  log.info_with_correlation("Starting operation", [#("user","alice")], id)
}

RPC Timeouts & Error Handling

Remote calls use a default timeout of 5000ms. Use remote_call.call_with_timeout to customise the timeout for long-running calls.

Example with a custom timeout:

import distribute/remote_call
import gleam/io
import gleam/string

pub fn call_remote_with_timeout() {
  // Call erlang:node() on a remote node with a 10s timeout
  case remote_call.call_with_timeout("other@host", "erlang", "node", [], [], 10_000) {
    Ok(result) -> io.println("Remote node name: " <> string.inspect(result))
    Error(remote_call.RpcBadRpc(reason)) -> io.println("RPC failed: " <> reason)
    Error(_) -> io.println("RPC error")
  }
}

Documentation

Examples

  • Multi-node shell scripts live in examples/two_nodes/.
  • A tiny example project wrapper lives in examples/two_nodes_app/.

Contributing

Contributions welcome! Please open an issue or PR on GitHub.

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

The full power of Erlang's distributed computing to Gleam, with a type-safe and idiomatic API.

Resources

License

Stars

Watchers

Forks

Packages

No packages published