When something “simple” like a database connection starts failing in production, it usually isn’t because you forgot a username. It’s because your app needs a repeatable way to open connections, reuse them safely, handle timeouts, rotate credentials, run migrations, and keep transactions predictable under load.
That’s why I reach for SQLAlchemy when working with PostgreSQL in Python. SQLAlchemy’s Engine gives you a managed connection pool, consistent transaction boundaries, and a clean abstraction over different drivers. In 2026, the ergonomics are even better with SQLAlchemy 2.x’s typed APIs and modern PostgreSQL drivers like psycopg (v3).
In this post I’ll show you how I connect PostgreSQL to SQLAlchemy in real projects: building the connection URL safely, creating the Engine, using Sessions correctly, running a small ORM model end-to-end, and hardening the setup for production (pooling, timeouts, SSL). I’ll also include an async path, because many teams now mix sync and async services.
The mental model: Engine, connections, and “lazy” connect
SQLAlchemy has a few concepts that are easy to mix up, so here’s the model I keep in my head:
- The
Engineis the long-lived, app-level object. It owns a connection pool. - A
Connectionis a short-lived wrapper around a DBAPI connection from the pool. - A
Session(ORM) is a unit-of-work manager that opens connections when needed, tracks changes, and commits or rolls back.
One detail that matters: create_engine(...) does not immediately connect to PostgreSQL. The first real network connection happens the first time you actually use the Engine (for example engine.connect() or executing a statement). This “connect on first use” behavior is a form of lazy initialization.
That’s why I usually do a small, explicit health check at startup (or in a readiness probe) rather than assuming Engine creation proves connectivity.
What actually happens when you “connect”
When you call engine.connect() (or the ORM needs a connection), SQLAlchemy roughly does this:
- Ask the pool for an available DBAPI connection.
- If there isn’t one, open a new DBAPI connection (up to your overflow limit).
- Wrap it in a SQLAlchemy
Connectionobject. - Optionally run “pre-ping” (if enabled) to ensure it’s alive.
- Hand it to you, then return it to the pool when you exit the context manager.
This matters because your app’s “connection behavior” is mostly pool behavior. If you size the pool badly, you get confusing failures (timeouts, too many connections, stale sessions) that look like “PostgreSQL is flaky” but are actually lifecycle issues.
Picking a driver in 2026: psycopg v3 vs psycopg2 vs asyncpg
PostgreSQL speaks “the PostgreSQL wire protocol,” but SQLAlchemy still needs a Python DBAPI driver to do the talking. Your driver choice affects performance characteristics, async support, and how future-proof your stack is.
Here’s how I think about it today:
What I’d pick now
When I still avoid it
—
—
psycopg (v3)
Rare edge cases where a legacy extension requires psycopg2
psycopg2
New projects (unless you’re constrained)
asyncpg or psycopg async
Simple scripts/ETL where sync is simplerFor most new sync services, I recommend psycopg (v3). For async, asyncpg remains a popular choice, and SQLAlchemy supports it well.
Install options (pick one sync driver):
python -m pip install sqlalchemy psycopg[binary]
or (legacy)
python -m pip install sqlalchemy psycopg2-binary
If you plan to run migrations:
python -m pip install alembic
A quick note on “binary” vs source builds
When you see something like psycopg[binary], it’s a convenience that typically installs prebuilt wheels so you don’t need a compiler toolchain locally. In containers and CI, this can be the difference between a 30-second install and a broken build.
If you’re in a strict environment (some production distros, certain security policies), you may prefer source builds and pin exact versions. The important thing is consistency: match what you build/test against with what you deploy.
Building a safe PostgreSQL URL (without leaking passwords)
A PostgreSQL connection string used by SQLAlchemy looks like this:
dialect+driver://username:password@host:port/database_name
The pieces mean:
dialect: the database type, likepostgresqldriver: which DBAPI driver SQLAlchemy should load, likepsycopgorpsycopg2username/password: database credentialshost/port: where PostgreSQL lives (oftenlocalhost:5432in dev)database_name: the database to connect to
Two security rules I follow:
1) Never hardcode passwords in source control.
2) Always URL-encode passwords if you do build a URL from parts (special characters like @ or : will break URLs).
In practice, I use an environment variable like DATABASE_URL and keep secrets in a secret manager or at least a .env file that never gets committed.
Example .env (don’t commit this):
DATABASEURL=postgresql+psycopg://appuser:REDACTED@localhost:5432/app_db
If you must assemble the URL yourself (for example from separate env vars), encode the password:
import os
import urllib.parse
user = os.environ["DB_USER"]
password = urllib.parse.quoteplus(os.environ["DBPASSWORD"])
host = os.environ.get("DB_HOST", "localhost")
port = int(os.environ.get("DB_PORT", "5432"))
dbname = os.environ["DB_NAME"]
database_url = f"postgresql+psycopg://{user}:{password}@{host}:{port}/{dbname}"
print(database_url)
If you want to avoid string formatting entirely, SQLAlchemy can build URLs safely:
import os
from sqlalchemy import URL
url = URL.create(
drivername="postgresql+psycopg",
username=os.environ.get("DB_USER"),
password=os.environ.get("DB_PASSWORD"),
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", "5432")),
database=os.environ.get("DB_NAME"),
)
url will render safely and handle escaping when needed.
URL patterns I use in practice
Here are a few real-world URL shapes (sanitized) and what they imply:
- Local dev:
postgresql+psycopg://user:pass@localhost:5432/app_db - Docker Compose:
postgresql+psycopg://user:pass@postgres:5432/app_db(wherepostgresis the service name) - Managed DB with TLS:
postgresql+psycopg://user:[email protected]:5432/app_db?sslmode=require
When a URL gets complicated, I stop inlining it and use URL.create(...) so I can add/query parameters without accidental string bugs.
Don’t log full URLs
If you log your configuration at startup, be careful: URLs often contain credentials. I’ll log the host, port, database name, and driver—but never the raw DATABASE_URL.
Creating the Engine (the right defaults for real apps)
The Engine is your central connection factory and pool manager. I typically create exactly one Engine per database per process.
Here’s a runnable example that creates an Engine and performs a quick connectivity check:
import os
from sqlalchemy import create_engine, text
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+psycopg://appuser:apppassword@localhost:5432/app_db",
)
engine = create_engine(
DATABASE_URL,
# Future-friendly behavior in SQLAlchemy 2.x
future=True,
# Pool tuning (good baseline; adjust per workload)
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
poolpreping=True,
)
def check_db() -> None:
# text() is the safe way to run literal SQL in SQLAlchemy
with engine.connect() as conn:
value = conn.execute(text("select 1")).scalar_one()
assert value == 1
if name == "main":
check_db()
print("Database connection OK")
What each pool setting is doing (in plain terms):
pool_size: how many open connections SQLAlchemy keeps ready to reusemax_overflow: how many extra connections it can open briefly during spikespool_timeout: how long you’ll wait for a free connection before erroringpool_recycle: refresh connections periodically (helpful with certain proxies/load balancers)poolpreping: checks a connection is still alive before using it (reduces “stale connection” surprises)
If you’ve ever seen errors that only happen after the app sits idle for 30–60 minutes, poolpreping=True is often the fix.
future=True and SQLAlchemy 2.x style
If you’re on SQLAlchemy 2.x, you’ll often see “2.0 style” usage with select(...) and context managers everywhere. In most modern code, I treat that style as the default:
- Use
select()rather than older query patterns. - Use
engine.begin()to get an explicit transaction scope. - Use
Sessionas a context manager.
It’s not just aesthetics—it makes transaction behavior obvious.
Engine creation in an app (don’t create per request)
In web apps, I create the Engine at process startup and share it:
- One Engine per process
- One pool per Engine
- Many short-lived Sessions per request
If you create an Engine per request, you effectively create a connection pool per request, which is a fast path to “too many connections.”
ORM Sessions done right: transactions, CRUD, and a complete example
A common mistake I see is treating SQLAlchemy Sessions like global singletons. Don’t do that.
- Create a single Engine globally.
- Create a
sessionmakerglobally. - Create a Session per unit of work (per request in a web app, per job in a worker).
Below is a complete, runnable script that:
- defines an ORM model
- creates a table
- inserts data
- queries data
- demonstrates commit/rollback safety
import os
from datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, create_engine, select
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+psycopg://appuser:apppassword@localhost:5432/app_db",
)
engine = create_engine(
DATABASE_URL,
future=True,
poolpreping=True,
)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
class Base(DeclarativeBase):
pass
class Customer(Base):
tablename = "customers"
id: Mapped[int] = mappedcolumn(primarykey=True)
email: Mapped[str] = mapped_column(String(320), unique=True, index=True)
fullname: Mapped[str] = mappedcolumn(String(200))
createdat: Mapped[datetime] = mappedcolumn(DateTime(timezone=True), default=datetime.utcnow)
def repr(self) -> str:
return f"Customer(id={self.id}, email={self.email!r})"
def init_db() -> None:
Base.metadata.create_all(bind=engine)
def createcustomer(session: Session, *, email: str, fullname: str) -> Customer:
customer = Customer(email=email, fullname=fullname)
session.add(customer)
# Flush assigns PKs without committing
session.flush()
return customer
def getcustomerby_email(session: Session, email: str) -> Optional[Customer]:
stmt = select(Customer).where(Customer.email == email)
return session.execute(stmt).scalaroneor_none()
if name == "main":
init_db()
# Unit of work: one session, one transaction boundary
with SessionLocal() as session:
try:
customer = create_customer(
session,
email="[email protected]",
full_name="Maria Chen",
)
session.commit()
print("Created:", customer)
except Exception:
session.rollback()
raise
with SessionLocal() as session:
found = getcustomerby_email(session, "[email protected]")
print("Found:", found)
A few “I learned this the hard way” notes:
- I always wrap work in
try/exceptand callrollback()on errors. If you don’t, the Session can get stuck in a failed transaction state. flush()is your friend when you want IDs without committing yet.- If you’re building an API, the Session lifecycle should match the request lifecycle. In frameworks like FastAPI, that usually means one dependency-injected Session per request.
A practical “session per request” pattern
Even if you’re not using a framework, I like to make the lifecycle explicit with a small helper.
from contextlib import contextmanager
from sqlalchemy.orm import Session
@contextmanager
def get_session() -> Session:
with SessionLocal() as session:
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
Then your code reads like “do work; commit on success; rollback on failure,” which is exactly what I want.
When to use engine.begin() vs Session
If I’m doing plain SQL (no ORM identity map, no object tracking), I often use Core with engine.begin():
from sqlalchemy import text
with engine.begin() as conn:
conn.execute(text("insert into audit_log(message) values (:m)"), {"m": "hello"})
That engine.begin() context manager starts a transaction and commits automatically when the block exits cleanly.
If I’m using ORM objects (create, update, relationships), I use Session.
Production connection settings: pooling, SSL, timeouts, and statement behavior
Local development is forgiving. Production PostgreSQL is not.
Here are the knobs I check when an app moves from “works on my laptop” to “works at 500 RPS.”
Pool sizing that matches your database limits
PostgreSQL has a finite number of connections. If you deploy 20 app instances and each instance can open 15 connections, you can accidentally request 300 connections.
I usually start with:
poolsize=5andmaxoverflow=10per process for typical web apps- then I scale based on observed concurrency and DB limits
If you’re using a connection pooler like PgBouncer, you’ll likely want different settings (and you may need to be careful with server-side prepared statements).
#### A quick mental math check
I do this before shipping:
totalpossibleconnections = processes * (poolsize + maxoverflow)- compare that to Postgres
max_connectionsminus the headroom you need for admin tools, migrations, and background jobs
It’s not perfect—real usage is bursty—but it prevents “we accidentally DDoS’ed our own database.”
Timeouts you can explain
If you don’t set timeouts, your service can hang until upstream retries pile up.
pool_timeout: fails fast when the pool is exhaustedconnectargs={"connecttimeout": 5}: fails fast when the DB is unreachable
Example:
from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg://appuser:app[email protected]:5432/app_db",
future=True,
poolpreping=True,
pool_timeout=30,
connect_args={
"connect_timeout": 5,
# psycopg understands many libpq-style settings
# "application_name": "billing-api",
},
)
#### Statement timeouts: preventing “one query takes down the app”
Connection timeouts help when the database is unreachable. Statement timeouts help when the database is reachable but a query is slow (bad plan, missing index, lock contention).
PostgreSQL supports statement_timeout. I often set it per connection using a startup command. With SQLAlchemy, a straightforward pattern is to run it after checkout.
from sqlalchemy import event
from sqlalchemy.engine import Engine
@event.listens_for(Engine, "connect")
def setpostgressettings(dbapiconnection, connectionrecord) -> None:
cursor = dbapi_connection.cursor()
cursor.execute("SET statement_timeout = ‘5s‘")
cursor.execute("SET lock_timeout = ‘2s‘")
cursor.close()
I keep these values conservative and specific to the service. A reporting job can tolerate longer queries than a latency-sensitive API.
SSL / TLS for managed databases
Most managed PostgreSQL providers require TLS.
Common patterns:
- Add
?sslmode=requireto the URL - Or pass SSL-related values via
connect_args(varies by driver/provider)
Example URL style:
import os
from sqlalchemy import create_engine
DATABASEURL = os.environ["DATABASEURL"]
engine = create_engine(
DATABASE_URL + "?sslmode=require",
future=True,
poolpreping=True,
)
#### Prefer verification when possible
sslmode=require encrypts traffic but may not validate certificates depending on provider settings. In environments where I can manage CA certificates properly, I aim for verification (often verify-full semantics) to reduce MITM risk.
The exact recipe depends on your provider and driver, but the core idea is: encryption + verification.
Health checks that catch real failures
I like a health check that actually executes a trivial query. It catches:
- wrong credentials
- wrong database name
- network ACL issues
- TLS misconfiguration
This is also where I set an application_name so DB logs show which service generated traffic.
from sqlalchemy import text
def readiness_check() -> None:
with engine.connect() as conn:
conn.execute(text("select 1"))
I keep “liveness” checks separate: liveness should usually be “the process is alive,” while readiness means “the process can do useful work,” which includes DB access for DB-backed services.
Local development that mirrors production (without being painful)
If you only ever connect to localhost, production failures will feel mysterious. I like a local setup that approximates:
- hostname-based networking (like containers or k8s)
- environment variable configuration
- migrations on startup
A simple Docker Compose approach is common, but even without it, you can simulate production-like behavior by:
- using a non-default user
- creating a dedicated database
- setting
application_name - enabling TLS if your production requires it (optional but useful)
The goal isn’t to perfectly clone production. It’s to catch the “obvious” problems early.
Migrations: wiring SQLAlchemy with Alembic
In real projects, “connecting to PostgreSQL” isn’t just about reading and writing data—it’s also about maintaining schema changes safely.
My baseline migration workflow:
- SQLAlchemy models define intent.
- Alembic versions define history.
- CI runs migrations against a blank database.
The most important connection detail: Alembic needs the same database URL and driver as your app.
A minimal pattern that keeps config in one place:
- Set
DATABASE_URLin the environment. - Have both the app and Alembic read it.
In alembic.ini, I usually avoid hardcoding the URL and instead set it in env.py:
import os
from alembic import context
config = context.config
config.setmainoption("sqlalchemy.url", os.environ["DATABASE_URL"])
That way:
- local dev uses local creds
- staging uses staging creds
- production uses production creds
…and the code doesn’t change.
Async SQLAlchemy with PostgreSQL (when concurrency matters)
If you’re building a highly concurrent service (lots of time waiting on IO), async can pay off. I don’t push async everywhere, but for websocket-heavy services, ingestion pipelines, or high fan-out workflows, it’s often worth it.
Install:
python -m pip install sqlalchemy asyncpg
Runnable async example using SQLAlchemy’s async engine and asyncpg:
import asyncio
import os
from sqlalchemy import text
from sqlalchemy.ext.asyncio import createasyncengine
DATABASE_URL = os.environ.get(
"DATABASEURLASYNC",
"postgresql+asyncpg://appuser:apppassword@localhost:5432/app_db",
)
async def main() -> None:
engine = createasyncengine(
DATABASE_URL,
pool_size=5,
max_overflow=10,
poolpreping=True,
)
async with engine.connect() as conn:
result = await conn.execute(text("select 1"))
value = result.scalar_one()
assert value == 1
await engine.dispose()
print("Async database connection OK")
if name == "main":
asyncio.run(main())
Two caveats I always call out:
- Don’t call blocking libraries from async request handlers. If your DB calls are async but your CPU-bound work is blocking, you’ll still bottleneck.
- Keep your async and sync Engines separate. Mixing them in one module creates confusing bugs.
Async ORM: AsyncSession and a unit-of-work pattern
A lot of async examples stop at “select 1.” In practice you’ll want the ORM too.
Here’s a compact but realistic pattern with AsyncSession:
import os
import asyncio
from datetime import datetime
from sqlalchemy import String, DateTime, select
from sqlalchemy.ext.asyncio import AsyncSession, createasyncengine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
DATABASE_URL = os.environ.get(
"DATABASEURLASYNC",
"postgresql+asyncpg://appuser:apppassword@localhost:5432/app_db",
)
engine = createasyncengine(DATABASEURL, poolpre_ping=True)
AsyncSessionLocal = sessionmaker(engine, class=AsyncSession, expireon_commit=False)
class Base(DeclarativeBase):
pass
class Event(Base):
tablename = "events"
id: Mapped[int] = mappedcolumn(primarykey=True)
name: Mapped[str] = mapped_column(String(200))
createdat: Mapped[datetime] = mappedcolumn(DateTime(timezone=True), default=datetime.utcnow)
async def init_db() -> None:
async with engine.begin() as conn:
await conn.runsync(Base.metadata.createall)
async def create_event(name: str) -> int:
async with AsyncSessionLocal() as session:
try:
event = Event(name=name)
session.add(event)
await session.flush()
await session.commit()
return event.id
except Exception:
await session.rollback()
raise
async def getevent(eventid: int) -> Event | None:
async with AsyncSessionLocal() as session:
result = await session.execute(select(Event).where(Event.id == event_id))
return result.scalaroneor_none()
async def main() -> None:
await init_db()
eventid = await createevent("signup")
event = await getevent(eventid)
print(event_id, event)
if name == "main":
asyncio.run(main())
The patterns mirror sync code:
- session per unit of work
flush()to get IDs- commit/rollback explicitly
I also set expireoncommit=False because it makes returned objects easier to use after a commit in async services.
Common mistakes I see (and how I fix them fast)
When PostgreSQL + SQLAlchemy “doesn’t connect,” the error message usually points to one of these.
1) Wrong driver name in the URL
If you install psycopg but write postgresql+psycopg2://..., SQLAlchemy will fail to load the driver.
Fix: match the installed package.
- psycopg v3:
postgresql+psycopg://... - psycopg2:
postgresql+psycopg2://... - asyncpg:
postgresql+asyncpg://...
2) Password contains special characters and breaks the URL
Symptoms: authentication fails even with the correct password.
Fix: URL-encode the password via urllib.parse.quote_plus, or use URL.create(...).
3) “Too many connections” under load
Symptoms: errors from PostgreSQL like “remaining connection slots are reserved…”
Fixes I try in order:
- reduce
poolsizeandmaxoverflow - make sure every Session is closed (use context managers)
- check that background jobs aren’t leaking sessions
- consider PgBouncer for connection pooling across many app instances
4) Stale connections after idle time
Symptoms: it works after deploy, then randomly fails later.
Fix: poolpreping=True and possibly pool_recycle=1800.
5) Confusing transaction behavior
Symptoms: reads don’t see writes, or partial changes persist.
Fixes:
- Always
commit()on success androllback()on exceptions. - Don’t reuse a Session across independent tasks.
- Use
with SessionLocal() as session:everywhere you can.
6) Running raw SQL unsafely
If you concatenate user input into SQL strings, you will eventually ship SQL injection.
Fix: use bound parameters.
from sqlalchemy import text
stmt = text("select * from customers where email = :email")
rows = session.execute(stmt, {"email": "[email protected]"}).all()
If you need dynamic SQL (optional filters, sort options), build it with SQLAlchemy expressions rather than string concatenation.
7) Mixing autocommit expectations with transaction scopes
PostgreSQL is transactional by default. That’s good—until you assume each statement commits automatically.
If you run:
session.add(...)session.flush()
…but never call session.commit(), your changes will be rolled back when the session closes.
In scripts, this can look like “it ran with no errors, but the data isn’t there.” The fix is simply making commit boundaries explicit.
Connection hardening checklist (what I do before shipping)
If I’m responsible for a production service, I want to be able to answer these questions quickly:
- How many connections can this service open at peak?
- What happens when the DB is down for 60 seconds?
- What happens when queries get slow?
- Are credentials rotated without redeploying code?
- Can we identify this service in Postgres logs?
Here’s how that translates to concrete SQLAlchemy choices.
Set an application name
This makes it easier to trace traffic and locks to a service.
engine = create_engine(
DATABASE_URL,
future=True,
poolpreping=True,
connect_args={
"application_name": "orders-api",
"connect_timeout": 5,
},
)
Make pool behavior explicit
I avoid leaving pooling to “whatever default” in services that handle real traffic.
engine = create_engine(
DATABASE_URL,
future=True,
pool_size=5,
max_overflow=10,
pool_timeout=30,
poolpreping=True,
)
Decide on isolation level intentionally
PostgreSQL defaults are usually good, but it’s worth understanding isolation when you see strange concurrency behavior.
READ COMMITTED(default) is fine for most apps.REPEATABLE READcan make reads more consistent inside a transaction.SERIALIZABLEcan prevent anomalies but may increase contention.
If I change isolation, I do it because I can explain why.
engine = create_engine(
DATABASE_URL,
future=True,
isolation_level="READ COMMITTED",
)
Observability: logging slow queries and pool pressure
Connections are a runtime resource. If you don’t observe them, you’ll eventually get paged by them.
Turn on SQL logging (carefully)
SQLAlchemy can log statements, but I treat it as a debugging tool, not a default production setting, because:
- it can log sensitive data
- it can generate a lot of logs
For local debugging, I might use:
engine = createengine(DATABASEURL, echo=True)
For production, I prefer structured logging at the app level plus database-side monitoring.
Track pool checkouts and timeouts
When I suspect pool pressure, I add lightweight pool event hooks temporarily.
import time
from sqlalchemy import event
from sqlalchemy.engine import Engine
@event.listens_for(Engine, "checkout")
def oncheckout(dbapiconnection, connectionrecord, connectionproxy) -> None:
connectionrecord.info["checkouttime"] = time.time()
@event.listens_for(Engine, "checkin")
def oncheckin(dbapiconnection, connection_record) -> None:
started = connectionrecord.info.pop("checkouttime", None)
if started is not None:
held = time.time() - started
# Replace print with your logger
if held > 1.0:
print(f"Connection held for {held:.2f}s")
This isn’t perfect, but it quickly reveals patterns like “a request handler holds a connection while doing non-DB work.”
Testing your connection code (so “it connects” isn’t a guess)
The fastest way to gain confidence is to write a small integration test that:
- connects
- runs a transaction
- proves rollback works
A minimal sync test pattern looks like this:
from sqlalchemy import text
def testdbround_trip() -> None:
with engine.begin() as conn:
value = conn.execute(text("select 123")).scalar_one()
assert value == 123
If you can run this test in CI against a temporary PostgreSQL instance, you catch misconfigurations early (wrong driver, wrong URL, missing TLS params).
Alternative approaches (and when I use them)
SQLAlchemy is my default, but I don’t force it everywhere.
When I use raw psycopg
If I’m writing a tiny script that does one query and exits, I might skip the ORM to reduce complexity.
But even then, the habits carry over:
- set timeouts
- handle exceptions
- close connections
When I use a higher-level framework integration
In frameworks like FastAPI, Django, or Flask, there are established patterns and helpers.
I still like to understand the underlying SQLAlchemy pieces because it makes debugging much easier when:
- pooling behaves oddly
- transactions don’t match expectations
- timeouts or SSL settings differ between environments
Summary: the “boring” connection that stays boring
The goal of a PostgreSQL connection setup isn’t to be clever. It’s to be boring and reliable.
If you take one thing from this post, let it be this: treat database connectivity as a lifecycle problem.
- One Engine per process.
- A pool sized to match your deployment and database limits.
- Short-lived Sessions with clear commit/rollback boundaries.
- Explicit timeouts and TLS settings.
- A health check that actually runs a query.
Once those are in place, connecting PostgreSQL with SQLAlchemy becomes something you don’t think about—which is exactly how it should be.



