PostgreSQL + Python Querying Guide (2026 Edition)

I keep bumping into Python teams who can model their data beautifully yet get slowed down by query mistakes: forgetting to parameterize, fetching too much, or leaking connections when something crashes. In 2026, with Python 3.14 around the corner and PostgreSQL 18 rolling out, knowing how to query sanely from Python is still a career‑long skill. In this guide I’m going to show you how I approach querying with psycopg2, why I still reach for it first even as psycopg3 gets shinier, and the small patterns that keep my production services fast, predictable, and kind to the database. We’ll start with the gritty bits—installing the driver on today’s stacks—then move through cursor usage, streaming, transactions, profiling, connection pooling, COPY, prepared statements, observability, and modern async considerations. Along the way I’ll share code you can paste directly into your project and the footguns I’ve seen bite real teams.

Why I still reach for psycopg2 when querying

  • Stability matters: psycopg2 2.9.11 (released October 10, 2025) supports CPython 3.9–3.14 and is battle‑tested with PostgreSQL 9.6 through current releases. (data.safetycli.com)
  • Ecosystem gravity: ORM drivers (SQLAlchemy, Django) still default to psycopg2 under the hood for sync paths; swapping drivers in 2026 is possible but rarely worth the churn unless you need async end‑to‑end.
  • Tooling familiarity: DBAs, SREs, and ops scripts already expect psycopg2 error messages and diagnostics. When minutes count, that shared vocabulary saves time.

Beyond those obvious reasons, there’s a less glamorous one: most production problems around querying are not caused by the driver, but by how the app uses it. I’ve seen psycopg2 blamed for slow requests that were actually caused by missing indexes, unbounded fetches, and a dozen long‑lived idle transactions. In that environment, a “stable and boring” driver is a feature. It lets you focus on discipline—parameterization, short transactions, correct fetch patterns, safe pooling—without re‑learning the whole API surface.

Installing in 2026 without the usual potholes

You can still pip install psycopg2-binary==2.9.11 on Linux, macOS, and Windows. I pin the binary wheel in production images so builds never fall back to compiling from source (which demands pg_config). The binary wheel exists for CPython 3.14, so future upgrades stay smooth. (pypi.org)

If you do hit pgconfig executable not found, it usually means pip tried to build from source because it couldn’t find a matching wheel. Add --only-binary psycopg2-binary or pin the exact version to force wheels; otherwise install the PostgreSQL client libs so pgconfig is on PATH. (reddit.com)

My install checklist

  • Pin: pip install --upgrade psycopg2-binary==2.9.11
  • Alpine images: add apk add postgresql-libs and libpq if you must compile.
  • macOS ARM: prefer Homebrew PostgreSQL to supply pg_config when wheels are missing.
  • Type hints: pip install types-psycopg2==2.9.21.20251012 to keep MyPy happy. (data.safetycli.com)

Production packaging reality

I treat psycopg2 like a system dependency even when it’s pip‑installed. That means I explicitly document it in the Dockerfile and ensure the base image has the correct glibc. Why? The “works on my laptop” psycopg2 import errors still happen in 2026, especially with slim containers or odd Linux distros. If your CI is building wheels on one environment and deploying on another, either pin a manylinux wheel or bake psycopg2 into the same base image you ship. It’s not sexy, but it’s the difference between a predictable deployment and a 2 AM rollback.

Connecting cleanly and keeping credentials safe

I keep a tiny connection helper that favors environment variables so secrets stay out of code:

import os

import psycopg2

from psycopg2.extras import RealDictCursor

DSN = """dbname={db} user={user} password={pwd} host={host} port={port} connect_timeout=5""".format(

db=os.getenv("PGDATABASE", "postgres"),

user=os.getenv("PGUSER", "postgres"),

pwd=os.getenv("PGPASSWORD", "postgres"),

host=os.getenv("PGHOST", "127.0.0.1"),

port=os.getenv("PGPORT", "5432"),

)

class PgConn:

def enter(self):

self.conn = psycopg2.connect(DSN, cursor_factory=RealDictCursor)

return self.conn

def exit(self, exc_type, exc, tb):

if exc:

self.conn.rollback()

else:

self.conn.commit()

self.conn.close()

I prefer RealDictCursor for API responses so column names map to dict keys—no more tuple indexing errors. A 5‑second connect_timeout guards against hung sockets when the DB is firewalled.

DSN variants you’ll actually use

  • Full TLS: append sslmode=require sslrootcert=/etc/ssl/certs/ca.pem when your org mandates encrypted hops.
  • App role separation: applicationname={service} aids pgstat_activity triage.
  • Read replicas: keep a READ_DSN env var; route analytics to it to avoid punishing primaries.

A more production‑ready connection helper

The tiny helper works for scripts, but for services I like a little more structure: explicit autocommit control, context‑managed cursors, and safe logging of connection details.

import os

import psycopg2

from psycopg2.extras import RealDictCursor

class Pg:

def init(self):

self.dsn = os.getenv("PG_DSN")

if not self.dsn:

self.dsn = (

"dbname={db} user={user} password={pwd} host={host} port={port} connect_timeout=5 "

"applicationname=myservice"

).format(

db=os.getenv("PGDATABASE", "postgres"),

user=os.getenv("PGUSER", "postgres"),

pwd=os.getenv("PGPASSWORD", "postgres"),

host=os.getenv("PGHOST", "127.0.0.1"),

port=os.getenv("PGPORT", "5432"),

)

def connect(self, autocommit=False):

conn = psycopg2.connect(self.dsn, cursor_factory=RealDictCursor)

conn.set_session(autocommit=autocommit)

return conn

def cursor(self, conn):

return conn.cursor()

The key is not the class, it’s the discipline: one place to manage configuration, one place to change connection defaults. That cuts down on “why do some requests behave differently?” issues.

Designing safer queries: parameters over f‑strings

String interpolation is still the fastest way to write a SQL injection. psycopg2 lets me pass parameters separately so the driver handles quoting:

with PgConn() as conn:

with conn.cursor() as cur:

cur.execute(

"SELECT id, email, plan FROM accounts WHERE created_at >= %s AND plan = ANY(%s)",

("2025-12-01", ["pro", "team"]),

)

rows = cur.fetchall()

Key points:

  • %s is the only placeholder psycopg2 understands, regardless of column type.
  • The second argument is a tuple; lists (for ANY or IN) are fine because psycopg2 adapts them to Postgres arrays.
  • Never concatenate identifiers either—use psycopg2.sql.Identifier when you must vary table or column names.

Dynamic identifiers the safe way

from psycopg2 import sql

def fetch_column(table, column):

with PgConn() as conn, conn.cursor() as cur:

query = sql.SQL("SELECT {col} FROM {tbl} LIMIT 5").format(

col=sql.Identifier(column),

tbl=sql.Identifier(table),

)

cur.execute(query)

return cur.fetchall()

Common parameterization edge cases

  • IN lists with zero items: WHERE id = ANY(%s) with an empty list returns no rows. That’s usually what you want, but don’t accidentally fall back to IN () by string building. If you need “no filter,” handle that in Python.
  • Array vs scalar: If you pass a list where a scalar is expected, Postgres might throw a type error or adapt to ARRAY[...] unexpectedly. I keep list parameters only where I explicitly use ANY or = ANY.
  • Identifiers vs values: Parameters are for values only. If you need a dynamic table or column, use psycopg2.sql and validate against an allowlist.

Validate and normalize before the query

Parameterization is not a replacement for validation. If I accept sort_by or direction from a client, I still map it through a whitelist:

ALLOWEDSORTS = {"createdat": "created_at", "email": "email"}

ALLOWED_DIRS = {"asc": "ASC", "desc": "DESC"}

def queryaccounts(sortby, direction):

col = ALLOWEDSORTS.get(sortby, "created_at")

dir = ALLOWEDDIRS.get(direction, "DESC")

with PgConn() as conn, conn.cursor() as cur:

q = sql.SQL("SELECT id, email FROM accounts ORDER BY {col} {dir}").format(

col=sql.Identifier(col),

dir=sql.SQL(dir_),

)

cur.execute(q)

return cur.fetchall()

I treat any dynamic identifier as untrusted unless it is mapped to a predefined token.

Fetch patterns: choosing fetchone, fetchmany, fetchall

I pick the fetch method based on expected cardinality and memory footprint.

with PgConn() as conn:

with conn.cursor(name="stream") as cur: # server-side cursor when named

cur.itersize = 500

cur.execute("SELECT * FROM events ORDER BY occurred_at DESC LIMIT 100000")

first = cur.fetchone()

batch = cur.fetchmany(500)

rest = cur.fetchall()

  • fetchone() advances the cursor like a queue; calling twice gets row 1 then row 2.
  • fetchmany(n) is my default for paged APIs; it keeps latency steady and memory flat.
  • fetchall() is fine only when the result set is naturally small (think 10–100 rows).

Streaming large result sets without flooding RAM

Server‑side cursors are lifesavers when exporting millions of rows. Naming the cursor keeps results on the server and fetches in chunks:

with PgConn() as conn:

with conn.cursor(name="export", cursor_factory=RealDictCursor) as cur:

cur.itersize = 2000

cur.execute("SELECT * FROM ledger WHERE closed_at >= %s", ("2025-01-01",))

for row in cur:

process(row)

Notes:

  • Avoid ORDER BY random() or complex sorts with server cursors; they may materialize the whole set anyway.
  • Keep transactions short; long‑lived cursors can hold MVCC snapshots and bloat vacuum. Commit after each chunk when consistency rules allow.

When fetchall is okay

  • Small reference tables (countries, enum lookups).
  • Admin screens with pagination already enforced at the app layer.
  • Unit tests where you intentionally want to assert the entire result.

Fetching just enough: “fat rows” vs “thin rows”

A quiet performance killer is fetching far more columns than you need. If your service only needs id and email, do not SELECT *. The CPU cost of decompression, network transfer, and Python object creation adds up. I do a quick pass through each query and ask, “Which columns does this call actually use?” It’s a low‑effort, high‑impact improvement.

Transactions, retries, and error handling

I treat every connection as a transaction scope. psycopg2 auto‑opens a transaction on first command. That means a SELECT without commit() still holds locks. Two patterns I rely on:

1) Autocommit for DDL or LISTEN/NOTIFY

with PgConn() as conn:

conn.set_session(autocommit=True)

with conn.cursor() as cur:

cur.execute("CREATE INDEX CONCURRENTLY IF NOT EXISTS idxeventsts ON events(occurred_at)")

2) Retry on transient errors (e.g., 40001 serialization failures)

import time

from psycopg2 import OperationalError, errors

def runwithretry(fn, attempts=3, backoff=0.2):

for i in range(attempts):

try:

return fn()

except errors.SerializationFailure:

time.sleep(backoff (2 * i))

except OperationalError:

if i == attempts - 1:

raise

time.sleep(backoff (2 * i))

I avoid catching ProgrammingError because it often signals a bug in my SQL, not a transient condition.

Savepoints for partial success

When a loop of operations should partly succeed, wrap each unit in a savepoint instead of a new transaction:

with PgConn() as conn:

with conn.cursor() as cur:

for row in payload:

cur.execute("SAVEPOINT sp")

try:

cur.execute("INSERT INTO items(id, body) VALUES (%s, %s)", row)

except Exception:

cur.execute("ROLLBACK TO SAVEPOINT sp")

Understanding psycopg2 transaction behavior

I wish more teams knew this: in psycopg2, the connection starts a transaction at the first execute() and keeps it open until commit() or rollback(). That means a simple SELECT can become a long‑lived transaction if you forget to commit or close the connection. The symptoms are classic: autovacuum lag, long idle in transaction sessions, and tuples that never get cleaned up. The fix is simple: either commit after reads (yes, it’s safe) or set autocommit for read‑only handlers.

Read‑only transactions to reduce risk

If you have read‑only endpoints that never write, you can enforce safety at the DB level:

with PgConn() as conn:

conn.set_session(readonly=True)

with conn.cursor() as cur:

cur.execute("SELECT * FROM accounts WHERE id = %s", (42,))

row = cur.fetchone()

Postgres will reject any write attempt, which turns accidental updates into clear errors.

Idempotency and retry boundaries

Retries are useful, but only for idempotent operations. If your query modifies external state (sending emails, pushing to a queue), make sure the retry happens before those side effects. A clean pattern is: write a row with a unique key, then let a worker handle side effects. The DB becomes your source of truth and retries are safe.

Async in 2026: when psycopg3 is worth it

psycopg3 3.3 landed December 1, 2025 with a polished async API, template string queries tied to Python 3.14, and improved binary type handling. (postgresql.org) If your stack is already async (FastAPI, Trio, asyncio tasks), running psycopg2 in threads is serviceable, but swapping to psycopg3 removes thread pools and sheds latency. For pure querying, though, psycopg2 remains faster to get running and pairs seamlessly with existing sync codebases.

Need

Traditional (psycopg2)

Modern (psycopg3) —

— Runtime fit

Sync apps, cron jobs

Async web/APIs, background workers Install risk

Stable wheels through CPython 3.14

Young wheels; check distro lag Feature gap

Mature server‑side cursors, COPY

Template strings, async cursors, pipeline mode Change effort

Drop‑in for Django/SQLAlchemy

Requires async stack, new imports

My rule: if your request/response path is async end‑to‑end, start new projects on psycopg3; otherwise stay on psycopg2 and revisit during a larger architecture change.

Mixing sync and async responsibly

If your team is halfway through an async migration, don’t try to force psycopg2 into the event loop. Wrap it in a thread pool, keep the DB work synchronous, and put a deadline on fully switching. The “hybrid forever” approach creates the worst of both worlds: sync drivers in async code, plus confusion about where blocking is happening.

Connection pooling that doesn’t bite

Opening a new TCP/TLS session for every request hurts latency. psycopg2 ships pool primitives; in production I usually prefer pgbouncer in transaction mode plus a lightweight pool in the app.

Simple threaded pool inside your app

from psycopg2 import pool

POOL = pool.ThreadedConnectionPool(minconn=2, maxconn=10, dsn=DSN)

def get_conn():

return POOL.getconn()

def put_conn(conn):

POOL.putconn(conn)

Use this for worker processes with predictable concurrency. Avoid it in short‑lived CLI scripts—with PgConn() is simpler there.

Pgbouncer + psycopg2 settings I set by default

  • serverresetquery = DISCARD ALL to keep sessions clean.
  • Use transaction pooling; statement pooling can break prepared statements.
  • On the Python side set options=‘-c statement_timeout=120000‘ per connection so long queries fail early.

Pooling pitfalls to watch for

  • Leaked connections: If you forget to return a connection to the pool, you can exhaust it under load. That often looks like “the database is down” but is really a pool mis‑use.
  • Long transactions: A pool doesn’t solve long transactions; it amplifies them. If each request holds a connection for 800 ms instead of 80 ms, your pool needs to be 10x larger to avoid saturation.
  • Forking with pools: If you run Gunicorn or forked workers, create the pool after the fork. Sharing a pool across processes leads to corrupted state.

A small pattern for safe pooling

I use a context manager so every getconn has a putconn:

from contextlib import contextmanager

from psycopg2 import pool

POOL = pool.ThreadedConnectionPool(minconn=2, maxconn=10, dsn=DSN)

@contextmanager

def pooled_conn():

conn = POOL.getconn()

try:

yield conn

conn.commit()

except Exception:

conn.rollback()

raise

finally:

POOL.putconn(conn)

That pattern alone removes most connection leaks I see in production.

COPY for bulk data moves

COPY beats INSERT loops by orders of magnitude. psycopg2 exposes both text and binary COPY. Binary is faster but less readable; start with text and upgrade when needed.

COPY FROM CSV

import io

rows = [(1, "a"), (2, "b")]

buffer = io.StringIO()

for r in rows:

buffer.write(f"{r[0]}," + r[1] + "\n")

buffer.seek(0)

with PgConn() as conn, conn.cursor() as cur:

cur.copy_from(buffer, "letters", columns=("id", "val"))

COPY TO stream to S3/Kafka

with PgConn() as conn, conn.cursor() as cur:

cur.copy_expert("COPY ledger TO STDOUT WITH CSV", open("/tmp/ledger.csv", "w"))

Tip: Keep COPY transactions short; they hold locks until completion.

Using COPY with a generator

When your data is already in memory as Python objects, it’s cleaner to stream rows than build a massive buffer:

import io

def row_iter(rows):

for r in rows:

yield f"{r[‘id‘]},{r[‘email‘]}\n"

with PgConn() as conn, conn.cursor() as cur:

buffer = io.StringIO("".join(rowiter(sourcerows)))

buffer.seek(0)

cur.copy_expert("COPY accounts (id,email) FROM STDIN WITH CSV", buffer)

For truly massive data, I avoid "".join(...) and instead use an iterator with a custom file‑like wrapper. It’s a little more code, but it prevents enormous memory spikes.

Prepared statements and server‑side re‑use

If your service issues the same query thousands of times, prepare it once per connection:

with PgConn() as conn, conn.cursor() as cur:

cur.execute("PREPARE get_user AS SELECT id, email FROM accounts WHERE id = $1")

cur.execute("EXECUTE get_user (%s)", (42,))

The first execute incurs a parse/plan; subsequent runs reuse the plan, trimming latency for hot paths. Combine with pgbouncer transaction pooling (not statement pooling) to keep prepared statements available per backend session.

When prepared statements hurt

Prepared statements can be slower when parameter values change shape drastically (e.g., highly selective vs very broad values). Postgres can pick a generic plan that’s safe but not optimal. If you see performance regressions, consider:

  • Using simple EXECUTE without PREPARE for dynamic filters.
  • Adjusting server settings for plan caching.
  • Letting the driver handle simple parameterized statements without explicit PREPARE.

In short: prepare for “hot, repeatable” queries, not for every query.

Profiling queries from Python

I keep a tiny timer wrapper to log slow statements without touching the Postgres config:

import time

import logging

log = logging.getLogger("query")

class TimedCursor:

def init(self, cur, threshold_ms=50):

self.cur = cur

self.threshold = threshold_ms

def execute(self, sql, params=None):

start = time.perfcounterns()

self.cur.execute(sql, params)

elapsedms = (time.perfcounterns() - start) / 1000_000

if elapsed_ms > self.threshold:

log.warning("%sms %s params=%s", round(elapsed_ms, 2), sql, params)

return self

def fetchone(self):

return self.cur.fetchone()

def fetchmany(self, n):

return self.cur.fetchmany(n)

def fetchall(self):

return self.cur.fetchall()

Wrap it and surface N+1 patterns without noisy database logs.

pgstatstatements from the app side

Even if you can’t enable the extension in prod, mimic its value: hash your SQL strings and log a SHA256 plus timing. Later, group logs by hash to find the worst offenders without storing full query text.

The “two clocks” idea

I track two timings: the query time and the post‑query processing time. If a query returns 5,000 rows and then Python spends 150 ms serializing them, your “DB time” might look fine while the request is still slow. When I add instrumentation, I log both. It quickly reveals whether I need indexes or just better response shaping.

Patterns for everyday CRUD queries

Read with filters and paging

with PgConn() as conn:

with conn.cursor(cursor_factory=RealDictCursor) as cur:

cur.execute(

"""

SELECT id, email, plan

FROM accounts

WHERE active = TRUE AND plan = %s

ORDER BY created_at DESC

LIMIT %s OFFSET %s

""",

("pro", 50, 0),

)

rows = cur.fetchall()

Parameterized IN with many items

cur.execute(

"SELECT id FROM orders WHERE id = ANY(%s)",

([uuid_list],),

)

Write with RETURNING

cur.execute(

"INSERT INTO accounts (email, plan) VALUES (%s, %s) RETURNING id",

("[email protected]", "team"),

)

new_id = cur.fetchone()[0]

Bulk insert with execute_values for speed

from psycopg2.extras import execute_values

rows = [("[email protected]", "pro"), ("[email protected]", "pro")]

execute_values(

cur,

"INSERT INTO accounts (email, plan) VALUES %s",

rows,

page_size=500,

)

UPDATE with audit trail

cur.execute(

"""

UPDATE accounts

SET plan = %s,

updated_at = NOW()

WHERE id = %s

RETURNING id, updated_at

""",

("enterprise", 42),

)

Capture updated_at to feed caches or CDC pipelines.

DELETE with safety guards

I avoid “naked deletes” in production:

cur.execute(

"DELETE FROM sessions WHERE userid = %s AND createdat < NOW() - INTERVAL '30 days'",

(user_id,),

)

I keep it explicit and time‑bounded so accidental broad deletions are less likely.

Working safely with JSON, UUID, and timestamptz

psycopg2 adapts common Python types automatically, but I still take care:

  • Use psycopg2.extras.Json when inserting dicts; it ensures jsonb stores what you expect.
  • Prefer Python’s uuid.UUID objects to plain strings; Postgres validates format either way, but UUID objects fail faster client‑side.
  • Always store timestamps in UTC (timestamptz). Attach timezone.utc to datetime before passing to queries to avoid implicit conversion surprises.

De/serializing JSON efficiently

When reading JSON columns in hot paths, request RealDictCursor plus json.loads only where needed. One bad habit is to auto‑decode every JSON field for every row even when your API only returns a subset. Decode on demand:

import json

def readuser(cur, userid):

cur.execute("SELECT id, settings FROM users WHERE id = %s", (user_id,))

row = cur.fetchone()

settings = json.loads(row["settings"]) if row and row["settings"] else {}

return {"id": row["id"], "settings": settings}

Timestamps: don’t let the DB guess

I pass timezone‑aware datetimes and specify UTC explicitly:

from datetime import datetime, timezone

now = datetime.now(timezone.utc)

cur.execute("INSERT INTO events (occurred_at) VALUES (%s)", (now,))

If you mix naive and aware timestamps, you’ll eventually see “can’t compare offset‑naive and offset‑aware datetimes” bugs or silent shifts across timezones.

Indexing and query shape: the hidden half of performance

Query code can be perfect and still slow if the database has to scan large tables. I treat query work as 50% Python, 50% SQL structure and indexing. A few practical checks I run:

  • If a query filters on a column, I check whether there’s an index on that column.
  • If a query uses ILIKE or LOWER(...) patterns, I use an index on a computed expression.
  • If the query joins two big tables, I verify the join keys are indexed.

Example: predictable pagination

Offset pagination (LIMIT 50 OFFSET 10000) gets slower as you page deeper because the DB still has to scan the skipped rows. For large datasets I switch to keyset pagination:

cur.execute(

"""

SELECT id, created_at, email

FROM accounts

WHERE created_at < %s

ORDER BY created_at DESC

LIMIT 50

""",

(lastseencreated_at,),

)

Keyset pagination is more stable under load and plays nicely with indexes.

Guardrails for production querying

Here are the guardrails I actually set in production services:

  • statement_timeout: kills runaway queries. I prefer 30–120 seconds depending on the endpoint.
  • idleintransactionsessiontimeout: avoids sessions that sit in a transaction forever.
  • lock_timeout: fails fast instead of waiting minutes on a lock.
  • applicationname: tags every connection so I can search pgstat_activity by service.

In psycopg2, these are just connection options:

DSN = "... options=‘-c statementtimeout=120000 -c locktimeout=5000‘ applicationname=myservice"

These guardrails save more time than any micro‑optimization.

Error handling and diagnostics

psycopg2 errors include rich diagnostics, but only if you surface them. I extract and log key fields when queries fail:

import psycopg2

try:

cur.execute("SELECT * FROM missing_table")

except psycopg2.Error as e:

diag = e.diag

info = {

"pgcode": e.pgcode,

"message": e.pgerror,

"schema": diag.schema_name,

"table": diag.table_name,

"column": diag.column_name,

"constraint": diag.constraint_name,

}

raise RuntimeError(info)

This makes it far easier to debug schema issues and constraint violations without guesswork.

Query composition without ORMs

Sometimes you want composability without a full ORM. I use small functions that build SQL fragments safely. A minimal example:

from psycopg2 import sql

def filter_clause(filters):

clauses = []

params = []

if "plan" in filters:

clauses.append(sql.SQL("plan = %s"))

params.append(filters["plan"])

if "active" in filters:

clauses.append(sql.SQL("active = %s"))

params.append(filters["active"])

where = sql.SQL(" WHERE ") + sql.SQL(" AND ").join(clauses) if clauses else sql.SQL("")

return where, params

with PgConn() as conn, conn.cursor() as cur:

where, params = filter_clause({"plan": "pro", "active": True})

query = sql.SQL("SELECT id, email FROM accounts") + where

cur.execute(query, params)

rows = cur.fetchall()

It’s not a full query builder, but it scales to complex endpoints without becoming unreadable.

Handling large writes without pain

For big writes I use two tiers:

  • execute_values for up to tens of thousands of rows.
  • COPY for anything bigger.

The difference is dramatic. executevalues is easy; COPY is faster but requires formatting and careful error handling. In practice I start with executevalues and switch to COPY only when profiling proves it’s needed.

Chunked inserts to control memory

from psycopg2.extras import execute_values

def insert_users(cur, rows, chunk=1000):

for i in range(0, len(rows), chunk):

batch = rows[i:i+chunk]

execute_values(cur, "INSERT INTO users(email, plan) VALUES %s", batch)

Chunking keeps memory stable and avoids exceeding statement length limits.

Testing queries without mocking the world

I’m a fan of using a real Postgres container for tests. SQLite is fast, but it hides Postgres behaviors around JSON, arrays, and concurrency. A real DB gives you confidence that the query shapes, types, and constraints work as expected.

When testing query functions, I use:

  • A single test DB per suite.
  • Transaction rollbacks in fixtures to keep tests isolated.
  • Seed data for realistic edge cases (nulls, empty arrays, large text).

The payoff is fewer production surprises and fewer “works in tests but fails in prod” bugs.

Security checklist for querying

This is my minimum bar for query safety:

  • Use parameters for every value.
  • Use psycopg2.sql and allowlists for identifiers.
  • Avoid dynamic SQL string concatenation.
  • Keep read‑only endpoints in read‑only transactions when possible.
  • Enforce timeouts so malicious or accidental expensive queries can’t run forever.
  • Monitor failed logins and repeated errors.

Security is not just about injection—timeouts and least‑privilege roles matter too.

Observability: beyond timing

Timing is a start, but I also log:

  • Query names (logical identifiers, not raw SQL).
  • Row counts for large responses.
  • Connection acquisition time (pool wait time).

This lets me answer questions like: “Is the query slow or is the pool saturated?” If 80 ms of a 120 ms request is waiting for a connection, I need more connections or shorter transactions, not faster SQL.

Lightweight query tagging

One simple pattern: add comments to SQL for traceability.

cur.execute("/ getactiveaccounts / SELECT id, email FROM accounts WHERE active = TRUE")

Postgres exposes that comment in pgstatactivity, which makes real‑time debugging easier.

Common pitfalls I see in the wild

Here are mistakes I still see in otherwise well‑run teams:

  • Forgotten commits after SELECTs: leads to idle in transaction sessions.
  • SELECT * on large tables: unnecessary payload and Python overhead.
  • Using OFFSET for deep pagination: gets slower the deeper you go.
  • Building IN strings manually: risky and brittle.
  • Long‑lived server‑side cursors: they hold snapshots and block vacuum.
  • No timeouts: one runaway query can hurt the whole system.

The fix is usually just a small habit change, not a full rewrite.

Alternative approaches when psycopg2 isn’t ideal

I love psycopg2, but it’s not always the best fit. Here’s how I decide:

  • If I need async end‑to‑end: psycopg3 or asyncpg.
  • If I want a SQL toolkit with migration support: SQLAlchemy Core.
  • If I need a quick admin interface: Django ORM.
  • If I need cross‑database support: a higher‑level ORM or query builder.

But even then, I still think direct SQL + psycopg2 is the fastest path to understanding performance and behavior. The database is where your truth lives; seeing the SQL is good for your instincts.

Putting it together: a practical querying module

Here’s a compact, production‑friendly module that combines the patterns above:

import os

import time

import logging

from contextlib import contextmanager

import psycopg2

from psycopg2.extras import RealDictCursor, execute_values

log = logging.getLogger("db")

DSN = os.getenv("PG_DSN", "dbname=postgres user=postgres password=postgres host=127.0.0.1 port=5432")

@contextmanager

def conn_ctx(readonly=False):

conn = psycopg2.connect(DSN, cursor_factory=RealDictCursor)

conn.set_session(readonly=readonly)

try:

yield conn

conn.commit()

except Exception:

conn.rollback()

raise

finally:

conn.close()

@contextmanager

def cursor_ctx(conn):

cur = conn.cursor()

try:

yield cur

finally:

cur.close()

class Timed:

def init(self, cur, threshold_ms=50):

self.cur = cur

self.thresholdms = thresholdms

def execute(self, sql, params=None):

start = time.perfcounterns()

self.cur.execute(sql, params)

ms = (time.perfcounterns() - start) / 1000000

if ms > self.threshold_ms:

log.warning("slow_query=%sms sql=%s", round(ms, 2), sql)

return self

def fetchall(self):

return self.cur.fetchall()

def fetchone(self):

return self.cur.fetchone()

Example usage

def listactiveaccounts(limit=50):

with conn_ctx(readonly=True) as conn:

with cursor_ctx(conn) as cur:

t = Timed(cur)

t.execute(

"SELECT id, email FROM accounts WHERE active = TRUE ORDER BY created_at DESC LIMIT %s",

(limit,),

)

return t.fetchall()

def bulkinsertaccounts(rows):

with conn_ctx() as conn:

with cursor_ctx(conn) as cur:

executevalues(cur, "INSERT INTO accounts (email, plan) VALUES %s", rows, pagesize=500)

This is still small, but it includes the core patterns: scoped connections, safe cursors, read‑only transactions, and slow query logging.

Expansion Strategy

Add new sections or deepen existing ones with:

  • Deeper code examples: More complete, real-world implementations
  • Edge cases: What breaks and how to handle it
  • Practical scenarios: When to use vs when NOT to use
  • Performance considerations: Before/after comparisons (use ranges, not exact numbers)
  • Common pitfalls: Mistakes developers make and how to avoid them
  • Alternative approaches: Different ways to solve the same problem

If Relevant to Topic

  • Modern tooling and AI-assisted workflows (for infrastructure/framework topics)
  • Comparison tables for Traditional vs Modern approaches
  • Production considerations: deployment, monitoring, scaling

Final thoughts

Querying data in PostgreSQL from Python is not just about “making the query work.” It’s about building habits that keep your services stable under real traffic: parameterization, scoped transactions, fetch discipline, thoughtful pooling, and sensible timeouts. psycopg2 is still my default because it’s predictable, widely supported, and well understood by the ecosystem. But the deeper lesson is this: your driver won’t save you from poor query patterns. The good news is that you don’t need heroics to fix them. A handful of low‑effort patterns—most of them in this guide—turn messy querying into clean, reliable, and fast systems.

If you adopt just two changes this week, make them these: parameterize every query and set a timeout on every connection. Those two habits alone prevent most of the production incidents I see. Everything else in this guide is a multiplier on top of that foundation.

Scroll to Top