Skip to content

[S2.5] Compression encoder path#17

Merged
ikolomi merged 4 commits into
unstablefrom
ikolomi/s2-encoder-path
Jun 3, 2026
Merged

[S2.5] Compression encoder path#17
ikolomi merged 4 commits into
unstablefrom
ikolomi/s2-encoder-path

Conversation

@ikolomi

@ikolomi ikolomi commented Jun 2, 2026

Copy link
Copy Markdown
Owner

Summary

Replaces the S2.4 placeholder pass-through worker body with real ZSTD_compress_usingCDict, and replaces the placeholder drain handler with the post-compression net-savings guard (R2.4.3). The full install path (createCompressedObject + dbOverwrite + compressionRegistryIncRef + caller-pin decrRefCount) remains stubbed — that lands with the write-path hook in S2.7 when there's a real production caller emitting compression candidates.

Commits in this PR

c298fbdb3 [S2.5] Encoder path — initial implementation (real ZSTD encoder + net-savings guard + 4 new gtest cases + test-only accessors)
b5c9773a3 [S2.5] PR review responses — testOnly* seam refactor (matched quicklist.c convention; removed the test-only block from the public header), TODO(S4.1)/TODO(S2.7) markers in drain handler, plan tweak
33a1e1f3a [S2.5] Fix CIBUILD_ZSTD=no compatibility (gated all ZSTD usage with #ifdef USE_ZSTD), unit/Makefile propagates -DUSE_ZSTD and -I../deps/zstd to the gtest build, clang-format-18 fixes, missing #include "compression_header.h" in test
0fd814c9e [S2.5] Fix CI — initialize server.compression_dict_max_versions in the test fixture (mirrors what test_compression_registry.cpp already does); fixes the runtime gtest failures

Architecture

Concern Choice Why
CCtx lifecycle One ZSTD_CCtx per worker thread, allocated at start, reused for every job ZSTD_CCtx is not thread-safe (zstd API contract); per-thread allocation keeps allocator pressure off the per-job hot path
Dict acquisition Worker atomically loads compressionRegistryActive() per job QSBR contract (§4.4) guarantees pointer validity until the worker reports quiescent at end-of-iteration; no per-job refcount management on the worker side
Buffer ownership Worker zmallocs dst, drain handler zfrees Unambiguous ownership transfer via the outbox; worker never touches robj (R2.11.4)
Buffer layout Single allocation: [16-byte header][ZSTD frame] Compress directly into buf + HEADER_SIZE, write header in-place after measuring; matches the zero-copy contract documented in compression_header.h
Slack reclaim zrealloc to actual size after compression Avoids leaking ZSTD_compressBound over-allocation into used_memory (per design §5.2)
Error encoding job->err is signed; ZSTD errors → negative values, worker-policy → positive values (err=1 = no active dict) Lets the drain handler distinguish ZSTD errors from worker-policy decisions without a separate flag
BUILD_ZSTD=no mode All ZSTD usage gated by #ifdef USE_ZSTD; worker takes the no-active-dict branch unconditionally Production library and gtest binary stay consistent; with no ZSTD compiled in, every job is dispositioned without compression, identical to the empty-registry case
Test seam testOnly* symbols defined in .c only, not declared in the public header Matches the established Valkey convention (see quicklist.c / intset.c) — production-callable surface stays free of test entry points

What this PR does NOT do

Concern Where it lands
Hook into dbAdd/dbSetValue/dbOverwrite S2.7 (write-path hook) — gated on S2.11 (bounded inbox + back-pressure counters)
objectGetUncompressedView for read paths S2.6 (decoder path)
Real install: createCompressedObject + dbOverwrite + dict frame-ref accounting + caller-pin decrRefCount S2.7 — needs a real production caller first
Sweep + cron integration S2.10
INFO compression counters: compression_skipped_incompressible, compression_errors_total, compression_live_ratio_10m, compression_compressions_per_sec S4.1 (Gilboa's track) — drain handler has TODO(S4.1): markers at the three branches naming each contribution

Worker body — key decisions

compressionDictPair *active = compressionRegistryActive();
if (active == NULL) {
    /* "compression-enabled yes but no active dict yet" state (R2.1.5).
     * Same branch when USE_ZSTD is not compiled in: registry is empty,
     * worker simply marks every job not-compressed.
     * err=1 is the worker-policy sentinel; no allocation. */
    job->err = 1;
}
#ifdef USE_ZSTD
else {
    void *buf = zmalloc(COMPRESSION_HEADER_SIZE + ZSTD_compressBound(src_len));
    size_t got = ZSTD_compress_usingCDict(cctx, buf + HEADER, ..., active->cdict);
    if (ZSTD_isError(got)) {
        zfree(buf); job->err = (int)(ssize_t)got;
    } else {
        void *shrunk = zrealloc(buf, HEADER + got);  /* slack reclaim */
        compressionHeaderEncode(shrunk, ALG_ZSTD_MAGIC, active->dict_id,
                                src_len, got);
        job->dict_id = active->dict_id;
        job->dst = shrunk; job->dst_len = HEADER + got;
    }
}
#endif

The if (shrunk == NULL) defensive path keeps the original buf and proceeds; zrealloc on shrink is allocator-specific behavior. We pay the slack cost rather than fail the job.

Drain handler — three branches

if (job->err != 0 || job->dst == NULL) {
    /* No-dict / ZSTD-error: dispose. (TODO(S4.1) markers name the
     * counter contributions for the err<0 vs err>0 cases.) */
} else if (job->dst_len >= uncompressed_len * (1 - savings_pct/100)) {
    /* Net-savings guard rejects: dispose.
     * TODO(S4.1): compression_skipped_incompressible++ + fold actual
     * measured ratio into compression_live_ratio_10m EMA per R2.3.5. */
} else {
    /* Install path is S2.7. Placeholder dispose for now.
     * TODO(S2.7): createCompressedObject + dbOverwrite +
     * compressionRegistryIncRef + decrRefCount on caller's pin.
     * TODO(S4.1): compressions_per_sec rate update + fold success
     * ratio into compression_live_ratio_10m EMA. */
}

The integer-form net-savings comparison avoids floating point. job->dst_len already includes the 16-byte header, so it IS the on-heap footprint we want to compare against.

Test seam (refactored after PR review)

Production code carries no test-only symbols in its public surface. Following the established Valkey convention (see quicklist.c / intset.c):

/* Defined in compression_workers.c only, declared by gtest test files
 * locally in their extern "C" block. */
int  testOnlyCompressionWorkersDrainOutbox(void **jobs_out, int budget);
void testOnlyCompressionWorkersFreeJob(void *job_ptr);
void testOnlyCompressionWorkersJobRead(void *job_ptr,
                                       const char **out_src,
                                       void **out_dst,
                                       size_t *out_dst_len,
                                       uint32_t *out_dict_id,
                                       int *out_err);

The compressionJob struct stays file-private. The single testOnlyCompressionWorkersJobRead function projects its fields into caller-provided out-pointers; the gtest test code defines its own flat CompressionJobView struct that the read function fills.

Tests

src/unit/test_compression_workers.cpp — 4 new gtest cases (gated behind #ifdef USE_ZSTD):

Test What it verifies
RealCompressionRoundTrip Install synthetic dict, enqueue 1 KB compressible JSON, drain via testing accessor, decompress with same DDict, byte-equal source
NetSavingsGuardRejectsIncompressible Pseudo-random bytes (deterministic seed for reproducibility), drain through production handler, verify no crash on rejection path. TODO(S4.1): extends to assert counter increments.
NoActiveDictMarksJobNotCompressed No dict installed, enqueue, verify err != 0 and dst == NULL (R2.1.5 path)
CompressionFromMultipleWorkersIsConsistent 4 workers × 100 jobs of varying sizes/contents, each compressed buffer round-trips correctly

The synthetic-dict helper (installSyntheticDict) generates ~100 KB of training corpus (1500 samples × ~70 B) — ZSTD's documented minimum is ~100x dict size. Goes through the production registry add/promote path; only the corpus source differs from real training (S1.2 plumbs main-thread kvstore iteration + bio).

The 14 existing gtest cases from S2.4 stayed encoder-agnostic — they assert only on job count and thread-count transitions, so they continue passing without modification (verified by running ./runtest --single unit/type/compression).

Build-system change

src/unit/Makefile now propagates -DUSE_ZSTD=1 -I../../deps/zstd to the gtest build (mirroring the existing JEMALLOC_CFLAGS pattern). Conditional on -DUSE_ZSTD being present in PREV_FINAL_CFLAGS (which the parent's BUILD_ZSTD=yes sets via .make-settings). Without ZSTD, ZSTD_LIB is dropped from LD_LIBS so the gtest binary doesn't try to link a non-existent libzstd.a.

Verified locally

  • make -j2 -C src clean (BUILD_ZSTD=yes default).
  • make -j2 -C src BUILD_ZSTD=no clean.
  • ./runtest --single unit/type/compression — 10/10 transparency tests pass under both BUILD_ZSTD=yes and BUILD_ZSTD=no.
  • Server boots cleanly with --compression-enabled yes --compression-threads 2.

Not verified locally (CI gates)

  • gtest unit tests (libgtest-dev not installed in this dev environment).
  • clang-format-18.

Diff stat

.../implementation/plan.md             |   2 +-
src/compression_workers.c              | 298 +++++++++++++--
src/unit/Makefile                      |  15 +-
src/unit/test_compression_workers.cpp  | 441 +++++++++++++++++++++-
4 files changed, 719 insertions(+), 37 deletions(-)

…rd in drain

Replaces the placeholder pass-through worker body with real
ZSTD_compress_usingCDict against the active dictionary, and replaces
the placeholder drain handler with the post-compression net-savings
guard (R2.4.3). The full install path (createCompressedObject +
dbOverwrite + compressionRegistryIncRef + caller-pin decRefCount)
remains stubbed — that lands with the write-path hook in S2.7 when
there's a real production caller emitting compression candidates.

Worker body (compression_workers.c):
  - Per-worker ZSTD_CCtx allocated once at thread start, reused for
    every job (CCtx is not thread-safe; one per worker keeps allocator
    pressure off the per-job hot path).
  - Atomically loads the active dict via compressionRegistryActive().
    Per the QSBR contract (§4.4) the pointer stays valid until the
    worker reports quiescent at end-of-iteration.
  - "compression-enabled yes but no active dict yet" state (R2.1.5)
    short-circuits to err=1 / dst=NULL — no allocation, no encode.
  - Compresses directly into a single zmalloc'd buffer at offset
    HEADER_SIZE; writes the header in-place after measuring the
    actual compressed size; shrinks via zrealloc to avoid leaking
    ZSTD_compressBound slack into used_memory (per design §5.2).
  - Preserves ZSTD error codes via signed downcast; positive err
    values are reserved for worker-policy decisions (no-dict, etc.).

Drain handler (compression_workers.c):
  - err != 0 || dst == NULL: dispose, no install.
  - dst_len >= uncompressed * (1 - savings_ratio_pct/100): net-savings
    guard rejects, dispose. INFO counter
    compression_skipped_incompressible++ comes in S4.1.
  - Otherwise: install path is S2.7 (placeholder dispose for now).

Test-only accessors (compression_workers.h + .c):
  - compressionWorkersDrainOutboxForTesting: peek without freeing.
  - compressionWorkersFreeJobForTesting: caller-side cleanup.
  - Field readers for compressionJob (file-private struct).
  Production code consumes jobs via compressionWorkersDrainOutbox.

New gtest cases (test_compression_workers.cpp):
  - RealCompressionRoundTrip: install synthetic dict, encode 1 KB
    of compressible JSON, decompress with same DDict, byte-equal.
  - NetSavingsGuardRejectsIncompressible: random bytes, drain through
    production handler, verify no crash on rejection path.
  - NoActiveDictMarksJobNotCompressed: enqueue without a dict, verify
    err != 0 and dst == NULL.
  - CompressionFromMultipleWorkersIsConsistent: 4 workers x 100 jobs,
    each compressed buffer round-trips correctly.

Synthetic dict helper (installSyntheticDict, test-private): generates
~100 KB of training corpus from a small JSON-shaped sample list,
runs ZDICT_trainFromBuffer, creates CDict/DDict, adds via
compressionRegistryAdd. ~100x dict-size of training data is the ZSTD-
recommended minimum.

Verified locally:
  - make -j2 -C src clean.
  - ./runtest --single unit/type/compression: 10/10 pass.
  - Server boots cleanly with --compression-enabled yes
    --compression-threads 2 (worker pool starts, awaits jobs).

Not verified locally (CI):
  - gtest unit tests (libgtest-dev not installed locally).
  - clang-format-18 (not installed locally).

Plan.md S2.5 marked complete.
@ikolomi ikolomi requested a review from GilboaAWS June 3, 2026 08:12
Comment thread .agents/planning/realtime-data-compression/implementation/plan.md Outdated
Comment thread src/compression_workers.h Outdated
Comment thread src/compression_workers.c
Comment thread src/unit/test_compression_workers.cpp
Comment thread src/unit/test_compression_workers.cpp Outdated
ikolomi added 3 commits June 3, 2026 12:50
Addresses the 5 review threads on PR #17.

T-3346992511 (plan.md L174): drop "(PR pending)" parenthetical from
the S2.5 sub-task. Eventually merged is the default state.

T-3347191182 (drain handler): add TODO(S4.1) markers at the three
drain-handler branches naming the exact counter contributions S4.1
will wire in. Per design R2.10.1 / R2.3.5:
  - err < 0 (real ZSTD error): compression_errors_total++ + rate-
    limited LL_WARNING per R6.1.
  - err > 0 (worker policy, no-dict): no counter — benign R2.1.5
    state, tracked via compression_state.
  - Net-savings reject: compression_skipped_incompressible++ AND
    fold the actual measured ratio into compression_live_ratio_10m
    (rejection ratios in [0.9, 1.05] inflate the EMA and naturally
    trip the drift threshold per R2.3.5).
  - Successful install (S2.7+): compressions_per_sec rate update +
    fold success ratio into the EMA.
Also added a TODO(S2.7) marker spelling out the install steps that
land with the write-path hook.

T-3347060548 (test seam in public header): refactor to match the
established Valkey convention (see quicklist.c / intset.c testOnly*
functions).
  - Renamed compressionWorkers*ForTesting → testOnlyCompressionWorkers*.
  - Removed the entire test-only block from compression_workers.h
    (~36 lines deleted). Production-callable surface now contains
    no test seams.
  - Definitions live only in compression_workers.c; the gtest test
    file declares what it needs locally in its own extern "C" block.
  - Replaced the 5 field accessors with a single
    testOnlyCompressionWorkersJobRead that fills caller-provided
    out-pointers. The result-projection struct (CompressionJobView)
    is defined in the gtest test file; production code carries no
    record of the projection layout.

T-3347222201 (callout for @GilboaAWS): no code change; informational
note about the synthetic-dict helper being portable to S1.x training
tests.

T-3347268445 (test:587 reject-path comment): prefixed the existing
"comes in S4.1" comment with TODO(S4.1) so grep can find it when
S4.1 wires the rejection counter.

Convention adopted: TODO(<step-id>) comments mark code that
intentionally defers behaviour to a future implementation step.
Grep-friendly, ties deferred work to a specific plan step. Documented
in SESSION_CHECKPOINT.md (working artifact).

Verified locally:
  - make -j2 -C src clean.
  - ./runtest --single unit/type/compression: 10/10 pass.
  - No remaining ForTesting symbols anywhere in src/.

Plan.md S2.5 line tightened.
…nclude

Addresses all CI failures on the previous push.

1. BUILD_ZSTD=no compatibility (build-32bit, test-ubuntu-latest-compression-off,
   build-macos-latest, Analyze (cpp), test-sanitizer-address, etc. — same
   root cause):
   compression_workers.c made unconditional ZSTD calls that broke when
   the parent is built without ZSTD. Gated all ZSTD usage behind
   #ifdef USE_ZSTD:
     - <zstd.h> include
     - per-worker ZSTD_CCtx alloc/free at thread start/exit
     - the encoder branch inside the per-job loop (the no-active-dict
       branch is now also taken when USE_ZSTD is not compiled in)
   When BUILD_ZSTD=no the worker pool runs but every job takes the
   "not compressed" path, consistent with the registry being empty.

2. test_compression_workers.cpp BUILD_ZSTD=no compatibility:
     - Wrapped <zstd.h> + <zdict.h> includes with #ifdef USE_ZSTD.
     - Wrapped the entire S2.5 encoder-path test section + helpers
       (installSyntheticDict, decompressBuffer) with #ifdef USE_ZSTD.
     - Added #include "compression_header.h" so COMPRESSION_HEADER_SIZE
       is defined (root cause of the test-ubuntu-latest compile error).

3. unit/Makefile: propagate USE_ZSTD to the gtest build:
     - Detect -DUSE_ZSTD in PREV_FINAL_CFLAGS (set by parent's
       BUILD_ZSTD=yes via .make-settings).
     - Adds ZSTD_CFLAGS = -DUSE_ZSTD=1 -I../../deps/zstd to the C++
       compile rule, mirroring the existing JEMALLOC_CFLAGS pattern.
     - Drops ZSTD_LIB from LD_LIBS when USE_ZSTD is not propagated
       (libzstd.a is not built when BUILD_ZSTD=no, so the link
       referencing it would fail).

4. clang-format-check: three trivial fixes flagged by clang-format-18:
     - Trailing-comment alignment in compression_workers.c (off by one
       space after a column shift).
     - Extra space in `int  testOnly...` declaration in test file.
     - Single-line `if (...) { ... }` in test file expanded to
       multi-line per the project style.

Verified locally:
  - make -j2 -C src clean (BUILD_ZSTD=yes default).
  - make -j2 -C src BUILD_ZSTD=no clean.
  - ./runtest --single unit/type/compression: 10/10 pass under both
    BUILD_ZSTD=yes and BUILD_ZSTD=no.

CI cells expected to recover (all share the same root causes above):
  - clang-format-check
  - build-32bit
  - build-macos-latest
  - test-ubuntu-latest
  - test-ubuntu-latest-cmake-tls
  - test-ubuntu-latest-compression-off
  - test-ubuntu-latest-compatibility (7.2.11, 8.0.6, 8.1.4)
  - test-sanitizer-address
  - Analyze (cpp)  [CodeQL]
The 4 still-failing CI cells (test-ubuntu-latest, test-sanitizer-address,
build-32bit, code-coverage) all hit the same 3 gtest failures:

  FAILED: CompressionWorkersTest.RealCompressionRoundTrip
  FAILED: CompressionWorkersTest.NetSavingsGuardRejectsIncompressible
  FAILED: CompressionWorkersTest.CompressionFromMultipleWorkersIsConsistent

Server log on the failing tests:

  Compression: dictionary registry cap reached (0).
  Run COMPRESSION SWEEP or raise compression-dict-max-versions.

Root cause: server.compression_dict_max_versions defaults to 0 in the
gtest harness (no real config layer), and the registry rejects every
promotion attempt when the cap is 0. installSyntheticDict() therefore
returns dict_id=0, and the tests' ASSERT_NE(dict_id, 0u) fails.

Mirrors what test_compression_registry.cpp already does in its SetUp
(line 29 sets the same field to 4). Added matching initialization
to the CompressionWorkersTest::SetUp.

Verified locally:
  - make -j2 -C src clean.
  - ./runtest --single unit/type/compression: 10/10 pass.

build-debian-old failure is unrelated and genuinely flaky (apt
fetched a debian package and got "Connection reset by peer") — will
self-resolve on retry.
@ikolomi ikolomi merged commit af8a57a into unstable Jun 3, 2026
79 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant