Skip to content

[S2.6] Compression decoder path#18

Merged
ikolomi merged 3 commits into
unstablefrom
ikolomi/s2-decoder-path
Jun 4, 2026
Merged

[S2.6] Compression decoder path#18
ikolomi merged 3 commits into
unstablefrom
ikolomi/s2-decoder-path

Conversation

@ikolomi

@ikolomi ikolomi commented Jun 3, 2026

Copy link
Copy Markdown
Owner

Summary

Replaces the stub objectGetUncompressedView in src/compression.c with a real synchronous main-thread decoder. The decoder is not yet wired into any read path (getCommand, replication feed, etc.) — that's S2.8 (read-path hook). This PR delivers the helper and its gtest coverage so S2.8 can plumb it in without touching the decompression logic itself.

Signature deviation from design (R2.5.2 + §4.3 updated in this PR)

Original objectGetUncompressedView(robj *o, sds *scratch)
Now objectGetUncompressedView(robj *o, sds *scratch, robj *view_out)

A heap-allocated view robj would cost ~16 ns/read even at jemalloc fast path. With ~41k decompressions/sec from §5.6 that's ~0.7 ms/sec of CPU on the main thread, just to allocate-and-free a scratch container that the caller is going to discard immediately. Stack-allocated view_out (refcount = OBJ_STATIC_REFCOUNT) eliminates that allocation. Matches the existing initStaticStringObject pattern in server.h.

Caller pattern (will land in S2.8):

robj  view;
sds   scratch = NULL;
robj *u = objectGetUncompressedView(o, &scratch, &view);
if (u == NULL) { addReplyError(c, "..."); sdsfree(scratch); return; }
addReplyBulk(c, u);
if (u != o) sdsfree(scratch);

Architecture

Concern Choice Why
DCtx lifecycle File-static main-thread singleton, lazy-allocated on first call, freed in compressionShutdown ZSTD_DCtx is not thread-safe, but v1 decompression is sync-on-main-thread (R2.5.1), so one instance suffices. Reusing the DCtx across calls amortizes the per-context setup cost — same pattern the per-worker CCtx uses on the encoder side.
Error model Return NULL on any failure Corruption-class events; operators can't act differently on them. All failures log at LL_WARNING and have TODO(S4.1): markers naming the counter contributions. Caller (S2.8) translates NULL into -ERR compressed value corrupt per §6.2.
USE_ZSTD gating Whole decompression branch behind #ifdef; uncompressed fast path always compiled With USE_ZSTD off, the encoding-not-COMPRESSED fast path is the only reachable case. Reaching the decompression branch with a compressed robj indicates either memory corruption or a buffer surviving a build-mode change → serverPanic.
Hot path Uncompressed values pay one branch and zero allocations Compressed values go through seven validation branches (header length, magic, length match, dict lookup, DCtx alloc, ZSTD_decompress, size match) — all with explicit error logs.
Scratch ownership Caller owns *scratch; may be NULL on first call (helper allocs); helper grows on demand via sdsMakeRoomFor Lets replication feed reuse a scratch across many decompressions. Matches design §2.5 verbatim.

What this PR does NOT do

  • Wire objectGetUncompressedView into getCommand, the replication feed, AOF writer, or RDB save path. S2.8 (read-path hook).
  • INFO counters (compression_errors_total, compression_decompressions_per_sec). Three TODO(S4.1): markers in the decoder name the counter contributions S4.1 must wire.
  • Async / worker-side decompression. Explicit v2 (Appendix C.2).
  • Latency-monitor decompress-sync event. S4.3 (Gilboa's track).

Decoder body — the seven validation branches

robj *objectGetUncompressedView(robj *o, sds *scratch, robj *view_out) {
    serverAssert(scratch != NULL);
    serverAssert(view_out != NULL);

    if (o->encoding != OBJ_ENCODING_COMPRESSED) return o;  /* hot path */

#ifdef USE_ZSTD
    /* 1. Buffer big enough for header? */
    /* 2. compressionHeaderDecode → magic OK? */
    /* 3. header.compressed_len + HEADER == buf_len? */
    /* 4. registry lookup by dict_id → DDict found? */
    /* 5. DCtx lazy alloc → success? */
    /* 6. ZSTD_decompress_usingDDict → no error? */
    /* 7. got == header.uncompressed_len? */
    /* All seven branches: serverLog(LL_WARNING, ...) + TODO(S4.1) marker + return NULL */

    /* Build view_out (refcount=OBJ_STATIC_REFCOUNT, encoding=RAW). */
    return view_out;
#else
    serverPanic("OBJ_ENCODING_COMPRESSED encountered with USE_ZSTD disabled");
#endif
}

Tests

src/unit/test_compression_workers.cpp — 6 new gtest cases gated #ifdef USE_ZSTD:

Test What it verifies
DecoderRoundTripsEncoder Encode via worker, decode via helper, byte-equal source. Reuses S2.5's installSyntheticDict.
DecoderPassthroughOnUncompressed Non-COMPRESSED robj returns itself, scratch and view_out untouched.
DecoderRejectsBadAlgMagic Hand-crafted bad magic → NULL.
DecoderRejectsMissingDict Encode, wipe registry, decode → NULL.
DecoderReusesScratchAcrossCalls Three back-to-back decodes with the same scratch sds; verify all decode correctly and scratch grows without leaking.
DecoderAllocatesScratchOnFirstCall *scratch = NULL handled gracefully (helper does sdsempty()).

Verified locally

  • make -j2 -C src → clean (BUILD_ZSTD=yes default).
  • make -j2 -C src BUILD_ZSTD=no → clean.
  • ./runtest --single unit/type/compression → 10/10 pass.

Not verified locally (CI gates)

  • gtest unit tests (libgtest-dev not installed in this dev environment).
  • clang-format-18.

Diff stat

.../design/detailed-design.md             |  10 +-
.../implementation/plan.md                |   2 +-
src/compression.c                         | 201 +++++++++++-
src/compression.h                         |  48 ++-
src/unit/test_compression_workers.cpp     | 353 ++++++++++++++++++
5 files changed, 593 insertions(+), 21 deletions(-)

Replaces the stub objectGetUncompressedView in compression.c with a
real synchronous main-thread decoder. The decoder is NOT yet wired
into any read path (getCommand, replication feed, etc.) — that's
S2.8 (read-path hook). This PR delivers the helper and its gtest
coverage so S2.8 can plumb it in without touching the decompression
logic itself.

Signature deviation from design (R2.5.2 + §4.3 updated):

  Original:  objectGetUncompressedView(robj *o, sds *scratch);
  Now:       objectGetUncompressedView(robj *o, sds *scratch, robj *view_out);

Rationale: a heap-allocated view robj would cost ~16 ns/read even at
jemalloc fast path. With ~41k decompressions/sec from §5.6 that's
~0.7 ms/sec of CPU on the main thread, just to allocate-and-free a
scratch container that the caller is going to discard immediately.
Stack-allocated view_out (refcount=OBJ_STATIC_REFCOUNT) eliminates
that allocation. Matches the existing initStaticStringObject pattern
in server.h.

Caller pattern (will land in S2.8):

  robj  view;
  sds   scratch = NULL;
  robj *u = objectGetUncompressedView(o, &scratch, &view);
  if (u == NULL) { addReplyError(c, "..."); sdsfree(scratch); return; }
  addReplyBulk(c, u);
  if (u != o) sdsfree(scratch);

Architecture

  Per-thread DCtx → file-static main-thread singleton. ZSTD_DCtx is
  not thread-safe, but v1 decompression is sync-on-main-thread per
  R2.5.1, so one instance suffices. Lazy-allocated on first call;
  freed in compressionShutdown (added cleanup hook).

  Error model → return NULL on any failure (corrupt header, missing
  dict, ZSTD error, OOM). All failures log at LL_WARNING and have
  TODO(S4.1) markers naming the counter contributions
  (compression_errors_total). Caller (S2.8) translates NULL into
  -ERR compressed value corrupt per §6.2. Helper deliberately
  doesn't know about clients.

  USE_ZSTD gating → entire decompression branch behind #ifdef. With
  USE_ZSTD off, the encoding!=COMPRESSED fast path is the only
  reachable case; reaching the decompression branch with a
  compressed robj indicates either memory corruption or a buffer
  surviving a build-mode change (RDB load with USE_ZSTD off would
  have decompressed inline per R2.6.3). serverPanic in that case —
  silently returning NULL would mask the inconsistency.

  Hot path → uncompressed values pay one branch and zero
  allocations. Compressed values go through 7 branches:
    1. Header length sanity check
    2. compressionHeaderDecode (validates alg_magic)
    3. Buffer length matches header's compressed_len + HEADER_SIZE
    4. compressionRegistryLookup (dict_id → DDict)
    5. Lazy DCtx allocation (only on first call)
    6. ZSTD_decompress_usingDDict
    7. Decompressed size matches header's uncompressed_len
  All seven paths have explicit error logs with descriptive text.

Tests

  test_compression_workers.cpp — 6 new gtest cases gated #ifdef USE_ZSTD:

    DecoderRoundTripsEncoder — encode via worker, decode via helper,
      byte-equal source. Reuses S2.5's installSyntheticDict.
    DecoderPassthroughOnUncompressed — non-COMPRESSED robj returns
      itself, scratch and view_out untouched.
    DecoderRejectsBadAlgMagic — hand-crafted bad magic → NULL.
    DecoderRejectsMissingDict — encode, wipe registry, decode → NULL.
    DecoderReusesScratchAcrossCalls — three back-to-back decodes
      with the same scratch sds, verify all decode and scratch grows
      without leaking.
    DecoderAllocatesScratchOnFirstCall — *scratch=NULL handled
      gracefully (helper does sdsempty()).

Verified locally:

  make -j2 -C src                  → clean (BUILD_ZSTD=yes default).
  make -j2 -C src BUILD_ZSTD=no    → clean.
  ./runtest --single unit/type/compression → 10/10 pass.

  gtest unit tests not runnable locally (libgtest-dev missing); CI
  validates. clang-format-18 also CI-validated.

Diff stat:

  .../design/detailed-design.md             |  10 +-
  .../implementation/plan.md                |   2 +-
  src/compression.c                         | 201 +++++++++++-
  src/compression.h                         |  48 ++-
  src/unit/test_compression_workers.cpp     | 353 ++++++++++++++++++
  5 files changed, 593 insertions(+), 21 deletions(-)
@ikolomi ikolomi requested a review from GilboaAWS June 3, 2026 12:50
ikolomi added 2 commits June 4, 2026 10:58
Reviewer asked: does compressionRegistryLookup bump a refcount?
What protects against UAF between lookup and ZSTD_decompress_usingDDict?

The answer is: lookup does NOT bump a refcount, and the protection is
the single-threaded main-thread cooperative model — registry mutations
(Add / Retire / IncRef / DecRef / TryGc / dictPairFree) are all
main-thread-only, and no code between the lookup and the ZSTD call
yields to the event loop. Same invariant that protects every
lookupKey()-then-use sequence in Valkey.

The previous comment misrepresented this in two ways:
  - Cited the QSBR contract — that's the worker-side mechanism for
    workers holding a CDict pointer across queue operations. Doesn't
    apply to main-thread readers.
  - Said NULL lookup "should not happen in a healthy server" — but
    a frame referencing a long-retired dict_id (e.g. an RDB-loaded
    value whose dict drained to frame_refs==0 and was GC'd) is a
    legitimate failure case the decoder must handle, not a bug.

New comment states the actual invariant explicitly, names the v2
async-decompression path that would need to revisit it (Appendix C.2),
and reframes NULL as legitimate.

Pure documentation change; no behavior change.
build-32bit and build-macos-latest fail with -Werror=sign-compare:

  test_compression_workers.cpp:874:5: error: comparison of integers
    of different signs: 'const int' and 'const unsigned int'
    EXPECT_EQ(OBJ_ENCODING_RAW, u->encoding);
    EXPECT_EQ(OBJ_STRING, u->type);

robj's `encoding` and `type` are 4-bit unsigned bitfields (server.h);
they get integer-promoted to `unsigned int` when read. The OBJ_*
macros are signed int literals (e.g. OBJ_ENCODING_RAW == 0). gtest's
EXPECT_EQ instantiates CmpHelperEQ<int, unsigned int>, and the
mixed-sign `lhs == rhs` inside that helper trips -Wsign-compare /
-Werror.

Same root cause as the explicit `(int)u->refcount` cast on the next
line (OBJ_STATIC_REFCOUNT is also a signed-int macro and the
refcount field is unsigned). Apply the same cast to encoding and
type for consistency.

The 64-bit Linux gcc cell happens to type-promote without the
warning, which is why x86_64 passed and 32-bit + macOS clang did not.

The third failing cell (test-ubuntu-latest-compatibility, 8.1.4) is
unrelated — wget on download.valkey.io got HTTP 403 fetching the
8.1.4 release tarball. CDN/server transient issue, not a code issue.

Verified locally:
  - make -j2 -C src              → clean
  - ./runtest --single unit/type/compression  → 10/10 pass
@ikolomi ikolomi merged commit b16594d into unstable Jun 4, 2026
79 checks passed
ikolomi added a commit that referenced this pull request Jun 7, 2026
Two reviewer threads addressed:

Thread #1 (T-3369017721) — production code carrying test concerns

  The drain handler had a `if (job->value == NULL)` branch that only
  existed to handle test-only jobs from
  testOnlyCompressionWorkersEnqueueRaw. Reviewer correctly pointed out
  that production code shouldn't carry test-only branches.

  Fix: replaced with serverAssert(job->value != NULL) at the top of
  the per-job loop. Production drain assumes every job has a real
  pinned robj; tests must extract their value=NULL jobs via
  testOnlyCompressionWorkersDrainOutbox before this drain runs.

  Side effect: removed the conditional `if (job->value != NULL)`
  guards around decrRefCount and the install branch — the top-of-loop
  assert means every code path can assume value is non-NULL.

Thread #2 (T-3356207626) — design doc out of sync with implementation

  Design §4.6 still described the original version-counter approach
  for staleness detection (`uint64_t version` field on compressionJob,
  "if version counter moved, discard"). The implementation has used
  pointer equality + the incrRefCount-pin since S2.4 PR #13.

  Fix: updated §4.6 to:
    - compressionJob struct: drop `version`, drop `robj *key`, add
      `robj *value` (pinned via incrRefCount), and `sds src` and
      `int dbid` separately, matching the actual struct.
    - Concurrency notes: replaced the "version counter moved" bullet
      with the pointer-equality + ABA-safety reasoning, naming the
      incrRefCount-reserves-the-address invariant as the protection
      mechanism (same property explained in PR #18 review).

Verified locally:
  - make -j2 -C src              → clean
  - ./runtest --single unit/type/compression  → 10/10 pass
ikolomi added a commit that referenced this pull request Jun 7, 2026
* [S2.7] Compression write-path hook

Wires compressionEnqueueCandidate into dbAddInternal and dbSetValue,
and replaces the TODO(S2.7) placeholder in the drain handler with a
real install path. With this change, writes to eligible STRING values
get queued for background compression and the result is installed back
into the kvstore as an OBJ_ENCODING_COMPRESSED robj.

The decoder (S2.6) is shipped but not yet wired into read paths (S2.8),
so as long as compression-enabled stays no (default), behavior is
unchanged. Once an operator turns the switch on, written values get
compressed, but reads return the compressed bytes until S2.8 lands.
Existing transparency tests verify no regression in the default-off
configuration.

Producer side (compression.c, db.c)

  Two seams in db.c — end of dbAddInternal and end of dbSetValue —
  call compressionEnqueueCandidate(key, value, db->id). The candidate
  function applies four guards:
    1. Master switch (compression_enabled, via compressionIsEligible).
    2. R2.2 eligibility (type/encoding/size/hot-key — also via predicate).
    3. R2.1.5 active-dict check — saves an allocator round-trip when
       compression-enabled=yes but training hasn't completed.
    4. incrRefCount(value) — pins the bytes for the worker AND
       reserves the robj address for the drain handler's pointer-
       equality stale check (ABA-safe per R2.4.4 + the lifetime
       discussion in PR #18).

  If the worker pool refuses (not started; future S2.11 inbox full),
  the pin is released immediately. RDB-load enqueue is deliberately
  skipped — TODO(S2.10): the sweep tick will rediscover RDB-loaded
  values without hammering the inbox during load.

API change: compressionWorkersEnqueue

  Old: compressionWorkersEnqueue(sds key, int dbid, uint64_t version, sds src)
  New: compressionWorkersEnqueue(robj *value, int dbid)

  The new form requires a pinned robj; the worker reads
  objectGetVal(value) once at enqueue (captured into job->src) and
  never touches the robj afterwards (R2.11.4 intact). The drain
  handler uses job->value for the kvstore lookup and the pointer-
  equality stale check.

  The version field is gone — pointer equality, made ABA-safe by the
  pin, is sufficient. R2.4.4 explains why: holding incrRefCount(value)
  prevents the allocator from reusing the address while the job is
  in flight.

Drain install (compression_workers.c)

  New compressionInstall() helper:
    1. void **slot = kvstoreHashtableFindRef(db->keys, didx, key_sds);
    2. If slot == NULL OR *slot != job->value: stale (overwrite, expire,
       or COW). Discard.
    3. Else: createCompressedObject(OBJ_STRING, job->dst, job->dst_len);
       dbReplaceValue installs.
    4. compressionRegistryIncRef(job->dict_id) on success.

  dbReplaceValue routes through dbSetValue(..., overwrite=0, ...),
  which does NOT call signalModifiedKey, moduleNotifyKeyUnlink, or
  signalDeletedKeyAsReady. Background compression is a storage-only
  change per R2.9.2 — no WATCH dirty_cas, no client-side-caching
  invalidations, no keyspace notifications.

  Pin released on every drain completion path (success, stale-discard,
  net-savings reject, ZSTD error, no-active-dict). Test-mode jobs
  (job->value == NULL) skip both install and decRef.

Test migration

  The 15 existing test-fixture call sites passed raw sds + dummy
  version. Migrated to a new testOnlyCompressionWorkersEnqueueRaw(src,
  dbid) that sets job->value = NULL. Tests extract jobs via
  testOnlyCompressionWorkersDrainOutbox before the production drain
  runs, so production-only paths (install, decRef) are never reached
  by the value=NULL sentinel.

  No new gtest cases for the install path itself — that requires a
  fully-initialized server.db / kvstore that the unit-test environment
  doesn't construct. End-to-end coverage will come from the Tcl
  transparency harness once S2.8 wires the read path.

TODO(S4.1) markers added at:
  - compressionInstall: compression_compressions_per_sec, EMA fold,
    compression_compressed_objects.
  - compressionEnqueueCandidate: compression_candidates_dropped_total
    when S2.11 lands (today the pool-not-started rejection is a
    config state, not back-pressure).

Verified locally:
  - make -j2 -C src              → clean (BUILD_ZSTD=yes default).
  - make -j2 -C src BUILD_ZSTD=no → clean.
  - ./runtest --single unit/type/compression → 10/10 pass.

  gtest unit tests not runnable locally; CI validates.

Diff stat:

  .../implementation/plan.md                |   4 +-
  src/compression.c                         |  35 +++-
  src/compression.h                         |  27 ++-
  src/compression_workers.c                 | 185 +++++++++++++++------
  src/compression_workers.h                 |  56 +++----
  src/db.c                                  |  14 ++
  src/unit/test_compression_workers.cpp     |  31 ++--
  7 files changed, 244 insertions(+), 108 deletions(-)

* [S2.7] PR #19 review: assert + design-doc alignment

Two reviewer threads addressed:

Thread #1 (T-3369017721) — production code carrying test concerns

  The drain handler had a `if (job->value == NULL)` branch that only
  existed to handle test-only jobs from
  testOnlyCompressionWorkersEnqueueRaw. Reviewer correctly pointed out
  that production code shouldn't carry test-only branches.

  Fix: replaced with serverAssert(job->value != NULL) at the top of
  the per-job loop. Production drain assumes every job has a real
  pinned robj; tests must extract their value=NULL jobs via
  testOnlyCompressionWorkersDrainOutbox before this drain runs.

  Side effect: removed the conditional `if (job->value != NULL)`
  guards around decrRefCount and the install branch — the top-of-loop
  assert means every code path can assume value is non-NULL.

Thread #2 (T-3356207626) — design doc out of sync with implementation

  Design §4.6 still described the original version-counter approach
  for staleness detection (`uint64_t version` field on compressionJob,
  "if version counter moved, discard"). The implementation has used
  pointer equality + the incrRefCount-pin since S2.4 PR #13.

  Fix: updated §4.6 to:
    - compressionJob struct: drop `version`, drop `robj *key`, add
      `robj *value` (pinned via incrRefCount), and `sds src` and
      `int dbid` separately, matching the actual struct.
    - Concurrency notes: replaced the "version counter moved" bullet
      with the pointer-equality + ABA-safety reasoning, naming the
      incrRefCount-reserves-the-address invariant as the protection
      mechanism (same property explained in PR #18 review).

Verified locally:
  - make -j2 -C src              → clean
  - ./runtest --single unit/type/compression  → 10/10 pass

* [S2.7] Fix CI: remove erroneous & on server.db indexing

build-32bit (and the 30+ downstream cells, all CI cells use -Werror):

  compression_workers.c:531:20: error: initialization of 'serverDb *'
    from incompatible pointer type 'serverDb **'
    [-Werror=incompatible-pointer-types]

`server.db` is `serverDb **` (array of pointers, one per DB). So
`server.db[i]` is already `serverDb *` — the address-of operator was
redundant and produced `serverDb **`.

Fix: drop the `&`. Matches the pattern used everywhere else in the
codebase (db.c, server.c, etc.).

Local make didn't catch this — the default SERVER_CFLAGS doesn't
include -Werror. CI does. Built locally with `make SERVER_CFLAGS=-Werror`
to confirm clean.

* [S2.7] Fix CI: tests must use testOnly drain for value=NULL jobs

5 gtest cases failed on build-32bit (and would on every test cell)
with the new production-drain serverAssert(job->value != NULL):

  ASSERTION FAILED: compression_workers.c:591 'job->value != NULL'

  in: SingleJobRoundTrip, BurstOf256JobsOneWorker,
      BurstOf1024JobsFourWorkers, ResizeAcrossEnqueuedJobs,
      NetSavingsGuardRejectsIncompressible

Root cause: the previous commit's reviewer-driven hardening (PR #19
review thread #1) made the production drain assert that every job
has a non-NULL pinned robj. The premise was "tests use the testOnly
drain to extract jobs before the production drain runs". That premise
was wrong — many tests ALSO call compressionWorkersDrainOutbox
directly to consume-and-dispose test-mode jobs (the drainUntil helper
is the most-used path).

Fix: add testOnlyCompressionWorkersDrainAndDispose(budget) — pulls
jobs via the existing testOnlyCompressionWorkersDrainOutbox, frees
them via testOnlyCompressionWorkersFreeJob, returns count. Migrate
the test fixture's drainUntil helper and all 8 direct
compressionWorkersDrainOutbox call sites in the test file to the
new helper.

Production drain stays clean — no test concerns. Reviewer thread #1
intent preserved.

Verified locally:
  - make -j2 -C src SERVER_CFLAGS=-Werror   → clean
  - ./runtest --single unit/type/compression → 10/10 pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant