Skip to content

@effect/sql-pg: Add dedicated connection option for PostgreSQL LISTEN/NOTIFY operations #6136

@smhmayboudi

Description

@smhmayboudi

What version of Effect is running?

@effect/sql-pg: 0.50.3, @effect/sql: 0.49.0, effect: 3.19.19

What steps can reproduce the bug?

The issue occurs when using PostgreSQL LISTEN/NOTIFY in production with multiple channels. The current implementation uses an RcRef (Reference Counted
Resource) to manage LISTEN connections, which creates a single shared connection for all listen() operations.

Reproduction scenario:

import { PgClient } from "@effect/sql-pg"
import { Duration, Effect, Stream } from "effect"

Effect.gen(function*() {
  const client = yield* PgClient.PgClient
  
  // Multiple LISTEN operations share the same connection
  yield* client.listen("channel-1").pipe(
    Stream.tap(msg => Effect.log(`Channel 1: ${msg}`)),
    Stream.runDrain,
    Effect.forkScoped
  )
  
  yield* client.listen("channel-2").pipe(
    Stream.tap(msg => Effect.log(`Channel 2: ${msg}`)),
    Stream.runDrain,
    Effect.forkScoped
  )
  
  yield* client.listen("channel-3").pipe(
    Stream.tap(msg => Effect.log(`Channel 3: ${msg}`)),
    Stream.runDrain,
    Effect.forkScoped
  )
  
  // All three subscriptions share ONE connection from the pool
  // This connection is subject to:
  // - idleTimeout (default: 10 minutes)
  // - connectionTTL (default: 30 minutes)
  // - Pool rotation
  
  // When the connection times out or is returned to pool,
  // ALL subscriptions are lost silently
}).pipe(
  Effect.provide(PgClient.layer({
    maxConnections: 20,
    minConnections: 5,
    connectionTTL: Duration.minutes(30),
    idleTimeout: Duration.minutes(10),
    url: Redacted.make("postgresql://...")
  })),
  Effect.runPromise
)

The problem:

  1. All listen() calls share the same RcRef connection (line 314-338 in PgClient.ts)
  2. This connection is managed by the same pool as regular queries
  3. When pool management rotates/times out the connection, all LISTEN subscriptions are lost
  4. No error is thrown - subscriptions fail silently

What is the expected behavior?

PostgreSQL LISTEN operations should have a dedicated, stable connection that:

  1. Is not subject to regular pool management (TTL, idle timeout)
  2. Maintains subscriptions reliably for long-running applications
  3. Can be configured independently from the query pool

Expected API:

PgClient.layer({
  maxConnections: 20,
  minConnections: 5,
  
  // Dedicated listen connection option
  dedicatedListenConnection: true,
  dedicatedListenConfig: {
    connectionTTL: Duration.hours(24),
    idleTimeout: Duration.hours(24),
    applicationNameSuffix: '-listener'
  }
})

This would create two connections:

  • Query pool: Normal settings for queries/transactions
  • Listen connection: Stable, long-lived connection for LISTEN operations

What do you see instead?

Current behavior:

  • Single shared connection for all LISTEN operations (via RcRef)
  • Connection subject to pool timeout/rotation
  • Silent subscription losses in production
  • Event-driven architectures become unreliable

Current workaround (what users must do now):

import { PgClient } from "@effect/sql-pg"
import { Config, Duration, Effect, Layer, Redacted } from "effect"

// Main database service
const PgClientLive = (appId: string) =>
  PgClient.layer({
    applicationName: appId,
    maxConnections: 20,
    minConnections: 5,
    connectionTTL: Duration.minutes(30),
    idleTimeout: Duration.minutes(10),
    url: Redacted.make(databaseUrl)
  })

// Dedicated listen client (manual workaround)
const PgListenClientLive = (appId: string) =>
  PgClient.layer({
    applicationName: `${appId}-listener`,
    maxConnections: 1,
    minConnections: 1,
    connectionTTL: Duration.hours(24),
    idleTimeout: Duration.hours(24),
    url: Redacted.make(databaseUrl)
  })

// Users must manually combine layers
export const DatabaseLayer = (appId: string) =>
  Layer.provideMerge(
    DatabaseServiceLayer,
    Layer.merge(
      Layer.effect(DatabaseServiceListen, /* ... */).pipe(
        Effect.provide(PgListenClientLive(appId))
      ),
      PgClientLive(appId)
    )
  )

This requires:

  • Manual layer management
  • Duplicate configuration
  • Deep understanding of RcRef internals
  • Extra boilerplate in every project

Additional information

Source code location:

  • File: packages/sql-pg/src/PgClient.ts
  • Lines: 314-338 (listenClient creation and usage)
// Line 314-317
const listenClient = yield* RcRef.make({
  acquire: reserveRaw
})

// Line 338 - All listen() calls use the same connection
const client = yield* RcRef.get(listenClient)

Why this matters:

  • PostgreSQL LISTEN state is connection-specific
  • When connection is lost, subscriptions are lost without notification
  • Production event-driven architectures (multiple channels) are affected
  • Common pattern in microservices (broker channels, event notifications, etc.)

Real-world usage:
Our application listens on 4+ channels:

  • BROKER_USER_INSERT
  • BROKER_USER_ORDER_INSERT
  • BROKER_USER_ORDER_UPDATE
  • BROKER_USER_SESSION_INSERT

All share one connection that times out after 10 minutes (default idle timeout).

Proposed solution:

See full PR proposal in this repository (I can submit as separate PR with implementation).

Options:

  1. Documentation only: Add warnings and workaround examples
  2. Configuration option (recommended): Add dedicatedListenConnection config
  3. Separate API: Expose PgClient.listenLayer() for dedicated listen clients

I'm happy to submit a PR implementing Option 2 (configuration option) with full backward compatibility.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions