A modern, type-safe Swift client for Kurrent (formerly EventStoreDB)
Built for Server-Side Swift and Event Sourcing
Event Sourcing is a powerful pattern for building scalable, auditable systems. swift-kurrentdb brings this capability to the Swift ecosystem with a modern, type-safe client.
- Native Swift β Designed for Swift from the ground up, not a wrapper
- Modern Concurrency β Full async/await with Swift 6 data-race safety
- Compile-Time Safety β Swift 6 strict concurrency compliance with typed throws
- Cluster-Ready β First-class support for multi-node TLS clusters
- Well-Documented β Comprehensive guides on Swift Package Index
- Typed Errors β All operations throw
KurrentErrorfor precise error handling
Add to your Package.swift:
// Stable release
dependencies: [
.package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "1.12.1")
]2.0.0 Beta available β A major new release is in active development. The 2.x API introduces a target-based design with improved type safety and composability.
.package(url: "https://github.com/gradyzhuo/swift-kurrentdb.git", from: "2.0.0")See the Migration Guide below for what's changed.
import KurrentDB
// Local development β single node
let settings = ClientSettings.localhost()
.authenticated(.credentials(username: "admin", password: "changeit"))
// Local development β multi-node TLS cluster
let settings = ClientSettings.localhost(ports: 2111, 2112, 2113)
.secure(true)
.tlsVerifyCert(false)
.authenticated(.credentials(username: "admin", password: "changeit"))
.cerificate(path: "/path/to/ca.crt")
// Production β remote cluster (TLS enabled by default)
let settings = ClientSettings.remote(
"node1.example.com:2113",
"node2.example.com:2113",
"node3.example.com:2113"
).authenticated(.credentials(username: "admin", password: "changeit"))
// Connection string
let settings: ClientSettings = "esdb://admin:changeit@node1:2113,node2:2113?tls=true"
let client = KurrentDBClient(settings: settings)// Create an event
let event = EventData(
eventType: "OrderPlaced",
model: ["orderId": "order-123", "total": 99.99]
)
// Append to stream
try await client.appendToStream("orders", events: [event]) {
$0.revision(expected: .any)
}
// Read events
let responses = try await client.readStream("orders") {
$0.startFrom(revision: .start).limit(10)
}
for try await response in responses {
if let event = try response.event {
print("Event: \(event.record.eventType)")
}
}// Append
try await client.appendToStream("orders", events: [event]) {
$0.revision(expected: .streamExists)
}
// Read forward
let responses = try await client.readStream("orders") {
$0.startFrom(revision: .start).limit(50)
}
// Read backward
let responses = try await client.readStream("orders") {
$0.startFrom(revision: .end).limit(10).backward()
}
// Read $all
let allResponses = try await client.readAllStreams {
$0.limit(100)
}
// Subscribe (catch-up)
let subscription = try await client.subscribeStream("orders")
for try await event in subscription.events { ... }
// Subscribe to $all
let subscription = try await client.subscribeAllStreams()
// Delete / tombstone
try await client.deleteStream("orders")
try await client.tombstoneStream("orders")
// Stream metadata
try await client.setStreamMetadata("orders", metadata: metadata)
let metadata = try await client.getStreamMetadata("orders")// Create
try await client.createContinuousProjection(name: "order-count", query: js)
try await client.createOneTimeProjection(query: js)
try await client.createTransientProjection(name: "temp", query: js)
// Lifecycle
try await client.enableProjection(name: "order-count")
try await client.disableProjection(name: "order-count")
try await client.abortProjection(name: "order-count")
try await client.resetProjection(name: "order-count")
try await client.deleteProjection(name: "order-count")
// Query state / result
let state: CountResult? = try await client.getProjectionState(of: CountResult.self, name: "order-count")
let result: Int? = try await client.getProjectionResult(of: Int.self, name: "order-count")
// List
let continuous = try await client.listAllProjections(mode: .continuous)
let all = try await client.listAllProjections(mode: .any)// Create a subscription group
try await client.createPersistentSubscription(
stream: "orders",
groupName: "order-workers"
) {
$0.startFrom(revision: .start)
.maxRetryCount(5)
}
// Subscribe and process events
let subscription = try await client.subscribePersistentSubscription(
stream: "orders",
groupName: "order-workers"
)
for try await result in subscription.events {
do {
// handle event
try await subscription.ack(readEvents: result.event)
} catch {
try await subscription.nack(readEvents: result.event, action: .park, reason: "\(error)")
}
}
// $all persistent subscription
try await client.createPersistentSubscriptionToAllStream(groupName: "all-workers")
let allSub = try await client.subscribePersistentSubscriptionToAllStreams(groupName: "all-workers")
// Update / delete
try await client.updatePersistentSubscription(stream: "orders", groupName: "order-workers") { $0 }
try await client.deletePersistentSubscription(stream: "orders", groupName: "order-workers")// Create a user
try await client.createUser(
loginName: "jane",
password: "secure_password",
fullName: "Jane Doe",
groups: ["ops"]
)
// Manage user
try await client.enableUser(loginName: "jane")
try await client.disableUser(loginName: "jane")
try await client.changeUserPassword(loginName: "jane", currentPassword: "old", newPassword: "new")
try await client.resetUserPassword(loginName: "jane", newPassword: "reset")// Scavenge
let response = try await client.startScavenge(threadCount: 2, startFromChunk: 0)
try await client.stopScavenge(scavengeId: response.scavengeId)
// System
try await client.mergeIndexes()
try await client.restartPersistentSubscriptions()
// Node
try await client.resignNode()
try await client.setNodePriority(priority: 10)let members = try await client.readCluster()
for member in members {
print("\(member.httpEndPoint.host):\(member.httpEndPoint.port) β \(member.state)")
}
if let leader = members.first(where: { $0.state == .leader && $0.isAlive }) {
print("Leader: \(leader.httpEndPoint)")
}let stats = try await client.stats(refreshTimePeriodInMs: 5000)
for try await snapshot in stats {
print("Metrics: \(snapshot.stats.count) entries")
}Version 2.0.0 introduces a breaking redesign of the API.
The flat methods on KurrentDBClient are replaced by a target-based, hierarchical style:
// 1.x
try await client.appendToStream("orders", events: [event]) { ... }
// 2.x
try await client.streams(of: .specified("orders")).append(events: [event]) { ... }In 2.x the old flat-method API is no longer part of the KurrentDB module.
It has been moved to a separate KurrentDB_V1 library that ships in the same package.
If you are not ready to migrate immediately, switch your dependency target and import:
// Package.swift
.product(name: "KurrentDB_V1", package: "swift-kurrentdb")// Replace your existing import
import KurrentDB_V1 // was: import KurrentDBKurrentDB_V1 gives you access to all 1.x methods (marked @deprecated) while you migrate to the new API at your own pace.
π Full Migration Guide β 1.x to 2.x
| Category | Operations |
|---|---|
| Streams | Append, read, delete, subscribe (catch-up), $all stream |
| Persistent Subscriptions | Create, subscribe, update, delete, ACK/NAK, $all support |
| Projections | Create (continuous/one-time/transient), enable, disable, state, result |
| Users | Create, enable, disable, update, change/reset password |
| Operations | Scavenge (start/stop), merge indexes, shutdown, node priority |
| Gossip | Cluster discovery, node health, leader detection |
| Monitoring | Real-time server statistics |
| Connection | TLS/SSL, cluster gossip discovery, auto-reconnection, keep-alive |
75% line coverage across the KurrentDB module, measured by running all 174 tests against a live 3-node TLS KurrentDB cluster.
| Subsystem | Line Coverage | Lines |
|---|---|---|
| Monitoring | 88.4% | 95 |
| ServerFeatures | 90.2% | 61 |
| Users | 87.6% | 403 |
| Streams | 86.1% | 1,560 |
| Operations | 83.0% | 235 |
| Projections | 77.3% | 865 |
| PersistentSubscriptions | 71.9% | 1,555 |
| Gossip | 66.9% | 142 |
| Core | 66.8% | 2,650 |
| KurrentDB (total) | 75.0% | 7,581 |
174 tests across 9 integration suites and 2 unit/mock suites. All integration tests run against a live 3-node TLS KurrentDB cluster.
| Suite | Tests | Type | Key Scenarios |
|---|---|---|---|
| StreamsTests | 35 | Integration | Append, read (forward/backward/limit/revision), subscribe, metadata, optimistic concurrency, delete, tombstone |
| ProjectionsTests | 16 | Integration | Create (continuous/one-time/transient), enable/disable, abort, reset, state/result query, list |
| PersistentSubscriptionsTests | 11 | Integration | Create, subscribe, ACK, NACK (park/retry), getInfo, update settings, list, delete, replay parked |
| UsersTests | 7 | Integration | Create, enable/disable, update, change/reset password |
| OperationsTests | 6 | Integration | Scavenge (start/stop), merge indexes, restart persistent subscriptions, node priority, resign |
| GossipTests | 3 | Integration | Read cluster members, node state, custom timeout |
| MonitoringTests | 3 | Integration | Server stats, refresh interval, metadata flag |
| KurrentCoreTests | 67 | Unit | Connection string parsing, EventData, projection status, stream identifiers, metadata, subscription filters |
| MockClientTests | 26 | Mock/DI | KurrentDBClientProtocol conformance, all factory call patterns, 5 domain service scenarios |
| Total | 174 | 0 commented-out tests |
Streams write-side error paths are explicitly covered:
| Scenario | Expected Error |
|---|---|
Append at stale revision (.at(99), stream at 0) |
wrongExpectedVersion |
Append with .noStream to an existing stream |
wrongExpectedVersion |
| Two concurrent writers at the same revision | One succeeds, one wrongExpectedVersion |
| Scenario | Verified |
|---|---|
| Create β subscribe β append β ACK | β |
| NACK with park (dead-letter queue) | β |
| NACK with retry (re-delivery, deliveries == 2) | β |
| getInfo (groupName, eventSource, $all) | β |
| Update settings β getInfo confirms change | β |
| park β replayParked β re-delivered β ACK | β |
- Swift 6.0+
- macOS 15+ / iOS 18+ / tvOS 18+ / watchOS 11+ / visionOS 2+ / Linux
| Server Version | Status | Notes |
|---|---|---|
| KurrentDB 26.0 | β Supported | Full feature support |
| KurrentDB 25.1 | β Supported | Full feature support |
| EventStoreDB 24.x | β Supported | Core features supported; KurrentDB v2 batch append not available |
Start a 3-node TLS cluster:
cd server
docker compose up -dThis generates TLS certificates automatically and starts nodes on ports 2111, 2112, and 2113.
Or a single insecure node for quick testing:
docker run --rm -d -p 2113:2113 \
-e KURRENTDB_CLUSTER_SIZE=1 \
-e KURRENTDB_RUN_PROJECTIONS=All \
-e KURRENTDB_START_STANDARD_PROJECTIONS=true \
-e KURRENTDB_INSECURE=true \
-e KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP=true \
docker.kurrent.io/kurrent-latest/kurrentdb:25.1| Guide | Description |
|---|---|
| Migration Guide (1.x β 2.x) | What changed in 2.0 and how to update your code |
| Getting Started | Connection settings, first event, basic usage |
| Appending Events | EventData, concurrency control, idempotency |
| Reading Events | Forward/backward reading, $all stream, filters |
| Projections | Create, manage, and query projection state |
| Persistent Subscriptions | Competing consumers, ACK/NAK, subscription groups |
| User Management | Create, enable, disable, password management |
| Cluster Gossip | Cluster discovery, node health monitoring |
| Monitoring | Real-time server statistics |
| Server Operations | Scavenge, index merge, shutdown, node management |
| Full API Reference | Complete API documentation |
Contributions are welcome! Whether it's bug reports, feature requests, documentation improvements, or code contributions.
- GitHub Discussions β Ask questions, share ideas
- Issues β Report bugs
- Contributing Guide β Get started contributing
MIT License β see LICENSE for details.
Built with:
- grpc-swift β Swift gRPC implementation
- swift-nio β Non-blocking I/O
Inspired by official Kurrent/EventStoreDB clients.
Made by Grady Zhuo