[S1.1] QSBR-based dictionary registry implementation#12
Merged
Conversation
Implement the dictionary lifecycle registry per design §4.4, replacing Phase 0 stubs with real QSBR logic. Same function order and names as Phase 0; only stub bodies replaced with implementation. Changes: - compression_registry.c: add/promote, retirement with worker-gen snapshot, grace-period GC, frame-ref accounting, worker quiescent API - compression_registry.h: struct updated (frame_refs, retire_worker_gen), Add gets promote flag, new QSBR functions added, removed <stdatomic.h> - src/unit/Makefile: add ZSTD_LIB to LD_LIBS for unit test linkage - test_compression_header.cpp: init registry in fixture - test_compression_registry.cpp: 15 new unit tests All 308 unit tests + 10 integration tests pass locally.
ikolomi
reviewed
May 28, 2026
ikolomi
reviewed
May 28, 2026
| uint32_t next_id; | ||
| } registry; | ||
|
|
||
| static _Atomic(uint64_t) worker_quiescent_gen[COMPRESSION_DICT_MAX]; |
Owner
There was a problem hiding this comment.
why COMPRESSION_DICT_MAX is used for threads count?
ikolomi
added a commit
that referenced
this pull request
May 28, 2026
Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, bounded inbox + outbox, QSBR-compatible
worker contract, runtime resize, graceful shutdown. The per-job body
is a deliberate placeholder pass-through — workers accept jobs and
post empty results to the outbox without compressing — so end-to-end
plumbing (enqueue, dequeue, drain, shutdown ordering) is exercised
before ZSTD_compress_usingCDict integration lands in S2.5.
Architecture
------------
Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle — the bio pattern. Right fit for a feature
whose default state is enabled-but-quiet, where busy-spin (the
io_threads pattern) would burn a core for no work. mutexQueue is
unbounded by construction, so enqueue does not have a back-pressure
return path in v1; the natural producer-side pacing comes from
compression-sweep-max-cpu-pct (§2.11 R2.11.2). If operational
experience argues for a bounded inbox we'll revisit non-breakingly.
Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Bounded — workers retry on
full rather than drop (discarding completed work would orphan the
caller's incrRefCount on the value robj).
QSBR worker contract (§4.4 step 2-3): each worker carries a stable
worker_id ∈ [0, n_threads). After every job it calls
compressionWorkerReportQuiescent(worker_id) which is implemented in
compression_registry.c (PR #12). The registry's GC pass observes the
per-worker generation counters and frees retired dicts only after
all workers have crossed the retire-time snapshot.
Worker identity is reset on Resize (Stop+Start). The registry's
quiescent_gen counters are NOT reset — generations are monotonic; any
dict that captured an old worker's gen at retire time has either
been freed already (registry GC pass between Stop and Start) or
remains visible to the new worker through normal QSBR semantics.
Why a separate COMPRESSION_WORKERS_MAX
--------------------------------------
The PR #12 registry sizes worker_quiescent_gen[] by COMPRESSION_DICT_MAX,
which is the dict registry cap (also 16). They happen to share the
value but mean different things and could diverge in v2. This PR
introduces COMPRESSION_WORKERS_MAX in compression_workers.h as the
authoritative bound; a follow-up alignment commit on the registry side
will switch worker_quiescent_gen[] to size by COMPRESSION_WORKERS_MAX.
For now the two values are equal so no functional issue.
Lifecycle wiring
----------------
- compressionInit (compression.c) now: registry init → workers Start.
- new compressionShutdown (compression.h, .c) calls Stop then Release,
in that order, satisfying the registry contract that workers must
be joined before the registry is freed.
- finishShutdown (server.c) calls compressionShutdown after
moduleUnloadAllModules so any module that held a compressed value
during cleanup has the path still functional.
- compressionAfterSleep drains up to 256 results per main-loop
iteration. Bound is internal safety, not a config knob; outbox
back-pressure surfaces operationally before we'd raise it.
- CONFIG SET compression-threads now applies via a new
applyCompressionThreads hook in config.c that calls
compressionWorkersResize. Resize is graceful (Stop + Start), so
in-flight items in the inbox are reclaimed cleanly.
Out of scope (later sub-tasks)
------------------------------
- Real ZSTD_compress_usingCDict in the worker → S2.5 (Encoder path).
- Worker-side atomic load of registry->active → S2.5 (worker
consumes job->dict_id snapshot captured at enqueue time, falls
back to active on dict-mismatch).
- compressionEnqueueCandidate caller in dbAdd/dbOverwrite → S2.7
(Write-path hook).
- Real install of dst into robj on outbox drain (createCompressedObject,
registry IncRef, decrRefCount of caller's pin) → S2.5.
- Tcl integration test exercising enqueue/drain end-to-end through a
running server → blocked on S2.7.
Tests
-----
src/unit/test_compression_workers.cpp — 14 gtest cases:
- StartZeroThreadsIsValid (compression-threads=0 disables the pool;
enqueue rejects).
- StartOneThread / StartFourThreads / StartMaxThreads.
- StopIsIdempotent (multiple Stops, including from no-Start).
- EnqueueRejectedBeforeStart.
- DrainBeforeStartReturnsZero.
- SingleJobRoundTrip (enqueue → drain delivers exactly one).
- BurstOf256JobsOneWorker (single-thread serializes).
- BurstOf1024JobsFourWorkers (four-thread concurrency).
- ResizeStartsPoolWhenNotInitialized / ResizeIsNoOpWhenSameCount /
ResizeUpAndDown / ResizeToZeroDisablesPool / ResizeRejectsOutOfRange /
ResizeAcrossEnqueuedJobs.
Verified locally
----------------
- make -j$(nproc) -C src builds clean (server binary).
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2, accepts
CONFIG SET compression-threads {0, 1, 2, 4, 8} at runtime, shuts
down cleanly with all worker threads joined.
Not verified locally (CI will validate):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).
Address Ilia's 6 review comments: 1. Add inline field descriptions to compressionDictPair struct 2. Clarify SWEEP reference in TryGc doc comment 3. Add field descriptions to registry static struct 4. Replace memmove with swap-last in removeFromDicts (order not needed) 5. Add comment clarifying cap bounds retiring dicts (design §4.4 step 7) 6. Use plain 16 for worker gen array (TODO: shared constant in follow-up) Additionally: - Remove the retiring linked list — unnecessary with max 16 entries, GC now scans dicts[] directly checking state == RETIRING - Update detailed-design.md to reflect removal of retiring_list - Add bounds assertion on worker_id in QSBR worker API
78bbc56 to
0828b2f
Compare
Set server.verbosity = LL_NOTHING in test fixtures to prevent serverLog from accessing uninitialized server fields under ASAN.
0828b2f to
5c0008f
Compare
The 5th makeFakeDictPair() is rejected by compressionRegistryAdd when the cap is reached. Caller retains ownership on rejection — must free it explicitly. Fixes ASAN leak detection.
ikolomi
approved these changes
May 31, 2026
ikolomi
added a commit
that referenced
this pull request
May 31, 2026
Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, bounded inbox + outbox, QSBR-compatible
worker contract, runtime resize, graceful shutdown. The per-job body
is a deliberate placeholder pass-through — workers accept jobs and
post empty results to the outbox without compressing — so end-to-end
plumbing (enqueue, dequeue, drain, shutdown ordering) is exercised
before ZSTD_compress_usingCDict integration lands in S2.5.
Architecture
------------
Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle — the bio pattern. Right fit for a feature
whose default state is enabled-but-quiet, where busy-spin (the
io_threads pattern) would burn a core for no work. mutexQueue is
unbounded by construction, so enqueue does not have a back-pressure
return path in v1; the natural producer-side pacing comes from
compression-sweep-max-cpu-pct (§2.11 R2.11.2). If operational
experience argues for a bounded inbox we'll revisit non-breakingly.
Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Bounded — workers retry on
full rather than drop (discarding completed work would orphan the
caller's incrRefCount on the value robj).
QSBR worker contract (§4.4 step 2-3): each worker carries a stable
worker_id ∈ [0, n_threads). After every job it calls
compressionWorkerReportQuiescent(worker_id) which is implemented in
compression_registry.c (PR #12). The registry's GC pass observes the
per-worker generation counters and frees retired dicts only after
all workers have crossed the retire-time snapshot.
Worker identity is reset on Resize (Stop+Start). The registry's
quiescent_gen counters are NOT reset — generations are monotonic; any
dict that captured an old worker's gen at retire time has either
been freed already (registry GC pass between Stop and Start) or
remains visible to the new worker through normal QSBR semantics.
Why a separate COMPRESSION_WORKERS_MAX
--------------------------------------
The PR #12 registry sizes worker_quiescent_gen[] by COMPRESSION_DICT_MAX,
which is the dict registry cap (also 16). They happen to share the
value but mean different things and could diverge in v2. This PR
introduces COMPRESSION_WORKERS_MAX in compression_workers.h as the
authoritative bound; a follow-up alignment commit on the registry side
will switch worker_quiescent_gen[] to size by COMPRESSION_WORKERS_MAX.
For now the two values are equal so no functional issue.
Lifecycle wiring
----------------
- compressionInit (compression.c) now: registry init → workers Start.
- new compressionShutdown (compression.h, .c) calls Stop then Release,
in that order, satisfying the registry contract that workers must
be joined before the registry is freed.
- finishShutdown (server.c) calls compressionShutdown after
moduleUnloadAllModules so any module that held a compressed value
during cleanup has the path still functional.
- compressionAfterSleep drains up to 256 results per main-loop
iteration. Bound is internal safety, not a config knob; outbox
back-pressure surfaces operationally before we'd raise it.
- CONFIG SET compression-threads now applies via a new
applyCompressionThreads hook in config.c that calls
compressionWorkersResize. Resize is graceful (Stop + Start), so
in-flight items in the inbox are reclaimed cleanly.
Out of scope (later sub-tasks)
------------------------------
- Real ZSTD_compress_usingCDict in the worker → S2.5 (Encoder path).
- Worker-side atomic load of registry->active → S2.5 (worker
consumes job->dict_id snapshot captured at enqueue time, falls
back to active on dict-mismatch).
- compressionEnqueueCandidate caller in dbAdd/dbOverwrite → S2.7
(Write-path hook).
- Real install of dst into robj on outbox drain (createCompressedObject,
registry IncRef, decrRefCount of caller's pin) → S2.5.
- Tcl integration test exercising enqueue/drain end-to-end through a
running server → blocked on S2.7.
Tests
-----
src/unit/test_compression_workers.cpp — 14 gtest cases:
- StartZeroThreadsIsValid (compression-threads=0 disables the pool;
enqueue rejects).
- StartOneThread / StartFourThreads / StartMaxThreads.
- StopIsIdempotent (multiple Stops, including from no-Start).
- EnqueueRejectedBeforeStart.
- DrainBeforeStartReturnsZero.
- SingleJobRoundTrip (enqueue → drain delivers exactly one).
- BurstOf256JobsOneWorker (single-thread serializes).
- BurstOf1024JobsFourWorkers (four-thread concurrency).
- ResizeStartsPoolWhenNotInitialized / ResizeIsNoOpWhenSameCount /
ResizeUpAndDown / ResizeToZeroDisablesPool / ResizeRejectsOutOfRange /
ResizeAcrossEnqueuedJobs.
Verified locally
----------------
- make -j$(nproc) -C src builds clean (server binary).
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2, accepts
CONFIG SET compression-threads {0, 1, 2, 4, 8} at runtime, shuts
down cleanly with all worker threads joined.
Not verified locally (CI will validate):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).
ikolomi
added a commit
that referenced
this pull request
Jun 2, 2026
Reflects what actually landed in PR #13 vs. the original one-liner plan entry for S2.4: - mutexqueue dual API (Pop/PopAll keep contract; new PopWakable for wake-aware consumers) - sentinel-based race-free shutdown - retire_n_workers field on compressionDictPair for resize-aware canFree - compressionWorkersGetThreadCount accessor S1.1 marked as merged (PR #12) and noted that PR #13 extended its data structures (retire_n_workers + COMPRESSION_WORKERS_MAX usage). Adds S2.11 — bounded inbox + per-caller back-pressure counters — as an explicit prerequisite before S2.7 (write-path hook) ships. Was mentioned in PR #13 description and SESSION_CHECKPOINT but not tracked in the plan. Squash this into the S2.4 commit at merge time.
ikolomi
added a commit
that referenced
this pull request
Jun 2, 2026
* feat(S2.4): compression worker pool
Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, mutexQueue inbox + mpsc outbox, QSBR
worker contract, runtime resize via Stop+Start, race-free graceful
shutdown via sentinels, grace barriers via wake-all from
compressionRegistryRetire, resize-aware canFree bound. The per-job
body is a deliberate placeholder pass-through — workers accept jobs
and post empty results to the outbox without compressing — so
end-to-end plumbing (enqueue, dequeue, drain, shutdown ordering) is
exercised before ZSTD_compress_usingCDict integration lands in S2.5.
Architecture
============
Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle. Right fit for a feature whose default state
is enabled-but-quiet, where busy-spin would burn a core for no work.
Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Workers retry on full rather
than drop completed work.
QSBR plumbing concerns surfaced during review and fixed in this commit
=====================================================================
QSBR grace barriers (mutexQueueWakeAll + new mutexQueuePopWakable)
------------------------------------------------------------------
The QSBR design (§4.4) requires that idle workers periodically advance
their per-thread quiescent_gen counter even when no jobs are flowing,
so the registry's GC pass can reclaim retired dicts. Without this, a
parked worker's gen stays frozen at the snapshot taken by
compressionRegistryRetire, blocking reclamation indefinitely.
mutexqueue.h is extended with **two new APIs** that preserve the
existing pop contract for current callers (bio):
- mutexQueueWakeAll(q): pthread_cond_broadcast under the mutex; wakes
every consumer parked in cond_wait without enqueuing anything.
- mutexQueuePopWakable(q, blocking=true): a wake-aware pop variant
that returns NULL once on a spurious wake or wake-all, instead of
re-parking. The compression worker uses this; bio continues to use
the unchanged mutexQueuePop.
The original mutexQueuePop / mutexQueuePopAll APIs keep their original
"never returns NULL when blocking" semantics — they re-park on
spurious wakes via the `while` loop. This addresses reviewer feedback
that adding mutexQueueWakeAll should not change behaviour for existing
callers.
compressionRegistryRetire calls compressionWorkersWakeAll() (a thin
wrapper) right after taking the gen snapshot. The compression worker
distinguishes wake reasons in its loop:
- mutexQueuePopWakable returns NULL → grace-barrier wake. Worker
advances its gen via compressionWorkerReportQuiescent and
continues.
- Returns &kShutdownSentinel → see below.
- Returns compressionJob* → real work.
Race-free shutdown via sentinels (kShutdownSentinel)
-----------------------------------------------------
Wake-all alone for shutdown has a small race: a worker that has read
shutdown_requested == 0 at top-of-loop and is about to enter
mutexQueuePopWakable can be preempted; if Stop's broadcast fires in
that window, the broadcast hits no waiters and is lost; the worker
subsequently parks and deadlocks pthread_join.
Sentinels avoid this because they put DATA in the queue. Once Stop
has pushed N sentinels, the next mutexQueuePopWakable sees length>0
under the mutex and pops one without entering cond_wait. The sentinel
is a unique address (static int) distinguishable by pointer equality;
zero memory cost.
Stop pushes N sentinels (matching pool.n_threads). The Start-rollback
path on pthread_create failure does the same.
Wake-all is reserved for grace barriers because missed barrier wakes
are benign — the next event catches the worker. Shutdown is
correctness-critical and cannot tolerate the race.
Resize-aware canFree (retire_n_workers)
=======================================
After a resize-up post-retire, the registry's snapshot may have been
taken with N workers but canFree iterates N+M. Slots beyond N are
zcalloc-default (retire_worker_gen[i]=0) and the new worker starts
at gen[i]=0 — strict-inequality canFree (`gen > retire_worker_gen`)
fails on these slots, blocking reclamation until normal traffic
advances the gen.
compressionDictPair gains an int retire_n_workers field. startRetirement
captures server.compression_threads alongside the gen snapshot. canFree
iterates `min(retire_n_workers, server.compression_threads)`:
- resize-up after retire: don't check new slots. Safe — the new
worker spawned AFTER retire and cannot have observed this dict
(compressionRegistryActive() returns the current active, never
a retiring one).
- resize-down after retire: don't check dead slots. Safe — the
dead worker is joined and cannot hold a pointer.
This eliminates the only theoretical liveness issue with QSBR + resize.
No gen-bumping at Start needed — keeps the QSBR invariant that gens
advance only via the worker reaching quiescence.
Other plumbing
==============
- compressionInit (compression.c): registry init → workers Start.
- compressionShutdown (new): Stop then Release, in that order, before
any registry teardown. Called from finishShutdown in server.c after
moduleUnloadAllModules.
- compressionAfterSleep drains up to 256 results per main-loop tick.
Comment explains the 256 budget rationale (~2.5 ms install time, fits
inside event-loop budget; surfaceable via outbox-backpressure
counter if undersized).
- CONFIG SET compression-threads applies via applyCompressionThreads
→ compressionWorkersResize. Resize is graceful (Stop + Start).
- compressionWorkersGetThreadCount(): test/introspection accessor for
the live pool size — used by unit tests to verify Resize actually
creates/destroys threads.
Worker contract refinements (review feedback)
=============================================
- compressionWorkersEnqueue dropped the `dict_id` parameter. Per the
design (§4.6), the worker loads the active dict at compress time;
the dict_id field on the job struct is filled by the worker after
compression so the resulting dict_id can be carried into the
compressed-frame header.
- Header documents the ownership contract for `key`/`src` sds
pointers: they are borrowed by the pool for the job's lifetime.
In production (S2.5+) the caller's incrRefCount on the value robj
pins the sds until the drain runs decrRefCount. In the unit-test
fixture the test owns and frees the sds directly.
- compression_registry.c uses COMPRESSION_WORKERS_MAX for sizing
worker_quiescent_gen[] instead of the bare constant 16.
Tests
=====
src/unit/test_compression_workers.cpp — extended:
- Lifecycle: Start{Zero,One,Four,Max}Threads + StopIsIdempotent +
ThreadCountIsZeroBeforeStart sanity check; every transition asserts
compressionWorkersGetThreadCount().
- Enqueue/drain edge cases: EnqueueRejectedBeforeStart,
DrainBeforeStartReturnsZero.
- End-to-end: SingleJobRoundTrip, BurstOf256JobsOneWorker,
BurstOf1024JobsFourWorkers — each asserts post-condition that the
outbox is empty after the expected count surfaced.
- Resize: ResizeStartsPoolWhenNotInitialized, ResizeIsNoOpWhenSameCount,
ResizeUpAndDown (each transition checks thread count),
ResizeToZeroDisablesPool, ResizeRejectsOutOfRange,
ResizeAcrossEnqueuedJobs (drop contract made explicit in comment +
asserted via EXPECT_LE/GE).
- Out-of-range Start is documented as assertion-based (caller-bug);
out-of-range public surface is exercised via Resize.
Design doc
==========
§4.4 and §4.6 in detailed-design.md reflect the wake-all/sentinel
split and the retire_n_workers field.
Verified locally
================
- make -j builds clean.
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2,
CONFIG SET compression-threads {0, 1, 2, 4, 8} all work at runtime,
shutdown joins all workers cleanly via the sentinel path.
Not verified locally (CI validates):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).
* docs(plan): record S2.4 delivery; add S2.11 bounded-inbox follow-up
Reflects what actually landed in PR #13 vs. the original one-liner plan
entry for S2.4:
- mutexqueue dual API (Pop/PopAll keep contract; new PopWakable for
wake-aware consumers)
- sentinel-based race-free shutdown
- retire_n_workers field on compressionDictPair for resize-aware
canFree
- compressionWorkersGetThreadCount accessor
S1.1 marked as merged (PR #12) and noted that PR #13 extended its
data structures (retire_n_workers + COMPRESSION_WORKERS_MAX usage).
Adds S2.11 — bounded inbox + per-caller back-pressure counters — as
an explicit prerequisite before S2.7 (write-path hook) ships. Was
mentioned in PR #13 description and SESSION_CHECKPOINT but not
tracked in the plan. Squash this into the S2.4 commit at merge time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements the dictionary lifecycle registry per design §4.4, replacing Phase 0 stubs with real QSBR (Quiescent-State-Based Reclamation) logic.
Review guidance
Same function order and names as Phase 0 — only stub bodies replaced with implementation. No renames, no reordering.
What changed
src/compression_registry.csrc/compression_registry.hframe_refs,retire_worker_gen[]),compressionRegistryAddgetspromoteflag, new QSBR functions at end, removed<stdatomic.h>src/unit/MakefileZSTD_LIBtoLD_LIBS(unit test binary needs libzstd for registry ZSTD calls)src/unit/test_compression_header.cppsrc/unit/test_compression_registry.cppTesting
./runtest --single unit/type/compression)Note for @ikolomi
test_compression_header.cppnow initializes the registry in SetUp — needed becausecompressionRegistryIncRefis no longer a stub.compressionRegistryAddsignature changed: addedint promoteparameter (0=retiring for RDB load, 1=active for training/import).