Python Event-Driven Programming in Practice (asyncio in 2026)

The first time event-driven code really “clicked” for me was while diagnosing a production service that looked idle—CPU was low, memory was stable—yet requests were timing out. The issue wasn’t raw horsepower. The service was waiting: on sockets, on downstream APIs, on timers, on a database connection pool. A traditional linear flow turned those waits into wasted time.

Event-driven programming flips the mental model. Instead of marching step-by-step and blocking whenever the world is slow, you arrange your program around events—“a socket became readable”, “a timer fired”, “a message arrived”, “a user clicked”—and you attach handlers that run when those events happen. In Python, the modern center of gravity for this style is asyncio: an event loop that schedules coroutines, resumes them when I/O is ready, and keeps your app responsive under load.

If you’ve used async/await but still feel unsure about event loops, tasks, cancellation, or how to structure a real app (not just toy sleep() examples), I’ll walk you through the model I use in 2026: the practical mechanics, the architecture patterns, and the mistakes I see even experienced developers make.

Event-Driven Thinking: Events, Handlers, and “Waiting Without Blocking”

Event-driven programming is less about “asynchronous code” and more about who owns time in your process.

In a synchronous script, your code owns time:

  • You call requests.get(...).
  • The thread blocks.
  • Your code continues when the response arrives.

In an event-driven app, the event loop owns time:

  • You start an operation (connect, read, sleep, wait for a message).
  • Your coroutine yields control (via await).
  • The loop runs other work.
  • When an event occurs (I/O ready, timer fired), the loop resumes your coroutine.

A helpful analogy: think of the event loop as an air-traffic controller. Coroutines are flights. Each flight can’t occupy the runway forever; it must yield when it’s waiting. The controller decides what moves next.

This is why event-driven code scales so well for I/O-bound workloads:

  • A single process can juggle thousands of open sockets when each handler is cooperative.
  • Latency spikes in one dependency don’t necessarily freeze unrelated requests.

But it also comes with a strict rule:

  • If you block the loop (CPU-heavy work, time.sleep, blocking file/network I/O), you freeze everything that depends on it.

What counts as an “event” in Python services?

When I say “event”, I don’t just mean UI clicks. In Python backend services, the most common event sources I design around are:

  • Socket readiness: a client connection has data to read, or the kernel send buffer can accept more bytes.
  • Timers: deadlines, intervals, retries, periodic work.
  • Queue messages: a job arrives from a broker, another subsystem, or an in-process queue.
  • Subprocess signals / lifecycle: SIGTERM for shutdown, child process exit, health checks.
  • Internal domain events: “user signed up”, “invoice paid”, “cache warmed”.

Once you start seeing these as events, application structure becomes clearer: you’re mostly writing small pieces of logic that react to the world, plus a coordinator that keeps the whole thing moving.

The Event Loop in Modern Python (and the Patterns I Actually Use)

In 2026, I treat asyncio.run(...) as the default entry point for asyncio programs. It creates and manages the event loop lifecycle correctly.

Here’s the canonical “Hello/World after 1 second” example, written in the modern style:

import asyncio

async def main() -> None:

print("Hello")

await asyncio.sleep(1)

print("World")

if name == "main":

asyncio.run(main())

A few practical notes that matter in real services:

  • asyncio.sleep(...) is an example of a timer event. The coroutine yields, and the loop resumes it later.
  • In libraries, prefer asyncio.getrunningloop() inside a coroutine when you truly need the loop (for low-level calls like call_later).
  • Avoid the old pattern of grabbing a global loop with asyncio.geteventloop() at import time. In modern Python, that pattern is increasingly fragile because loop ownership is explicit.

Traditional vs event-driven: what changes in your design

Here’s how I explain the shift to teams that are moving from synchronous services.

Concern

Traditional (blocking)

Event-driven (async) —

— Waiting on I/O

Thread blocks

Coroutine yields (await) Concurrency

Threads/processes

Tasks on one loop + optional threads Failure isolation

A stuck call can tie up a worker thread

A stuck call can be timed out and cancelled without stalling unrelated tasks Structure

“Call stack tells the story”

“State + events tell the story” Tuning knob

# of threads/workers

Backpressure, task limits, timeouts

If you’re building APIs, bots, socket servers, webhook processors, or message consumers, event-driven is usually the simplest way to get concurrency without spawning a thread per request.

The 3 rules I keep repeating to myself

When an asyncio system misbehaves, it almost always violates one of these:

  • Never block the loop. If you must do blocking work, push it to a thread or process.
  • Bound everything. Timeouts, queue sizes, concurrency limits.
  • Make shutdown a design feature. Cancellation and cleanup paths should be intentional, not accidental.

Coroutines, Tasks, and Futures: The Trio You Must Understand

People often lump these together, but I find it easier to treat them as different layers.

Coroutines: the thing you write

A coroutine is an async def function. It can pause at await points.

import asyncio

async def greetcustomer(customername: str) -> None:

print(f"Hello {customer_name}")

await asyncio.sleep(1)

print(f"Goodbye {customer_name}")

if name == "main":

asyncio.run(greet_customer("Alice"))

Tasks: how coroutines run concurrently

If a coroutine is a “plan”, a Task is “the plan, scheduled and running”. You create tasks when you want concurrency.

import asyncio

async def sendwelcomeemail(user_id: int) -> None:

await asyncio.sleep(1)

print(f"Sent welcome email to user={user_id}")

async def warmcache(userid: int) -> None:

await asyncio.sleep(2)

print(f"Warmed cache for user={user_id}")

async def main() -> None:

emailtask = asyncio.createtask(sendwelcomeemail(42))

cachetask = asyncio.createtask(warm_cache(42))

# Await both; if one fails, you want to see it.

await asyncio.gather(emailtask, cachetask)

if name == "main":

asyncio.run(main())

In 2026, for multi-task orchestration, I strongly prefer structured concurrency with asyncio.TaskGroup (Python 3.11+). It makes cancellation and error propagation more predictable.

import asyncio

async def fetchprofile(userid: int) -> dict:

await asyncio.sleep(0.3)

return {"userid": userid, "plan": "pro"}

async def fetchusage(userid: int) -> dict:

await asyncio.sleep(0.5)

return {"userid": userid, "requests_24h": 1280}

async def main() -> None:

results: dict[str, dict] = {}

async with asyncio.TaskGroup() as tg:

profiletask = tg.createtask(fetch_profile(7))

usagetask = tg.createtask(fetch_usage(7))

# TaskGroup waits for completion before exiting.

results["profile"] = profile_task.result()

results["usage"] = usage_task.result()

print(results)

if name == "main":

asyncio.run(main())

Futures: the low-level placeholder

A Future is a “result that will exist later”. Most application code doesn’t need to instantiate Futures directly, but understanding them explains a lot of asyncio’s behavior.

This example creates a Future and completes it from a timer callback (a classic event-loop pattern):

import asyncio

async def main() -> None:

loop = asyncio.getrunningloop()

future: asyncio.Future[str] = loop.create_future()

def complete_future() -> None:

# This callback runs on the event loop thread.

if not future.done():

future.set_result("inventory refreshed")

loop.calllater(1.0, completefuture)

message = await future

print(f"Result: {message}")

if name == "main":

asyncio.run(main())

Where Futures show up in practice:

  • You’re integrating with callback-style libraries.
  • You’re bridging threads and asyncio.
  • You’re implementing custom protocols.

A word on @asyncio.coroutine

You may still see legacy coroutine syntax (@asyncio.coroutine + yield from). In modern Python, I treat it as historical context. Use async def unless you’re stuck maintaining old code.

A Practical Architecture: An In-Process Event Bus You Can Grow Into

Most “event-driven” tutorials stop at asyncio.gather. Real systems need a way to route events to handlers, apply backpressure, and keep shutdown predictable.

When I’m building a single-process service (CLI daemon, webhook worker, small socket service), I often start with an in-process event bus:

  • Events are typed objects.
  • Producers publish to a queue.
  • Consumers dispatch to handler functions.

Here’s a runnable example that you can paste into event_bus.py.

import asyncio

from dataclasses import dataclass

from typing import Awaitable, Callable, Dict, List, Type

@dataclass(frozen=True)

class UserSignedUp:

user_id: int

email: str

@dataclass(frozen=True)

class PaymentFailed:

user_id: int

reason: str

Event = UserSignedUp | PaymentFailed

Handler = Callable[[Event], Awaitable[None]]

class EventBus:

def init(self, *, maxqueuesize: int = 1000) -> None:

self.queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=maxqueue_size)

self._handlers: Dict[Type[object], List[Handler]] = {}

self._stopping = asyncio.Event()

def subscribe(self, event_type: Type[object], handler: Handler) -> None:

self.handlers.setdefault(eventtype, []).append(handler)

async def publish(self, event: Event) -> None:

# Backpressure: if queue is full, publishers will await here.

await self._queue.put(event)

async def stop(self) -> None:

self._stopping.set()

async def run(self) -> None:

while not self.stopping.isset():

try:

event = await asyncio.waitfor(self.queue.get(), timeout=0.2)

except asyncio.TimeoutError:

continue

try:

await self._dispatch(event)

finally:

self.queue.taskdone()

async def _dispatch(self, event: Event) -> None:

event_type = type(event)

handlers = self.handlers.get(eventtype, [])

# Fan-out handlers concurrently; isolate failures.

# In bigger systems, you may want retries or a dead-letter queue.

tasks = [asyncio.create_task(h(event)) for h in handlers]

if not tasks:

return

results = await asyncio.gather(*tasks, return_exceptions=True)

for result in results:

if isinstance(result, Exception):

print(f"Handler error: {result!r}")

async def sendonboardingemail(event: Event) -> None:

assert isinstance(event, UserSignedUp)

await asyncio.sleep(0.1)

print(f"Email sent to {event.email}")

async def provisiondefaultworkspace(event: Event) -> None:

assert isinstance(event, UserSignedUp)

await asyncio.sleep(0.2)

print(f"Workspace provisioned for user={event.user_id}")

async def alertbillingteam(event: Event) -> None:

assert isinstance(event, PaymentFailed)

await asyncio.sleep(0.1)

print(f"Billing alert for user={event.user_id}: {event.reason}")

async def main() -> None:

bus = EventBus(maxqueuesize=100)

bus.subscribe(UserSignedUp, sendonboardingemail)

bus.subscribe(UserSignedUp, provisiondefaultworkspace)

bus.subscribe(PaymentFailed, alertbillingteam)

runner = asyncio.create_task(bus.run())

await bus.publish(UserSignedUp(user_id=101, email="[email protected]"))

await bus.publish(PaymentFailed(userid=101, reason="carddeclined"))

# Wait until all events currently in the queue are processed.

await bus._queue.join()

await bus.stop()

await runner

if name == "main":

asyncio.run(main())

Why I like starting here:

  • You get clear boundaries between producers and consumers.
  • You can add metrics, tracing, retries, and batching without rewriting call chains.
  • Backpressure is explicit via Queue(maxsize=...).

How it evolves:

  • Replace the in-process queue with a broker (Redis streams, NATS, Kafka, SQS) when you need durability or multi-host scaling.
  • Keep the same handler shape (async def handle(event)).

Making the event bus “production-shaped” without overengineering

That example is intentionally small, but if I’m going to use it beyond a toy, I usually add four features early:

1) Explicit shutdown and draining

  • Stop accepting new events.
  • Drain what’s queued.
  • Cancel in-flight handler tasks.

2) A concurrency limit for handlers

Even “async” systems can drown themselves by spawning too many tasks. A semaphore is a simple, effective throttle.

3) A dead-letter queue (DLQ) concept

If an event cannot be processed after retries, I want a place to put it. Even if that “place” is just a file or a database table in the first version.

4) Idempotency awareness

If you retry handlers, make sure the handler can safely run more than once (or ensure dedupe upstream).

I’ll revisit these in the reliability section, because this is where real systems either get calm—or get chaotic.

Real I/O Events: Webhooks, Sockets, and Timers Without the Pain

Event-driven programming shines when the outside world is unpredictable. Here are a few patterns I reach for.

Pattern 1: A tiny TCP server (event-driven by default)

Python’s asyncio can accept many concurrent connections in one process.

import asyncio

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:

peer = writer.getextrainfo("peername")

print(f"Client connected: {peer}")

try:

while True:

line = await reader.readline()

if not line:

break

message = line.decode("utf-8").rstrip("\n")

response = f"ack: {message}\n"

writer.write(response.encode("utf-8"))

await writer.drain()

finally:

writer.close()

await writer.wait_closed()

print(f"Client disconnected: {peer}")

async def main() -> None:

server = await asyncio.startserver(handleclient, host="127.0.0.1", port=9000)

addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])

print(f"Listening on {addrs}")

async with server:

await server.serve_forever()

if name == "main":

asyncio.run(main())

This is event-driven because:

  • “Socket readable” events wake reader.readline().
  • “Socket writable” events let writer.drain() proceed.

#### Edge cases I actually care about in socket handlers

  • Slowloris-style clients: a client that sends bytes painfully slowly. Use timeouts around reads.
  • Unbounded messages: never accept unlimited line sizes; set maximums.
  • Backpressure: if your response can be large, writer.drain() is your friend. If you ignore it, you can buffer too much in memory.

Pattern 2: Timers as events (periodic work)

For periodic jobs, I avoid crons inside a service unless there’s a clear reason. A timer loop with cancellation support is often enough.

import asyncio

import time

async def emit_heartbeat() -> None:

while True:

# Monotonic time is a good default for scheduling logic.

print(f"heartbeat ts={time.time():.0f}")

await asyncio.sleep(5)

async def main() -> None:

task = asyncio.createtask(emitheartbeat())

# Let it run for a bit, then cancel (demo for shutdown).

await asyncio.sleep(12)

task.cancel()

try:

await task

except asyncio.CancelledError:

print("heartbeat stopped")

if name == "main":

asyncio.run(main())

In real services, that cancellation path matters. Your shutdown sequence should stop timers, stop accepting new work, and drain what’s in-flight.

#### A better periodic loop: fixed-rate scheduling (avoids drift)

A common bug: you do work, then sleep(interval). That drifts because “work time” adds to the interval.

If you want “run roughly every 10 seconds” regardless of work time, schedule against monotonic time:

import asyncio

import time

async def periodic(interval_s: float) -> None:

loop = asyncio.getrunningloop()

next_run = loop.time()

while True:

nextrun += intervals

# Do work.

print(f"tick wall={time.time():.0f}")

# Sleep until the next scheduled time (or 0 if we fell behind).

delay = max(0.0, next_run - loop.time())

await asyncio.sleep(delay)

async def main() -> None:

task = asyncio.create_task(periodic(2.0))

await asyncio.sleep(7)

task.cancel()

try:

await task

except asyncio.CancelledError:

pass

if name == "main":

asyncio.run(main())

This pattern is especially useful for pollers, refreshers, and batch flushers.

Pattern 3: An async worker pool (queue + concurrency limit)

If you consume events/jobs and do I/O per job, I like this structure:

  • A producer puts jobs into an asyncio.Queue.
  • N workers pull jobs.
  • A semaphore limits expensive operations.
import asyncio

from dataclasses import dataclass

@dataclass(frozen=True)

class Job:

job_id: int

payload: str

async def process_job(job: Job, sem: asyncio.Semaphore) -> None:

# Semaphore limits “hot” concurrency (e.g., outbound API calls).

async with sem:

await asyncio.sleep(0.2)

print(f"processed job={job.job_id} payload={job.payload}")

async def worker(name: str, q: asyncio.Queue[Job], sem: asyncio.Semaphore) -> None:

while True:

job = await q.get()

try:

await process_job(job, sem)

finally:

q.task_done()

async def main() -> None:

q: asyncio.Queue[Job] = asyncio.Queue(maxsize=100)

sem = asyncio.Semaphore(10)

workers = [asyncio.create_task(worker(f"w{i}", q, sem)) for i in range(5)]

for i in range(20):

await q.put(Job(job_id=i, payload="x" * (i % 5)))

await q.join() # Wait until all jobs are done.

for t in workers:

t.cancel()

await asyncio.gather(*workers, return_exceptions=True)

if name == "main":

asyncio.run(main())

This is event-driven in the most practical sense: the queue is your internal event stream, and workers react as items arrive. It scales nicely, and you can add retries/timeouts around process_job without changing the overall shape.

Reliability as a First-Class Event: Timeouts, Cancellation, and Backpressure

This is the part that separates “it works on my laptop” from “it survives Tuesday”.

Timeouts: don’t let one dependency hold the loop hostage

If an outbound call might hang, put a bound on it. I prefer asyncio.timeout(...) (newer style) when available, otherwise asyncio.wait_for.

import asyncio

async def callpartnerapi() -> str:

# Pretend the network is slow.

await asyncio.sleep(2)

return "ok"

async def main() -> None:

try:

async with asyncio.timeout(1.0):

result = await callpartnerapi()

print(result)

except TimeoutError:

print("partner API timed out")

if name == "main":

asyncio.run(main())

What I’ve learned the hard way: timeouts are not just about “not waiting too long”. They are also:

  • A protection against partial outages.
  • A lever for load shedding.
  • A forcing function for your code to handle cancellation and cleanup.

#### Timeout budgets: one deadline, many awaits

In real handlers, you might do multiple awaits (db + cache + http). If each has its own 1s timeout, you can accidentally allow a 3s+ request.

A pattern I like is a single request-level deadline:

import asyncio

async def step(name: str, delay: float) -> str:

await asyncio.sleep(delay)

return name

async def handle_request() -> list[str]:

results: list[str] = []

async with asyncio.timeout(1.2):

results.append(await step("cache", 0.4))

results.append(await step("db", 0.5))

results.append(await step("http", 0.6)) # This one will likely time out.

return results

async def main() -> None:

try:

print(await handle_request())

except TimeoutError:

print("request deadline exceeded")

if name == "main":

asyncio.run(main())

That single deadline forces you to think in terms of budgets, which is how production systems actually behave.

Cancellation: treat it like real control flow, not an exception you ignore

Cancellation is how you reclaim resources and keep shutdown fast. The important rule:

  • Don’t accidentally swallow asyncio.CancelledError.

The most common bug I see is overly broad exception handling:

# Bad pattern (don’t do this):

try:

...

except Exception:

...

In modern Python, cancellation is represented as an exception, and if you catch it and keep going, your service may refuse to shut down.

Here’s how I write cancellation-aware loops:

import asyncio

async def run_forever() -> None:

try:

while True:

await asyncio.sleep(1)

print("tick")

except asyncio.CancelledError:

# Cleanup belongs here.

print("got cancelled, cleaning up")

raise # Re-raise so the cancellation propagates.

async def main() -> None:

t = asyncio.createtask(runforever())

await asyncio.sleep(2.5)

t.cancel()

await asyncio.gather(t, return_exceptions=True)

if name == "main":

asyncio.run(main())

#### Cleanup rules I follow

When a task is cancelled, I want to answer three questions:

  • What resources might be open? (sockets, files, db connections)
  • What state might be half-written? (temp files, partially sent responses)
  • Is it safe to retry? (idempotent operations)

If a handler needs “must-run cleanup” (like releasing a lock), put it in finally blocks.

#### When to shield from cancellation (rare, but real)

Sometimes you need a small section to complete even during shutdown (for example: a final flush of logs, or returning a borrowed connection to a pool). Cancellation shielding exists, but I use it sparingly because it can delay shutdown.

Conceptually:

  • Allow cancellation for most operations.
  • Shield only tiny, bounded cleanup steps.

Backpressure: the real secret behind stable event-driven systems

Backpressure means: when the system is overloaded, you slow down producers rather than letting memory explode or latency become infinite.

In asyncio, the cleanest backpressure primitive is still the queue:

  • A bounded asyncio.Queue(maxsize=N) forces producers to wait when full.
  • That waiting is cooperative and doesn’t block the whole process.

#### A practical backpressure pattern: “accept, enqueue, respond later”

If you’re handling incoming events faster than you can process:

  • Don’t spawn unlimited tasks.
  • Enqueue work.
  • Let the queue limit act as your pressure valve.

Even in HTTP servers (framework-dependent), the equivalent is often:

  • Return 429/503 when queues are too deep.
  • Or apply per-tenant concurrency limits.

#### Concurrency limits (Semaphore) vs queue size

These solve different problems:

  • Queue size limits buffering (how much work you’re willing to hold).
  • Semaphore limits simultaneous expensive operations.

I almost always use both.

Retries (with jitter) and simple circuit breaking

Event-driven systems are great at retries because they can “wait without blocking” between attempts.

Here’s a retry helper I use as a starting point:

import asyncio

import random

from collections.abc import Awaitable, Callable

async def retry(

fn: Callable[[], Awaitable[str]],

*,

attempts: int = 4,

basedelays: float = 0.2,

maxdelays: float = 2.0,

) -> str:

last_exc: Exception | None = None

for i in range(attempts):

try:

return await fn()

except Exception as e:

last_exc = e

if i == attempts - 1:

break

# Exponential backoff with jitter.

delay = min(maxdelays, basedelays (2 * i))

delay = delay * (0.5 + random.random())

await asyncio.sleep(delay)

assert last_exc is not None

raise last_exc

Two production notes:

  • Retries amplify traffic during outages; pair them with timeouts and (ideally) circuit breaking.
  • Not everything should be retried. Validate whether failures are transient.

A minimal circuit-breaker idea (conceptual): if a dependency fails too often, “open the circuit” for a short window and fail fast, rather than stacking requests behind a failing service.

Structured Concurrency as an Architecture Tool (not just a language feature)

asyncio.TaskGroup is more than a convenience. It changes how I structure services.

Why TaskGroup reduces “ghost tasks”

The easiest way to leak tasks is:

  • create tasks in many places
  • forget to await them
  • never cancel them on shutdown

TaskGroup encourages the opposite:

  • tasks are created within a known scope
  • leaving the scope waits for completion
  • errors propagate in a predictable way

Pattern: fan-out work, cancel siblings on failure

If you need “all or nothing” behavior (e.g., you’re assembling a response from multiple dependencies), TaskGroup gives you a clean default: if one task fails, the group cancels the rest.

That behavior is often what you want in request handlers.

Pattern: supervisor tasks + worker tasks

For daemons, I often build:

  • one supervisor TaskGroup for top-level tasks
  • worker pools inside it

This lets me implement: “if any critical subsystem fails, shut down the whole service cleanly.”

Mixing Blocking Code with Event-Driven Code (Without Regret)

Most real Python programs live in a mixed world:

  • Some libraries are async-native.
  • Some are blocking (and may never become async).

The goal isn’t to eliminate blocking code. The goal is to isolate it so it doesn’t freeze the loop.

Use asyncio.to_thread for blocking calls

If you need to call a blocking function (CPU-light but waiting on I/O), run it in a thread:

import asyncio

import time

def blocking_io() -> str:

time.sleep(1) # Blocks a thread, not the event loop.

return "done"

async def main() -> None:

result = await asyncio.tothread(blockingio)

print(result)

if name == "main":

asyncio.run(main())

When I use this:

  • legacy SDKs
  • blocking filesystem operations (sometimes)
  • quick integrations

When I avoid this:

  • CPU-heavy work (threads won’t help much due to the GIL; consider processes)
  • extremely high-volume call paths (thread overhead can become noticeable)

Use a process pool for CPU-bound work

If your handler does heavy computation (compression, parsing huge blobs, ML inference without a native release of the GIL), consider processes.

Conceptually:

  • keep the event loop for coordination
  • offload heavy CPU work to a process pool

Anti-pattern: “just make it async” wrappers everywhere

A common failure mode is sprinkling to_thread everywhere until the program “seems fine.” You can end up with:

  • unbounded thread creation (or an overloaded executor)
  • harder debugging
  • hidden latency

I prefer to:

  • isolate blocking work behind a small adapter module
  • apply concurrency limits around that adapter

Graceful Shutdown: Signals, Draining, and “Stop Accepting New Work”

In event-driven programs, shutdown is not a footnote. It’s part of correctness.

A shutdown sequence I trust looks like this:

  • Receive shutdown request (signal, admin command, parent process).
  • Stop accepting new incoming work.
  • Cancel background tasks and timers.
  • Drain queues (within a timeout).
  • Close network servers and resources.

Here’s a pattern I use for daemons:

import asyncio

import signal

async def serve(stop: asyncio.Event) -> None:

while not stop.is_set():

await asyncio.sleep(0.5)

print("serving...")

async def main() -> None:

stop = asyncio.Event()

loop = asyncio.getrunningloop()

# Signal handlers must be lightweight.

for s in (signal.SIGINT, signal.SIGTERM):

try:

loop.addsignalhandler(s, stop.set)

except NotImplementedError:

# Some platforms/event loops may not support this.

pass

task = asyncio.create_task(serve(stop))

await stop.wait()

task.cancel()

await asyncio.gather(task, return_exceptions=True)

print("shutdown complete")

if name == "main":

asyncio.run(main())

Two details I care about:

  • The signal handler just flips an event. No heavy work.
  • Cancellation is explicit and awaited.

Draining queues during shutdown

If you have an internal queue, a pragmatic approach is:

  • stop producers
  • wait for queue.join() with a deadline
  • if deadline expires, cancel workers and exit

That way you don’t hang forever trying to be “perfect.”

Observability in Event-Driven Python: Logging, Metrics, and “Where Did My Time Go?”

Async systems can feel harder to debug because the call stack is no longer a single story. That means I lean more on observability.

Logging: add correlation IDs (and don’t lose them)

In concurrent systems, you need to answer:

  • which log lines belong to which request/job?

A common approach is a request ID carried through the workflow. In async Python, this often uses contextvars so the ID follows the task.

Even if you don’t implement full context propagation, the habit helps: include jobid, userid, request_id in logs.

Metrics: measure queue depth and task latency

If I can only have a few metrics for a worker service, they are:

  • queue depth (or backlog)
  • time-in-queue (how long work waits before starting)
  • handler duration
  • timeout and error counts

These metrics make backpressure visible.

Tracing: async makes it more valuable

Distributed tracing becomes more valuable in event-driven systems because a single “request” might trigger many concurrent tasks and downstream calls.

Even without a full tracing stack, you can fake the benefit by measuring durations around awaits and logging them with the same correlation ID.

Testing Event-Driven Code: Make the Loop Your Test Fixture

Async code is testable, but you need to test the right things:

  • timeouts actually time out
  • cancellation leaves no leaked tasks
  • backpressure behaves under load

Test the smallest unit: the handler

If you have an event bus, test handlers as pure async functions:

  • feed them a known event
  • assert on outputs/state

Test orchestration: queue + workers

For orchestration code, I like tests that:

  • enqueue N jobs
  • run workers
  • ensure all jobs processed
  • cancel workers
  • assert no pending tasks

Test time as a dependency

If you have complex timer logic, consider injecting a clock or using loop time (loop.time()) rather than wall time. It reduces flakiness.

Common Pitfalls (and the fixes that actually work)

These are the mistakes I see repeatedly.

Pitfall 1: Blocking the loop accidentally

Symptoms:

  • “random” latency spikes
  • timeouts under load
  • the whole service feels stuck

Causes:

  • time.sleep() inside async def
  • blocking HTTP clients / database drivers inside handlers
  • CPU-heavy parsing/serialization inside the loop

Fix:

  • replace blocking calls with async-native libraries
  • or isolate them with asyncio.to_thread
  • offload CPU-heavy work to processes

Pitfall 2: Unlimited task creation

Symptoms:

  • memory climbs
  • CPU climbs due to scheduling overhead
  • downstream dependencies get hammered

Fix:

  • bounded queue
  • semaphore limits
  • TaskGroup scopes

Pitfall 3: Swallowing cancellation

Symptoms:

  • SIGTERM doesn’t stop the process
  • deploys hang
  • shutdown takes forever

Fix:

  • never blanket-catch exceptions without re-raising CancelledError
  • put cleanup in finally

Pitfall 4: “Async all the way down” pressure

Sometimes teams think everything must become async. That leads to churn and complexity.

Fix:

  • only make the boundaries async where concurrency matters
  • keep pure computation synchronous
  • isolate blocking adapters rather than rewriting the world

When to Use Event-Driven Programming (and When Not To)

Event-driven is a tool, not a religion.

It’s a great fit when you have:

  • lots of concurrent I/O (web servers, bots, socket services)
  • many slow dependencies and you want to stay responsive
  • streaming inputs/outputs
  • queue-based processing

It’s not the best fit when:

  • your workload is mostly CPU-bound and heavy
  • you’re writing a small script that does one thing once
  • your team/tooling ecosystem is heavily synchronous and the complexity cost outweighs benefits

In those cases, threads/processes or simple synchronous code can be the more honest solution.

Alternative Approaches (So You Know Your Options)

Even if you standardize on asyncio, it helps to know what else exists conceptually.

Threads

Threads are event-driven in their own way (preemptive scheduling). They can be simpler for retrofitting blocking libraries, but you pay with:

  • higher overhead per concurrent unit
  • harder shared-state correctness

Multiprocessing

Great for CPU-bound parallelism. Not a replacement for event-driven I/O, but complementary.

Other async ecosystems

There are other async frameworks and concurrency models in Python. Even if you never use them, the ideas (structured concurrency, cancellation semantics, nurseries) influence how you write asyncio code today.

A Practical “Bigger Example”: Event Bus + Worker Pool + Shutdown

To tie the ideas together, here’s what I consider a “starter production shape” for an in-process event-driven service:

  • domain events
  • bounded queue for backpressure
  • worker tasks with concurrency limits
  • timeouts and retries
  • graceful shutdown

This is longer than a toy, but still small enough to understand in one sitting.

import asyncio

import random

from dataclasses import dataclass

from typing import Awaitable, Callable

@dataclass(frozen=True)

class UserSignedUp:

user_id: int

email: str

Event = UserSignedUp

Handler = Callable[[Event], Awaitable[None]]

class Service:

def init(self, *, queue_size: int = 100, workers: int = 4) -> None:

self.q: asyncio.Queue[Event] = asyncio.Queue(maxsize=queuesize)

self._stop = asyncio.Event()

self.workersn = workers

self._handlers: list[Handler] = []

self._sem = asyncio.Semaphore(10) # limit expensive operations

def subscribe(self, handler: Handler) -> None:

self._handlers.append(handler)

async def publish(self, event: Event) -> None:

await self._q.put(event)

async def stop(self) -> None:

self._stop.set()

async def run(self) -> None:

workers = [asyncio.createtask(self.worker(i)) for i in range(self.workersn)]

try:

await self._stop.wait()

# Drain queued work with a deadline.

try:

async with asyncio.timeout(2.0):

await self._q.join()

except TimeoutError:

pass

finally:

for t in workers:

t.cancel()

await asyncio.gather(*workers, return_exceptions=True)

async def _worker(self, idx: int) -> None:

try:

while True:

event = await self._q.get()

try:

await self._dispatch(event)

finally:

self.q.taskdone()

except asyncio.CancelledError:

raise

async def _dispatch(self, event: Event) -> None:

# Fan-out handlers concurrently, but keep it bounded.

async with asyncio.TaskGroup() as tg:

for h in self._handlers:

tg.createtask(self.run_handler(h, event))

async def runhandler(self, h: Handler, event: Event) -> None:

# Bound handler time. In real services, you might use per-handler budgets.

async with asyncio.timeout(1.0):

async with self._sem:

await h(event)

async def send_email(event: Event) -> None:

# Simulate flaky external dependency.

await asyncio.sleep(0.05)

if random.random() < 0.1:

raise RuntimeError("email provider error")

print(f"email to {event.email}")

async def provision_workspace(event: Event) -> None:

await asyncio.sleep(0.08)

print(f"workspace for user={event.user_id}")

async def main() -> None:

svc = Service(queue_size=50, workers=3)

svc.subscribe(send_email)

svc.subscribe(provision_workspace)

runner = asyncio.create_task(svc.run())

for i in range(30):

await svc.publish(UserSignedUp(user_id=i, email=f"u{i}@example.com"))

# Let it work a moment then shut down.

await asyncio.sleep(0.3)

await svc.stop()

await runner

if name == "main":

asyncio.run(main())

What this demonstrates:

  • Backpressure: bounded queue.
  • Concurrency limits: semaphore.
  • Predictable cancellation: workers are cancelled at shutdown.
  • Structured concurrency: TaskGroup per event dispatch.

What it doesn’t include (but you can add next):

  • retries with jitter per handler
  • DLQ for failed events
  • persistence/durability for events
  • per-tenant limits
  • metrics and tracing hooks

Performance Considerations (Practical, Not Magical)

Event-driven programming improves throughput and latency mainly by reducing wasted waiting. But it’s not free.

Where asyncio shines

I see the biggest wins when:

  • average I/O wait time is high relative to CPU time
  • concurrency is high (hundreds to tens of thousands of in-flight operations)
  • you can keep handlers lightweight and cooperative

In these conditions, it’s common to see improvements like:

  • lower tail latency (p95/p99) because one slow request doesn’t hog a worker thread
  • better resource usage because you don’t need one thread per concurrent request

I intentionally think in ranges rather than promises: the win might be “noticeable” or “dramatic” depending on dependencies and workload.

Where asyncio can disappoint

  • If you do heavy CPU work in handlers, you’ll bottleneck on the event loop.
  • If you call blocking libraries without isolation, you lose the benefits.
  • If you create too many tasks, scheduling overhead becomes a bottleneck.

The “real” tuning knobs

When a service is struggling, I usually tune these before anything else:

  • timeouts and deadlines
  • queue sizes
  • semaphores / concurrency limits
  • batching (do fewer, bigger operations)
  • retry policy (including max attempts)

Expansion Strategy

Add new sections or deepen existing ones with:

  • Deeper code examples: More complete, real-world implementations
  • Edge cases: What breaks and how to handle it
  • Practical scenarios: When to use vs when NOT to use
  • Performance considerations: Before/after comparisons (use ranges, not exact numbers)
  • Common pitfalls: Mistakes developers make and how to avoid them
  • Alternative approaches: Different ways to solve the same problem

If Relevant to Topic

  • Modern tooling and AI-assisted workflows (for infrastructure/framework topics)
  • Comparison tables for Traditional vs Modern approaches
  • Production considerations: deployment, monitoring, scaling

If you take nothing else from this: event-driven programming in Python isn’t about sprinkling async everywhere. It’s about designing around waiting, bounding your concurrency, and treating timeouts/cancellation/backpressure as part of your core correctness story. That’s what keeps services responsive when dependencies are slow, traffic is spiky, and the world is messy.

Scroll to Top