Skip to content

[S1.1] QSBR-based dictionary registry implementation#12

Merged
ikolomi merged 5 commits into
unstablefrom
gilboa/s1-dict-registry-qsbr
May 31, 2026
Merged

[S1.1] QSBR-based dictionary registry implementation#12
ikolomi merged 5 commits into
unstablefrom
gilboa/s1-dict-registry-qsbr

Conversation

@GilboaAWS

Copy link
Copy Markdown
Collaborator

Summary

Implements the dictionary lifecycle registry per design §4.4, replacing Phase 0 stubs with real QSBR (Quiescent-State-Based Reclamation) logic.

Review guidance

Same function order and names as Phase 0 — only stub bodies replaced with implementation. No renames, no reordering.

What changed

File Change
src/compression_registry.c Stub bodies → QSBR implementation (add/promote, retirement, GC, frame-ref accounting, worker quiescent API)
src/compression_registry.h Struct updated (frame_refs, retire_worker_gen[]), compressionRegistryAdd gets promote flag, new QSBR functions at end, removed <stdatomic.h>
src/unit/Makefile Added ZSTD_LIB to LD_LIBS (unit test binary needs libzstd for registry ZSTD calls)
src/unit/test_compression_header.cpp Added registry init to fixture (IncRef is no longer a no-op)
src/unit/test_compression_registry.cpp 15 new unit tests

Testing

  • All 308 unit tests pass locally (including Ilia's CompressionHeaderTest)
  • 10/10 integration tests pass (./runtest --single unit/type/compression)

Note for @ikolomi

  • test_compression_header.cpp now initializes the registry in SetUp — needed because compressionRegistryIncRef is no longer a stub.
  • compressionRegistryAdd signature changed: added int promote parameter (0=retiring for RDB load, 1=active for training/import).

Implement the dictionary lifecycle registry per design §4.4, replacing
Phase 0 stubs with real QSBR logic. Same function order and names as
Phase 0; only stub bodies replaced with implementation.

Changes:
- compression_registry.c: add/promote, retirement with worker-gen
  snapshot, grace-period GC, frame-ref accounting, worker quiescent API
- compression_registry.h: struct updated (frame_refs, retire_worker_gen),
  Add gets promote flag, new QSBR functions added, removed <stdatomic.h>
- src/unit/Makefile: add ZSTD_LIB to LD_LIBS for unit test linkage
- test_compression_header.cpp: init registry in fixture
- test_compression_registry.cpp: 15 new unit tests

All 308 unit tests + 10 integration tests pass locally.
@GilboaAWS GilboaAWS requested a review from ikolomi May 28, 2026 14:12
@GilboaAWS GilboaAWS marked this pull request as ready for review May 28, 2026 14:18
Comment thread src/compression_registry.h Outdated
Comment thread src/compression_registry.h Outdated
Comment thread src/compression_registry.c Outdated
Comment thread src/compression_registry.c Outdated
Comment thread src/compression_registry.c Outdated
Comment thread src/compression_registry.c Outdated
uint32_t next_id;
} registry;

static _Atomic(uint64_t) worker_quiescent_gen[COMPRESSION_DICT_MAX];

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why COMPRESSION_DICT_MAX is used for threads count?

ikolomi added a commit that referenced this pull request May 28, 2026
Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, bounded inbox + outbox, QSBR-compatible
worker contract, runtime resize, graceful shutdown. The per-job body
is a deliberate placeholder pass-through — workers accept jobs and
post empty results to the outbox without compressing — so end-to-end
plumbing (enqueue, dequeue, drain, shutdown ordering) is exercised
before ZSTD_compress_usingCDict integration lands in S2.5.

Architecture
------------

Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle — the bio pattern. Right fit for a feature
whose default state is enabled-but-quiet, where busy-spin (the
io_threads pattern) would burn a core for no work. mutexQueue is
unbounded by construction, so enqueue does not have a back-pressure
return path in v1; the natural producer-side pacing comes from
compression-sweep-max-cpu-pct (§2.11 R2.11.2). If operational
experience argues for a bounded inbox we'll revisit non-breakingly.

Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Bounded — workers retry on
full rather than drop (discarding completed work would orphan the
caller's incrRefCount on the value robj).

QSBR worker contract (§4.4 step 2-3): each worker carries a stable
worker_id ∈ [0, n_threads). After every job it calls
compressionWorkerReportQuiescent(worker_id) which is implemented in
compression_registry.c (PR #12). The registry's GC pass observes the
per-worker generation counters and frees retired dicts only after
all workers have crossed the retire-time snapshot.

Worker identity is reset on Resize (Stop+Start). The registry's
quiescent_gen counters are NOT reset — generations are monotonic; any
dict that captured an old worker's gen at retire time has either
been freed already (registry GC pass between Stop and Start) or
remains visible to the new worker through normal QSBR semantics.

Why a separate COMPRESSION_WORKERS_MAX
--------------------------------------

The PR #12 registry sizes worker_quiescent_gen[] by COMPRESSION_DICT_MAX,
which is the dict registry cap (also 16). They happen to share the
value but mean different things and could diverge in v2. This PR
introduces COMPRESSION_WORKERS_MAX in compression_workers.h as the
authoritative bound; a follow-up alignment commit on the registry side
will switch worker_quiescent_gen[] to size by COMPRESSION_WORKERS_MAX.
For now the two values are equal so no functional issue.

Lifecycle wiring
----------------

- compressionInit (compression.c) now: registry init → workers Start.
- new compressionShutdown (compression.h, .c) calls Stop then Release,
  in that order, satisfying the registry contract that workers must
  be joined before the registry is freed.
- finishShutdown (server.c) calls compressionShutdown after
  moduleUnloadAllModules so any module that held a compressed value
  during cleanup has the path still functional.
- compressionAfterSleep drains up to 256 results per main-loop
  iteration. Bound is internal safety, not a config knob; outbox
  back-pressure surfaces operationally before we'd raise it.
- CONFIG SET compression-threads now applies via a new
  applyCompressionThreads hook in config.c that calls
  compressionWorkersResize. Resize is graceful (Stop + Start), so
  in-flight items in the inbox are reclaimed cleanly.

Out of scope (later sub-tasks)
------------------------------

- Real ZSTD_compress_usingCDict in the worker → S2.5 (Encoder path).
- Worker-side atomic load of registry->active → S2.5 (worker
  consumes job->dict_id snapshot captured at enqueue time, falls
  back to active on dict-mismatch).
- compressionEnqueueCandidate caller in dbAdd/dbOverwrite → S2.7
  (Write-path hook).
- Real install of dst into robj on outbox drain (createCompressedObject,
  registry IncRef, decrRefCount of caller's pin) → S2.5.
- Tcl integration test exercising enqueue/drain end-to-end through a
  running server → blocked on S2.7.

Tests
-----

src/unit/test_compression_workers.cpp — 14 gtest cases:

- StartZeroThreadsIsValid (compression-threads=0 disables the pool;
  enqueue rejects).
- StartOneThread / StartFourThreads / StartMaxThreads.
- StopIsIdempotent (multiple Stops, including from no-Start).
- EnqueueRejectedBeforeStart.
- DrainBeforeStartReturnsZero.
- SingleJobRoundTrip (enqueue → drain delivers exactly one).
- BurstOf256JobsOneWorker (single-thread serializes).
- BurstOf1024JobsFourWorkers (four-thread concurrency).
- ResizeStartsPoolWhenNotInitialized / ResizeIsNoOpWhenSameCount /
  ResizeUpAndDown / ResizeToZeroDisablesPool / ResizeRejectsOutOfRange /
  ResizeAcrossEnqueuedJobs.

Verified locally
----------------

- make -j$(nproc) -C src builds clean (server binary).
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2, accepts
  CONFIG SET compression-threads {0, 1, 2, 4, 8} at runtime, shuts
  down cleanly with all worker threads joined.

Not verified locally (CI will validate):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).
GilboaAWS added 2 commits May 31, 2026 08:23
Address Ilia's 6 review comments:
1. Add inline field descriptions to compressionDictPair struct
2. Clarify SWEEP reference in TryGc doc comment
3. Add field descriptions to registry static struct
4. Replace memmove with swap-last in removeFromDicts (order not needed)
5. Add comment clarifying cap bounds retiring dicts (design §4.4 step 7)
6. Use plain 16 for worker gen array (TODO: shared constant in follow-up)

Additionally:
- Remove the retiring linked list — unnecessary with max 16 entries,
  GC now scans dicts[] directly checking state == RETIRING
- Update detailed-design.md to reflect removal of retiring_list
- Add bounds assertion on worker_id in QSBR worker API
@GilboaAWS GilboaAWS force-pushed the gilboa/s1-dict-registry-qsbr branch from 78bbc56 to 0828b2f Compare May 31, 2026 08:47
Set server.verbosity = LL_NOTHING in test fixtures to prevent
serverLog from accessing uninitialized server fields under ASAN.
@GilboaAWS GilboaAWS force-pushed the gilboa/s1-dict-registry-qsbr branch from 0828b2f to 5c0008f Compare May 31, 2026 08:59
The 5th makeFakeDictPair() is rejected by compressionRegistryAdd when
the cap is reached. Caller retains ownership on rejection — must free
it explicitly. Fixes ASAN leak detection.
@ikolomi ikolomi merged commit 6dcf52b into unstable May 31, 2026
79 checks passed
ikolomi added a commit that referenced this pull request May 31, 2026
Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, bounded inbox + outbox, QSBR-compatible
worker contract, runtime resize, graceful shutdown. The per-job body
is a deliberate placeholder pass-through — workers accept jobs and
post empty results to the outbox without compressing — so end-to-end
plumbing (enqueue, dequeue, drain, shutdown ordering) is exercised
before ZSTD_compress_usingCDict integration lands in S2.5.

Architecture
------------

Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle — the bio pattern. Right fit for a feature
whose default state is enabled-but-quiet, where busy-spin (the
io_threads pattern) would burn a core for no work. mutexQueue is
unbounded by construction, so enqueue does not have a back-pressure
return path in v1; the natural producer-side pacing comes from
compression-sweep-max-cpu-pct (§2.11 R2.11.2). If operational
experience argues for a bounded inbox we'll revisit non-breakingly.

Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Bounded — workers retry on
full rather than drop (discarding completed work would orphan the
caller's incrRefCount on the value robj).

QSBR worker contract (§4.4 step 2-3): each worker carries a stable
worker_id ∈ [0, n_threads). After every job it calls
compressionWorkerReportQuiescent(worker_id) which is implemented in
compression_registry.c (PR #12). The registry's GC pass observes the
per-worker generation counters and frees retired dicts only after
all workers have crossed the retire-time snapshot.

Worker identity is reset on Resize (Stop+Start). The registry's
quiescent_gen counters are NOT reset — generations are monotonic; any
dict that captured an old worker's gen at retire time has either
been freed already (registry GC pass between Stop and Start) or
remains visible to the new worker through normal QSBR semantics.

Why a separate COMPRESSION_WORKERS_MAX
--------------------------------------

The PR #12 registry sizes worker_quiescent_gen[] by COMPRESSION_DICT_MAX,
which is the dict registry cap (also 16). They happen to share the
value but mean different things and could diverge in v2. This PR
introduces COMPRESSION_WORKERS_MAX in compression_workers.h as the
authoritative bound; a follow-up alignment commit on the registry side
will switch worker_quiescent_gen[] to size by COMPRESSION_WORKERS_MAX.
For now the two values are equal so no functional issue.

Lifecycle wiring
----------------

- compressionInit (compression.c) now: registry init → workers Start.
- new compressionShutdown (compression.h, .c) calls Stop then Release,
  in that order, satisfying the registry contract that workers must
  be joined before the registry is freed.
- finishShutdown (server.c) calls compressionShutdown after
  moduleUnloadAllModules so any module that held a compressed value
  during cleanup has the path still functional.
- compressionAfterSleep drains up to 256 results per main-loop
  iteration. Bound is internal safety, not a config knob; outbox
  back-pressure surfaces operationally before we'd raise it.
- CONFIG SET compression-threads now applies via a new
  applyCompressionThreads hook in config.c that calls
  compressionWorkersResize. Resize is graceful (Stop + Start), so
  in-flight items in the inbox are reclaimed cleanly.

Out of scope (later sub-tasks)
------------------------------

- Real ZSTD_compress_usingCDict in the worker → S2.5 (Encoder path).
- Worker-side atomic load of registry->active → S2.5 (worker
  consumes job->dict_id snapshot captured at enqueue time, falls
  back to active on dict-mismatch).
- compressionEnqueueCandidate caller in dbAdd/dbOverwrite → S2.7
  (Write-path hook).
- Real install of dst into robj on outbox drain (createCompressedObject,
  registry IncRef, decrRefCount of caller's pin) → S2.5.
- Tcl integration test exercising enqueue/drain end-to-end through a
  running server → blocked on S2.7.

Tests
-----

src/unit/test_compression_workers.cpp — 14 gtest cases:

- StartZeroThreadsIsValid (compression-threads=0 disables the pool;
  enqueue rejects).
- StartOneThread / StartFourThreads / StartMaxThreads.
- StopIsIdempotent (multiple Stops, including from no-Start).
- EnqueueRejectedBeforeStart.
- DrainBeforeStartReturnsZero.
- SingleJobRoundTrip (enqueue → drain delivers exactly one).
- BurstOf256JobsOneWorker (single-thread serializes).
- BurstOf1024JobsFourWorkers (four-thread concurrency).
- ResizeStartsPoolWhenNotInitialized / ResizeIsNoOpWhenSameCount /
  ResizeUpAndDown / ResizeToZeroDisablesPool / ResizeRejectsOutOfRange /
  ResizeAcrossEnqueuedJobs.

Verified locally
----------------

- make -j$(nproc) -C src builds clean (server binary).
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2, accepts
  CONFIG SET compression-threads {0, 1, 2, 4, 8} at runtime, shuts
  down cleanly with all worker threads joined.

Not verified locally (CI will validate):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).
ikolomi added a commit that referenced this pull request Jun 2, 2026
Reflects what actually landed in PR #13 vs. the original one-liner plan
entry for S2.4:

  - mutexqueue dual API (Pop/PopAll keep contract; new PopWakable for
    wake-aware consumers)
  - sentinel-based race-free shutdown
  - retire_n_workers field on compressionDictPair for resize-aware
    canFree
  - compressionWorkersGetThreadCount accessor

S1.1 marked as merged (PR #12) and noted that PR #13 extended its
data structures (retire_n_workers + COMPRESSION_WORKERS_MAX usage).

Adds S2.11 — bounded inbox + per-caller back-pressure counters — as
an explicit prerequisite before S2.7 (write-path hook) ships. Was
mentioned in PR #13 description and SESSION_CHECKPOINT but not
tracked in the plan. Squash this into the S2.4 commit at merge time.
ikolomi added a commit that referenced this pull request Jun 2, 2026
* feat(S2.4): compression worker pool

Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, mutexQueue inbox + mpsc outbox, QSBR
worker contract, runtime resize via Stop+Start, race-free graceful
shutdown via sentinels, grace barriers via wake-all from
compressionRegistryRetire, resize-aware canFree bound. The per-job
body is a deliberate placeholder pass-through — workers accept jobs
and post empty results to the outbox without compressing — so
end-to-end plumbing (enqueue, dequeue, drain, shutdown ordering) is
exercised before ZSTD_compress_usingCDict integration lands in S2.5.

Architecture
============

Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle. Right fit for a feature whose default state
is enabled-but-quiet, where busy-spin would burn a core for no work.

Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Workers retry on full rather
than drop completed work.

QSBR plumbing concerns surfaced during review and fixed in this commit
=====================================================================

QSBR grace barriers (mutexQueueWakeAll + new mutexQueuePopWakable)
------------------------------------------------------------------

The QSBR design (§4.4) requires that idle workers periodically advance
their per-thread quiescent_gen counter even when no jobs are flowing,
so the registry's GC pass can reclaim retired dicts. Without this, a
parked worker's gen stays frozen at the snapshot taken by
compressionRegistryRetire, blocking reclamation indefinitely.

mutexqueue.h is extended with **two new APIs** that preserve the
existing pop contract for current callers (bio):

  - mutexQueueWakeAll(q): pthread_cond_broadcast under the mutex; wakes
    every consumer parked in cond_wait without enqueuing anything.
  - mutexQueuePopWakable(q, blocking=true): a wake-aware pop variant
    that returns NULL once on a spurious wake or wake-all, instead of
    re-parking. The compression worker uses this; bio continues to use
    the unchanged mutexQueuePop.

The original mutexQueuePop / mutexQueuePopAll APIs keep their original
"never returns NULL when blocking" semantics — they re-park on
spurious wakes via the `while` loop. This addresses reviewer feedback
that adding mutexQueueWakeAll should not change behaviour for existing
callers.

compressionRegistryRetire calls compressionWorkersWakeAll() (a thin
wrapper) right after taking the gen snapshot. The compression worker
distinguishes wake reasons in its loop:

  - mutexQueuePopWakable returns NULL → grace-barrier wake. Worker
    advances its gen via compressionWorkerReportQuiescent and
    continues.
  - Returns &kShutdownSentinel → see below.
  - Returns compressionJob* → real work.

Race-free shutdown via sentinels (kShutdownSentinel)
-----------------------------------------------------

Wake-all alone for shutdown has a small race: a worker that has read
shutdown_requested == 0 at top-of-loop and is about to enter
mutexQueuePopWakable can be preempted; if Stop's broadcast fires in
that window, the broadcast hits no waiters and is lost; the worker
subsequently parks and deadlocks pthread_join.

Sentinels avoid this because they put DATA in the queue. Once Stop
has pushed N sentinels, the next mutexQueuePopWakable sees length>0
under the mutex and pops one without entering cond_wait. The sentinel
is a unique address (static int) distinguishable by pointer equality;
zero memory cost.

Stop pushes N sentinels (matching pool.n_threads). The Start-rollback
path on pthread_create failure does the same.

Wake-all is reserved for grace barriers because missed barrier wakes
are benign — the next event catches the worker. Shutdown is
correctness-critical and cannot tolerate the race.

Resize-aware canFree (retire_n_workers)
=======================================

After a resize-up post-retire, the registry's snapshot may have been
taken with N workers but canFree iterates N+M. Slots beyond N are
zcalloc-default (retire_worker_gen[i]=0) and the new worker starts
at gen[i]=0 — strict-inequality canFree (`gen > retire_worker_gen`)
fails on these slots, blocking reclamation until normal traffic
advances the gen.

compressionDictPair gains an int retire_n_workers field. startRetirement
captures server.compression_threads alongside the gen snapshot. canFree
iterates `min(retire_n_workers, server.compression_threads)`:

  - resize-up after retire: don't check new slots. Safe — the new
    worker spawned AFTER retire and cannot have observed this dict
    (compressionRegistryActive() returns the current active, never
    a retiring one).
  - resize-down after retire: don't check dead slots. Safe — the
    dead worker is joined and cannot hold a pointer.

This eliminates the only theoretical liveness issue with QSBR + resize.
No gen-bumping at Start needed — keeps the QSBR invariant that gens
advance only via the worker reaching quiescence.

Other plumbing
==============

- compressionInit (compression.c): registry init → workers Start.
- compressionShutdown (new): Stop then Release, in that order, before
  any registry teardown. Called from finishShutdown in server.c after
  moduleUnloadAllModules.
- compressionAfterSleep drains up to 256 results per main-loop tick.
  Comment explains the 256 budget rationale (~2.5 ms install time, fits
  inside event-loop budget; surfaceable via outbox-backpressure
  counter if undersized).
- CONFIG SET compression-threads applies via applyCompressionThreads
  → compressionWorkersResize. Resize is graceful (Stop + Start).
- compressionWorkersGetThreadCount(): test/introspection accessor for
  the live pool size — used by unit tests to verify Resize actually
  creates/destroys threads.

Worker contract refinements (review feedback)
=============================================

- compressionWorkersEnqueue dropped the `dict_id` parameter. Per the
  design (§4.6), the worker loads the active dict at compress time;
  the dict_id field on the job struct is filled by the worker after
  compression so the resulting dict_id can be carried into the
  compressed-frame header.
- Header documents the ownership contract for `key`/`src` sds
  pointers: they are borrowed by the pool for the job's lifetime.
  In production (S2.5+) the caller's incrRefCount on the value robj
  pins the sds until the drain runs decrRefCount. In the unit-test
  fixture the test owns and frees the sds directly.
- compression_registry.c uses COMPRESSION_WORKERS_MAX for sizing
  worker_quiescent_gen[] instead of the bare constant 16.

Tests
=====

src/unit/test_compression_workers.cpp — extended:
- Lifecycle: Start{Zero,One,Four,Max}Threads + StopIsIdempotent +
  ThreadCountIsZeroBeforeStart sanity check; every transition asserts
  compressionWorkersGetThreadCount().
- Enqueue/drain edge cases: EnqueueRejectedBeforeStart,
  DrainBeforeStartReturnsZero.
- End-to-end: SingleJobRoundTrip, BurstOf256JobsOneWorker,
  BurstOf1024JobsFourWorkers — each asserts post-condition that the
  outbox is empty after the expected count surfaced.
- Resize: ResizeStartsPoolWhenNotInitialized, ResizeIsNoOpWhenSameCount,
  ResizeUpAndDown (each transition checks thread count),
  ResizeToZeroDisablesPool, ResizeRejectsOutOfRange,
  ResizeAcrossEnqueuedJobs (drop contract made explicit in comment +
  asserted via EXPECT_LE/GE).
- Out-of-range Start is documented as assertion-based (caller-bug);
  out-of-range public surface is exercised via Resize.

Design doc
==========

§4.4 and §4.6 in detailed-design.md reflect the wake-all/sentinel
split and the retire_n_workers field.

Verified locally
================

- make -j builds clean.
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2,
  CONFIG SET compression-threads {0, 1, 2, 4, 8} all work at runtime,
  shutdown joins all workers cleanly via the sentinel path.

Not verified locally (CI validates):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).

* docs(plan): record S2.4 delivery; add S2.11 bounded-inbox follow-up

Reflects what actually landed in PR #13 vs. the original one-liner plan
entry for S2.4:

  - mutexqueue dual API (Pop/PopAll keep contract; new PopWakable for
    wake-aware consumers)
  - sentinel-based race-free shutdown
  - retire_n_workers field on compressionDictPair for resize-aware
    canFree
  - compressionWorkersGetThreadCount accessor

S1.1 marked as merged (PR #12) and noted that PR #13 extended its
data structures (retire_n_workers + COMPRESSION_WORKERS_MAX usage).

Adds S2.11 — bounded inbox + per-caller back-pressure counters — as
an explicit prerequisite before S2.7 (write-path hook) ships. Was
mentioned in PR #13 description and SESSION_CHECKPOINT but not
tracked in the plan. Squash this into the S2.4 commit at merge time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants