-
-
Notifications
You must be signed in to change notification settings - Fork 536
Description
What version of Effect is running?
@effect/sql-pg: 0.50.3, @effect/sql: 0.49.0, effect: 3.19.19
What steps can reproduce the bug?
The issue occurs when using PostgreSQL LISTEN/NOTIFY in production with multiple channels. The current implementation uses an RcRef (Reference Counted
Resource) to manage LISTEN connections, which creates a single shared connection for all listen() operations.
Reproduction scenario:
import { PgClient } from "@effect/sql-pg"
import { Duration, Effect, Stream } from "effect"
Effect.gen(function*() {
const client = yield* PgClient.PgClient
// Multiple LISTEN operations share the same connection
yield* client.listen("channel-1").pipe(
Stream.tap(msg => Effect.log(`Channel 1: ${msg}`)),
Stream.runDrain,
Effect.forkScoped
)
yield* client.listen("channel-2").pipe(
Stream.tap(msg => Effect.log(`Channel 2: ${msg}`)),
Stream.runDrain,
Effect.forkScoped
)
yield* client.listen("channel-3").pipe(
Stream.tap(msg => Effect.log(`Channel 3: ${msg}`)),
Stream.runDrain,
Effect.forkScoped
)
// All three subscriptions share ONE connection from the pool
// This connection is subject to:
// - idleTimeout (default: 10 minutes)
// - connectionTTL (default: 30 minutes)
// - Pool rotation
// When the connection times out or is returned to pool,
// ALL subscriptions are lost silently
}).pipe(
Effect.provide(PgClient.layer({
maxConnections: 20,
minConnections: 5,
connectionTTL: Duration.minutes(30),
idleTimeout: Duration.minutes(10),
url: Redacted.make("postgresql://...")
})),
Effect.runPromise
)The problem:
- All listen() calls share the same RcRef connection (line 314-338 in PgClient.ts)
- This connection is managed by the same pool as regular queries
- When pool management rotates/times out the connection, all LISTEN subscriptions are lost
- No error is thrown - subscriptions fail silently
What is the expected behavior?
PostgreSQL LISTEN operations should have a dedicated, stable connection that:
- Is not subject to regular pool management (TTL, idle timeout)
- Maintains subscriptions reliably for long-running applications
- Can be configured independently from the query pool
Expected API:
PgClient.layer({
maxConnections: 20,
minConnections: 5,
// Dedicated listen connection option
dedicatedListenConnection: true,
dedicatedListenConfig: {
connectionTTL: Duration.hours(24),
idleTimeout: Duration.hours(24),
applicationNameSuffix: '-listener'
}
})This would create two connections:
- Query pool: Normal settings for queries/transactions
- Listen connection: Stable, long-lived connection for LISTEN operations
What do you see instead?
Current behavior:
- Single shared connection for all LISTEN operations (via RcRef)
- Connection subject to pool timeout/rotation
- Silent subscription losses in production
- Event-driven architectures become unreliable
Current workaround (what users must do now):
import { PgClient } from "@effect/sql-pg"
import { Config, Duration, Effect, Layer, Redacted } from "effect"
// Main database service
const PgClientLive = (appId: string) =>
PgClient.layer({
applicationName: appId,
maxConnections: 20,
minConnections: 5,
connectionTTL: Duration.minutes(30),
idleTimeout: Duration.minutes(10),
url: Redacted.make(databaseUrl)
})
// Dedicated listen client (manual workaround)
const PgListenClientLive = (appId: string) =>
PgClient.layer({
applicationName: `${appId}-listener`,
maxConnections: 1,
minConnections: 1,
connectionTTL: Duration.hours(24),
idleTimeout: Duration.hours(24),
url: Redacted.make(databaseUrl)
})
// Users must manually combine layers
export const DatabaseLayer = (appId: string) =>
Layer.provideMerge(
DatabaseServiceLayer,
Layer.merge(
Layer.effect(DatabaseServiceListen, /* ... */).pipe(
Effect.provide(PgListenClientLive(appId))
),
PgClientLive(appId)
)
)This requires:
- Manual layer management
- Duplicate configuration
- Deep understanding of RcRef internals
- Extra boilerplate in every project
Additional information
Source code location:
- File: packages/sql-pg/src/PgClient.ts
- Lines: 314-338 (listenClient creation and usage)
// Line 314-317
const listenClient = yield* RcRef.make({
acquire: reserveRaw
})
// Line 338 - All listen() calls use the same connection
const client = yield* RcRef.get(listenClient)Why this matters:
- PostgreSQL LISTEN state is connection-specific
- When connection is lost, subscriptions are lost without notification
- Production event-driven architectures (multiple channels) are affected
- Common pattern in microservices (broker channels, event notifications, etc.)
Real-world usage:
Our application listens on 4+ channels:
- BROKER_USER_INSERT
- BROKER_USER_ORDER_INSERT
- BROKER_USER_ORDER_UPDATE
- BROKER_USER_SESSION_INSERT
All share one connection that times out after 10 minutes (default idle timeout).
Proposed solution:
See full PR proposal in this repository (I can submit as separate PR with implementation).
Options:
- Documentation only: Add warnings and workaround examples
- Configuration option (recommended): Add dedicatedListenConnection config
- Separate API: Expose PgClient.listenLayer() for dedicated listen clients
I'm happy to submit a PR implementing Option 2 (configuration option) with full backward compatibility.