Skip to content

gradyzhuo/swift-kurrentdb

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

586 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

swift-kurrentdb

License Swift Package Index Swift-build-testing codecov

A modern, type-safe Swift client for Kurrent (formerly EventStoreDB)

Built for Server-Side Swift and Event Sourcing

Documentation | Getting Started | Discussions


Why swift-kurrentdb?

Event Sourcing is a powerful pattern for building scalable, auditable systems. swift-kurrentdb brings this capability to the Swift ecosystem with a modern, type-safe client.

  • Native Swift β€” Designed for Swift from the ground up, not a wrapper
  • Modern Concurrency β€” Full async/await with Swift 6 data-race safety
  • Compile-Time Safety β€” Swift 6 strict concurrency compliance with typed throws
  • Cluster-Ready β€” First-class support for multi-node TLS clusters
  • Well-Documented β€” Comprehensive guides on Swift Package Index
  • Typed Errors β€” All operations throw KurrentError for precise error handling

Quick Start

Installation

Add to your Package.swift:

// Stable release
dependencies: [
    .package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "1.12.1")
]

2.0.0 Beta available β€” A major new release is in active development. The 2.x API introduces a target-based design with improved type safety and composability.

.package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "2.0.0")

See the Migration Guide below for what's changed.


Connect to a Cluster

import KurrentDB

// Local development β€” single node
let settings = ClientSettings.localhost()
    .authenticated(.credentials(username: "admin", password: "changeit"))

// Local development β€” multi-node TLS cluster
let settings = ClientSettings.localhost(ports: 2111, 2112, 2113)
    .secure(true)
    .tlsVerifyCert(false)
    .authenticated(.credentials(username: "admin", password: "changeit"))
    .cerificate(path: "/path/to/ca.crt")

// Production β€” remote cluster (TLS enabled by default)
let settings = ClientSettings.remote(
    "node1.example.com:2113",
    "node2.example.com:2113",
    "node3.example.com:2113"
).authenticated(.credentials(username: "admin", password: "changeit"))

// Connection string
let settings: ClientSettings = "esdb://admin:changeit@node1:2113,node2:2113?tls=true"

let client = KurrentDBClient(settings: settings)

Append and Read Events

// Create an event
let event = EventData(
    eventType: "OrderPlaced",
    model: ["orderId": "order-123", "total": 99.99]
)

// Append to stream
try await client.appendToStream("orders", events: [event]) {
    $0.revision(expected: .any)
}

// Read events
let responses = try await client.readStream("orders") {
    $0.startFrom(revision: .start).limit(10)
}

for try await response in responses {
    if let event = try response.event {
        print("Event: \(event.record.eventType)")
    }
}

API Overview

Streams

// Append
try await client.appendToStream("orders", events: [event]) {
    $0.revision(expected: .streamExists)
}

// Read forward
let responses = try await client.readStream("orders") {
    $0.startFrom(revision: .start).limit(50)
}

// Read backward
let responses = try await client.readStream("orders") {
    $0.startFrom(revision: .end).limit(10).backward()
}

// Read $all
let allResponses = try await client.readAllStreams {
    $0.limit(100)
}

// Subscribe (catch-up)
let subscription = try await client.subscribeStream("orders")
for try await event in subscription.events { ... }

// Subscribe to $all
let subscription = try await client.subscribeAllStreams()

// Delete / tombstone
try await client.deleteStream("orders")
try await client.tombstoneStream("orders")

// Stream metadata
try await client.setStreamMetadata("orders", metadata: metadata)
let metadata = try await client.getStreamMetadata("orders")

Projections

// Create
try await client.createContinuousProjection(name: "order-count", query: js)
try await client.createOneTimeProjection(query: js)
try await client.createTransientProjection(name: "temp", query: js)

// Lifecycle
try await client.enableProjection(name: "order-count")
try await client.disableProjection(name: "order-count")
try await client.abortProjection(name: "order-count")
try await client.resetProjection(name: "order-count")
try await client.deleteProjection(name: "order-count")

// Query state / result
let state: CountResult? = try await client.getProjectionState(of: CountResult.self, name: "order-count")
let result: Int? = try await client.getProjectionResult(of: Int.self, name: "order-count")

// List
let continuous = try await client.listAllProjections(mode: .continuous)
let all = try await client.listAllProjections(mode: .any)

Persistent Subscriptions

// Create a subscription group
try await client.createPersistentSubscription(
    stream: "orders",
    groupName: "order-workers"
) {
    $0.startFrom(revision: .start)
      .maxRetryCount(5)
}

// Subscribe and process events
let subscription = try await client.subscribePersistentSubscription(
    stream: "orders",
    groupName: "order-workers"
)

for try await result in subscription.events {
    do {
        // handle event
        try await subscription.ack(readEvents: result.event)
    } catch {
        try await subscription.nack(readEvents: result.event, action: .park, reason: "\(error)")
    }
}

// $all persistent subscription
try await client.createPersistentSubscriptionToAllStream(groupName: "all-workers")
let allSub = try await client.subscribePersistentSubscriptionToAllStreams(groupName: "all-workers")

// Update / delete
try await client.updatePersistentSubscription(stream: "orders", groupName: "order-workers") { $0 }
try await client.deletePersistentSubscription(stream: "orders", groupName: "order-workers")

User Management

// Create a user
try await client.createUser(
    loginName: "jane",
    password: "secure_password",
    fullName: "Jane Doe",
    groups: ["ops"]
)

// Manage user
try await client.enableUser(loginName: "jane")
try await client.disableUser(loginName: "jane")
try await client.changeUserPassword(loginName: "jane", currentPassword: "old", newPassword: "new")
try await client.resetUserPassword(loginName: "jane", newPassword: "reset")

Server Operations

// Scavenge
let response = try await client.startScavenge(threadCount: 2, startFromChunk: 0)
try await client.stopScavenge(scavengeId: response.scavengeId)

// System
try await client.mergeIndexes()
try await client.restartPersistentSubscriptions()

// Node
try await client.resignNode()
try await client.setNodePriority(priority: 10)

Cluster Gossip

let members = try await client.readCluster()

for member in members {
    print("\(member.httpEndPoint.host):\(member.httpEndPoint.port) β€” \(member.state)")
}

if let leader = members.first(where: { $0.state == .leader && $0.isAlive }) {
    print("Leader: \(leader.httpEndPoint)")
}

Monitoring

let stats = try await client.stats(refreshTimePeriodInMs: 5000)

for try await snapshot in stats {
    print("Metrics: \(snapshot.stats.count) entries")
}

Migration Guide

Version 2.0.0 introduces a breaking redesign of the API. The flat methods on KurrentDBClient are replaced by a target-based, hierarchical style:

// 1.x
try await client.appendToStream("orders", events: [event]) { ... }

// 2.x
try await client.streams(of: .specified("orders")).append(events: [event]) { ... }

The 1.x API moves to KurrentDB_V1

In 2.x the old flat-method API is no longer part of the KurrentDB module. It has been moved to a separate KurrentDB_V1 library that ships in the same package. If you are not ready to migrate immediately, switch your dependency target and import:

// Package.swift
.product(name: "KurrentDB_V1", package: "swift-kurrentdb")
// Replace your existing import
import KurrentDB_V1   // was: import KurrentDB

KurrentDB_V1 gives you access to all 1.x methods (marked @deprecated) while you migrate to the new API at your own pace.

πŸ‘‰ Full Migration Guide β€” 1.x to 2.x


Features

Category Operations
Streams Append, read, delete, subscribe (catch-up), $all stream
Persistent Subscriptions Create, subscribe, update, delete, ACK/NAK, $all support
Projections Create (continuous/one-time/transient), enable, disable, state, result
Users Create, enable, disable, update, change/reset password
Operations Scavenge (start/stop), merge indexes, shutdown, node priority
Gossip Cluster discovery, node health, leader detection
Monitoring Real-time server statistics
Connection TLS/SSL, cluster gossip discovery, auto-reconnection, keep-alive

Test Coverage

75% line coverage across the KurrentDB module, measured by running all 174 tests against a live 3-node TLS KurrentDB cluster.

Coverage by Subsystem

