A few years ago I debugged a "random" production bug that only appeared under load: a request counter sometimes went backward. The code looked innocent—just counter += 1 from a handful of worker threads. The failure mode was rare enough to slip past testing, but frequent enough to break dashboards and alerting.
The fix was a single lock, but the lesson was bigger: correctness in threaded code isn’t about writing clever logic—it’s about making the rules of shared access explicit. In Python’s threading module, Lock.acquire() is the primitive that lets you do exactly that.
You’ll leave this post knowing how acquire() behaves (blocking vs non-blocking vs timeout), how I structure lock usage so failures are obvious, how to avoid deadlocks, and when I skip locks entirely in favor of higher-level patterns. I’ll also show runnable examples you can paste into a file and execute.
What acquire() Really Buys You (And What It Doesn’t)
When multiple threads touch the same mutable state, you can hit race conditions: two threads interleave operations in a way you didn’t expect. The important detail is that race conditions are not only about “two writes at once”—they’re about read-modify-write sequences that must be treated as one logical action.
A classic case:
- Thread A reads
shared_counter(say it is 41) - Thread B reads
shared_counter(still 41) - Thread A writes 42
- Thread B writes 42
You ran two increments and got one. That’s a data loss bug.
Lock.acquire() is how you say: “Only one thread is allowed to execute this critical section at a time.” Once a thread acquires the lock, other threads attempting to acquire it will either:
- wait (blocking),
- fail immediately (non-blocking), or
- wait up to a limit (timeout).
One misconception I still see in code reviews: “Python has the GIL, so I’m safe.” The GIL does not make your high-level operations atomic. It prevents multiple threads from executing Python bytecode at the exact same instant, but threads can still switch between bytecode instructions, and C extensions may release the GIL during I/O or heavy computation. If correctness depends on “these two operations happen together,” you need explicit synchronization.
On the flip side: locks don’t make code faster. Under contention, they serialize execution. You use them to make a shared invariant true—then you work to keep the locked region as small and predictable as possible.
A mental model that keeps me honest:
- A lock is not “a performance tool.” It’s a correctness boundary.
- The lock’s job is to guard an invariant (e.g., “this dict and this list must be updated together”).
- Once that invariant is stable, you can look for designs that reduce contention.
The Lock.acquire() Contract: Parameters, Return Value, Edge Rules
You create a lock like this:
import threading
lock = threading.Lock()
The signature you’ll see in documentation and docstrings is:
lock.acquire(blocking=True, timeout=-1) -> bool
Here’s how I think about each piece.
blocking (default True)
blocking=True: wait until the lock is available, then acquire it.blocking=False: attempt to acquire it right now.
If blocking=True and you don’t set a timeout, acquire() will normally return True once it succeeds. The wait is interruptible (for example, a KeyboardInterrupt during a wait in a CLI tool).
timeout (default -1)
timeout=-1means “wait forever.”timeoutcan be a float or int number of seconds to wait.
Two rules that matter in real code:
1) timeout must be non-negative, unless it is -1.
2) If you pass blocking=False, you cannot pass a timeout.
This raises a ValueError:
import threading
lock = threading.Lock()
ValueError: can‘t specify a timeout for a non-blocking call
lock.acquire(False, 0.1)
Return value
Truemeans you acquired the lock.Falsemeans you didn’t acquire it (typical withblocking=Falseor a timeout).
That return value is not “nice to have.” It’s the difference between code that degrades gracefully and code that silently runs without the protection it assumed.
If you only remember one thing: when you use timeouts or non-blocking mode, you must treat the False case as a first-class branch, not as an afterthought.
A subtle but important behavior: ownership
A plain threading.Lock is a primitive lock. It is not “owned” by a particular thread. That means:
- Any thread may call
release()(even a different one from the thread that acquired it). - Calling
release()on an unlocked lock raisesRuntimeError.
I consider “release from a different thread” a code smell even if it’s technically allowed, because it becomes very hard to reason about. If you truly need ownership semantics (only the owning thread can release, and re-entrancy is supported), that’s exactly what threading.RLock is for.
A Counter Example: The Buggy Version, the Correct Version, and the Cleanest Version
When I teach locks, I start with something small and real: a shared counter.
Bug: increments without a lock
This version may print a final value smaller than expected under enough contention. On some machines it “looks fine” most of the time—which is exactly why it’s dangerous.
import threading
shared_counter = 0
def incrementwithoutlock():
global shared_counter
# This looks like one operation, but it‘s a read-modify-write sequence.
shared_counter += 1
threads = [threading.Thread(target=incrementwithoutlock) for in range(50000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(‘Final counter value:‘, shared_counter)
Correct: explicit acquire() + try/finally
If I’m writing low-level code where I need the explicit True/False branch, I reach for this pattern:
import threading
lock = threading.Lock()
shared_counter = 0
def incrementwithlock():
global shared_counter
acquired = lock.acquire() # blocking=True by default
if not acquired:
# In practice, this branch won‘t happen in the default blocking mode,
# but it‘s a good habit when you later introduce timeouts.
raise RuntimeError(‘Failed to acquire lock unexpectedly‘)
try:
shared_counter += 1
finally:
lock.release()
threads = [threading.Thread(target=incrementwithlock) for in range(50000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(‘Final counter value:‘, shared_counter)
The key is the finally. If an exception occurs in your critical section (including a bug you haven’t hit yet), you still release the lock.
Cleanest when you always block: with lock:
In codebases I maintain, I prefer the context manager form when timeouts aren’t needed:
import threading
lock = threading.Lock()
shared_counter = 0
def incrementwithcontext_manager():
global shared_counter
# Under the hood, this does an acquire() then release() reliably.
with lock:
shared_counter += 1
threads = [threading.Thread(target=incrementwithcontextmanager) for in range(50_000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(‘Final counter value:‘, shared_counter)
So why talk about acquire() at all if with lock: exists? Because the moment you care about non-blocking behavior, timeouts, backoff, or instrumentation, you’re back to an explicit acquire() and an explicit decision.
Non-Blocking and Timed acquire(): Patterns I Use to Avoid Thread Pileups
Blocking forever is sometimes correct—and sometimes a reliability hazard. In systems that must stay responsive, I like to encode a deadline: “I’ll try to take the lock, but if I can’t, I’ll do something else.”
Pattern 1: Best-effort updates (non-blocking)
Imagine you have a shared in-memory cache of diagnostics that is “nice to have,” not mission-critical. If the lock is busy, you can skip updating that cache rather than stalling a worker.
import threading
import time
lock = threading.Lock()
diagnostics = {‘lastseenms‘: None, ‘updates‘: 0}
def recordheartbeatbest_effort():
# Non-blocking: if another thread is updating diagnostics, skip.
if not lock.acquire(blocking=False):
return
try:
diagnostics[‘lastseenms‘] = int(time.time() * 1000)
diagnostics[‘updates‘] += 1
finally:
lock.release()
threads = [threading.Thread(target=recordheartbeatbesteffort) for in range(1000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(‘Diagnostics:‘, diagnostics)
This is a good fit when:
- correctness doesn’t require every update,
- you’d rather lose a sample than block a worker.
Pattern 2: Timeouts to preserve responsiveness
If skipping work isn’t acceptable, but waiting forever is risky, timeouts give you a controlled failure mode.
import threading
import time
lock = threading.Lock()
def guardedoperation(name: str, timeouts: float) -> None:
acquired = lock.acquire(timeout=timeout_s)
if not acquired:
print(f‘[{name}] lock busy after {timeout_s:.2f}s; falling back‘)
# Fallback could be: return cached data, enqueue for later, or report "busy".
return
try:
print(f‘[{name}] acquired lock; doing protected work‘)
time.sleep(0.2) # Simulate a slow critical section
finally:
lock.release()
print(f‘[{name}] released lock‘)
threads = [
threading.Thread(target=guarded_operation, args=(f‘worker-{i}‘, 0.05))
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
You’ll see some workers fail fast instead of stacking up.
Pattern 3: Timed acquire + backoff (reduces contention)
When many threads are competing, immediate retries can make things worse. A small jittered backoff often helps:
import random
import threading
import time
lock = threading.Lock()
def trywithbackoff(max_attempts: int = 5) -> bool:
for attempt in range(1, max_attempts + 1):
if lock.acquire(timeout=0.05):
return True
# Backoff with jitter: spreads retries out.
sleep_s = (0.01 attempt) + random.random() 0.01
time.sleep(sleep_s)
return False
def worker(name: str) -> None:
if not trywithbackoff():
print(f‘[{name}] gave up acquiring lock‘)
return
try:
print(f‘[{name}] acquired lock‘)
time.sleep(0.1)
finally:
lock.release()
threads = [threading.Thread(target=worker, args=(f‘worker-{i}‘,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
The point isn’t perfection—it’s creating a system that behaves predictably under overload.
Deadlocks and “Lock Hygiene”: How I Keep Threading Bugs Boring
If race conditions are “threads interleave in a surprising way,” deadlocks are “threads stop forever in a boring way.” They happen when two or more threads each hold a lock and wait for another lock held by someone else.
The classic two-lock deadlock
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def task_1():
lock_a.acquire()
try:
time.sleep(0.05)
lock_b.acquire()
try:
print(‘task_1 got both locks‘)
finally:
lock_b.release()
finally:
lock_a.release()
def task_2():
lock_b.acquire()
try:
time.sleep(0.05)
lock_a.acquire()
try:
print(‘task_2 got both locks‘)
finally:
lock_a.release()
finally:
lock_b.release()
threading.Thread(target=task_1).start()
threading.Thread(target=task_2).start()
This can deadlock because the lock acquisition order is inconsistent.
Fix 1: Enforce a global lock order
In production, I standardize the rule: if code must take multiple locks, it must take them in a single canonical order (by name, by object id, by an explicit rank). The simplest version is: always acquire locka then lockb.
In larger codebases, I’ll literally encode this by giving locks a rank and only allowing acquisition in increasing order. If that feels heavy, I at least document it in a short section near the lock definitions.
Fix 2: Use timeouts and fail fast
Timeouts won’t prevent deadlocks, but they turn “hang forever” into “error path you can see.” If you add logging and metrics on failure-to-acquire, you get a fighting chance to diagnose.
A practical pattern is: time out, log the lock name, include the thread name, and surface a clear error. That gives you searchable signals in logs.
Fix 3: Avoid nested locks when you can
My favorite deadlock prevention technique is design: avoid needing multiple locks at once.
- Move slow work (I/O, JSON parsing, compression) outside the lock.
- Copy the data you need while holding the lock, then release and process.
- Use a queue and a single owner thread (message passing) when you can.
Re-entrancy trap: acquiring the same lock twice
threading.Lock is not re-entrant. If the same thread tries to acquire() it twice, it will block forever (or until timeout).
If you truly need the same thread to re-enter a locked region (for example, recursion or layered helpers that must share one lock), you likely want threading.RLock instead. I treat that as a design smell worth investigating, but it’s sometimes the right tool.
Here’s a quick repro that shows why re-entrancy matters:
import threading
lock = threading.Lock()
def outer():
lock.acquire()
try:
inner()
finally:
lock.release()
def inner():
# This will deadlock because outer() already acquired lock.
lock.acquire()
try:
print(‘inner acquired lock‘)
finally:
lock.release()
threading.Thread(target=outer).start()
Switching to threading.RLock() makes this safe (but again: I’d ask why the code needs re-entrancy and whether it can be refactored).
Diagnostics I actually use
When threading issues hit production, you want thread dumps. Two handy tricks:
- Enable
faulthandlerto dump tracebacks on demand or after a timeout. - Name your threads meaningfully (
threading.Thread(name=‘cache-refresher‘, ...)) so logs are readable.
Even a small log line like print(threading.current_thread().name) in a repro script can save you an hour.
If I’m chasing a real hang, I’ll often add a watchdog thread that prints stack traces if the program doesn’t make progress. This doesn’t fix anything, but it makes the bug diagnosable.
Picking the Right Primitive in 2026: Locks, But Also Better Building Blocks
Locks are foundational, but they shouldn’t be your only tool. When I review code, I push for the highest-level primitive that preserves clarity.
Here’s a quick map of common choices:
Best starting tool
—
threading.Lock
threading.RLock
threading.Semaphore
threading.Condition
threading.Event
queue.Queue
And there’s a bigger architectural decision: threads vs async.
- If you are coordinating CPU-bound work in Python, threads won’t scale linearly because of interpreter constraints. You might need processes or native extensions.
- If your workload is mostly I/O (network, disk), async frameworks can be a better match.
That said, don’t mix models casually:
threading.Lockis for threads.asyncio.Lockis for coroutines.
If you call threading.Lock.acquire() inside an async event loop thread, you can block the loop and stall unrelated tasks.
Traditional vs modern patterns (what I recommend now)
Traditional approach
—
manual acquire() + try/finally
with lock: when blocking forever is fine blocking forever
acquire(timeout=...) + fallback lock around dict/list
queue.Queue) + single owner “seems fine” manual tests
In 2026, I also see teams using AI-assisted workflows to generate concurrency stress harnesses and fuzz schedules. That’s helpful, but I keep the human rule: the locking policy must be understandable from reading the code. If the policy needs an AI to explain it, it’s too complex.
Performance and Testing: Making acquire() Safe and Cheap
Locks can be fast when uncontended. In many real systems, the “uncontended” case is the common case: most threads grab the lock immediately, do a tiny update, and leave.
The performance trouble starts when you make one (or more) of these mistakes:
- You hold a lock while doing slow work (network I/O, disk I/O, heavy parsing).
- You make a single global lock guard too many unrelated things.
- You have bursts where many threads wake up and stampede the same lock.
When I’m optimizing a threaded system, I’m usually not trying to shave microseconds off acquire(). I’m trying to reduce contention and tail latency.
Rule 1: Keep the critical section tiny
The best lock is a lock you hold for a short time.
A common refactor I do is: copy shared data under the lock, then process the copy outside the lock.
Bad (holds lock during slow work):
import json
import threading
lock = threading.Lock()
shared_payload = {‘raw‘: ‘{"items": [1,2,3] }‘}
def readandparse():
with lock:
raw = shared_payload[‘raw‘]
# JSON parsing can be slow under load; don‘t do it while holding the lock.
return json.loads(raw)
Better (lock only protects the shared read):
import json
import threading
lock = threading.Lock()
shared_payload = {‘raw‘: ‘{"items": [1,2,3] }‘}
def readandparse():
with lock:
raw = shared_payload[‘raw‘]
# Parse outside the lock.
return json.loads(raw)
Rule 2: Reduce lock granularity (when it actually helps)
Sometimes one global lock is simplest and correct. But when the system grows, a single lock can become a throughput limiter.
A middle-ground pattern I like is striped locks: instead of one lock for a whole dict keyed by many IDs, you use (say) 64 locks and choose one based on the key’s hash. That spreads contention while keeping reasoning manageable.
Here’s a simple striped-lock counter map:
import threading
class StripedCounter:
def init(self, stripes: int = 64):
self.locks = [threading.Lock() for in range(stripes)]
self._counts = {}
def lockfor(self, key):
return self.locks[hash(key) % len(self.locks)]
def inc(self, key, delta: int = 1):
lock = self.lockfor(key)
with lock:
self.counts[key] = self.counts.get(key, 0) + delta
def snapshot(self):
# To keep this example simple, take all locks in a consistent order.
for lock in self._locks:
lock.acquire()
try:
return dict(self._counts)
finally:
for lock in reversed(self._locks):
lock.release()
This is not free: snapshots are more expensive, and you must be careful about acquiring multiple locks (consistent order!). But for “hot keys” workloads, it can reduce lock contention dramatically.
Rule 3: Prefer queues for ownership (remove shared state)
If many threads mutate a shared structure, I ask: can we make a single thread the “owner” of that structure?
With message passing, worker threads send requests to the owner thread via queue.Queue, and only the owner touches the mutable state. That often removes the need for acquire() around the state entirely.
Example: a metrics aggregator that owns the counter dict.
import queue
import threading
import time
updates = queue.Queue()
class Aggregator(threading.Thread):
def init(self):
super().init(name=‘aggregator‘, daemon=True)
self.counts = {}
self.stop_flag = threading.Event()
def run(self):
while not self.stopflag.isset():
try:
key, delta = updates.get(timeout=0.1)
except queue.Empty:
continue
self.counts[key] = self.counts.get(key, 0) + delta
agg = Aggregator()
agg.start()
Workers: no lock required.
def worker(name: str):
for _ in range(1000):
updates.put((‘requests‘, 1))
updates.put((f‘done:{name}‘, 1))
threads = [threading.Thread(target=worker, args=(f‘w{i}‘,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
Give aggregator a moment to drain.
time.sleep(0.2)
agg.stop_flag.set()
print(‘requests:‘, agg.counts.get(‘requests‘))
For many systems, this is the highest-leverage “performance + correctness” win: fewer locks, fewer weird races, fewer deadlocks.
Testing strategy: make races more likely on purpose
Threading bugs love to hide. My testing goal is to make the scheduler interleave operations as often as possible and to make failures reproducible enough to diagnose.
Here are a few tactics I use:
- Use
threading.Barrierto start threads at the same time (creates contention). - Use small sleeps in strategic places (yes, it feels wrong; it forces interleavings).
- Run the test in a loop (hundreds or thousands of iterations).
- Keep assertions simple: totals match, invariants hold, no exceptions.
A small stress harness for a shared counter:
import threading
import time
def runstress(nthreads: int, nincrements: int, uselock: bool) -> int:
lock = threading.Lock()
counter = {‘value‘: 0}
barrier = threading.Barrier(n_threads)
def worker():
barrier.wait()
for in range(nincrements):
if use_lock:
with lock:
counter[‘value‘] += 1
else:
# Intentionally widen the race window.
v = counter[‘value‘]
time.sleep(0)
counter[‘value‘] = v + 1
threads = [threading.Thread(target=worker) for in range(nthreads)]
for t in threads:
t.start()
for t in threads:
t.join()
return counter[‘value‘]
expected = 20 * 500
for i in range(50):
got = runstress(nthreads=20, nincrements=500, uselock=False)
if got != expected:
print(‘Race reproduced on iteration‘, i, ‘got‘, got, ‘expected‘, expected)
break
else:
print(‘Did not reproduce this time (try increasing loops/threads)‘)
print(‘With lock:‘, runstress(nthreads=20, nincrements=500, uselock=True))
This kind of test isn’t perfect, but it turns “I think it’s safe” into “I can see it fail without a lock.”
Measuring contention (without overcomplicating it)
If I suspect contention, I want a quick signal: are threads spending meaningful time waiting to acquire?
A pragmatic pattern is to time acquire() and record slow waits.
import threading
import time
lock = threading.Lock()
def withtiming(timeouts: float):
start = time.perf_counter()
acquired = lock.acquire(timeout=timeout_s)
waited = time.perf_counter() - start
if waited > 0.01:
print(f‘Waited {waited:.4f}s to acquire (acquired={acquired})‘)
return acquired
In real services, I’ll replace prints with metrics (histograms for wait time, counters for timeouts). The purpose is not to obsess over nanoseconds; it’s to notice when a lock becomes a hot spot.
Practical Scenarios: Where I Reach for acquire() in Real Code
The abstract examples are useful, but the real value shows up in everyday patterns.
Scenario 1: Protecting a multi-step invariant
I often see code like this:
- update a dict
- update a list
- update a timestamp
If those three updates must be consistent with each other, they belong in one critical section.
import threading
import time
lock = threading.Lock()
state = {
‘by_id‘: {},
‘recent_ids‘: [],
‘lastupdatems‘: 0,
}
def recorditem(itemid: str, payload: dict):
now_ms = int(time.time() * 1000)
with lock:
state[‘byid‘][itemid] = payload
state[‘recentids‘].append(itemid)
state[‘lastupdatems‘] = now_ms
The key is not “dict updates are thread-safe.” The key is that the relationship between these pieces of state must be kept consistent.
Scenario 2: Time-bounded cache refresh
Refreshing a shared cache is a classic place where I use timeouts.
Goal: only one thread performs the expensive refresh; other threads should either wait briefly or serve stale data.
import threading
import time
refresh_lock = threading.Lock()
cache = {‘value‘: None, ‘updated_at‘: 0.0}
TTL_S = 1.0
def expensive_fetch():
time.sleep(0.2)
return f‘data@{time.time():.3f}‘
def get_value():
now = time.time()
if cache[‘value‘] is not None and now - cache[‘updatedat‘] < TTLS:
return cache[‘value‘]
# Cache is missing/stale; attempt refresh.
acquired = refresh_lock.acquire(timeout=0.05)
if not acquired:
# Another thread is refreshing; serve stale if present.
return cache[‘value‘]
try:
# Double-check after acquiring to avoid redundant refresh.
now = time.time()
if cache[‘value‘] is not None and now - cache[‘updatedat‘] < TTLS:
return cache[‘value‘]
v = expensive_fetch()
cache[‘value‘] = v
cache[‘updated_at‘] = time.time()
return v
finally:
refresh_lock.release()
This pattern gives you:
- correctness: only one refresh at a time
- responsiveness: callers don’t all block behind the refresh
- a graceful fallback: serve stale when busy
The “double-check” inside the lock matters. Without it, you can get a thundering herd where multiple threads decide to refresh, then line up and each refresh in turn.
Scenario 3: Limiting concurrency with Semaphore.acquire()
This post is about Lock.acquire(), but I want to call out a cousin: semaphores also have acquire(blocking=True, timeout=None)-style behavior and return a boolean.
If the real problem is “don’t do more than N things at once,” a semaphore is often clearer than a lock.
import threading
import time
limit = threading.Semaphore(3)
def do_work(i: int):
if not limit.acquire(timeout=0.1):
print(‘busy, dropping task‘, i)
return
try:
print(‘start‘, i)
time.sleep(0.2)
print(‘end‘, i)
finally:
limit.release()
threads = [threading.Thread(target=do_work, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
I reach for this when talking to external systems: database connections, rate-limited APIs, GPU slots, etc.
Common Pitfalls With acquire() (The Stuff I Watch For in Reviews)
These are the mistakes I see most often, and the fixes I recommend.
Pitfall 1: Forgetting finally
If code calls acquire() and then returns early or raises before releasing, you’ve created a permanent lock hold. This can look like a random hang later.
Fix: use with lock: or try/finally.
Pitfall 2: Ignoring the False return
If you use non-blocking or timeout acquire and then proceed as if you acquired, you’ve basically deleted your lock.
Fix: treat False as a real branch and define behavior:
- return stale data
- enqueue for later
- raise a clear exception
- record a metric and short-circuit
Pitfall 3: Holding the lock across I/O
This is the biggest real-world performance killer.
Fix: move I/O outside the lock. If you need to publish results, reacquire to update shared state.
Pitfall 4: Lock order not documented
Multiple locks without an order is a deadlock waiting to happen.
Fix: document a canonical order and enforce it in code review. If the project is large, consider a small helper that acquires locks in sorted order.
Pitfall 5: Using RLock as a band-aid
RLock is valid, but I don’t use it to hide unclear layering. If re-entrancy is required, I want a comment explaining why.
Pitfall 6: Using threading.Lock for cross-process coordination
threading.Lock coordinates threads within one process. It won’t coordinate multiple Python processes.
Fix: for processes, use multiprocessing primitives or OS-level locks (depending on your system and needs).
A Quick Note on Condition: How acquire() Shows Up Indirectly
Even if you don’t call Lock.acquire() directly, you might be using it through higher-level primitives.
A threading.Condition is built around a lock. The pattern is:
- acquire the condition’s lock
- check a predicate
- wait (which temporarily releases the lock)
- when notified, reacquire the lock and re-check
Here’s a small producer/consumer example that shows the shape:
import threading
import time
cond = threading.Condition()
queue = []
def producer():
for i in range(5):
time.sleep(0.05)
with cond:
queue.append(i)
cond.notify()
def consumer():
seen = 0
while seen < 5:
with cond:
while not queue:
cond.wait(timeout=0.2)
item = queue.pop(0)
print(‘consumed‘, item)
seen += 1
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Two acquire()-related takeaways:
with cond:is acquiring the underlying lock.- You must use a
whileloop aroundwait()because you can wake up without your predicate being true (timeouts, spurious wakeups, or multiple consumers).
When I Skip Locks Entirely
This is the part that surprises people: the more concurrency work I do, the more I try to avoid shared mutable state.
I skip locks when:
- I can make data immutable (create new objects instead of mutating shared ones).
- I can use message passing (
queue.Queue) and a single owner. - I can use thread-safe library primitives that already encode the right behavior.
If you’re thinking, “Isn’t that just moving complexity somewhere else?”—yes, but it’s often moving it into simpler, more testable places.
A few examples:
- Instead of a shared list + lock, use
queue.Queue. - Instead of shared counters, aggregate per-thread counts and merge periodically.
- Instead of “every worker updates a global dict,” give each worker its own dict and merge under a single lock at the end.
That last one is a great pattern when you want to reduce contention: do lots of work without locks, then do one short merge step.
My Personal Checklist for acquire() in Production Code
When I’m about to ship something that uses acquire(), I ask myself:
1) What invariant is this lock protecting? Can I describe it in one sentence?
2) Can the critical section be shorter?
3) Can I avoid nested locks? If not, is lock order clearly enforced?
4) Should this be time-bounded (timeout=) to preserve responsiveness?
5) If acquire() fails (False), what is the intended behavior?
6) Do we have visibility (logs/metrics) into wait time and timeouts?
7) Do we have a stress test that tries hard to break it?
If I can’t answer those questions, I usually pause and redesign.
Closing Thought
Lock.acquire() is small, but it’s one of those primitives that changes how you think. Instead of hoping threads behave, you declare the rules: who can touch shared state, when, and what happens under overload.
Once you treat the True/False return as part of your design—not as an awkward API detail—you can build threaded systems that are not only correct, but also resilient: they slow down gracefully, they fail loudly when they must, and they’re diagnosable when something goes wrong.
If you take nothing else from this post: the best threading code doesn’t look clever. It looks obvious.


