- High Performance - Built on battle-tested Erlang/OTP and brod client
- Type-Safe - Full Gleam type safety for your Kafka interactions
- Single Import - Everything you need from one module:
import franz - Flexible Consumers - Both group-based and topic-based subscription models
- Async & Sync - Choose between synchronous and asynchronous message production
- Rich Configuration - Fine-tune every aspect of your Kafka client
- Multiple Partitioning Strategies - Random, hash-based, or custom partitioners
- Message Batching - Efficient batch processing support
- SASL Authentication - Support for PLAIN and SCRAM authentication
- OTP Supervision - Full integration with gleam_otp supervision trees
Add Franz to your Gleam project:
gleam add franzHere's a simple example to get you started with Franz:
import franz
import gleam/erlang/process
import gleam/io
pub fn main() {
// 1. Connect to Kafka
let name = process.new_name("my_kafka_client")
let assert Ok(_) =
franz.client()
|> franz.endpoints([franz.Endpoint("localhost", 9092)])
|> franz.option(franz.AutoStartProducers(True))
|> franz.name(name)
|> franz.start()
let client = franz.named(name)
// 2. Send a message
let assert Ok(_) =
franz.produce_sync(
client: client,
topic: "greetings",
partition: franz.SinglePartition(0),
key: <<"user:123">>,
value: franz.Value(<<"Hello, Franz!">>, []),
)
io.println("Message sent successfully!")
}Franz uses a builder pattern for configuration:
import franz
import gleam/erlang/process
pub fn connect_to_kafka() {
// Create a named client
let name = process.new_name("my_client")
let endpoints = [
franz.Endpoint("broker1.example.com", 9092),
franz.Endpoint("broker2.example.com", 9092),
]
franz.client()
|> franz.endpoints(endpoints)
|> franz.option(franz.AutoStartProducers(True))
|> franz.name(name)
|> franz.start()
}
pub fn connect_with_auth() {
// With SASL authentication
let name = process.new_name("secure_client")
franz.client()
|> franz.endpoints([franz.Endpoint("secure.broker.com", 9093)])
|> franz.sasl(franz.SaslCredentials(franz.ScramSha256, "username", "password"))
|> franz.ssl(franz.SslEnabled)
|> franz.name(name)
|> franz.start()
}Franz offers multiple ways to produce messages to Kafka:
import franz
import gleam/bit_array
pub fn send_user_event(client: franz.Client, user_id: String, event: String) {
// Start a producer for the topic
let assert Ok(_) =
franz.producer(client, "user-events")
|> franz.producer_option(franz.RequiredAcks(-1))
|> franz.producer_option(franz.Compression(franz.Gzip))
|> franz.producer_start()
// Send the message synchronously
franz.produce_sync(
client: client,
topic: "user-events",
partition: franz.Partitioner(franz.Hash), // Use hash partitioner based on key
key: bit_array.from_string(user_id),
value: franz.Value(
bit_array.from_string(event),
[#("event-type", "user-action")], // Headers
),
)
}import franz
import gleam/io
import gleam/int
pub fn send_async_with_confirmation(client: franz.Client) {
franz.produce_async(
client: client,
topic: "async-events",
partition: franz.Partitioner(franz.Random), // Random partition
key: <<"">>,
value: franz.Value(<<"async data">>, []),
callback: fn(partition, offset) {
let franz.Partition(p) = partition
let franz.Offset(o) = offset
io.println("Message delivered to partition " <> int.to_string(p))
io.println("At offset " <> int.to_string(o))
},
)
}import franz
pub fn fire_and_forget(client: franz.Client) {
// No acknowledgment - fastest but may lose messages
franz.produce(
client: client,
topic: "metrics",
partition: franz.Partitioner(franz.Random),
key: <<"">>,
value: franz.Value(<<"metric data">>, []),
)
}Franz provides two main consumer types: Group Subscribers and Topic Subscribers.
Perfect for scalable, fault-tolerant consumption with automatic partition assignment:
import franz
import gleam/io
import gleam/erlang/process
pub fn start_consumer_group(client: franz.Client) {
let name = process.new_name("analytics_consumer")
franz.default_group_subscriber_config(
name,
client: client,
group_id: "analytics-processors",
topics: ["user-events", "system-events"],
callback: fn(message, state) {
case message {
franz.KafkaMessage(offset, _key, value, _, _, _) -> {
io.println("Processing message at offset " <> int.to_string(offset))
// Process the message...
// Commit the offset after processing
franz.GroupCommit(state)
}
franz.KafkaMessageSet(topic, partition, _, messages) -> {
io.println("Batch from " <> topic <> " partition " <> int.to_string(partition))
// Process batch...
franz.GroupCommit(state)
}
}
},
init_state: Nil,
)
|> franz.start_group_subscriber()
}For fine-grained control over partition assignment:
import franz
import gleam/io
import gleam/int
import gleam/erlang/process
pub fn subscribe_to_notifications(client: franz.Client) {
let name = process.new_name("notification_subscriber")
franz.default_topic_subscriber(
name,
client: client,
topic: "notifications",
callback: fn(partition, message, state) {
let franz.KafkaMessage(offset, _key, value, _, _, _) = message
io.println("Partition " <> int.to_string(partition) <> " offset " <> int.to_string(offset))
// Process notification...
franz.TopicAck(state)
},
initial_state: Nil,
)
|> franz.start_topic_subscriber()
}Franz components integrate with gleam_otp supervision trees:
import franz
import gleam/otp/supervision
import gleam/erlang/process
pub fn start_supervised() {
let client_name = process.new_name("supervised_client")
let consumer_name = process.new_name("supervised_consumer")
let client_builder =
franz.client()
|> franz.endpoints([franz.Endpoint("localhost", 9092)])
|> franz.name(client_name)
let client = franz.named(client_name)
let consumer_builder =
franz.default_group_subscriber_config(
consumer_name,
client: client,
group_id: "my-group",
topics: ["events"],
callback: fn(msg, state) { franz.GroupCommit(state) },
init_state: Nil,
)
let children = [
franz.supervised(client_builder),
franz.group_subscriber_supervised(consumer_builder),
]
supervision.start(children)
}All time-related options use gleam/time/duration.Duration for type safety.
| Option | Description | Default |
|---|---|---|
AutoStartProducers(Bool) |
Automatically start producers for topics | False |
ReconnectCoolDown(Duration) |
Cooldown between reconnection attempts | 1 second |
RestartDelay(Duration) |
Delay before restarting crashed client | 10 seconds |
AllowTopicAutoCreation(Bool) |
Allow automatic topic creation | True |
ConnectTimeout(Duration) |
TCP connection timeout | 5 seconds |
RequestTimeout(Duration) |
Request timeout | 30 seconds |
UnknownTopicCacheTtl(Duration) |
Cache TTL for unknown topic errors | 2 minutes |
| Option | Description | Default |
|---|---|---|
RequiredAcks(Int) |
Acknowledgments required (0, 1, or -1 for all) | -1 |
AckTimeout(Duration) |
Ack timeout | 10 seconds |
Compression(Compression) |
Compression algorithm | NoCompression |
MaxBatchSize(Int) |
Max batch size in bytes | 1048576 |
MaxRetries(Int) |
Max retry attempts | 3 |
RetryBackoff(Duration) |
Delay between retries | 500 ms |
MaxLinger(Duration) |
Max time to wait for batching | 0 (immediate) |
| Option | Description | Default |
|---|---|---|
BeginOffset(StartingOffset) |
Starting offset (Latest, Earliest, AtOffset, AtTimestamp) | Latest |
MinBytes(Int) |
Minimum bytes to fetch | 0 |
MaxBytes(Int) |
Maximum bytes to fetch | 1048576 |
MaxWaitTime(Duration) |
Max time to wait for data | 10 seconds |
SleepTimeout(Duration) |
Sleep when no data available | 1 second |
PrefetchCount(Int) |
Messages to prefetch | 10 |
ConsumerIsolationLevel(IsolationLevel) |
Transaction isolation | ReadCommitted |
| Option | Description | Default |
|---|---|---|
SessionTimeout(Duration) |
Session timeout | Kafka default |
HeartbeatRate(Duration) |
Heartbeat interval | Kafka default |
RebalanceTimeout(Duration) |
Rebalance timeout | Kafka default |
RejoinDelay(Duration) |
Delay before rejoin attempt | Kafka default |
OffsetCommitInterval(Duration) |
Auto-commit interval | Kafka default |
MaxRejoinAttempts(Int) |
Max rejoin attempts | Kafka default |
Franz supports multiple SASL mechanisms:
import franz
import gleam/erlang/process
let name = process.new_name("secure_client")
// PLAIN authentication (use with SSL!)
franz.default_client(name)
|> franz.endpoints([franz.Endpoint("kafka.example.com", 9093)])
|> franz.sasl(franz.SaslCredentials(franz.Plain, "username", "password"))
// SCRAM-SHA-256
franz.default_client(name)
|> franz.endpoints([franz.Endpoint("kafka.example.com", 9093)])
|> franz.sasl(franz.SaslCredentials(franz.ScramSha256, "username", "password"))
// SCRAM-SHA-512
franz.default_client(name)
|> franz.endpoints([franz.Endpoint("kafka.example.com", 9093)])
|> franz.sasl(franz.SaslCredentials(franz.ScramSha512, "username", "password"))import franz
import gleam/erlang/process
import gleam/option.{Some}
let name = process.new_name("ssl_client")
// Simple SSL with system CA
franz.default_client(name)
|> franz.endpoints([franz.Endpoint("kafka.example.com", 9093)])
|> franz.ssl(franz.SslEnabled)
// Custom certificates (mTLS)
franz.default_client(name)
|> franz.endpoints([franz.Endpoint("kafka.example.com", 9093)])
|> franz.ssl(franz.SslWithOptions(
cacertfile: Some("/path/to/ca.crt"),
certfile: Some("/path/to/client.crt"),
keyfile: Some("/path/to/client.key"),
verify: franz.VerifyPeer,
))Franz provides specific error types for each operation category:
import franz
pub fn handle_produce_result(result: Result(Nil, franz.ProduceError)) {
case result {
Ok(_) -> io.println("Success!")
Error(franz.ProducerDown) -> restart_producer()
Error(franz.ProducerMessageTooLarge) -> split_message()
Error(franz.ProducerNotEnoughReplicas) -> retry_later()
Error(error) -> log_error(error)
}
}Error types:
ClientError- Connection and authentication failuresTopicError- Topic administration errorsProduceError- Producer operation errorsFetchError- Consumer/fetch operation errorsGroupError- Consumer group errors
Franz is built on top of the battle-tested brod Erlang client, providing:
- Connection pooling for efficient resource usage
- Automatic reconnection with configurable backoff
- Supervised processes for fault tolerance
- Backpressure handling for flow control
We welcome contributions! Please see our Contributing Guide for details.
# Run tests
gleam test
# Format code
gleam format
# Type check
gleam checkFranz is released under the MIT License. See the LICENSE file for details.
- Built on brod - the robust Erlang Kafka client
- Inspired by the Gleam community's commitment to type safety and developer experience
- Special thanks to all contributors and users of Franz
Made with love by the Gleam community