Subsystem Line Coverage Lines
Monitoring 88.4% 95
ServerFeatures 90.2% 61
Users 87.6% 403
Streams 86.1% 1,560
Operations 83.0% 235
Projections 77.3% 865
PersistentSubscriptions 71.9% 1,555
Gossip 66.9% 142
Core 66.8% 2,650
KurrentDB (total) 75.0% 7,581

Test Suites

174 tests across 9 integration suites and 2 unit/mock suites. All integration tests run against a live 3-node TLS KurrentDB cluster.

Suite Tests Type Key Scenarios
StreamsTests 35 Integration Append, read (forward/backward/limit/revision), subscribe, metadata, optimistic concurrency, delete, tombstone
ProjectionsTests 16 Integration Create (continuous/one-time/transient), enable/disable, abort, reset, state/result query, list
PersistentSubscriptionsTests 11 Integration Create, subscribe, ACK, NACK (park/retry), getInfo, update settings, list, delete, replay parked
UsersTests 7 Integration Create, enable/disable, update, change/reset password
OperationsTests 6 Integration Scavenge (start/stop), merge indexes, restart persistent subscriptions, node priority, resign
GossipTests 3 Integration Read cluster members, node state, custom timeout
MonitoringTests 3 Integration Server stats, refresh interval, metadata flag
KurrentCoreTests 67 Unit Connection string parsing, EventData, projection status, stream identifiers, metadata, subscription filters
MockClientTests 26 Mock/DI KurrentDBClientProtocol conformance, all factory call patterns, 5 domain service scenarios
Total 174 0 commented-out tests

Optimistic Concurrency

Streams write-side error paths are explicitly covered:

Scenario Expected Error
Append at stale revision (.at(99), stream at 0) wrongExpectedVersion
Append with .noStream to an existing stream wrongExpectedVersion
Two concurrent writers at the same revision One succeeds, one wrongExpectedVersion

Persistent Subscription Lifecycle

Scenario Verified
Create β†’ subscribe β†’ append β†’ ACK βœ“
NACK with park (dead-letter queue) βœ“
NACK with retry (re-delivery, deliveries == 2) βœ“
getInfo (groupName, eventSource, $all) βœ“
Update settings β†’ getInfo confirms change βœ“
park β†’ replayParked β†’ re-delivered β†’ ACK βœ“

Requirements

  • Swift 6.0+
  • macOS 15+ / iOS 18+ / tvOS 18+ / watchOS 11+ / visionOS 2+ / Linux

KurrentDB Server Compatibility

Server Version Status Notes
KurrentDB 26.0 βœ… Supported Full feature support
KurrentDB 25.1 βœ… Supported Full feature support
EventStoreDB 24.x βœ… Supported Core features supported; KurrentDB v2 batch append not available

Local Development with Docker

Start a 3-node TLS cluster:

cd server
docker compose up -d

This generates TLS certificates automatically and starts nodes on ports 2111, 2112, and 2113.

Or a single insecure node for quick testing:

docker run --rm -d -p 2113:2113 \
  -e KURRENTDB_CLUSTER_SIZE=1 \
  -e KURRENTDB_RUN_PROJECTIONS=All \
  -e KURRENTDB_START_STANDARD_PROJECTIONS=true \
  -e KURRENTDB_INSECURE=true \
  -e KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP=true \
  docker.kurrent.io/kurrent-latest/kurrentdb:25.1

Documentation

Guide Description
Migration Guide (1.x β†’ 2.x) What changed in 2.0 and how to update your code
Getting Started Connection settings, first event, basic usage
Appending Events EventData, concurrency control, idempotency
Reading Events Forward/backward reading, $all stream, filters
Projections Create, manage, and query projection state
Persistent Subscriptions Competing consumers, ACK/NAK, subscription groups
User Management Create, enable, disable, password management
Cluster Gossip Cluster discovery, node health monitoring
Monitoring Real-time server statistics
Server Operations Scavenge, index merge, shutdown, node management
Full API Reference Complete API documentation

Contributing

Contributions are welcome! Whether it's bug reports, feature requests, documentation improvements, or code contributions.

License

MIT License β€” see LICENSE for details.

Acknowledgments

Built with:

Inspired by official Kurrent/EventStoreDB clients.


Made by Grady Zhuo

About

Kurrent Databse gRPC Client SDK in Swift.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Languages