The first time event-driven code really “clicked” for me was while diagnosing a production service that looked idle—CPU was low, memory was stable—yet requests were timing out. The issue wasn’t raw horsepower. The service was waiting: on sockets, on downstream APIs, on timers, on a database connection pool. A traditional linear flow turned those waits into wasted time.
Event-driven programming flips the mental model. Instead of marching step-by-step and blocking whenever the world is slow, you arrange your program around events—“a socket became readable”, “a timer fired”, “a message arrived”, “a user clicked”—and you attach handlers that run when those events happen. In Python, the modern center of gravity for this style is asyncio: an event loop that schedules coroutines, resumes them when I/O is ready, and keeps your app responsive under load.
If you’ve used async/await but still feel unsure about event loops, tasks, cancellation, or how to structure a real app (not just toy sleep() examples), I’ll walk you through the model I use in 2026: the practical mechanics, the architecture patterns, and the mistakes I see even experienced developers make.
Event-Driven Thinking: Events, Handlers, and “Waiting Without Blocking”
Event-driven programming is less about “asynchronous code” and more about who owns time in your process.
In a synchronous script, your code owns time:
- You call
requests.get(...). - The thread blocks.
- Your code continues when the response arrives.
In an event-driven app, the event loop owns time:
- You start an operation (connect, read, sleep, wait for a message).
- Your coroutine yields control (via
await). - The loop runs other work.
- When an event occurs (I/O ready, timer fired), the loop resumes your coroutine.
A helpful analogy: think of the event loop as an air-traffic controller. Coroutines are flights. Each flight can’t occupy the runway forever; it must yield when it’s waiting. The controller decides what moves next.
This is why event-driven code scales so well for I/O-bound workloads:
- A single process can juggle thousands of open sockets when each handler is cooperative.
- Latency spikes in one dependency don’t necessarily freeze unrelated requests.
But it also comes with a strict rule:
- If you block the loop (CPU-heavy work,
time.sleep, blocking file/network I/O), you freeze everything that depends on it.
What counts as an “event” in Python services?
When I say “event”, I don’t just mean UI clicks. In Python backend services, the most common event sources I design around are:
- Socket readiness: a client connection has data to read, or the kernel send buffer can accept more bytes.
- Timers: deadlines, intervals, retries, periodic work.
- Queue messages: a job arrives from a broker, another subsystem, or an in-process queue.
- Subprocess signals / lifecycle: SIGTERM for shutdown, child process exit, health checks.
- Internal domain events: “user signed up”, “invoice paid”, “cache warmed”.
Once you start seeing these as events, application structure becomes clearer: you’re mostly writing small pieces of logic that react to the world, plus a coordinator that keeps the whole thing moving.
The Event Loop in Modern Python (and the Patterns I Actually Use)
In 2026, I treat asyncio.run(...) as the default entry point for asyncio programs. It creates and manages the event loop lifecycle correctly.
Here’s the canonical “Hello/World after 1 second” example, written in the modern style:
import asyncio
async def main() -> None:
print("Hello")
await asyncio.sleep(1)
print("World")
if name == "main":
asyncio.run(main())
A few practical notes that matter in real services:
asyncio.sleep(...)is an example of a timer event. The coroutine yields, and the loop resumes it later.- In libraries, prefer
asyncio.getrunningloop()inside a coroutine when you truly need the loop (for low-level calls likecall_later). - Avoid the old pattern of grabbing a global loop with
asyncio.geteventloop()at import time. In modern Python, that pattern is increasingly fragile because loop ownership is explicit.
Traditional vs event-driven: what changes in your design
Here’s how I explain the shift to teams that are moving from synchronous services.
Traditional (blocking)
—
Thread blocks
await) Threads/processes
A stuck call can tie up a worker thread
“Call stack tells the story”
# of threads/workers
If you’re building APIs, bots, socket servers, webhook processors, or message consumers, event-driven is usually the simplest way to get concurrency without spawning a thread per request.
The 3 rules I keep repeating to myself
When an asyncio system misbehaves, it almost always violates one of these:
- Never block the loop. If you must do blocking work, push it to a thread or process.
- Bound everything. Timeouts, queue sizes, concurrency limits.
- Make shutdown a design feature. Cancellation and cleanup paths should be intentional, not accidental.
Coroutines, Tasks, and Futures: The Trio You Must Understand
People often lump these together, but I find it easier to treat them as different layers.
Coroutines: the thing you write
A coroutine is an async def function. It can pause at await points.
import asyncio
async def greetcustomer(customername: str) -> None:
print(f"Hello {customer_name}")
await asyncio.sleep(1)
print(f"Goodbye {customer_name}")
if name == "main":
asyncio.run(greet_customer("Alice"))
Tasks: how coroutines run concurrently
If a coroutine is a “plan”, a Task is “the plan, scheduled and running”. You create tasks when you want concurrency.
import asyncio
async def sendwelcomeemail(user_id: int) -> None:
await asyncio.sleep(1)
print(f"Sent welcome email to user={user_id}")
async def warmcache(userid: int) -> None:
await asyncio.sleep(2)
print(f"Warmed cache for user={user_id}")
async def main() -> None:
emailtask = asyncio.createtask(sendwelcomeemail(42))
cachetask = asyncio.createtask(warm_cache(42))
# Await both; if one fails, you want to see it.
await asyncio.gather(emailtask, cachetask)
if name == "main":
asyncio.run(main())
In 2026, for multi-task orchestration, I strongly prefer structured concurrency with asyncio.TaskGroup (Python 3.11+). It makes cancellation and error propagation more predictable.
import asyncio
async def fetchprofile(userid: int) -> dict:
await asyncio.sleep(0.3)
return {"userid": userid, "plan": "pro"}
async def fetchusage(userid: int) -> dict:
await asyncio.sleep(0.5)
return {"userid": userid, "requests_24h": 1280}
async def main() -> None:
results: dict[str, dict] = {}
async with asyncio.TaskGroup() as tg:
profiletask = tg.createtask(fetch_profile(7))
usagetask = tg.createtask(fetch_usage(7))
# TaskGroup waits for completion before exiting.
results["profile"] = profile_task.result()
results["usage"] = usage_task.result()
print(results)
if name == "main":
asyncio.run(main())
Futures: the low-level placeholder
A Future is a “result that will exist later”. Most application code doesn’t need to instantiate Futures directly, but understanding them explains a lot of asyncio’s behavior.
This example creates a Future and completes it from a timer callback (a classic event-loop pattern):
import asyncio
async def main() -> None:
loop = asyncio.getrunningloop()
future: asyncio.Future[str] = loop.create_future()
def complete_future() -> None:
# This callback runs on the event loop thread.
if not future.done():
future.set_result("inventory refreshed")
loop.calllater(1.0, completefuture)
message = await future
print(f"Result: {message}")
if name == "main":
asyncio.run(main())
Where Futures show up in practice:
- You’re integrating with callback-style libraries.
- You’re bridging threads and asyncio.
- You’re implementing custom protocols.
A word on @asyncio.coroutine
You may still see legacy coroutine syntax (@asyncio.coroutine + yield from). In modern Python, I treat it as historical context. Use async def unless you’re stuck maintaining old code.
A Practical Architecture: An In-Process Event Bus You Can Grow Into
Most “event-driven” tutorials stop at asyncio.gather. Real systems need a way to route events to handlers, apply backpressure, and keep shutdown predictable.
When I’m building a single-process service (CLI daemon, webhook worker, small socket service), I often start with an in-process event bus:
- Events are typed objects.
- Producers publish to a queue.
- Consumers dispatch to handler functions.
Here’s a runnable example that you can paste into event_bus.py.
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Dict, List, Type
@dataclass(frozen=True)
class UserSignedUp:
user_id: int
email: str
@dataclass(frozen=True)
class PaymentFailed:
user_id: int
reason: str
Event = UserSignedUp | PaymentFailed
Handler = Callable[[Event], Awaitable[None]]
class EventBus:
def init(self, *, maxqueuesize: int = 1000) -> None:
self.queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=maxqueue_size)
self._handlers: Dict[Type[object], List[Handler]] = {}
self._stopping = asyncio.Event()
def subscribe(self, event_type: Type[object], handler: Handler) -> None:
self.handlers.setdefault(eventtype, []).append(handler)
async def publish(self, event: Event) -> None:
# Backpressure: if queue is full, publishers will await here.
await self._queue.put(event)
async def stop(self) -> None:
self._stopping.set()
async def run(self) -> None:
while not self.stopping.isset():
try:
event = await asyncio.waitfor(self.queue.get(), timeout=0.2)
except asyncio.TimeoutError:
continue
try:
await self._dispatch(event)
finally:
self.queue.taskdone()
async def _dispatch(self, event: Event) -> None:
event_type = type(event)
handlers = self.handlers.get(eventtype, [])
# Fan-out handlers concurrently; isolate failures.
# In bigger systems, you may want retries or a dead-letter queue.
tasks = [asyncio.create_task(h(event)) for h in handlers]
if not tasks:
return
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Handler error: {result!r}")
async def sendonboardingemail(event: Event) -> None:
assert isinstance(event, UserSignedUp)
await asyncio.sleep(0.1)
print(f"Email sent to {event.email}")
async def provisiondefaultworkspace(event: Event) -> None:
assert isinstance(event, UserSignedUp)
await asyncio.sleep(0.2)
print(f"Workspace provisioned for user={event.user_id}")
async def alertbillingteam(event: Event) -> None:
assert isinstance(event, PaymentFailed)
await asyncio.sleep(0.1)
print(f"Billing alert for user={event.user_id}: {event.reason}")
async def main() -> None:
bus = EventBus(maxqueuesize=100)
bus.subscribe(UserSignedUp, sendonboardingemail)
bus.subscribe(UserSignedUp, provisiondefaultworkspace)
bus.subscribe(PaymentFailed, alertbillingteam)
runner = asyncio.create_task(bus.run())
await bus.publish(UserSignedUp(user_id=101, email="[email protected]"))
await bus.publish(PaymentFailed(userid=101, reason="carddeclined"))
# Wait until all events currently in the queue are processed.
await bus._queue.join()
await bus.stop()
await runner
if name == "main":
asyncio.run(main())
Why I like starting here:
- You get clear boundaries between producers and consumers.
- You can add metrics, tracing, retries, and batching without rewriting call chains.
- Backpressure is explicit via
Queue(maxsize=...).
How it evolves:
- Replace the in-process queue with a broker (Redis streams, NATS, Kafka, SQS) when you need durability or multi-host scaling.
- Keep the same handler shape (
async def handle(event)).
Making the event bus “production-shaped” without overengineering
That example is intentionally small, but if I’m going to use it beyond a toy, I usually add four features early:
1) Explicit shutdown and draining
- Stop accepting new events.
- Drain what’s queued.
- Cancel in-flight handler tasks.
2) A concurrency limit for handlers
Even “async” systems can drown themselves by spawning too many tasks. A semaphore is a simple, effective throttle.
3) A dead-letter queue (DLQ) concept
If an event cannot be processed after retries, I want a place to put it. Even if that “place” is just a file or a database table in the first version.
4) Idempotency awareness
If you retry handlers, make sure the handler can safely run more than once (or ensure dedupe upstream).
I’ll revisit these in the reliability section, because this is where real systems either get calm—or get chaotic.
Real I/O Events: Webhooks, Sockets, and Timers Without the Pain
Event-driven programming shines when the outside world is unpredictable. Here are a few patterns I reach for.
Pattern 1: A tiny TCP server (event-driven by default)
Python’s asyncio can accept many concurrent connections in one process.
import asyncio
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
peer = writer.getextrainfo("peername")
print(f"Client connected: {peer}")
try:
while True:
line = await reader.readline()
if not line:
break
message = line.decode("utf-8").rstrip("\n")
response = f"ack: {message}\n"
writer.write(response.encode("utf-8"))
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
print(f"Client disconnected: {peer}")
async def main() -> None:
server = await asyncio.startserver(handleclient, host="127.0.0.1", port=9000)
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
print(f"Listening on {addrs}")
async with server:
await server.serve_forever()
if name == "main":
asyncio.run(main())
This is event-driven because:
- “Socket readable” events wake
reader.readline(). - “Socket writable” events let
writer.drain()proceed.
#### Edge cases I actually care about in socket handlers
- Slowloris-style clients: a client that sends bytes painfully slowly. Use timeouts around reads.
- Unbounded messages: never accept unlimited line sizes; set maximums.
- Backpressure: if your response can be large,
writer.drain()is your friend. If you ignore it, you can buffer too much in memory.
Pattern 2: Timers as events (periodic work)
For periodic jobs, I avoid crons inside a service unless there’s a clear reason. A timer loop with cancellation support is often enough.
import asyncio
import time
async def emit_heartbeat() -> None:
while True:
# Monotonic time is a good default for scheduling logic.
print(f"heartbeat ts={time.time():.0f}")
await asyncio.sleep(5)
async def main() -> None:
task = asyncio.createtask(emitheartbeat())
# Let it run for a bit, then cancel (demo for shutdown).
await asyncio.sleep(12)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("heartbeat stopped")
if name == "main":
asyncio.run(main())
In real services, that cancellation path matters. Your shutdown sequence should stop timers, stop accepting new work, and drain what’s in-flight.
#### A better periodic loop: fixed-rate scheduling (avoids drift)
A common bug: you do work, then sleep(interval). That drifts because “work time” adds to the interval.
If you want “run roughly every 10 seconds” regardless of work time, schedule against monotonic time:
import asyncio
import time
async def periodic(interval_s: float) -> None:
loop = asyncio.getrunningloop()
next_run = loop.time()
while True:
nextrun += intervals
# Do work.
print(f"tick wall={time.time():.0f}")
# Sleep until the next scheduled time (or 0 if we fell behind).
delay = max(0.0, next_run - loop.time())
await asyncio.sleep(delay)
async def main() -> None:
task = asyncio.create_task(periodic(2.0))
await asyncio.sleep(7)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if name == "main":
asyncio.run(main())
This pattern is especially useful for pollers, refreshers, and batch flushers.
Pattern 3: An async worker pool (queue + concurrency limit)
If you consume events/jobs and do I/O per job, I like this structure:
- A producer puts jobs into an
asyncio.Queue. - N workers pull jobs.
- A semaphore limits expensive operations.
import asyncio
from dataclasses import dataclass
@dataclass(frozen=True)
class Job:
job_id: int
payload: str
async def process_job(job: Job, sem: asyncio.Semaphore) -> None:
# Semaphore limits “hot” concurrency (e.g., outbound API calls).
async with sem:
await asyncio.sleep(0.2)
print(f"processed job={job.job_id} payload={job.payload}")
async def worker(name: str, q: asyncio.Queue[Job], sem: asyncio.Semaphore) -> None:
while True:
job = await q.get()
try:
await process_job(job, sem)
finally:
q.task_done()
async def main() -> None:
q: asyncio.Queue[Job] = asyncio.Queue(maxsize=100)
sem = asyncio.Semaphore(10)
workers = [asyncio.create_task(worker(f"w{i}", q, sem)) for i in range(5)]
for i in range(20):
await q.put(Job(job_id=i, payload="x" * (i % 5)))
await q.join() # Wait until all jobs are done.
for t in workers:
t.cancel()
await asyncio.gather(*workers, return_exceptions=True)
if name == "main":
asyncio.run(main())
This is event-driven in the most practical sense: the queue is your internal event stream, and workers react as items arrive. It scales nicely, and you can add retries/timeouts around process_job without changing the overall shape.
Reliability as a First-Class Event: Timeouts, Cancellation, and Backpressure
This is the part that separates “it works on my laptop” from “it survives Tuesday”.
Timeouts: don’t let one dependency hold the loop hostage
If an outbound call might hang, put a bound on it. I prefer asyncio.timeout(...) (newer style) when available, otherwise asyncio.wait_for.
import asyncio
async def callpartnerapi() -> str:
# Pretend the network is slow.
await asyncio.sleep(2)
return "ok"
async def main() -> None:
try:
async with asyncio.timeout(1.0):
result = await callpartnerapi()
print(result)
except TimeoutError:
print("partner API timed out")
if name == "main":
asyncio.run(main())
What I’ve learned the hard way: timeouts are not just about “not waiting too long”. They are also:
- A protection against partial outages.
- A lever for load shedding.
- A forcing function for your code to handle cancellation and cleanup.
#### Timeout budgets: one deadline, many awaits
In real handlers, you might do multiple awaits (db + cache + http). If each has its own 1s timeout, you can accidentally allow a 3s+ request.
A pattern I like is a single request-level deadline:
import asyncio
async def step(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return name
async def handle_request() -> list[str]:
results: list[str] = []
async with asyncio.timeout(1.2):
results.append(await step("cache", 0.4))
results.append(await step("db", 0.5))
results.append(await step("http", 0.6)) # This one will likely time out.
return results
async def main() -> None:
try:
print(await handle_request())
except TimeoutError:
print("request deadline exceeded")
if name == "main":
asyncio.run(main())
That single deadline forces you to think in terms of budgets, which is how production systems actually behave.
Cancellation: treat it like real control flow, not an exception you ignore
Cancellation is how you reclaim resources and keep shutdown fast. The important rule:
- Don’t accidentally swallow
asyncio.CancelledError.
The most common bug I see is overly broad exception handling:
# Bad pattern (don’t do this):
try:
...
except Exception:
...
In modern Python, cancellation is represented as an exception, and if you catch it and keep going, your service may refuse to shut down.
Here’s how I write cancellation-aware loops:
import asyncio
async def run_forever() -> None:
try:
while True:
await asyncio.sleep(1)
print("tick")
except asyncio.CancelledError:
# Cleanup belongs here.
print("got cancelled, cleaning up")
raise # Re-raise so the cancellation propagates.
async def main() -> None:
t = asyncio.createtask(runforever())
await asyncio.sleep(2.5)
t.cancel()
await asyncio.gather(t, return_exceptions=True)
if name == "main":
asyncio.run(main())
#### Cleanup rules I follow
When a task is cancelled, I want to answer three questions:
- What resources might be open? (sockets, files, db connections)
- What state might be half-written? (temp files, partially sent responses)
- Is it safe to retry? (idempotent operations)
If a handler needs “must-run cleanup” (like releasing a lock), put it in finally blocks.
#### When to shield from cancellation (rare, but real)
Sometimes you need a small section to complete even during shutdown (for example: a final flush of logs, or returning a borrowed connection to a pool). Cancellation shielding exists, but I use it sparingly because it can delay shutdown.
Conceptually:
- Allow cancellation for most operations.
- Shield only tiny, bounded cleanup steps.
Backpressure: the real secret behind stable event-driven systems
Backpressure means: when the system is overloaded, you slow down producers rather than letting memory explode or latency become infinite.
In asyncio, the cleanest backpressure primitive is still the queue:
- A bounded
asyncio.Queue(maxsize=N)forces producers to wait when full. - That waiting is cooperative and doesn’t block the whole process.
#### A practical backpressure pattern: “accept, enqueue, respond later”
If you’re handling incoming events faster than you can process:
- Don’t spawn unlimited tasks.
- Enqueue work.
- Let the queue limit act as your pressure valve.
Even in HTTP servers (framework-dependent), the equivalent is often:
- Return 429/503 when queues are too deep.
- Or apply per-tenant concurrency limits.
#### Concurrency limits (Semaphore) vs queue size
These solve different problems:
- Queue size limits buffering (how much work you’re willing to hold).
- Semaphore limits simultaneous expensive operations.
I almost always use both.
Retries (with jitter) and simple circuit breaking
Event-driven systems are great at retries because they can “wait without blocking” between attempts.
Here’s a retry helper I use as a starting point:
import asyncio
import random
from collections.abc import Awaitable, Callable
async def retry(
fn: Callable[[], Awaitable[str]],
*,
attempts: int = 4,
basedelays: float = 0.2,
maxdelays: float = 2.0,
) -> str:
last_exc: Exception | None = None
for i in range(attempts):
try:
return await fn()
except Exception as e:
last_exc = e
if i == attempts - 1:
break
# Exponential backoff with jitter.
delay = min(maxdelays, basedelays (2 * i))
delay = delay * (0.5 + random.random())
await asyncio.sleep(delay)
assert last_exc is not None
raise last_exc
Two production notes:
- Retries amplify traffic during outages; pair them with timeouts and (ideally) circuit breaking.
- Not everything should be retried. Validate whether failures are transient.
A minimal circuit-breaker idea (conceptual): if a dependency fails too often, “open the circuit” for a short window and fail fast, rather than stacking requests behind a failing service.
Structured Concurrency as an Architecture Tool (not just a language feature)
asyncio.TaskGroup is more than a convenience. It changes how I structure services.
Why TaskGroup reduces “ghost tasks”
The easiest way to leak tasks is:
- create tasks in many places
- forget to await them
- never cancel them on shutdown
TaskGroup encourages the opposite:
- tasks are created within a known scope
- leaving the scope waits for completion
- errors propagate in a predictable way
Pattern: fan-out work, cancel siblings on failure
If you need “all or nothing” behavior (e.g., you’re assembling a response from multiple dependencies), TaskGroup gives you a clean default: if one task fails, the group cancels the rest.
That behavior is often what you want in request handlers.
Pattern: supervisor tasks + worker tasks
For daemons, I often build:
- one supervisor TaskGroup for top-level tasks
- worker pools inside it
This lets me implement: “if any critical subsystem fails, shut down the whole service cleanly.”
Mixing Blocking Code with Event-Driven Code (Without Regret)
Most real Python programs live in a mixed world:
- Some libraries are async-native.
- Some are blocking (and may never become async).
The goal isn’t to eliminate blocking code. The goal is to isolate it so it doesn’t freeze the loop.
Use asyncio.to_thread for blocking calls
If you need to call a blocking function (CPU-light but waiting on I/O), run it in a thread:
import asyncio
import time
def blocking_io() -> str:
time.sleep(1) # Blocks a thread, not the event loop.
return "done"
async def main() -> None:
result = await asyncio.tothread(blockingio)
print(result)
if name == "main":
asyncio.run(main())
When I use this:
- legacy SDKs
- blocking filesystem operations (sometimes)
- quick integrations
When I avoid this:
- CPU-heavy work (threads won’t help much due to the GIL; consider processes)
- extremely high-volume call paths (thread overhead can become noticeable)
Use a process pool for CPU-bound work
If your handler does heavy computation (compression, parsing huge blobs, ML inference without a native release of the GIL), consider processes.
Conceptually:
- keep the event loop for coordination
- offload heavy CPU work to a process pool
Anti-pattern: “just make it async” wrappers everywhere
A common failure mode is sprinkling to_thread everywhere until the program “seems fine.” You can end up with:
- unbounded thread creation (or an overloaded executor)
- harder debugging
- hidden latency
I prefer to:
- isolate blocking work behind a small adapter module
- apply concurrency limits around that adapter
Graceful Shutdown: Signals, Draining, and “Stop Accepting New Work”
In event-driven programs, shutdown is not a footnote. It’s part of correctness.
A shutdown sequence I trust looks like this:
- Receive shutdown request (signal, admin command, parent process).
- Stop accepting new incoming work.
- Cancel background tasks and timers.
- Drain queues (within a timeout).
- Close network servers and resources.
Here’s a pattern I use for daemons:
import asyncio
import signal
async def serve(stop: asyncio.Event) -> None:
while not stop.is_set():
await asyncio.sleep(0.5)
print("serving...")
async def main() -> None:
stop = asyncio.Event()
loop = asyncio.getrunningloop()
# Signal handlers must be lightweight.
for s in (signal.SIGINT, signal.SIGTERM):
try:
loop.addsignalhandler(s, stop.set)
except NotImplementedError:
# Some platforms/event loops may not support this.
pass
task = asyncio.create_task(serve(stop))
await stop.wait()
task.cancel()
await asyncio.gather(task, return_exceptions=True)
print("shutdown complete")
if name == "main":
asyncio.run(main())
Two details I care about:
- The signal handler just flips an event. No heavy work.
- Cancellation is explicit and awaited.
Draining queues during shutdown
If you have an internal queue, a pragmatic approach is:
- stop producers
- wait for
queue.join()with a deadline - if deadline expires, cancel workers and exit
That way you don’t hang forever trying to be “perfect.”
Observability in Event-Driven Python: Logging, Metrics, and “Where Did My Time Go?”
Async systems can feel harder to debug because the call stack is no longer a single story. That means I lean more on observability.
Logging: add correlation IDs (and don’t lose them)
In concurrent systems, you need to answer:
- which log lines belong to which request/job?
A common approach is a request ID carried through the workflow. In async Python, this often uses contextvars so the ID follows the task.
Even if you don’t implement full context propagation, the habit helps: include jobid, userid, request_id in logs.
Metrics: measure queue depth and task latency
If I can only have a few metrics for a worker service, they are:
- queue depth (or backlog)
- time-in-queue (how long work waits before starting)
- handler duration
- timeout and error counts
These metrics make backpressure visible.
Tracing: async makes it more valuable
Distributed tracing becomes more valuable in event-driven systems because a single “request” might trigger many concurrent tasks and downstream calls.
Even without a full tracing stack, you can fake the benefit by measuring durations around awaits and logging them with the same correlation ID.
Testing Event-Driven Code: Make the Loop Your Test Fixture
Async code is testable, but you need to test the right things:
- timeouts actually time out
- cancellation leaves no leaked tasks
- backpressure behaves under load
Test the smallest unit: the handler
If you have an event bus, test handlers as pure async functions:
- feed them a known event
- assert on outputs/state
Test orchestration: queue + workers
For orchestration code, I like tests that:
- enqueue N jobs
- run workers
- ensure all jobs processed
- cancel workers
- assert no pending tasks
Test time as a dependency
If you have complex timer logic, consider injecting a clock or using loop time (loop.time()) rather than wall time. It reduces flakiness.
Common Pitfalls (and the fixes that actually work)
These are the mistakes I see repeatedly.
Pitfall 1: Blocking the loop accidentally
Symptoms:
- “random” latency spikes
- timeouts under load
- the whole service feels stuck
Causes:
time.sleep()insideasync def- blocking HTTP clients / database drivers inside handlers
- CPU-heavy parsing/serialization inside the loop
Fix:
- replace blocking calls with async-native libraries
- or isolate them with
asyncio.to_thread - offload CPU-heavy work to processes
Pitfall 2: Unlimited task creation
Symptoms:
- memory climbs
- CPU climbs due to scheduling overhead
- downstream dependencies get hammered
Fix:
- bounded queue
- semaphore limits
- TaskGroup scopes
Pitfall 3: Swallowing cancellation
Symptoms:
- SIGTERM doesn’t stop the process
- deploys hang
- shutdown takes forever
Fix:
- never blanket-catch exceptions without re-raising
CancelledError - put cleanup in
finally
Pitfall 4: “Async all the way down” pressure
Sometimes teams think everything must become async. That leads to churn and complexity.
Fix:
- only make the boundaries async where concurrency matters
- keep pure computation synchronous
- isolate blocking adapters rather than rewriting the world
When to Use Event-Driven Programming (and When Not To)
Event-driven is a tool, not a religion.
It’s a great fit when you have:
- lots of concurrent I/O (web servers, bots, socket services)
- many slow dependencies and you want to stay responsive
- streaming inputs/outputs
- queue-based processing
It’s not the best fit when:
- your workload is mostly CPU-bound and heavy
- you’re writing a small script that does one thing once
- your team/tooling ecosystem is heavily synchronous and the complexity cost outweighs benefits
In those cases, threads/processes or simple synchronous code can be the more honest solution.
Alternative Approaches (So You Know Your Options)
Even if you standardize on asyncio, it helps to know what else exists conceptually.
Threads
Threads are event-driven in their own way (preemptive scheduling). They can be simpler for retrofitting blocking libraries, but you pay with:
- higher overhead per concurrent unit
- harder shared-state correctness
Multiprocessing
Great for CPU-bound parallelism. Not a replacement for event-driven I/O, but complementary.
Other async ecosystems
There are other async frameworks and concurrency models in Python. Even if you never use them, the ideas (structured concurrency, cancellation semantics, nurseries) influence how you write asyncio code today.
A Practical “Bigger Example”: Event Bus + Worker Pool + Shutdown
To tie the ideas together, here’s what I consider a “starter production shape” for an in-process event-driven service:
- domain events
- bounded queue for backpressure
- worker tasks with concurrency limits
- timeouts and retries
- graceful shutdown
This is longer than a toy, but still small enough to understand in one sitting.
import asyncio
import random
from dataclasses import dataclass
from typing import Awaitable, Callable
@dataclass(frozen=True)
class UserSignedUp:
user_id: int
email: str
Event = UserSignedUp
Handler = Callable[[Event], Awaitable[None]]
class Service:
def init(self, *, queue_size: int = 100, workers: int = 4) -> None:
self.q: asyncio.Queue[Event] = asyncio.Queue(maxsize=queuesize)
self._stop = asyncio.Event()
self.workersn = workers
self._handlers: list[Handler] = []
self._sem = asyncio.Semaphore(10) # limit expensive operations
def subscribe(self, handler: Handler) -> None:
self._handlers.append(handler)
async def publish(self, event: Event) -> None:
await self._q.put(event)
async def stop(self) -> None:
self._stop.set()
async def run(self) -> None:
workers = [asyncio.createtask(self.worker(i)) for i in range(self.workersn)]
try:
await self._stop.wait()
# Drain queued work with a deadline.
try:
async with asyncio.timeout(2.0):
await self._q.join()
except TimeoutError:
pass
finally:
for t in workers:
t.cancel()
await asyncio.gather(*workers, return_exceptions=True)
async def _worker(self, idx: int) -> None:
try:
while True:
event = await self._q.get()
try:
await self._dispatch(event)
finally:
self.q.taskdone()
except asyncio.CancelledError:
raise
async def _dispatch(self, event: Event) -> None:
# Fan-out handlers concurrently, but keep it bounded.
async with asyncio.TaskGroup() as tg:
for h in self._handlers:
tg.createtask(self.run_handler(h, event))
async def runhandler(self, h: Handler, event: Event) -> None:
# Bound handler time. In real services, you might use per-handler budgets.
async with asyncio.timeout(1.0):
async with self._sem:
await h(event)
async def send_email(event: Event) -> None:
# Simulate flaky external dependency.
await asyncio.sleep(0.05)
if random.random() < 0.1:
raise RuntimeError("email provider error")
print(f"email to {event.email}")
async def provision_workspace(event: Event) -> None:
await asyncio.sleep(0.08)
print(f"workspace for user={event.user_id}")
async def main() -> None:
svc = Service(queue_size=50, workers=3)
svc.subscribe(send_email)
svc.subscribe(provision_workspace)
runner = asyncio.create_task(svc.run())
for i in range(30):
await svc.publish(UserSignedUp(user_id=i, email=f"u{i}@example.com"))
# Let it work a moment then shut down.
await asyncio.sleep(0.3)
await svc.stop()
await runner
if name == "main":
asyncio.run(main())
What this demonstrates:
- Backpressure: bounded queue.
- Concurrency limits: semaphore.
- Predictable cancellation: workers are cancelled at shutdown.
- Structured concurrency: TaskGroup per event dispatch.
What it doesn’t include (but you can add next):
- retries with jitter per handler
- DLQ for failed events
- persistence/durability for events
- per-tenant limits
- metrics and tracing hooks
Performance Considerations (Practical, Not Magical)
Event-driven programming improves throughput and latency mainly by reducing wasted waiting. But it’s not free.
Where asyncio shines
I see the biggest wins when:
- average I/O wait time is high relative to CPU time
- concurrency is high (hundreds to tens of thousands of in-flight operations)
- you can keep handlers lightweight and cooperative
In these conditions, it’s common to see improvements like:
- lower tail latency (p95/p99) because one slow request doesn’t hog a worker thread
- better resource usage because you don’t need one thread per concurrent request
I intentionally think in ranges rather than promises: the win might be “noticeable” or “dramatic” depending on dependencies and workload.
Where asyncio can disappoint
- If you do heavy CPU work in handlers, you’ll bottleneck on the event loop.
- If you call blocking libraries without isolation, you lose the benefits.
- If you create too many tasks, scheduling overhead becomes a bottleneck.
The “real” tuning knobs
When a service is struggling, I usually tune these before anything else:
- timeouts and deadlines
- queue sizes
- semaphores / concurrency limits
- batching (do fewer, bigger operations)
- retry policy (including max attempts)
Expansion Strategy
Add new sections or deepen existing ones with:
- Deeper code examples: More complete, real-world implementations
- Edge cases: What breaks and how to handle it
- Practical scenarios: When to use vs when NOT to use
- Performance considerations: Before/after comparisons (use ranges, not exact numbers)
- Common pitfalls: Mistakes developers make and how to avoid them
- Alternative approaches: Different ways to solve the same problem
If Relevant to Topic
- Modern tooling and AI-assisted workflows (for infrastructure/framework topics)
- Comparison tables for Traditional vs Modern approaches
- Production considerations: deployment, monitoring, scaling
If you take nothing else from this: event-driven programming in Python isn’t about sprinkling async everywhere. It’s about designing around waiting, bounding your concurrency, and treating timeouts/cancellation/backpressure as part of your core correctness story. That’s what keeps services responsive when dependencies are slow, traffic is spiky, and the world is messy.


