Skip to content

[S2.4] Compression worker pool#13

Merged
ikolomi merged 2 commits into
unstablefrom
ikolomi/s2-worker-pool
Jun 2, 2026
Merged

[S2.4] Compression worker pool#13
ikolomi merged 2 commits into
unstablefrom
ikolomi/s2-worker-pool

Conversation

@ikolomi

@ikolomi ikolomi commented May 28, 2026

Copy link
Copy Markdown
Owner

Summary

Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real pool: pthread-managed workers, mutexQueue inbox + mpsc outbox, QSBR worker contract, runtime resize via Stop+Start, race-free graceful shutdown via sentinels, grace barriers via wake-all from compressionRegistryRetire, resize-aware canFree bound. The per-job body is a deliberate placeholder pass-through — workers accept jobs and post empty results to the outbox without compressing — so end-to-end plumbing (enqueue, dequeue, drain, shutdown ordering) is exercised before ZSTD_compress_usingCDict integration lands in S2.5.

Architecture

Component Implementation Why
Inbox mutexQueue (cond-var blocking pop, FIFO) Workers park when idle — bio pattern. Right fit for enabled-but-quiet default state.
Outbox mpscQueue (lock-free) Main thread polls non-blocking from compressionAfterSleep.
Worker contract Atomic load of active dict; ZSTD compress (placeholder in S2.4); MPSC post; compressionWorkerReportQuiescent QSBR per §4.4. Workers never touch robj or registry state directly.
Pool sizing compression-threads (0..16) Matches R2.11.1. 0 disables the pool; resize via Stop+Start.

Two QSBR plumbing concerns surfaced during review and fixed in this commit

Grace barriers (mutexQueueWakeAll)

The QSBR design (§4.4) requires that idle workers periodically advance their per-thread quiescent_gen counter even when no jobs are flowing, so the registry's GC pass can reclaim retired dicts. Without this, a parked worker's gen stays frozen at the snapshot taken by compressionRegistryRetire, blocking reclamation indefinitely.

mutexqueue.h gains mutexQueueWakeAll: a pthread_cond_broadcast on the queue's cond var that wakes all parked consumers without enqueuing anything. mutexQueuePop and mutexQueuePopAll change while (length==0)if (length==0) so they may return NULL on a spurious wake; callers using mutexQueueWakeAll must handle the NULL.

bio.c gets a defensive if (!job) continue; (bio doesn't currently call wake-all but the contract change makes the null path observable in principle).

compressionRegistryRetire calls compressionWorkersWakeAll() (a thin wrapper) right after taking the gen snapshot.

Race-free shutdown (kShutdownSentinel)

Wake-all alone for shutdown has a small race: a worker that has read shutdown_requested == 0 at top-of-loop and is about to enter mutexQueuePop can be preempted; if Stop's broadcast fires in that window, the broadcast hits no waiters and is lost; the worker subsequently parks and deadlocks pthread_join.

Sentinels avoid this because they put DATA in the queue. Once Stop has pushed N sentinels, the next mutexQueuePop sees length>0 under the mutex and pops one without entering cond_wait. The sentinel is a unique address (static int) distinguishable by pointer equality; zero memory cost.

Stop pushes N sentinels (matching pool.n_threads). Worker loop distinguishes:

  • NULL → grace barrier wake → ReportQuiescent + continue.
  • &kShutdownSentinel → shutdown → ReportQuiescent + break.
  • compressionJob* → process.

Wake-all is reserved for grace barriers because missed barrier wakes are benign — the next event catches the worker. Shutdown is correctness-critical and cannot tolerate the race.

Resize-aware canFree (retire_n_workers)

After a resize-up post-retire, the registry's snapshot may have been taken with N workers but canFree would iterate N+M. Slots beyond N are zcalloc-default (retire_worker_gen[i]=0) and the new worker starts at gen[i]=0 — strict-inequality canFree (gen > retire_worker_gen) fails on these slots, blocking reclamation.

compressionDictPair gains an int retire_n_workers field. startRetirement captures server.compression_threads alongside the gen snapshot. canFree iterates min(retire_n_workers, server.compression_threads):

  • resize-up after retire: don't check new slots. Safe — the new worker spawned AFTER retire and cannot have observed this dict (compressionRegistryActive() returns the current active, never a retiring one).
  • resize-down after retire: don't check dead slots. Safe — the dead worker is joined and cannot hold a pointer.

This eliminates the only theoretical liveness issue with QSBR + resize. No gen-bumping at Start needed — keeps the QSBR invariant that gens advance only via the worker reaching quiescence.

Other plumbing

  • compressionInit (compression.c): registry init → workers Start.
  • compressionShutdown (new): Stop then Release, in that order, before any registry teardown. Called from finishShutdown in server.c after moduleUnloadAllModules.
  • compressionAfterSleep drains up to 256 results per main-loop tick.
  • CONFIG SET compression-threads applies via applyCompressionThreadscompressionWorkersResize. Resize is graceful (Stop + Start).

Out of scope (later sub-tasks)

  • Real ZSTD_compress_usingCDict in the worker → S2.5 (Encoder path).
  • compressionEnqueueCandidate caller in dbAdd/dbOverwriteS2.7.
  • Real install of dst into robj on outbox drain → S2.5.
  • Bounded inbox + per-caller back-pressure counters → S2.x.

Tests

src/unit/test_compression_workers.cpp — 14 gtest cases:

  • Lifecycle: StartZeroThreadsIsValid, StartOneThread, StartFourThreads, StartMaxThreads, StopIsIdempotent.
  • Enqueue/drain edge cases: EnqueueRejectedBeforeStart, DrainBeforeStartReturnsZero.
  • End-to-end: SingleJobRoundTrip, BurstOf256JobsOneWorker, BurstOf1024JobsFourWorkers.
  • Resize: ResizeStartsPoolWhenNotInitialized, ResizeIsNoOpWhenSameCount, ResizeUpAndDown, ResizeToZeroDisablesPool, ResizeRejectsOutOfRange, ResizeAcrossEnqueuedJobs.

Verified locally

  • make -j builds clean.
  • ./runtest --single unit/type/compression: 10/10 passes.
  • Manual smoke test: server boots with compression-threads=2, CONFIG SET compression-threads {0, 1, 2, 4, 8} all work at runtime, shutdown joins all workers cleanly via the sentinel path.

Not verified locally (CI validates):

  • gtest unit tests on Linux/macOS/32-bit.
  • clang-format-18.

@ikolomi ikolomi requested a review from GilboaAWS May 28, 2026 19:49
@ikolomi ikolomi force-pushed the ikolomi/s2-worker-pool branch from d05f564 to 03bb927 Compare May 28, 2026 20:13
Comment thread src/compression_workers.c Outdated
@ikolomi ikolomi force-pushed the ikolomi/s2-worker-pool branch 2 times, most recently from 90e0d24 to 8cd7ac5 Compare June 2, 2026 07:31
@ikolomi ikolomi changed the title Ikolomi/s2 worker pool [S2.4] Compression worker pool Jun 2, 2026
Comment thread src/mutexqueue.h
Comment thread src/compression.c
Comment thread src/compression_workers.h
Comment thread src/compression_workers.c
Comment thread src/unit/test_compression_workers.cpp
Comment thread src/unit/test_compression_workers.cpp
Comment thread src/unit/test_compression_workers.cpp
Comment thread src/unit/test_compression_workers.cpp
@ikolomi

ikolomi commented Jun 2, 2026

Copy link
Copy Markdown
Owner Author

Also need to check that all the design docs (in .agents/planning/realtime-data-compression folder are alligned/updated)

@ikolomi ikolomi force-pushed the ikolomi/s2-worker-pool branch from 8cd7ac5 to f26fcf0 Compare June 2, 2026 09:53
@ikolomi

ikolomi commented Jun 2, 2026

Copy link
Copy Markdown
Owner Author

Updated. detailed-design.md §4.4 now describes the retire_n_workers field on compressionDictPair, and §4.6 has been rewritten to reflect the dual mutexqueue API (the standard mutexQueuePop/PopAll keep their original 'never NULL on blocking' contract; the new mutexQueuePopWakable is the wake-aware variant) plus the sentinel-based shutdown design. Both are part of the amended commit on this PR.

Replaces the Phase 0 stub at src/compression_workers.{c,h} with a real
pool: pthread-managed workers, mutexQueue inbox + mpsc outbox, QSBR
worker contract, runtime resize via Stop+Start, race-free graceful
shutdown via sentinels, grace barriers via wake-all from
compressionRegistryRetire, resize-aware canFree bound. The per-job
body is a deliberate placeholder pass-through — workers accept jobs
and post empty results to the outbox without compressing — so
end-to-end plumbing (enqueue, dequeue, drain, shutdown ordering) is
exercised before ZSTD_compress_usingCDict integration lands in S2.5.

Architecture
============

Inbox: mutexQueue (src/mutexqueue.h). Cond-var blocking gives us
worker parking when idle. Right fit for a feature whose default state
is enabled-but-quiet, where busy-spin would burn a core for no work.

Outbox: mpscQueue from src/queues.h. Lock-free; the main thread polls
non-blocking from compressionAfterSleep. Workers retry on full rather
than drop completed work.

QSBR plumbing concerns surfaced during review and fixed in this commit
=====================================================================

QSBR grace barriers (mutexQueueWakeAll + new mutexQueuePopWakable)
------------------------------------------------------------------

The QSBR design (§4.4) requires that idle workers periodically advance
their per-thread quiescent_gen counter even when no jobs are flowing,
so the registry's GC pass can reclaim retired dicts. Without this, a
parked worker's gen stays frozen at the snapshot taken by
compressionRegistryRetire, blocking reclamation indefinitely.

mutexqueue.h is extended with **two new APIs** that preserve the
existing pop contract for current callers (bio):

  - mutexQueueWakeAll(q): pthread_cond_broadcast under the mutex; wakes
    every consumer parked in cond_wait without enqueuing anything.
  - mutexQueuePopWakable(q, blocking=true): a wake-aware pop variant
    that returns NULL once on a spurious wake or wake-all, instead of
    re-parking. The compression worker uses this; bio continues to use
    the unchanged mutexQueuePop.

The original mutexQueuePop / mutexQueuePopAll APIs keep their original
"never returns NULL when blocking" semantics — they re-park on
spurious wakes via the `while` loop. This addresses reviewer feedback
that adding mutexQueueWakeAll should not change behaviour for existing
callers.

compressionRegistryRetire calls compressionWorkersWakeAll() (a thin
wrapper) right after taking the gen snapshot. The compression worker
distinguishes wake reasons in its loop:

  - mutexQueuePopWakable returns NULL → grace-barrier wake. Worker
    advances its gen via compressionWorkerReportQuiescent and
    continues.
  - Returns &kShutdownSentinel → see below.
  - Returns compressionJob* → real work.

Race-free shutdown via sentinels (kShutdownSentinel)
-----------------------------------------------------

Wake-all alone for shutdown has a small race: a worker that has read
shutdown_requested == 0 at top-of-loop and is about to enter
mutexQueuePopWakable can be preempted; if Stop's broadcast fires in
that window, the broadcast hits no waiters and is lost; the worker
subsequently parks and deadlocks pthread_join.

Sentinels avoid this because they put DATA in the queue. Once Stop
has pushed N sentinels, the next mutexQueuePopWakable sees length>0
under the mutex and pops one without entering cond_wait. The sentinel
is a unique address (static int) distinguishable by pointer equality;
zero memory cost.

Stop pushes N sentinels (matching pool.n_threads). The Start-rollback
path on pthread_create failure does the same.

Wake-all is reserved for grace barriers because missed barrier wakes
are benign — the next event catches the worker. Shutdown is
correctness-critical and cannot tolerate the race.

Resize-aware canFree (retire_n_workers)
=======================================

After a resize-up post-retire, the registry's snapshot may have been
taken with N workers but canFree iterates N+M. Slots beyond N are
zcalloc-default (retire_worker_gen[i]=0) and the new worker starts
at gen[i]=0 — strict-inequality canFree (`gen > retire_worker_gen`)
fails on these slots, blocking reclamation until normal traffic
advances the gen.

compressionDictPair gains an int retire_n_workers field. startRetirement
captures server.compression_threads alongside the gen snapshot. canFree
iterates `min(retire_n_workers, server.compression_threads)`:

  - resize-up after retire: don't check new slots. Safe — the new
    worker spawned AFTER retire and cannot have observed this dict
    (compressionRegistryActive() returns the current active, never
    a retiring one).
  - resize-down after retire: don't check dead slots. Safe — the
    dead worker is joined and cannot hold a pointer.

This eliminates the only theoretical liveness issue with QSBR + resize.
No gen-bumping at Start needed — keeps the QSBR invariant that gens
advance only via the worker reaching quiescence.

Other plumbing
==============

- compressionInit (compression.c): registry init → workers Start.
- compressionShutdown (new): Stop then Release, in that order, before
  any registry teardown. Called from finishShutdown in server.c after
  moduleUnloadAllModules.
- compressionAfterSleep drains up to 256 results per main-loop tick.
  Comment explains the 256 budget rationale (~2.5 ms install time, fits
  inside event-loop budget; surfaceable via outbox-backpressure
  counter if undersized).
- CONFIG SET compression-threads applies via applyCompressionThreads
  → compressionWorkersResize. Resize is graceful (Stop + Start).
- compressionWorkersGetThreadCount(): test/introspection accessor for
  the live pool size — used by unit tests to verify Resize actually
  creates/destroys threads.

Worker contract refinements (review feedback)
=============================================

- compressionWorkersEnqueue dropped the `dict_id` parameter. Per the
  design (§4.6), the worker loads the active dict at compress time;
  the dict_id field on the job struct is filled by the worker after
  compression so the resulting dict_id can be carried into the
  compressed-frame header.
- Header documents the ownership contract for `key`/`src` sds
  pointers: they are borrowed by the pool for the job's lifetime.
  In production (S2.5+) the caller's incrRefCount on the value robj
  pins the sds until the drain runs decrRefCount. In the unit-test
  fixture the test owns and frees the sds directly.
- compression_registry.c uses COMPRESSION_WORKERS_MAX for sizing
  worker_quiescent_gen[] instead of the bare constant 16.

Tests
=====

src/unit/test_compression_workers.cpp — extended:
- Lifecycle: Start{Zero,One,Four,Max}Threads + StopIsIdempotent +
  ThreadCountIsZeroBeforeStart sanity check; every transition asserts
  compressionWorkersGetThreadCount().
- Enqueue/drain edge cases: EnqueueRejectedBeforeStart,
  DrainBeforeStartReturnsZero.
- End-to-end: SingleJobRoundTrip, BurstOf256JobsOneWorker,
  BurstOf1024JobsFourWorkers — each asserts post-condition that the
  outbox is empty after the expected count surfaced.
- Resize: ResizeStartsPoolWhenNotInitialized, ResizeIsNoOpWhenSameCount,
  ResizeUpAndDown (each transition checks thread count),
  ResizeToZeroDisablesPool, ResizeRejectsOutOfRange,
  ResizeAcrossEnqueuedJobs (drop contract made explicit in comment +
  asserted via EXPECT_LE/GE).
- Out-of-range Start is documented as assertion-based (caller-bug);
  out-of-range public surface is exercised via Resize.

Design doc
==========

§4.4 and §4.6 in detailed-design.md reflect the wake-all/sentinel
split and the retire_n_workers field.

Verified locally
================

- make -j builds clean.
- ./runtest --single unit/type/compression: 10/10 passes.
- Manual smoke test: server boots with compression-threads=2,
  CONFIG SET compression-threads {0, 1, 2, 4, 8} all work at runtime,
  shutdown joins all workers cleanly via the sentinel path.

Not verified locally (CI validates):
- gtest unit tests on Linux/macOS/32-bit (no libgtest-dev locally).
- clang-format-18 (not installed locally).
@ikolomi ikolomi force-pushed the ikolomi/s2-worker-pool branch from f26fcf0 to ac8631d Compare June 2, 2026 10:20
Reflects what actually landed in PR #13 vs. the original one-liner plan
entry for S2.4:

  - mutexqueue dual API (Pop/PopAll keep contract; new PopWakable for
    wake-aware consumers)
  - sentinel-based race-free shutdown
  - retire_n_workers field on compressionDictPair for resize-aware
    canFree
  - compressionWorkersGetThreadCount accessor

S1.1 marked as merged (PR #12) and noted that PR #13 extended its
data structures (retire_n_workers + COMPRESSION_WORKERS_MAX usage).

Adds S2.11 — bounded inbox + per-caller back-pressure counters — as
an explicit prerequisite before S2.7 (write-path hook) ships. Was
mentioned in PR #13 description and SESSION_CHECKPOINT but not
tracked in the plan. Squash this into the S2.4 commit at merge time.
@ikolomi ikolomi merged commit 73cff46 into unstable Jun 2, 2026
58 checks passed
ikolomi added a commit that referenced this pull request Jun 2, 2026
Reconcile the planning docs with the implementation-phase changes
that landed after the original design walkthrough.

  - idea-honing.md Q9: add a "Superseded during implementation"
    annotation pointing to the new training-knob model from PR #14
    (compression-dict-min-training-keys / -max-training-keys /
    compression-training-buffer-size). The original walkthrough text
    is preserved below the annotation for historical record.

  - detailed-design.md §7.1 transparency-mode example: replace
    --compression-dict-first-training-keys-count (removed in PR #14)
    with --compression-dict-min-training-keys.

  - detailed-design.md §2.12 heading: "Advanced knobs (12)" → "(11)".
    Off-by-one count introduced when PR #14 reshuffled the table.

  - summary.md: add a new "What changed during implementation"
    section between the walkthrough section and the plan summary,
    capturing the three substantive post-walkthrough refinements
    (PR #14 training rewrite, PR #15 R2.5.6 read-hot gap, PR #13
    QSBR plumbing).

Doc-only; no code changes.
ikolomi added a commit that referenced this pull request Jun 7, 2026
Two reviewer threads addressed:

Thread #1 (T-3369017721) — production code carrying test concerns

  The drain handler had a `if (job->value == NULL)` branch that only
  existed to handle test-only jobs from
  testOnlyCompressionWorkersEnqueueRaw. Reviewer correctly pointed out
  that production code shouldn't carry test-only branches.

  Fix: replaced with serverAssert(job->value != NULL) at the top of
  the per-job loop. Production drain assumes every job has a real
  pinned robj; tests must extract their value=NULL jobs via
  testOnlyCompressionWorkersDrainOutbox before this drain runs.

  Side effect: removed the conditional `if (job->value != NULL)`
  guards around decrRefCount and the install branch — the top-of-loop
  assert means every code path can assume value is non-NULL.

Thread #2 (T-3356207626) — design doc out of sync with implementation

  Design §4.6 still described the original version-counter approach
  for staleness detection (`uint64_t version` field on compressionJob,
  "if version counter moved, discard"). The implementation has used
  pointer equality + the incrRefCount-pin since S2.4 PR #13.

  Fix: updated §4.6 to:
    - compressionJob struct: drop `version`, drop `robj *key`, add
      `robj *value` (pinned via incrRefCount), and `sds src` and
      `int dbid` separately, matching the actual struct.
    - Concurrency notes: replaced the "version counter moved" bullet
      with the pointer-equality + ABA-safety reasoning, naming the
      incrRefCount-reserves-the-address invariant as the protection
      mechanism (same property explained in PR #18 review).

Verified locally:
  - make -j2 -C src              → clean
  - ./runtest --single unit/type/compression  → 10/10 pass
ikolomi added a commit that referenced this pull request Jun 7, 2026
* [S2.7] Compression write-path hook

Wires compressionEnqueueCandidate into dbAddInternal and dbSetValue,
and replaces the TODO(S2.7) placeholder in the drain handler with a
real install path. With this change, writes to eligible STRING values
get queued for background compression and the result is installed back
into the kvstore as an OBJ_ENCODING_COMPRESSED robj.

The decoder (S2.6) is shipped but not yet wired into read paths (S2.8),
so as long as compression-enabled stays no (default), behavior is
unchanged. Once an operator turns the switch on, written values get
compressed, but reads return the compressed bytes until S2.8 lands.
Existing transparency tests verify no regression in the default-off
configuration.

Producer side (compression.c, db.c)

  Two seams in db.c — end of dbAddInternal and end of dbSetValue —
  call compressionEnqueueCandidate(key, value, db->id). The candidate
  function applies four guards:
    1. Master switch (compression_enabled, via compressionIsEligible).
    2. R2.2 eligibility (type/encoding/size/hot-key — also via predicate).
    3. R2.1.5 active-dict check — saves an allocator round-trip when
       compression-enabled=yes but training hasn't completed.
    4. incrRefCount(value) — pins the bytes for the worker AND
       reserves the robj address for the drain handler's pointer-
       equality stale check (ABA-safe per R2.4.4 + the lifetime
       discussion in PR #18).

  If the worker pool refuses (not started; future S2.11 inbox full),
  the pin is released immediately. RDB-load enqueue is deliberately
  skipped — TODO(S2.10): the sweep tick will rediscover RDB-loaded
  values without hammering the inbox during load.

API change: compressionWorkersEnqueue

  Old: compressionWorkersEnqueue(sds key, int dbid, uint64_t version, sds src)
  New: compressionWorkersEnqueue(robj *value, int dbid)

  The new form requires a pinned robj; the worker reads
  objectGetVal(value) once at enqueue (captured into job->src) and
  never touches the robj afterwards (R2.11.4 intact). The drain
  handler uses job->value for the kvstore lookup and the pointer-
  equality stale check.

  The version field is gone — pointer equality, made ABA-safe by the
  pin, is sufficient. R2.4.4 explains why: holding incrRefCount(value)
  prevents the allocator from reusing the address while the job is
  in flight.

Drain install (compression_workers.c)

  New compressionInstall() helper:
    1. void **slot = kvstoreHashtableFindRef(db->keys, didx, key_sds);
    2. If slot == NULL OR *slot != job->value: stale (overwrite, expire,
       or COW). Discard.
    3. Else: createCompressedObject(OBJ_STRING, job->dst, job->dst_len);
       dbReplaceValue installs.
    4. compressionRegistryIncRef(job->dict_id) on success.

  dbReplaceValue routes through dbSetValue(..., overwrite=0, ...),
  which does NOT call signalModifiedKey, moduleNotifyKeyUnlink, or
  signalDeletedKeyAsReady. Background compression is a storage-only
  change per R2.9.2 — no WATCH dirty_cas, no client-side-caching
  invalidations, no keyspace notifications.

  Pin released on every drain completion path (success, stale-discard,
  net-savings reject, ZSTD error, no-active-dict). Test-mode jobs
  (job->value == NULL) skip both install and decRef.

Test migration

  The 15 existing test-fixture call sites passed raw sds + dummy
  version. Migrated to a new testOnlyCompressionWorkersEnqueueRaw(src,
  dbid) that sets job->value = NULL. Tests extract jobs via
  testOnlyCompressionWorkersDrainOutbox before the production drain
  runs, so production-only paths (install, decRef) are never reached
  by the value=NULL sentinel.

  No new gtest cases for the install path itself — that requires a
  fully-initialized server.db / kvstore that the unit-test environment
  doesn't construct. End-to-end coverage will come from the Tcl
  transparency harness once S2.8 wires the read path.

TODO(S4.1) markers added at:
  - compressionInstall: compression_compressions_per_sec, EMA fold,
    compression_compressed_objects.
  - compressionEnqueueCandidate: compression_candidates_dropped_total
    when S2.11 lands (today the pool-not-started rejection is a
    config state, not back-pressure).

Verified locally:
  - make -j2 -C src              → clean (BUILD_ZSTD=yes default).
  - make -j2 -C src BUILD_ZSTD=no → clean.
  - ./runtest --single unit/type/compression → 10/10 pass.

  gtest unit tests not runnable locally; CI validates.

Diff stat:

  .../implementation/plan.md                |   4 +-
  src/compression.c                         |  35 +++-
  src/compression.h                         |  27 ++-
  src/compression_workers.c                 | 185 +++++++++++++++------
  src/compression_workers.h                 |  56 +++----
  src/db.c                                  |  14 ++
  src/unit/test_compression_workers.cpp     |  31 ++--
  7 files changed, 244 insertions(+), 108 deletions(-)

* [S2.7] PR #19 review: assert + design-doc alignment

Two reviewer threads addressed:

Thread #1 (T-3369017721) — production code carrying test concerns

  The drain handler had a `if (job->value == NULL)` branch that only
  existed to handle test-only jobs from
  testOnlyCompressionWorkersEnqueueRaw. Reviewer correctly pointed out
  that production code shouldn't carry test-only branches.

  Fix: replaced with serverAssert(job->value != NULL) at the top of
  the per-job loop. Production drain assumes every job has a real
  pinned robj; tests must extract their value=NULL jobs via
  testOnlyCompressionWorkersDrainOutbox before this drain runs.

  Side effect: removed the conditional `if (job->value != NULL)`
  guards around decrRefCount and the install branch — the top-of-loop
  assert means every code path can assume value is non-NULL.

Thread #2 (T-3356207626) — design doc out of sync with implementation

  Design §4.6 still described the original version-counter approach
  for staleness detection (`uint64_t version` field on compressionJob,
  "if version counter moved, discard"). The implementation has used
  pointer equality + the incrRefCount-pin since S2.4 PR #13.

  Fix: updated §4.6 to:
    - compressionJob struct: drop `version`, drop `robj *key`, add
      `robj *value` (pinned via incrRefCount), and `sds src` and
      `int dbid` separately, matching the actual struct.
    - Concurrency notes: replaced the "version counter moved" bullet
      with the pointer-equality + ABA-safety reasoning, naming the
      incrRefCount-reserves-the-address invariant as the protection
      mechanism (same property explained in PR #18 review).

Verified locally:
  - make -j2 -C src              → clean
  - ./runtest --single unit/type/compression  → 10/10 pass

* [S2.7] Fix CI: remove erroneous & on server.db indexing

build-32bit (and the 30+ downstream cells, all CI cells use -Werror):

  compression_workers.c:531:20: error: initialization of 'serverDb *'
    from incompatible pointer type 'serverDb **'
    [-Werror=incompatible-pointer-types]

`server.db` is `serverDb **` (array of pointers, one per DB). So
`server.db[i]` is already `serverDb *` — the address-of operator was
redundant and produced `serverDb **`.

Fix: drop the `&`. Matches the pattern used everywhere else in the
codebase (db.c, server.c, etc.).

Local make didn't catch this — the default SERVER_CFLAGS doesn't
include -Werror. CI does. Built locally with `make SERVER_CFLAGS=-Werror`
to confirm clean.

* [S2.7] Fix CI: tests must use testOnly drain for value=NULL jobs

5 gtest cases failed on build-32bit (and would on every test cell)
with the new production-drain serverAssert(job->value != NULL):

  ASSERTION FAILED: compression_workers.c:591 'job->value != NULL'

  in: SingleJobRoundTrip, BurstOf256JobsOneWorker,
      BurstOf1024JobsFourWorkers, ResizeAcrossEnqueuedJobs,
      NetSavingsGuardRejectsIncompressible

Root cause: the previous commit's reviewer-driven hardening (PR #19
review thread #1) made the production drain assert that every job
has a non-NULL pinned robj. The premise was "tests use the testOnly
drain to extract jobs before the production drain runs". That premise
was wrong — many tests ALSO call compressionWorkersDrainOutbox
directly to consume-and-dispose test-mode jobs (the drainUntil helper
is the most-used path).

Fix: add testOnlyCompressionWorkersDrainAndDispose(budget) — pulls
jobs via the existing testOnlyCompressionWorkersDrainOutbox, frees
them via testOnlyCompressionWorkersFreeJob, returns count. Migrate
the test fixture's drainUntil helper and all 8 direct
compressionWorkersDrainOutbox call sites in the test file to the
new helper.

Production drain stays clean — no test concerns. Reviewer thread #1
intent preserved.

Verified locally:
  - make -j2 -C src SERVER_CFLAGS=-Werror   → clean
  - ./runtest --single unit/type/compression → 10/10 pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant