I keep bumping into Python teams who can model their data beautifully yet get slowed down by query mistakes: forgetting to parameterize, fetching too much, or leaking connections when something crashes. In 2026, with Python 3.14 around the corner and PostgreSQL 18 rolling out, knowing how to query sanely from Python is still a career‑long skill. In this guide I’m going to show you how I approach querying with psycopg2, why I still reach for it first even as psycopg3 gets shinier, and the small patterns that keep my production services fast, predictable, and kind to the database. We’ll start with the gritty bits—installing the driver on today’s stacks—then move through cursor usage, streaming, transactions, profiling, connection pooling, COPY, prepared statements, observability, and modern async considerations. Along the way I’ll share code you can paste directly into your project and the footguns I’ve seen bite real teams.
Why I still reach for psycopg2 when querying
- Stability matters: psycopg2 2.9.11 (released October 10, 2025) supports CPython 3.9–3.14 and is battle‑tested with PostgreSQL 9.6 through current releases. (data.safetycli.com)
- Ecosystem gravity: ORM drivers (SQLAlchemy, Django) still default to psycopg2 under the hood for sync paths; swapping drivers in 2026 is possible but rarely worth the churn unless you need async end‑to‑end.
- Tooling familiarity: DBAs, SREs, and ops scripts already expect psycopg2 error messages and diagnostics. When minutes count, that shared vocabulary saves time.
Beyond those obvious reasons, there’s a less glamorous one: most production problems around querying are not caused by the driver, but by how the app uses it. I’ve seen psycopg2 blamed for slow requests that were actually caused by missing indexes, unbounded fetches, and a dozen long‑lived idle transactions. In that environment, a “stable and boring” driver is a feature. It lets you focus on discipline—parameterization, short transactions, correct fetch patterns, safe pooling—without re‑learning the whole API surface.
Installing in 2026 without the usual potholes
You can still pip install psycopg2-binary==2.9.11 on Linux, macOS, and Windows. I pin the binary wheel in production images so builds never fall back to compiling from source (which demands pg_config). The binary wheel exists for CPython 3.14, so future upgrades stay smooth. (pypi.org)
If you do hit pgconfig executable not found, it usually means pip tried to build from source because it couldn’t find a matching wheel. Add --only-binary psycopg2-binary or pin the exact version to force wheels; otherwise install the PostgreSQL client libs so pgconfig is on PATH. (reddit.com)
My install checklist
- Pin:
pip install --upgrade psycopg2-binary==2.9.11 - Alpine images: add
apk add postgresql-libsandlibpqif you must compile. - macOS ARM: prefer Homebrew PostgreSQL to supply
pg_configwhen wheels are missing. - Type hints:
pip install types-psycopg2==2.9.21.20251012to keep MyPy happy. (data.safetycli.com)
Production packaging reality
I treat psycopg2 like a system dependency even when it’s pip‑installed. That means I explicitly document it in the Dockerfile and ensure the base image has the correct glibc. Why? The “works on my laptop” psycopg2 import errors still happen in 2026, especially with slim containers or odd Linux distros. If your CI is building wheels on one environment and deploying on another, either pin a manylinux wheel or bake psycopg2 into the same base image you ship. It’s not sexy, but it’s the difference between a predictable deployment and a 2 AM rollback.
Connecting cleanly and keeping credentials safe
I keep a tiny connection helper that favors environment variables so secrets stay out of code:
import os
import psycopg2
from psycopg2.extras import RealDictCursor
DSN = """dbname={db} user={user} password={pwd} host={host} port={port} connect_timeout=5""".format(
db=os.getenv("PGDATABASE", "postgres"),
user=os.getenv("PGUSER", "postgres"),
pwd=os.getenv("PGPASSWORD", "postgres"),
host=os.getenv("PGHOST", "127.0.0.1"),
port=os.getenv("PGPORT", "5432"),
)
class PgConn:
def enter(self):
self.conn = psycopg2.connect(DSN, cursor_factory=RealDictCursor)
return self.conn
def exit(self, exc_type, exc, tb):
if exc:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
I prefer RealDictCursor for API responses so column names map to dict keys—no more tuple indexing errors. A 5‑second connect_timeout guards against hung sockets when the DB is firewalled.
DSN variants you’ll actually use
- Full TLS: append
sslmode=require sslrootcert=/etc/ssl/certs/ca.pemwhen your org mandates encrypted hops. - App role separation:
applicationname={service}aids pgstat_activity triage. - Read replicas: keep a
READ_DSNenv var; route analytics to it to avoid punishing primaries.
A more production‑ready connection helper
The tiny helper works for scripts, but for services I like a little more structure: explicit autocommit control, context‑managed cursors, and safe logging of connection details.
import os
import psycopg2
from psycopg2.extras import RealDictCursor
class Pg:
def init(self):
self.dsn = os.getenv("PG_DSN")
if not self.dsn:
self.dsn = (
"dbname={db} user={user} password={pwd} host={host} port={port} connect_timeout=5 "
"applicationname=myservice"
).format(
db=os.getenv("PGDATABASE", "postgres"),
user=os.getenv("PGUSER", "postgres"),
pwd=os.getenv("PGPASSWORD", "postgres"),
host=os.getenv("PGHOST", "127.0.0.1"),
port=os.getenv("PGPORT", "5432"),
)
def connect(self, autocommit=False):
conn = psycopg2.connect(self.dsn, cursor_factory=RealDictCursor)
conn.set_session(autocommit=autocommit)
return conn
def cursor(self, conn):
return conn.cursor()
The key is not the class, it’s the discipline: one place to manage configuration, one place to change connection defaults. That cuts down on “why do some requests behave differently?” issues.
Designing safer queries: parameters over f‑strings
String interpolation is still the fastest way to write a SQL injection. psycopg2 lets me pass parameters separately so the driver handles quoting:
with PgConn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, email, plan FROM accounts WHERE created_at >= %s AND plan = ANY(%s)",
("2025-12-01", ["pro", "team"]),
)
rows = cur.fetchall()
Key points:
%sis the only placeholder psycopg2 understands, regardless of column type.- The second argument is a tuple; lists (for
ANYorIN) are fine because psycopg2 adapts them to Postgres arrays. - Never concatenate identifiers either—use
psycopg2.sql.Identifierwhen you must vary table or column names.
Dynamic identifiers the safe way
from psycopg2 import sql
def fetch_column(table, column):
with PgConn() as conn, conn.cursor() as cur:
query = sql.SQL("SELECT {col} FROM {tbl} LIMIT 5").format(
col=sql.Identifier(column),
tbl=sql.Identifier(table),
)
cur.execute(query)
return cur.fetchall()
Common parameterization edge cases
- IN lists with zero items:
WHERE id = ANY(%s)with an empty list returns no rows. That’s usually what you want, but don’t accidentally fall back toIN ()by string building. If you need “no filter,” handle that in Python. - Array vs scalar: If you pass a list where a scalar is expected, Postgres might throw a type error or adapt to
ARRAY[...]unexpectedly. I keep list parameters only where I explicitly useANYor= ANY. - Identifiers vs values: Parameters are for values only. If you need a dynamic table or column, use
psycopg2.sqland validate against an allowlist.
Validate and normalize before the query
Parameterization is not a replacement for validation. If I accept sort_by or direction from a client, I still map it through a whitelist:
ALLOWEDSORTS = {"createdat": "created_at", "email": "email"}
ALLOWED_DIRS = {"asc": "ASC", "desc": "DESC"}
def queryaccounts(sortby, direction):
col = ALLOWEDSORTS.get(sortby, "created_at")
dir = ALLOWEDDIRS.get(direction, "DESC")
with PgConn() as conn, conn.cursor() as cur:
q = sql.SQL("SELECT id, email FROM accounts ORDER BY {col} {dir}").format(
col=sql.Identifier(col),
dir=sql.SQL(dir_),
)
cur.execute(q)
return cur.fetchall()
I treat any dynamic identifier as untrusted unless it is mapped to a predefined token.
Fetch patterns: choosing fetchone, fetchmany, fetchall
I pick the fetch method based on expected cardinality and memory footprint.
with PgConn() as conn:
with conn.cursor(name="stream") as cur: # server-side cursor when named
cur.itersize = 500
cur.execute("SELECT * FROM events ORDER BY occurred_at DESC LIMIT 100000")
first = cur.fetchone()
batch = cur.fetchmany(500)
rest = cur.fetchall()
fetchone()advances the cursor like a queue; calling twice gets row 1 then row 2.fetchmany(n)is my default for paged APIs; it keeps latency steady and memory flat.fetchall()is fine only when the result set is naturally small (think 10–100 rows).
Streaming large result sets without flooding RAM
Server‑side cursors are lifesavers when exporting millions of rows. Naming the cursor keeps results on the server and fetches in chunks:
with PgConn() as conn:
with conn.cursor(name="export", cursor_factory=RealDictCursor) as cur:
cur.itersize = 2000
cur.execute("SELECT * FROM ledger WHERE closed_at >= %s", ("2025-01-01",))
for row in cur:
process(row)
Notes:
- Avoid
ORDER BY random()or complex sorts with server cursors; they may materialize the whole set anyway. - Keep transactions short; long‑lived cursors can hold MVCC snapshots and bloat vacuum. Commit after each chunk when consistency rules allow.
When fetchall is okay
- Small reference tables (countries, enum lookups).
- Admin screens with pagination already enforced at the app layer.
- Unit tests where you intentionally want to assert the entire result.
Fetching just enough: “fat rows” vs “thin rows”
A quiet performance killer is fetching far more columns than you need. If your service only needs id and email, do not SELECT *. The CPU cost of decompression, network transfer, and Python object creation adds up. I do a quick pass through each query and ask, “Which columns does this call actually use?” It’s a low‑effort, high‑impact improvement.
Transactions, retries, and error handling
I treat every connection as a transaction scope. psycopg2 auto‑opens a transaction on first command. That means a SELECT without commit() still holds locks. Two patterns I rely on:
1) Autocommit for DDL or LISTEN/NOTIFY
with PgConn() as conn:
conn.set_session(autocommit=True)
with conn.cursor() as cur:
cur.execute("CREATE INDEX CONCURRENTLY IF NOT EXISTS idxeventsts ON events(occurred_at)")
2) Retry on transient errors (e.g., 40001 serialization failures)
import time
from psycopg2 import OperationalError, errors
def runwithretry(fn, attempts=3, backoff=0.2):
for i in range(attempts):
try:
return fn()
except errors.SerializationFailure:
time.sleep(backoff (2 * i))
except OperationalError:
if i == attempts - 1:
raise
time.sleep(backoff (2 * i))
I avoid catching ProgrammingError because it often signals a bug in my SQL, not a transient condition.
Savepoints for partial success
When a loop of operations should partly succeed, wrap each unit in a savepoint instead of a new transaction:
with PgConn() as conn:
with conn.cursor() as cur:
for row in payload:
cur.execute("SAVEPOINT sp")
try:
cur.execute("INSERT INTO items(id, body) VALUES (%s, %s)", row)
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
Understanding psycopg2 transaction behavior
I wish more teams knew this: in psycopg2, the connection starts a transaction at the first execute() and keeps it open until commit() or rollback(). That means a simple SELECT can become a long‑lived transaction if you forget to commit or close the connection. The symptoms are classic: autovacuum lag, long idle in transaction sessions, and tuples that never get cleaned up. The fix is simple: either commit after reads (yes, it’s safe) or set autocommit for read‑only handlers.
Read‑only transactions to reduce risk
If you have read‑only endpoints that never write, you can enforce safety at the DB level:
with PgConn() as conn:
conn.set_session(readonly=True)
with conn.cursor() as cur:
cur.execute("SELECT * FROM accounts WHERE id = %s", (42,))
row = cur.fetchone()
Postgres will reject any write attempt, which turns accidental updates into clear errors.
Idempotency and retry boundaries
Retries are useful, but only for idempotent operations. If your query modifies external state (sending emails, pushing to a queue), make sure the retry happens before those side effects. A clean pattern is: write a row with a unique key, then let a worker handle side effects. The DB becomes your source of truth and retries are safe.
Async in 2026: when psycopg3 is worth it
psycopg3 3.3 landed December 1, 2025 with a polished async API, template string queries tied to Python 3.14, and improved binary type handling. (postgresql.org) If your stack is already async (FastAPI, Trio, asyncio tasks), running psycopg2 in threads is serviceable, but swapping to psycopg3 removes thread pools and sheds latency. For pure querying, though, psycopg2 remains faster to get running and pairs seamlessly with existing sync codebases.
Traditional (psycopg2)
—
Sync apps, cron jobs
Stable wheels through CPython 3.14
Mature server‑side cursors, COPY
Drop‑in for Django/SQLAlchemy
My rule: if your request/response path is async end‑to‑end, start new projects on psycopg3; otherwise stay on psycopg2 and revisit during a larger architecture change.
Mixing sync and async responsibly
If your team is halfway through an async migration, don’t try to force psycopg2 into the event loop. Wrap it in a thread pool, keep the DB work synchronous, and put a deadline on fully switching. The “hybrid forever” approach creates the worst of both worlds: sync drivers in async code, plus confusion about where blocking is happening.
Connection pooling that doesn’t bite
Opening a new TCP/TLS session for every request hurts latency. psycopg2 ships pool primitives; in production I usually prefer pgbouncer in transaction mode plus a lightweight pool in the app.
Simple threaded pool inside your app
from psycopg2 import pool
POOL = pool.ThreadedConnectionPool(minconn=2, maxconn=10, dsn=DSN)
def get_conn():
return POOL.getconn()
def put_conn(conn):
POOL.putconn(conn)
Use this for worker processes with predictable concurrency. Avoid it in short‑lived CLI scripts—with PgConn() is simpler there.
Pgbouncer + psycopg2 settings I set by default
serverresetquery = DISCARD ALLto keep sessions clean.- Use transaction pooling; statement pooling can break prepared statements.
- On the Python side set
options=‘-c statement_timeout=120000‘per connection so long queries fail early.
Pooling pitfalls to watch for
- Leaked connections: If you forget to return a connection to the pool, you can exhaust it under load. That often looks like “the database is down” but is really a pool mis‑use.
- Long transactions: A pool doesn’t solve long transactions; it amplifies them. If each request holds a connection for 800 ms instead of 80 ms, your pool needs to be 10x larger to avoid saturation.
- Forking with pools: If you run Gunicorn or forked workers, create the pool after the fork. Sharing a pool across processes leads to corrupted state.
A small pattern for safe pooling
I use a context manager so every getconn has a putconn:
from contextlib import contextmanager
from psycopg2 import pool
POOL = pool.ThreadedConnectionPool(minconn=2, maxconn=10, dsn=DSN)
@contextmanager
def pooled_conn():
conn = POOL.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
POOL.putconn(conn)
That pattern alone removes most connection leaks I see in production.
COPY for bulk data moves
COPY beats INSERT loops by orders of magnitude. psycopg2 exposes both text and binary COPY. Binary is faster but less readable; start with text and upgrade when needed.
COPY FROM CSV
import io
rows = [(1, "a"), (2, "b")]
buffer = io.StringIO()
for r in rows:
buffer.write(f"{r[0]}," + r[1] + "\n")
buffer.seek(0)
with PgConn() as conn, conn.cursor() as cur:
cur.copy_from(buffer, "letters", columns=("id", "val"))
COPY TO stream to S3/Kafka
with PgConn() as conn, conn.cursor() as cur:
cur.copy_expert("COPY ledger TO STDOUT WITH CSV", open("/tmp/ledger.csv", "w"))
Tip: Keep COPY transactions short; they hold locks until completion.
Using COPY with a generator
When your data is already in memory as Python objects, it’s cleaner to stream rows than build a massive buffer:
import io
def row_iter(rows):
for r in rows:
yield f"{r[‘id‘]},{r[‘email‘]}\n"
with PgConn() as conn, conn.cursor() as cur:
buffer = io.StringIO("".join(rowiter(sourcerows)))
buffer.seek(0)
cur.copy_expert("COPY accounts (id,email) FROM STDIN WITH CSV", buffer)
For truly massive data, I avoid "".join(...) and instead use an iterator with a custom file‑like wrapper. It’s a little more code, but it prevents enormous memory spikes.
Prepared statements and server‑side re‑use
If your service issues the same query thousands of times, prepare it once per connection:
with PgConn() as conn, conn.cursor() as cur:
cur.execute("PREPARE get_user AS SELECT id, email FROM accounts WHERE id = $1")
cur.execute("EXECUTE get_user (%s)", (42,))
The first execute incurs a parse/plan; subsequent runs reuse the plan, trimming latency for hot paths. Combine with pgbouncer transaction pooling (not statement pooling) to keep prepared statements available per backend session.
When prepared statements hurt
Prepared statements can be slower when parameter values change shape drastically (e.g., highly selective vs very broad values). Postgres can pick a generic plan that’s safe but not optimal. If you see performance regressions, consider:
- Using simple
EXECUTEwithoutPREPAREfor dynamic filters. - Adjusting server settings for plan caching.
- Letting the driver handle simple parameterized statements without explicit
PREPARE.
In short: prepare for “hot, repeatable” queries, not for every query.
Profiling queries from Python
I keep a tiny timer wrapper to log slow statements without touching the Postgres config:
import time
import logging
log = logging.getLogger("query")
class TimedCursor:
def init(self, cur, threshold_ms=50):
self.cur = cur
self.threshold = threshold_ms
def execute(self, sql, params=None):
start = time.perfcounterns()
self.cur.execute(sql, params)
elapsedms = (time.perfcounterns() - start) / 1000_000
if elapsed_ms > self.threshold:
log.warning("%sms %s params=%s", round(elapsed_ms, 2), sql, params)
return self
def fetchone(self):
return self.cur.fetchone()
def fetchmany(self, n):
return self.cur.fetchmany(n)
def fetchall(self):
return self.cur.fetchall()
Wrap it and surface N+1 patterns without noisy database logs.
pgstatstatements from the app side
Even if you can’t enable the extension in prod, mimic its value: hash your SQL strings and log a SHA256 plus timing. Later, group logs by hash to find the worst offenders without storing full query text.
The “two clocks” idea
I track two timings: the query time and the post‑query processing time. If a query returns 5,000 rows and then Python spends 150 ms serializing them, your “DB time” might look fine while the request is still slow. When I add instrumentation, I log both. It quickly reveals whether I need indexes or just better response shaping.
Patterns for everyday CRUD queries
Read with filters and paging
with PgConn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT id, email, plan
FROM accounts
WHERE active = TRUE AND plan = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
("pro", 50, 0),
)
rows = cur.fetchall()
Parameterized IN with many items
cur.execute(
"SELECT id FROM orders WHERE id = ANY(%s)",
([uuid_list],),
)
Write with RETURNING
cur.execute(
"INSERT INTO accounts (email, plan) VALUES (%s, %s) RETURNING id",
("[email protected]", "team"),
)
new_id = cur.fetchone()[0]
Bulk insert with execute_values for speed
from psycopg2.extras import execute_values
rows = [("[email protected]", "pro"), ("[email protected]", "pro")]
execute_values(
cur,
"INSERT INTO accounts (email, plan) VALUES %s",
rows,
page_size=500,
)
UPDATE with audit trail
cur.execute(
"""
UPDATE accounts
SET plan = %s,
updated_at = NOW()
WHERE id = %s
RETURNING id, updated_at
""",
("enterprise", 42),
)
Capture updated_at to feed caches or CDC pipelines.
DELETE with safety guards
I avoid “naked deletes” in production:
cur.execute(
"DELETE FROM sessions WHERE userid = %s AND createdat < NOW() - INTERVAL '30 days'",
(user_id,),
)
I keep it explicit and time‑bounded so accidental broad deletions are less likely.
Working safely with JSON, UUID, and timestamptz
psycopg2 adapts common Python types automatically, but I still take care:
- Use
psycopg2.extras.Jsonwhen inserting dicts; it ensuresjsonbstores what you expect. - Prefer Python’s
uuid.UUIDobjects to plain strings; Postgres validates format either way, but UUID objects fail faster client‑side. - Always store timestamps in UTC (
timestamptz). Attachtimezone.utctodatetimebefore passing to queries to avoid implicit conversion surprises.
De/serializing JSON efficiently
When reading JSON columns in hot paths, request RealDictCursor plus json.loads only where needed. One bad habit is to auto‑decode every JSON field for every row even when your API only returns a subset. Decode on demand:
import json
def readuser(cur, userid):
cur.execute("SELECT id, settings FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
settings = json.loads(row["settings"]) if row and row["settings"] else {}
return {"id": row["id"], "settings": settings}
Timestamps: don’t let the DB guess
I pass timezone‑aware datetimes and specify UTC explicitly:
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
cur.execute("INSERT INTO events (occurred_at) VALUES (%s)", (now,))
If you mix naive and aware timestamps, you’ll eventually see “can’t compare offset‑naive and offset‑aware datetimes” bugs or silent shifts across timezones.
Indexing and query shape: the hidden half of performance
Query code can be perfect and still slow if the database has to scan large tables. I treat query work as 50% Python, 50% SQL structure and indexing. A few practical checks I run:
- If a query filters on a column, I check whether there’s an index on that column.
- If a query uses
ILIKEorLOWER(...)patterns, I use an index on a computed expression. - If the query joins two big tables, I verify the join keys are indexed.
Example: predictable pagination
Offset pagination (LIMIT 50 OFFSET 10000) gets slower as you page deeper because the DB still has to scan the skipped rows. For large datasets I switch to keyset pagination:
cur.execute(
"""
SELECT id, created_at, email
FROM accounts
WHERE created_at < %s
ORDER BY created_at DESC
LIMIT 50
""",
(lastseencreated_at,),
)
Keyset pagination is more stable under load and plays nicely with indexes.
Guardrails for production querying
Here are the guardrails I actually set in production services:
statement_timeout: kills runaway queries. I prefer 30–120 seconds depending on the endpoint.idleintransactionsessiontimeout: avoids sessions that sit in a transaction forever.lock_timeout: fails fast instead of waiting minutes on a lock.applicationname: tags every connection so I can searchpgstat_activityby service.
In psycopg2, these are just connection options:
DSN = "... options=‘-c statementtimeout=120000 -c locktimeout=5000‘ applicationname=myservice"
These guardrails save more time than any micro‑optimization.
Error handling and diagnostics
psycopg2 errors include rich diagnostics, but only if you surface them. I extract and log key fields when queries fail:
import psycopg2
try:
cur.execute("SELECT * FROM missing_table")
except psycopg2.Error as e:
diag = e.diag
info = {
"pgcode": e.pgcode,
"message": e.pgerror,
"schema": diag.schema_name,
"table": diag.table_name,
"column": diag.column_name,
"constraint": diag.constraint_name,
}
raise RuntimeError(info)
This makes it far easier to debug schema issues and constraint violations without guesswork.
Query composition without ORMs
Sometimes you want composability without a full ORM. I use small functions that build SQL fragments safely. A minimal example:
from psycopg2 import sql
def filter_clause(filters):
clauses = []
params = []
if "plan" in filters:
clauses.append(sql.SQL("plan = %s"))
params.append(filters["plan"])
if "active" in filters:
clauses.append(sql.SQL("active = %s"))
params.append(filters["active"])
where = sql.SQL(" WHERE ") + sql.SQL(" AND ").join(clauses) if clauses else sql.SQL("")
return where, params
with PgConn() as conn, conn.cursor() as cur:
where, params = filter_clause({"plan": "pro", "active": True})
query = sql.SQL("SELECT id, email FROM accounts") + where
cur.execute(query, params)
rows = cur.fetchall()
It’s not a full query builder, but it scales to complex endpoints without becoming unreadable.
Handling large writes without pain
For big writes I use two tiers:
execute_valuesfor up to tens of thousands of rows.COPYfor anything bigger.
The difference is dramatic. executevalues is easy; COPY is faster but requires formatting and careful error handling. In practice I start with executevalues and switch to COPY only when profiling proves it’s needed.
Chunked inserts to control memory
from psycopg2.extras import execute_values
def insert_users(cur, rows, chunk=1000):
for i in range(0, len(rows), chunk):
batch = rows[i:i+chunk]
execute_values(cur, "INSERT INTO users(email, plan) VALUES %s", batch)
Chunking keeps memory stable and avoids exceeding statement length limits.
Testing queries without mocking the world
I’m a fan of using a real Postgres container for tests. SQLite is fast, but it hides Postgres behaviors around JSON, arrays, and concurrency. A real DB gives you confidence that the query shapes, types, and constraints work as expected.
When testing query functions, I use:
- A single test DB per suite.
- Transaction rollbacks in fixtures to keep tests isolated.
- Seed data for realistic edge cases (nulls, empty arrays, large text).
The payoff is fewer production surprises and fewer “works in tests but fails in prod” bugs.
Security checklist for querying
This is my minimum bar for query safety:
- Use parameters for every value.
- Use
psycopg2.sqland allowlists for identifiers. - Avoid dynamic SQL string concatenation.
- Keep read‑only endpoints in read‑only transactions when possible.
- Enforce timeouts so malicious or accidental expensive queries can’t run forever.
- Monitor failed logins and repeated errors.
Security is not just about injection—timeouts and least‑privilege roles matter too.
Observability: beyond timing
Timing is a start, but I also log:
- Query names (logical identifiers, not raw SQL).
- Row counts for large responses.
- Connection acquisition time (pool wait time).
This lets me answer questions like: “Is the query slow or is the pool saturated?” If 80 ms of a 120 ms request is waiting for a connection, I need more connections or shorter transactions, not faster SQL.
Lightweight query tagging
One simple pattern: add comments to SQL for traceability.
cur.execute("/ getactiveaccounts / SELECT id, email FROM accounts WHERE active = TRUE")
Postgres exposes that comment in pgstatactivity, which makes real‑time debugging easier.
Common pitfalls I see in the wild
Here are mistakes I still see in otherwise well‑run teams:
- Forgotten commits after SELECTs: leads to
idle in transactionsessions. SELECT *on large tables: unnecessary payload and Python overhead.- Using
OFFSETfor deep pagination: gets slower the deeper you go. - Building
INstrings manually: risky and brittle. - Long‑lived server‑side cursors: they hold snapshots and block vacuum.
- No timeouts: one runaway query can hurt the whole system.
The fix is usually just a small habit change, not a full rewrite.
Alternative approaches when psycopg2 isn’t ideal
I love psycopg2, but it’s not always the best fit. Here’s how I decide:
- If I need async end‑to‑end: psycopg3 or asyncpg.
- If I want a SQL toolkit with migration support: SQLAlchemy Core.
- If I need a quick admin interface: Django ORM.
- If I need cross‑database support: a higher‑level ORM or query builder.
But even then, I still think direct SQL + psycopg2 is the fastest path to understanding performance and behavior. The database is where your truth lives; seeing the SQL is good for your instincts.
Putting it together: a practical querying module
Here’s a compact, production‑friendly module that combines the patterns above:
import os
import time
import logging
from contextlib import contextmanager
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
log = logging.getLogger("db")
DSN = os.getenv("PG_DSN", "dbname=postgres user=postgres password=postgres host=127.0.0.1 port=5432")
@contextmanager
def conn_ctx(readonly=False):
conn = psycopg2.connect(DSN, cursor_factory=RealDictCursor)
conn.set_session(readonly=readonly)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
@contextmanager
def cursor_ctx(conn):
cur = conn.cursor()
try:
yield cur
finally:
cur.close()
class Timed:
def init(self, cur, threshold_ms=50):
self.cur = cur
self.thresholdms = thresholdms
def execute(self, sql, params=None):
start = time.perfcounterns()
self.cur.execute(sql, params)
ms = (time.perfcounterns() - start) / 1000000
if ms > self.threshold_ms:
log.warning("slow_query=%sms sql=%s", round(ms, 2), sql)
return self
def fetchall(self):
return self.cur.fetchall()
def fetchone(self):
return self.cur.fetchone()
Example usage
def listactiveaccounts(limit=50):
with conn_ctx(readonly=True) as conn:
with cursor_ctx(conn) as cur:
t = Timed(cur)
t.execute(
"SELECT id, email FROM accounts WHERE active = TRUE ORDER BY created_at DESC LIMIT %s",
(limit,),
)
return t.fetchall()
def bulkinsertaccounts(rows):
with conn_ctx() as conn:
with cursor_ctx(conn) as cur:
executevalues(cur, "INSERT INTO accounts (email, plan) VALUES %s", rows, pagesize=500)
This is still small, but it includes the core patterns: scoped connections, safe cursors, read‑only transactions, and slow query logging.
Expansion Strategy
Add new sections or deepen existing ones with:
- Deeper code examples: More complete, real-world implementations
- Edge cases: What breaks and how to handle it
- Practical scenarios: When to use vs when NOT to use
- Performance considerations: Before/after comparisons (use ranges, not exact numbers)
- Common pitfalls: Mistakes developers make and how to avoid them
- Alternative approaches: Different ways to solve the same problem
If Relevant to Topic
- Modern tooling and AI-assisted workflows (for infrastructure/framework topics)
- Comparison tables for Traditional vs Modern approaches
- Production considerations: deployment, monitoring, scaling
Final thoughts
Querying data in PostgreSQL from Python is not just about “making the query work.” It’s about building habits that keep your services stable under real traffic: parameterization, scoped transactions, fetch discipline, thoughtful pooling, and sensible timeouts. psycopg2 is still my default because it’s predictable, widely supported, and well understood by the ecosystem. But the deeper lesson is this: your driver won’t save you from poor query patterns. The good news is that you don’t need heroics to fix them. A handful of low‑effort patterns—most of them in this guide—turn messy querying into clean, reliable, and fast systems.
If you adopt just two changes this week, make them these: parameterize every query and set a timeout on every connection. Those two habits alone prevent most of the production incidents I see. Everything else in this guide is a multiplier on top of that foundation.



