Streaming Compression support for Replication#3853
Conversation
Add streaming LZ4-backed RDB compression with rio decorators, stream envelope handling, integration changes, and the follow-up fixes and config cleanup needed on top of unstable. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- Remove dead code: rdbIsValidMagic() and unused #include <string.h> in rdb.h - Remove redundant first RIO_FLAG_SKIP_RDB_CHECKSUM set in rdbSaveInternal - Remove unrelated changes: config_parse_depth, USE_FAST_FLOAT, write-make-settings - Validate full 8-byte VKCS envelope in aof.c rdbFileUsesStreamingCompression - Add SAFETY comment for rdbRioHasCorruptCompressedInput cast invariant - Rename all snake_case identifiers to camelCase per Valkey conventions: types (compression_algo_t -> compressionAlgo, stream_compressor_t -> streamCompressor, compress_rio_t -> compressRio, etc.), functions (stream_writer_create -> streamWriterCreate, compress_rio_finish -> compressRioFinish, write_vkcs_envelope -> writeVkcsEnvelope, etc.), and static variables (compression_lz4_codec_impl -> compressionLz4CodecImpl) - Drop _t suffix from all types to match Valkey convention - Fix typo: streamWriterIsErrord -> streamWriterIsErrored - Replace silent dummy buffer allocation with assert(needed > 0) in streamWriterEnsureOutBuf Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
* Address streaming RDB compression review * Skip RDB CRC for streaming compression * Remove brittle 32-bit compression unit test --------- Co-authored-by: Sarthak Aggarwal <sarthagg@amazon.com> Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
--------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- rdbSaveInternal: the old comment claimed per-string LZF was disabled 'when algo != LZF', but the actual gate is RIO_FLAG_STREAMING_COMPRESSION on the wrapper rio. Standalone rios (DUMP, AOF rewrite, diskless) keep using LZF regardless of algo. - rdbInputStreamPrepare: flag the synchronous probe IO so a future non-blocking caller (replication) doesn't accidentally block the loop. - rdbRioHasCorruptCompressedInput: the cast is sound today only because one producer sets RIO_FLAG_STREAMING_DECOMPRESSION. Replace the 'SAFETY' assertion with a note telling the next person to add a type discriminator before adding a second producer. - rdbSaveRawString: minor wording cleanup on the per-string LZF gate. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Match the surrounding Valkey style: drop comments that restate the code, drop the Ownership/Threading/Returns boilerplate from headers, collapse repeated 'capacity-shortage is retriable' notes into one explanation per function. Behavior is unchanged. Net -285 lines; integration tests (21/21) and build are clean. Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Adds replication wire compression on top of valkey-io#3531 with lz4 as the first supported codec for the incremental replication stream. The replication stream from primary to replica is wrapped in a VKCS envelope (using STREAM_KIND_REPL) and compressed as a single long-lived frame at the per-replica buffer layer. Default behavior is unchanged with 'replcompression no'; existing replicas without the new capability stay uncompressed. Negotiation is per-replica via the existing PSYNC handshake; a new REPLICA_CAPA_COMPRESSION capability lets each side opt in independently. Compression runs inline on the IO thread that owns the replica's write job; no dedicated compression thread, no IPC, no reordering. Optional sticky thread affinity (lazy ownership + event-driven rebalance) keeps the long-lived LZ4 frame state on a single IO thread for cache locality. Configs: replcompression bool, default no repl-compression-thread-affinity bool, default yes Internal constants: REPLICA_CAPA_COMPRESSION (1 << 4) REPL_COMPRESSION_ALGO ALGO_LZ4 REPL_COMPRESSION_LEVEL 0 (LZ4 fast mode) REPL_COMPRESSION_BATCH_LIMIT 1 MB raw input per dispatch REPL_STREAM_DECODER_OUTPUT_MAX 256 MB INFO replication per-replica fields: compression=lz4, compressed_bytes, uncompressed_bytes, compression_ratio, compression_errors, compression_cpu_usec, debug_compression_pending_drains, debug_thread_switches INFO replication server-level (replica side): repl_decompression_errors, repl_decompression_cpu_usec, repl_decompressed_bytes_total, repl_apply_cpu_usec, repl_apply_batches CI adds a test-replication-compression job that runs the replication-tagged integration tests with replcompression=yes to exercise compression across the broader replication test surface. Tests: 18 streamReader push-mode unit tests + 3 replCompression unit tests + 27 integration tests. Performance (BlockMesh tweets, 3M keys x ~315 byte JSON values, 1,073 MB uncompressed per replica, 30 clients, pipeline 50, 2 cross-region replicas): LZ4 level 0 (default): 0.48 ratio, 52% bandwidth saved, 2.5s compression CPU per replica, <1% throughput overhead vs uncompressed baseline. Affinity ON vs OFF: throughput unchanged (118.6K vs 118.1K keys/s) but thread switches drop from ~800K to ~30 per replica. ZSTD support follows in valkey-io#3798. Related to valkey-io#3531. Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds LZ4-based streaming compression support, wires it into RDB and replication transport paths, updates config and build integration, and adds tests plus a CI job for the new compression scenarios. ChangesStreaming compression and replication transport
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
🧹 Nitpick comments (7)
.github/workflows/ci.yml (1)
212-220: ⚡ Quick winConsider setting
persist-credentials: falsefor supply-chain hardening.The checkout actions at lines 213 and 220 do not explicitly set
persist-credentials: false. While not a functional issue, setting this option prevents the action from configuring Git credentials that could leak in logs or be misused by malicious code.🔒 Proposed fix to add persist-credentials: false
- name: Install libbacktrace uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: repository: ianlancetaylor/libbacktrace ref: b9e40069c0b47a722286b94eb5231f7f05c08713 path: libbacktrace + persist-credentials: false - run: cd libbacktrace && ./configure && make && sudo make install - name: Checkout Valkey uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In @.github/workflows/ci.yml around lines 212 - 220, Update the two actions/checkout steps (the "Install libbacktrace" checkout using uses: actions/checkout@de0fac2e4500d... and the "Checkout Valkey" checkout with the same uses) to include persist-credentials: false under their with: blocks to prevent Git credentials from being persisted; ensure you add the persist-credentials: false key alongside the existing repository/ref/path keys and keep YAML indentation consistent.src/compression.h (1)
47-55: ⚡ Quick winAdd function-level docs for the remaining public declarations.
compressionAlgoSupportsStreaming,compressionAlgoName, and the init/destroy APIs are currently undocumented. Please add brief contract comments (return semantics, ownership/lifecycle expectations) for these declarations to keep this public header fully self-describing.As per coding guidelines: "Document why code exists, not just what it does; document all functions in C code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/compression.h` around lines 47 - 55, Add brief function-level comments for compressionAlgoSupportsStreaming, compressionAlgoName, streamCompressorInit, streamCompressorDestroy, streamDecompressorInit, and streamDecompressorDestroy that describe each function's contract: the meaning of return values (e.g., true/false or 0/error codes), who owns any returned pointers or resources, lifecycle expectations (caller allocates/initializes struct before/after call, who must call Destroy), and any error conditions; place these comments immediately above each declaration in the header so the public API is self-describing and documents why the functions exist as well as how callers must use them.src/rdb.c (1)
3151-3192: ⚡ Quick winDocument the new input-stream helper lifecycle.
rdbInputStreamInit(),rdbInputStreamDestroy(), andrdbInputStreamValidateEnd()add a non-trivial wrapper lifecycle, but onlyrdbInputStreamPrepare()is documented. Please add short contract comments so callers know the required call order and ownership rules.As per coding guidelines, "Document all functions in C code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/rdb.c` around lines 3151 - 3192, Add short contract comments for rdbInputStreamInit, rdbInputStreamPrepare, rdbInputStreamValidateEnd, and rdbInputStreamDestroy describing the required call order (Init() to set up the struct, optional Prepare() to wrap raw_rio into a decompressor which may replace rdb_rio and set RIO_FLAG_SKIP_RDB_CHECKSUM, ValidateEnd() to be called after reads to verify the decompressor stream, and Destroy() to free the decompressor), the ownership rules (input does not take ownership of the provided raw_rio; Prepare() initializes internal decompressor state that must be cleaned up by Destroy()), and return/behavior expectations (Prepare() returns DECOMPRESS_RIO_INIT_* codes, Destroy() is idempotent, ValidateEnd() returns C_OK/C_ERR only when initialized). Include these comments immediately above the definitions of rdbInputStreamInit, rdbInputStreamPrepare, rdbInputStreamValidateEnd, and rdbInputStreamDestroy and reference the struct fields input->raw_rio, input->rdb_rio, input->decompressor, and input->initialized.src/rdb.h (1)
190-197: ⚡ Quick winAdd declaration-level contract docs for
rdbInputStreamlifecycle API.Please document ownership and required call order (
Init -> Prepare -> ValidateEnd -> Destroy) in the header so callers can’t misuse the stream wrapper.♻️ Suggested header comment shape
+/* Wraps a raw input rio and exposes a logical RDB byte stream. + * Lifecycle: Init -> Prepare -> (use input.rdb_rio) -> ValidateEnd -> Destroy. + * `raw_rio` ownership remains with caller. + */ typedef struct { rio *raw_rio; rio *rdb_rio; decompressRio decompressor; streamReaderInfo stream_info; bool initialized; } rdbInputStream; +/* Initialize wrapper state around `raw_rio` (no stream inspection yet). */ void rdbInputStreamInit(rdbInputStream *input, rio *raw_rio); +/* Prepare input stream; may attach decompression rio depending on envelope. */ decompressRioInitResult rdbInputStreamPrepare(rdbInputStream *input); +/* Validate stream terminates cleanly at logical end. */ int rdbInputStreamValidateEnd(rdbInputStream *input); +/* Release wrapper/decompression resources. */ void rdbInputStreamDestroy(rdbInputStream *input);As per coding guidelines "Document why code exists, not just what it does; document all functions in C code".
Also applies to: 234-238
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/rdb.h` around lines 190 - 197, Add a clear declaration-level lifecycle contract comment for the rdbInputStream type describing ownership and required call order: state that callers own the struct, which fields are initialized by the creator vs by Init/Prepare, and that the API must be used in order Init -> Prepare -> ValidateEnd -> Destroy (with Destroy cleaning up raw_rio/rdb_rio/decompressor if initialized). Reference the rdbInputStream type and the lifecycle functions (Init, Prepare, ValidateEnd, Destroy) and document when initialized boolean is set/cleared, which function takes/returns ownership of rio pointers, and that calling functions out of order is undefined to prevent misuse.tests/integration/rdb-compression.tcl (1)
273-275: ⚡ Quick winStrengthen corruption test assertions to validate the failure reason.
These checks only match
*Error*, which is too broad. Prefer asserting specific error/log patterns per scenario (like the existingverify_log_messageusage) so false positives don’t mask regressions.As per coding guidelines "Use clear assertions with meaningful error messages in tests".
Also applies to: 294-296, 350-352, 368-369
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/rdb-compression.tcl` around lines 273 - 275, The test uses broad assertions like the catch/err + assert_match "*Error*" pattern (e.g., the catch {r debug reload nosave} err followed by assert_match "*Error*" $err) which is too generic; replace those with precise checks that validate the expected failure reason—either call the existing verify_log_message helper with the expected error/log substring or change assert_match to match a specific error pattern (include the expected text) for each scenario (also update the similar occurrences at the other reported spots). Ensure you update the specific statements (the catch/err blocks and their assert_match lines) so they assert the exact error or log message that the test intends to trigger rather than the generic "*Error*".src/replication.c (1)
90-219: Please pull in@core-teamfor thisreplication.cchange set.This patch changes replication protocol negotiation, state transitions, and IO-thread interaction in a file the repo treats as architecture-review-required.
As per coding guidelines, "Request
@core-teamarchitectural review for changes to cluster*.c, replication.c, rdb.c, or aof.c".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/replication.c` around lines 90 - 219, This change touches replication.c (functions like markCompressedReplicasForDisconnect, replBalanceAffinity, replicaInitCompressionOnPsync, replInitDecompression, replDestroyDecompression) which per project policy requires an architecture-level review; please add `@core-team` to the PR/reviewers and include a short justification referencing the altered replication protocol negotiation, state transitions, and IO-thread interaction so the core team can perform the requested architectural review before merging.src/server.h (1)
3048-3055: ⚡ Quick winAdd brief header docs for the new replication-compression API.
These are new exported entry points, but the lifecycle/ownership split is not obvious from the names alone. A short comment block here describing per-replica vs singleton state, expected call order, and return semantics would make this interface much safer to consume.
As per coding guidelines, "Document all functions in C code" and "Use comments for non-obvious behavior and rationale, not for restating code".📝 Example
+/* Primary-side per-replica compression lifecycle. Returns C_OK/C_ERR. */ void markCompressedReplicasForDisconnect(void); int replInitCompression(client *c, compressionAlgo algo, int level); void replDestroyCompression(client *c); void replBalanceAffinity(void); + +/* Replica-side compressed stream decode path. `new_data_start` is the first + * newly appended byte in `c->querybuf`. Returns C_OK/C_ERR. */ int replDecompressQueryBuf(client *c, size_t new_data_start); + +/* Replica-side singleton decompressor lifecycle. */ int replInitDecompression(void); void replDestroyDecompression(void); void replRefreshDecompression(void);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/server.h` around lines 3048 - 3055, Add brief header documentation above the new replication-compression API declaring which functions operate on per-replica state vs global/singleton state, the expected call order/lifecycle (e.g. replInitCompression/replDestroyCompression per-client, replInitDecompression/replDestroyDecompression once for the process, when to call replRefreshDecompression and markCompressedReplicasForDisconnect), and the return semantics (which functions return 0 on success / non-zero on error and whether callers own/must free any returned resources). Reference the exact symbols in the docs: markCompressedReplicasForDisconnect, replInitCompression(client *c, compressionAlgo algo, int level), replDestroyCompression(client *c), replBalanceAffinity, replDecompressQueryBuf(client *c, size_t new_data_start), replInitDecompression, replDestroyDecompression, and replRefreshDecompression so callers can quickly see ownership and ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@deps/lz4/lz4frame.c`:
- Around line 1619-1620: The code computes srcStart = (const BYTE*)srcBuffer and
srcEnd = srcStart + *srcSizePtr even when srcBuffer can be NULL (seen when
calling LZ4F_decompress(dctx, NULL, &o, NULL, &i, NULL)), which causes undefined
behavior; instead of patching the vendored file inline, update/re-vendor to an
upstream LZ4 release that contains the fix, and while doing so verify that the
logic around LZ4F_decompress, srcBuffer, srcSizePtr, srcStart and srcEnd
guards/skips the pointer arithmetic when srcBuffer == NULL or *srcSizePtr == 0
(i.e., ensure the code checks for NULL/zero before computing srcEnd or uses a
safe branch), then run the decompression unit tests that exercise the NULL input
path to confirm the upstream fix resolves the UB.
In `@src/aof.c`:
- Around line 1014-1022: The code currently treats a short read (0 < nread <
sizeof(header)) as success by returning 0, which lets a truncated RDB header be
considered uncompressed; change this so any partial read is treated as an error:
after reading into header from fd (variables nread, header, sizeof(header), fd),
if nread == -1 restore read_errno and return -1 (as already done), and if nread
< (ssize_t)sizeof(header) also set errno to EIO (or restore read_errno if
appropriate) and return -1 so the caller will treat the probe as a failure and
trigger the fallback path instead of accepting a truncated header.
In `@src/compression_stream.c`:
- Around line 818-820: The cap check can overflow because effective_len + len
may wrap; update the check in the push/feed logic (where effective_len is
computed from sdslen(t->feed_queue) - t->feed_head) to first guard against
addition overflow and oversized single inputs by verifying either len >
t->feed_cap or that effective_len is greater than t->feed_cap - len (or
equivalently SIZE_MAX - effective_len < len) before calling
streamReaderSetError(t, STREAM_READER_ERROR_IO); this ensures you never perform
wrapping addition and correctly detect when the new data would exceed
t->feed_cap.
In `@src/config.c`:
- Around line 2590-2596: The updateReplCompression apply-hook is performing an
irreversible action (markCompressedReplicasForDisconnect) during CONFIG SET,
which can be rolled back by configSetCommand; change updateReplCompression to
avoid side effects: instead of calling markCompressedReplicasForDisconnect
directly, record the intent (e.g., set a new flag or push affected replica IDs
into a pending list) and return success; then invoke
markCompressedReplicasForDisconnect only from the post-commit path in
configSetCommand (or a dedicated commit callback) after all option apply-hooks
succeed, using the new flag/pending list; keep the function signature of
updateReplCompression and use symbols updateReplCompression,
markCompressedReplicasForDisconnect, and configSetCommand to locate and wire the
deferred action.
In `@src/networking.c`:
- Around line 4598-4606: c->querybuf is being mutated with sdscatlen() which may
realloc the SDS and corrupt the thread-local shared pointer (thread_shared_qb)
if the primary is still using it; before appending
server.repl_stream_decode_buf, check if c->querybuf == thread_shared_qb and if
so call sdsnewlen()/sdsdup to make a private copy (or otherwise allocate a fresh
SDS) and assign it to c->querybuf, then perform the sdscatlen() append and
update c->querybuf_peak; refer to c->querybuf, server.repl_stream_decode_buf,
thread_shared_qb and the decompression path to implement this guard.
In `@src/rdb.c`:
- Around line 3695-3698: The conditional is checking the wrong streaming flag:
replace the RIO_FLAG_STREAMING_COMPRESSION test with
RIO_FLAG_STREAMING_DECOMPRESSION so load-side streaming-decompression hits the
special notice; update the branch that currently reads "(rdb->flags &
RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)" to
use RIO_FLAG_STREAMING_DECOMPRESSION, keeping the RIO_FLAG_SKIP_RDB_CHECKSUM
check and the serverLog(LL_NOTICE, ...) calls unchanged (refer to rdb->flags,
RIO_FLAG_STREAMING_DECOMPRESSION, RIO_FLAG_SKIP_RDB_CHECKSUM, and serverLog).
In `@src/replication.c`:
- Around line 4042-4049: The code only advertises compression when
use_diskless_load is true, making server.repl_compression a no-op for disk-based
syncs; change the branch to advertise REPLICA_CAPA_COMPRESSION_STR whenever
server.repl_compression is enabled (i.e., check server.repl_compression instead
of use_diskless_load) so the capability is sent for all full-sync/PSYNC flows;
update the block that sets argv/ lens/ argc (the one referencing
use_diskless_load, REPLICA_CAPA_COMPRESSION_STR and server.repl_compression) to
add the capability whenever server.repl_compression is true and leave any
diskless-specific logic unchanged.
- Around line 216-219: replRefreshDecompression currently drops failures from
replInitDecompression; change replRefreshDecompression to propagate errors by
returning an int (return C_ERR on failure, C_OK on success), call
replDestroyDecompression() then check the return value of
replInitDecompression() and return C_ERR if it fails, and update all new call
sites (those that run handshake/full-sync immediately after calling
replRefreshDecompression) to check its return value and abort the sync session
on C_ERR; keep references to replInitDecompression, replDestroyDecompression,
replRefreshDecompression and use the C_ERR/C_OK constants so callers can act
accordingly.
In `@src/rio.c`:
- Around line 506-509: In rioReadPartial, before calling r->read_some, ensure
the computed bytes_to_read (the min of len and r->max_processing_chunk) is not
greater than SSIZE_MAX; if it is, reject the request and return an appropriate
error instead of invoking r->read_some to avoid backend-dependent truncation/
sign issues. Update the logic around bytes_to_read (used in the existing
computation) to perform this SSIZE_MAX bound check and return an error (and set
errno consistently, e.g. EINVAL/ERANGE) when exceeded; also include the required
header for SSIZE_MAX if not present.
In `@src/server.c`:
- Around line 6654-6665: The compression_ratio calculation is inverted: change
the expression that now computes (uncompressed / compressed) to compute
(compressed / uncompressed) instead, guarding against division by zero. In the
printf-style block that builds the stats string (reference symbols:
"compression_ratio=%.2f", compressionAlgoName(REPL_COMPRESSION_ALGO),
replica->repl_data->repl_compressed_bytes_total,
replica->repl_data->repl_uncompressed_bytes_total), replace the ternary that
returns (double)repl_uncompressed_bytes_total /
(double)repl_compressed_bytes_total with one that returns
repl_uncompressed_bytes_total == 0 ? 0.0 :
(double)replica->repl_data->repl_compressed_bytes_total /
(double)replica->repl_data->repl_uncompressed_bytes_total so compression_ratio
reflects compressed/uncompressed safely.
- Around line 6650-6669: The INFO path reads replication compression counters
(repl_compressed_bytes_total, repl_uncompressed_bytes_total,
repl_compression_errors, repl_compression_cpu_usec,
repl_compression_pending_drains, repl_compression_thread_switches) while IO
threads update them in writeToReplicaCompressed/postWriteToReplica, creating a
data race; fix by making these counters atomic (e.g., _Atomic size_t or
atomic_ullong) or by protecting updates/reads with the replica lock and then use
atomic_load (or the lock) when building the INFO string in src/server.c; update
all increment/+= sites in writeToReplicaCompressed/postWriteToReplica to use
atomic_fetch_add (or acquire the same lock) and replace direct reads in the INFO
formatting block with atomic loads (or locked reads) to ensure race-free access.
In `@src/server.h`:
- Around line 1281-1288: The lifetime replication counters use size_t/long long
and will wrap on 32-bit builds; change repl_compressed_bytes_total,
repl_uncompressed_bytes_total, repl_compression_errors,
repl_compression_pending_drains and repl_compression_thread_switches from size_t
to uint64_t and change repl_compression_cpu_usec from long long to uint64_t
(leave last_processed_tid and affinity_tid as int), ensure the header includes
<stdint.h> and update any INFO/printf format specifiers or casts to use
PRIu64/uint64_t where these fields are printed; apply the same replacements to
the other identical field group mentioned in the comment.
---
Nitpick comments:
In @.github/workflows/ci.yml:
- Around line 212-220: Update the two actions/checkout steps (the "Install
libbacktrace" checkout using uses: actions/checkout@de0fac2e4500d... and the
"Checkout Valkey" checkout with the same uses) to include persist-credentials:
false under their with: blocks to prevent Git credentials from being persisted;
ensure you add the persist-credentials: false key alongside the existing
repository/ref/path keys and keep YAML indentation consistent.
In `@src/compression.h`:
- Around line 47-55: Add brief function-level comments for
compressionAlgoSupportsStreaming, compressionAlgoName, streamCompressorInit,
streamCompressorDestroy, streamDecompressorInit, and streamDecompressorDestroy
that describe each function's contract: the meaning of return values (e.g.,
true/false or 0/error codes), who owns any returned pointers or resources,
lifecycle expectations (caller allocates/initializes struct before/after call,
who must call Destroy), and any error conditions; place these comments
immediately above each declaration in the header so the public API is
self-describing and documents why the functions exist as well as how callers
must use them.
In `@src/rdb.c`:
- Around line 3151-3192: Add short contract comments for rdbInputStreamInit,
rdbInputStreamPrepare, rdbInputStreamValidateEnd, and rdbInputStreamDestroy
describing the required call order (Init() to set up the struct, optional
Prepare() to wrap raw_rio into a decompressor which may replace rdb_rio and set
RIO_FLAG_SKIP_RDB_CHECKSUM, ValidateEnd() to be called after reads to verify the
decompressor stream, and Destroy() to free the decompressor), the ownership
rules (input does not take ownership of the provided raw_rio; Prepare()
initializes internal decompressor state that must be cleaned up by Destroy()),
and return/behavior expectations (Prepare() returns DECOMPRESS_RIO_INIT_* codes,
Destroy() is idempotent, ValidateEnd() returns C_OK/C_ERR only when
initialized). Include these comments immediately above the definitions of
rdbInputStreamInit, rdbInputStreamPrepare, rdbInputStreamValidateEnd, and
rdbInputStreamDestroy and reference the struct fields input->raw_rio,
input->rdb_rio, input->decompressor, and input->initialized.
In `@src/rdb.h`:
- Around line 190-197: Add a clear declaration-level lifecycle contract comment
for the rdbInputStream type describing ownership and required call order: state
that callers own the struct, which fields are initialized by the creator vs by
Init/Prepare, and that the API must be used in order Init -> Prepare ->
ValidateEnd -> Destroy (with Destroy cleaning up raw_rio/rdb_rio/decompressor if
initialized). Reference the rdbInputStream type and the lifecycle functions
(Init, Prepare, ValidateEnd, Destroy) and document when initialized boolean is
set/cleared, which function takes/returns ownership of rio pointers, and that
calling functions out of order is undefined to prevent misuse.
In `@src/replication.c`:
- Around line 90-219: This change touches replication.c (functions like
markCompressedReplicasForDisconnect, replBalanceAffinity,
replicaInitCompressionOnPsync, replInitDecompression, replDestroyDecompression)
which per project policy requires an architecture-level review; please add
`@core-team` to the PR/reviewers and include a short justification referencing the
altered replication protocol negotiation, state transitions, and IO-thread
interaction so the core team can perform the requested architectural review
before merging.
In `@src/server.h`:
- Around line 3048-3055: Add brief header documentation above the new
replication-compression API declaring which functions operate on per-replica
state vs global/singleton state, the expected call order/lifecycle (e.g.
replInitCompression/replDestroyCompression per-client,
replInitDecompression/replDestroyDecompression once for the process, when to
call replRefreshDecompression and markCompressedReplicasForDisconnect), and the
return semantics (which functions return 0 on success / non-zero on error and
whether callers own/must free any returned resources). Reference the exact
symbols in the docs: markCompressedReplicasForDisconnect,
replInitCompression(client *c, compressionAlgo algo, int level),
replDestroyCompression(client *c), replBalanceAffinity,
replDecompressQueryBuf(client *c, size_t new_data_start), replInitDecompression,
replDestroyDecompression, and replRefreshDecompression so callers can quickly
see ownership and ordering.
In `@tests/integration/rdb-compression.tcl`:
- Around line 273-275: The test uses broad assertions like the catch/err +
assert_match "*Error*" pattern (e.g., the catch {r debug reload nosave} err
followed by assert_match "*Error*" $err) which is too generic; replace those
with precise checks that validate the expected failure reason—either call the
existing verify_log_message helper with the expected error/log substring or
change assert_match to match a specific error pattern (include the expected
text) for each scenario (also update the similar occurrences at the other
reported spots). Ensure you update the specific statements (the catch/err blocks
and their assert_match lines) so they assert the exact error or log message that
the test intends to trigger rather than the generic "*Error*".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 4465ea09-be4a-45a9-9f78-c6764e703003
📒 Files selected for processing (47)
.github/workflows/ci.ymlcmake/Modules/SourceFiles.cmakecmake/Modules/ValkeySetup.cmakedeps/CMakeLists.txtdeps/Makefiledeps/lz4/CMakeLists.txtdeps/lz4/LICENSEdeps/lz4/Makefiledeps/lz4/lz4.cdeps/lz4/lz4.hdeps/lz4/lz4frame.cdeps/lz4/lz4frame.hdeps/lz4/lz4hc.cdeps/lz4/lz4hc.hdeps/lz4/xxhash.cdeps/lz4/xxhash.hsrc/CMakeLists.txtsrc/Makefilesrc/aof.csrc/compression.csrc/compression.hsrc/compression_lz4.csrc/compression_lz4.hsrc/compression_rio.csrc/compression_rio.hsrc/compression_stream.csrc/compression_stream.hsrc/config.csrc/io_threads.csrc/networking.csrc/rdb.csrc/rdb.hsrc/replication.csrc/rio.csrc/rio.hsrc/server.csrc/server.hsrc/unit/CMakeLists.txtsrc/unit/Makefilesrc/unit/test_compression.cppsrc/unit/test_repl_compression.cppsrc/valkey-check-rdb.ctests/integration/rdb-compression.tcltests/integration/repl-compression.tcltests/integration/replication-aof-sync.tcltests/integration/valkey-check-rdb.tclvalkey.conf
| const BYTE* const srcStart = (const BYTE*)srcBuffer; | ||
| const BYTE* const srcEnd = srcStart + *srcSizePtr; |
There was a problem hiding this comment.
❓ Verification inconclusive
🏁 Script executed:
#!/bin/bash
# Verify the reachable NULL/0 probe path and the null-pointer arithmetic.
rg -n -C2 'LZ4F_decompress\(dctx, NULL, &o, NULL, &i, NULL\)|srcEnd = srcStart \+ \*srcSizePtr' deps/lz4/lz4frame.cRepository: valkey-io/valkey
Length of output: 590
Prevent UB from srcEnd = srcStart + *srcSizePtr when srcBuffer is NULL in deps/lz4/lz4frame.c
LZ4F_decompress(dctx, NULL, &o, NULL, &i, NULL) is called at ~line 1496, and the file later computes srcStart = (const BYTE*)srcBuffer and srcEnd = srcStart + *srcSizePtr at lines 1619–1620. If that block can run with srcBuffer == NULL, pointer arithmetic on a NULL-derived pointer is undefined in C; confirm the probe path’s i/*srcSizePtr value (and that the code guards/skips this arithmetic for the NULL/0 case) and fix via re-vendoring a corrected upstream LZ4 release rather than patching the vendor file in place.
🧰 Tools
🪛 Cppcheck (2.20.0)
[error] 1620-1620: Pointer addition with NULL pointer.
(nullPointerArithmetic)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@deps/lz4/lz4frame.c` around lines 1619 - 1620, The code computes srcStart =
(const BYTE*)srcBuffer and srcEnd = srcStart + *srcSizePtr even when srcBuffer
can be NULL (seen when calling LZ4F_decompress(dctx, NULL, &o, NULL, &i, NULL)),
which causes undefined behavior; instead of patching the vendored file inline,
update/re-vendor to an upstream LZ4 release that contains the fix, and while
doing so verify that the logic around LZ4F_decompress, srcBuffer, srcSizePtr,
srcStart and srcEnd guards/skips the pointer arithmetic when srcBuffer == NULL
or *srcSizePtr == 0 (i.e., ensure the code checks for NULL/zero before computing
srcEnd or uses a safe branch), then run the decompression unit tests that
exercise the NULL input path to confirm the upstream fix resolves the UB.
| ssize_t nread = read(fd, header, sizeof(header)); | ||
| int read_errno = errno; | ||
| close(fd); | ||
|
|
||
| if (nread == -1) { | ||
| errno = read_errno; | ||
| return -1; | ||
| } | ||
| if (nread < (ssize_t)sizeof(header)) return 0; |
There was a problem hiding this comment.
Don't treat a short probe as “not compressed.”
0 < nread < sizeof(header) currently returns 0, so a truncated sync RDB is treated as safe to reuse as an AOF base. That bypasses the new fallback path and can persist a broken base file instead of forcing BGREWRITEAOF.
🔎 Suggested fix
- ssize_t nread = read(fd, header, sizeof(header));
+ ssize_t nread;
+ do {
+ nread = read(fd, header, sizeof(header));
+ } while (nread == -1 && errno == EINTR);
int read_errno = errno;
close(fd);
if (nread == -1) {
errno = read_errno;
return -1;
}
- if (nread < (ssize_t)sizeof(header)) return 0;
+ if (nread < (ssize_t)sizeof(header)) {
+ errno = EINVAL;
+ return -1;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ssize_t nread = read(fd, header, sizeof(header)); | |
| int read_errno = errno; | |
| close(fd); | |
| if (nread == -1) { | |
| errno = read_errno; | |
| return -1; | |
| } | |
| if (nread < (ssize_t)sizeof(header)) return 0; | |
| ssize_t nread; | |
| do { | |
| nread = read(fd, header, sizeof(header)); | |
| } while (nread == -1 && errno == EINTR); | |
| int read_errno = errno; | |
| close(fd); | |
| if (nread == -1) { | |
| errno = read_errno; | |
| return -1; | |
| } | |
| if (nread < (ssize_t)sizeof(header)) { | |
| errno = EINVAL; | |
| return -1; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/aof.c` around lines 1014 - 1022, The code currently treats a short read
(0 < nread < sizeof(header)) as success by returning 0, which lets a truncated
RDB header be considered uncompressed; change this so any partial read is
treated as an error: after reading into header from fd (variables nread, header,
sizeof(header), fd), if nread == -1 restore read_errno and return -1 (as already
done), and if nread < (ssize_t)sizeof(header) also set errno to EIO (or restore
read_errno if appropriate) and return -1 so the caller will treat the probe as a
failure and trigger the fallback path instead of accepting a truncated header.
| size_t bytes_to_read = | ||
| (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; | ||
| ssize_t got = r->read_some(r, buf, bytes_to_read); | ||
|
|
There was a problem hiding this comment.
Add SSIZE_MAX bound check before read_some.
rioReadPartial() should reject requests above SSIZE_MAX (Line 506 onward) before calling backend read_some, to avoid backend-dependent truncation/sign behavior on large reads.
Proposed fix
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
+ if (bytes_to_read > (size_t)SSIZE_MAX) {
+ r->flags |= RIO_FLAG_READ_ERROR;
+ return -1;
+ }
ssize_t got = r->read_some(r, buf, bytes_to_read);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| size_t bytes_to_read = | |
| (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; | |
| ssize_t got = r->read_some(r, buf, bytes_to_read); | |
| size_t bytes_to_read = | |
| (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; | |
| if (bytes_to_read > (size_t)SSIZE_MAX) { | |
| r->flags |= RIO_FLAG_READ_ERROR; | |
| return -1; | |
| } | |
| ssize_t got = r->read_some(r, buf, bytes_to_read); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/rio.c` around lines 506 - 509, In rioReadPartial, before calling
r->read_some, ensure the computed bytes_to_read (the min of len and
r->max_processing_chunk) is not greater than SSIZE_MAX; if it is, reject the
request and return an appropriate error instead of invoking r->read_some to
avoid backend-dependent truncation/ sign issues. Update the logic around
bytes_to_read (used in the existing computation) to perform this SSIZE_MAX bound
check and return an error (and set errno consistently, e.g. EINVAL/ERANGE) when
exceeded; also include the required header for SSIZE_MAX if not present.
|
@CodeRabbit review the replication compression implementation only |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
cat src/replication.c | grep -n "compress\|Compress\|COMPRESS\|lz4\|LZ4\|decompres\|Decompres\|CAPA_COMPRESS\|replBalance\|markCompressed\|replInitDecomp\|replDestroyDecomp\|replRefreshDecomp\|replInitComp\|replDestroyComp" | head -200Length of output: 3685 🏁 Script executed: #!/bin/bash
cat src/networking.c | grep -n "compress\|Compress\|COMPRESS\|lz4\|LZ4\|decompres\|Decompres\|replDecomp\|replInitComp\|replDestroyComp\|compressed_buf\|writeToReplicaComp\|postWriteToReplica" | head -200Length of output: 9715 🏁 Script executed: #!/bin/bash
# Get broader context of replication.c compression functions
rg -n "markCompressedReplicasForDisconnect|replBalanceAffinity|replInitDecompression|replDestroyDecompression|replRefreshDecompression|REPLICA_CAPA_COMPRESSION|repl_compression|repl_stream_decoder" src/replication.c | head -150Length of output: 1831 🏁 Script executed: #!/bin/bash
# Read the compression-related sections from replication.c
awk '/markCompressedReplicasForDisconnect|replBalanceAffinity|replInitDecompression|replDestroyDecompression|replRefreshDecompression/{found=1; start=NR} found && NR>=start && NR<=start+60{print NR": "$0} NR>start+60{found=0}' src/replication.c | head -400Length of output: 19200 🏁 Script executed: #!/bin/bash
# Read networking.c compression functions
awk '/replInitCompression|replDestroyCompression|writeToReplicaCompressed|replDecompressQueryBuf|postWriteToReplica/{found=1; start=NR} found && NR>=start && NR<=start+80{print NR": "$0} NR>start+80{found=0}' src/networking.c | head -500Length of output: 22176 🏁 Script executed: #!/bin/bash
cat src/compression_stream.c | head -150Length of output: 4581 🏁 Script executed: #!/bin/bash
cat src/compression_lz4.cLength of output: 4981 🏁 Script executed: #!/bin/bash
cat src/io_threads.c | grep -n "compress\|Compress\|replBalance\|affinity\|repl_stream_decoder\|repl_compression\|COMPRESSION" | head -100Length of output: 747 🏁 Script executed: #!/bin/bash
# Read server.h for the ClientReplicationData and valkeyServer compression fields
rg -n "repl_compressor|compressed_buf|repl_stream_decoder|repl_compression|REPLICA_CAPA_COMPRESS|compression_error|ClientReplicationData|repl_decompression|repl_apply" src/server.h | head -100Length of output: 2393
Replication Compression ReviewArchitecture SummaryThe design is sound: one long-lived LZ4 frame per replica ( Issues Found1.
|
# Conflicts: # src/io_threads.c # src/networking.c # src/replication.c
|
Actionable comments posted: 0 |
sarthakaggarwal97
left a comment
There was a problem hiding this comment.
Thanks @roshkhatri for this PR. I took an initial look at this change. The idea looks sound to me, and I have posted some comments related to API. I do have another suggestion.
At this point, based on the numbers, thread-affinity doesn't look too helpful. If we conclude that this is helpful. It would be a good follow up PR to this change. I think you also mentioned this offline.
--------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
|
@CodeRabbit That C_ERR branch for replRefreshDecompression is unreachable (replDecompressorCreate only fails on allocation, which zmalloc aborts on; codec init is lazy in replDecompressorDecode and already returns REPL_DECODE_ERR -> disconnect), so I removed the dead C_ERR plumbing and made replRefreshDecompression void |
|
Tip For best results, initiate chat on the files or code changes.
Issue |
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
|
Test failures are not related to this PR change |
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
…ion-streaming-compression-pr
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
sarthakaggarwal97
left a comment
There was a problem hiding this comment.
Sharing some more comments on the low level implementation.
| const uint8_t *in = src; | ||
| size_t off = 0; | ||
|
|
||
| /* Probe phase: classify the stream from its leading bytes. The magic may |
There was a problem hiding this comment.
why do we need the probe phase? Through CAPA we would know if the stream is compressed or not right?
There was a problem hiding this comment.
No, it will not know if the stream is compresses as the primary would only compress if compression is enabled on primary.
The key distinction of CAPA is that it tells the primary that the replica can decode compression and has the capabilty of decompression, but doesn't guarantee the primary will actually use it, so thats the reason probe phase is needed to detect whether the incoming stream is actually compressed.
| /* ===== Primary-side per-replica compressor ===== */ | ||
|
|
||
| /* Emit callback for the streamWriter: append compressed bytes to out_buf. */ | ||
| static int replCompressorEmit(void *ctx, const uint8_t *data, size_t len) { |
There was a problem hiding this comment.
This adds a second compressed-output buffer on top of streamWriter's internal out_buf: compression writes into the writer scratch, then this callback copies into replCompressor.out_buf for socket staging.
We do need staging for partial socket writes, but I don't think we need both buffers. Could the stream writer emit directly into the per-replica staging buffer, or let the replication adapter provide the output buffer used by the codec? That would remove one memcpy per emitted compressed chunk.
| if (c->querybuf == thread_shared_qb && sdsavail(c->querybuf) < decompressed_len) { | ||
| initSharedQueryBuf(); | ||
| } | ||
| c->querybuf = sdscatlen(c->querybuf, decode_buf, decompressed_len); |
There was a problem hiding this comment.
This path does two copies for every decompressed byte: codec output -> rd->decode_buf, then rd->decode_buf -> c->querybuf.
Since replica-side decompression runs on the main thread, I think we should be stricter about copies here. Can the adapter decode directly into the query buffer after preserving the pre-read prefix? Something like replDecompressorDecodeIntoSds(..., &c->querybuf, ...) would keep the ownership clear and remove a full memcpy from the hot path.
There was a problem hiding this comment.
We would need one copy, as like codec outputs to rd->decode_buf, then rd->decode_buf -> c->querybuf. as we cannot replace the compressed bytes with the decompressed bytes in place as decompressed bytes will be larger and will replace the compressed ones. We need preserving the compressed bytes until we finish decompressing the batch. and then replace the compressed set of bytes with the decompressed ones
| "slave_read_only:%d\r\n", server.repl_replica_ro, | ||
| "replica_announced:%d\r\n", server.replica_announced)); | ||
| if (server.repl_decompression_errors > 0) { | ||
| info = sdscatfmt(info, "repl_decompression_errors:%U\r\n", |
There was a problem hiding this comment.
The new replication decompression counters are emitted in INFO, but I don't see them reset by CONFIG RESETSTAT.
If these are intended to behave like the adjacent replication/network counters, we should reset repl_decompression_errors, repl_decompression_cpu_usec, and repl_decompressed_bytes_total as well.
|
@coderabbitai do full review for replication compression |
|
✅ Action performedFull review finished. |
There was a problem hiding this comment.
Actionable comments posted: 9
♻️ Duplicate comments (1)
src/compression_lz4.c (1)
91-129: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick winMake undersized-output failures atomic.
Once
LZ4F_compressBegin()succeeds, the frame header may already be inoutputandcctxhas advanced. The lateroffset >= output_capacitybranches therefore are not safely retriable: a caller with room only for the header gets-1, and a retry on the same compressor can continue without re-emitting the header.#!/bin/bash set -euo pipefail printf '\n--- compression feed implementation ---\n' sed -n '80,140p' src/compression_lz4.c printf '\n--- public feed contract ---\n' sed -n '65,90p' src/compression.h printf '\n--- output-bound helper and codec dispatch ---\n' sed -n '1,90p' src/compression.c🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/compression_lz4.c` around lines 91 - 129, The undersized-output handling in the LZ4 feed path is not atomic because `LZ4F_compressBegin()` can partially emit the frame header before later `offset >= output_capacity` checks return `-1`. Update `compression_lz4.c`’s feed/compress routine so any output-capacity failure after `LZ4F_compressBegin()` is treated as a committed partial write, and make retries use a fresh compressor state instead of continuing from `cctx`/`stream_started`; align the retry contract with the public feed behavior in `compression.h` and the dispatch path in `compression.c`.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.github/workflows/ci.yml:
- Around line 221-229: Both `actions/checkout` steps in the workflow currently
persist the GitHub token in local git config, which is unnecessary for this job.
Update the `Install libbacktrace` checkout and the subsequent `Checkout Valkey`
step to set `persist-credentials: false` so the token is not left available to
the PR-controlled `make` and `./runtest` commands.
In `@src/compression_lz4.c`:
- Around line 49-52: The LZ4 init path in compressionLz4CompressorInit and
compressionLz4DecompressorInit should not assert on a NULL context and then
still report success; update these init functions to detect allocation failure
from the
LZ4F_createCompressionContext_advanced/LZ4F_createDecompressionContext_advanced
calls, return -1 on failure, and leave cleanup to
streamCompressorInit/streamDecompressorInit so the wrapper can handle the error
path correctly.
In `@src/compression_repl.c`:
- Around line 108-129: The no-progress path in replDecodeFeed() drops unconsumed
compressed input, so replDecompressQueryBuf() may discard a partial frame and
corrupt replication. Update replDecompressor/replDecodeFeed() to retain pending
bytes across calls (or otherwise report how many bytes were actually consumed)
when streamDecompressorFeed() returns consumed == 0 and produced == 0, and only
allow the caller to trim the query buffer by the true consumed amount. Keep the
behavior aligned with the stream reader contract used by compression_stream.c so
split headers/blocks are preserved until they can be decoded.
In `@src/compression_stream.c`:
- Around line 259-301: The stream writer does not stop accepting operations
after a failure, so later calls can proceed on partially advanced state. Update
streamWriterWrite, streamWriterFlush, and streamWriterFinish to check
writer->errored up front and return failure/no-op immediately, and ensure
streamWriterFinish only marks writer->finished after streamWriterFeedAndEmit
succeeds so a failed final emit stays sticky. Use the existing
streamWriterWrite, streamWriterFlush, streamWriterFinish, and
writer->errored/finished fields to keep all public writer operations rejected
once an error has occurred.
In `@src/unit/test_compression.cpp`:
- Around line 304-345: The current regression test in
streamCompressorFeedErrorRecovery only covers the fully pre-frame failure path,
so add a second subcase that forces streamCompressorFeed() to emit the LZ4 frame
header and then fail before completing the flush/write. Use the existing
streamCompressorInit, streamCompressorOutputBound, and streamCompressorFeed flow
to trigger the partial-write error, then retry on the same stream and verify the
output still round-trips correctly instead of continuing from a corrupted state.
Keep the test in the same GoogleTest fixture and extend the assertions around
stream_started and the retry behavior to cover this header-written recovery
path.
In `@src/valkey-check-rdb.c`:
- Around line 851-854: The checksum-logging branch in the RDB checker is using
the wrong transport flag and misclassifies decompressed inputs. Update the
condition in the RDB check logic around the RIO flag handling to key off the
decompression wrapper state instead of RIO_FLAG_STREAMING_COMPRESSION, so
compressed inputs are recognized correctly before falling through to the generic
skip message. Use the existing checksum decision block in the checker function
that logs via rdbCheckInfo as the place to make this change.
In `@tests/integration/repl-compression.tcl`:
- Line 1: The repl-compression integration suite is still registered only under
the generic repl/external:skip tags, so the new repl-compression tag in
server.tcl never controls these tests. Update the tag declaration in
repl-compression.tcl to use repl-compression as the top-level tag and remove the
custom repl-compression-suite deny-tag gating so --tags repl-compression and
--tags -repl-compression correctly include or exclude the suite.
- Around line 79-114: The test temporarily changes the primary’s
repl-compression setting in the “Replica with repl-compression lz4-stream and
disk-backed load also negotiates compression” case, but it only restores it on
the success path. Update the test around the $primary config set and $primary
config set repl-compression no calls to use a try/finally pattern so the
original setting is always restored even if any wait_for_condition,
assert_equal, or replicaof step fails.
In `@tests/integration/replication-aof-sync.tcl`:
- Around line 167-214: The new replication restart test is asserting RDB reuse
for a primary using lz4-stream compression, but the current AOF sync path still
falls back to BGREWRITEAOF for that case. Either update the AOF sync/reuse logic
in the runtime code path that handles `src/aof.c` so `wait_for_sync` can reuse
the synced RDB as the AOF base, or adjust this test in
`replication-aof-sync.tcl` to expect the fallback log and non-reuse behavior;
keep the checks around `wait_for_sync`, `log_file_matches`, and
`get_cur_base_aof_name` consistent with whichever behavior is intended.
---
Duplicate comments:
In `@src/compression_lz4.c`:
- Around line 91-129: The undersized-output handling in the LZ4 feed path is not
atomic because `LZ4F_compressBegin()` can partially emit the frame header before
later `offset >= output_capacity` checks return `-1`. Update
`compression_lz4.c`’s feed/compress routine so any output-capacity failure after
`LZ4F_compressBegin()` is treated as a committed partial write, and make retries
use a fresh compressor state instead of continuing from `cctx`/`stream_started`;
align the retry contract with the public feed behavior in `compression.h` and
the dispatch path in `compression.c`.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 7d6c12b0-40b0-4924-b389-7d7f1c8923dc
📒 Files selected for processing (58)
.github/workflows/ci.ymlcmake/Modules/SourceFiles.cmakecmake/Modules/ValkeySetup.cmakedeps/CMakeLists.txtdeps/Makefiledeps/lz4/CMakeLists.txtdeps/lz4/LICENSEdeps/lz4/Makefiledeps/lz4/lz4.cdeps/lz4/lz4.hdeps/lz4/lz4frame.cdeps/lz4/lz4frame.hdeps/lz4/lz4hc.cdeps/lz4/lz4hc.hdeps/lz4/xxhash.cdeps/lz4/xxhash.hsrc/CMakeLists.txtsrc/Makefilesrc/aof.csrc/compression.csrc/compression.hsrc/compression_lz4.csrc/compression_lz4.hsrc/compression_repl.csrc/compression_repl.hsrc/compression_rio.csrc/compression_rio.hsrc/compression_stream.csrc/compression_stream.hsrc/config.csrc/io_threads.csrc/networking.csrc/rdb.csrc/rdb.hsrc/replication.csrc/rio.csrc/rio.hsrc/server.csrc/server.hsrc/unit/CMakeLists.txtsrc/unit/Makefilesrc/unit/test_compression.cppsrc/unit/test_repl_compression.cppsrc/valkey-check-rdb.ctests/integration/block-repl.tcltests/integration/dual-channel-replication.tcltests/integration/rdb-compression.tcltests/integration/repl-compression.tcltests/integration/replication-2.tcltests/integration/replication-3.tcltests/integration/replication-4.tcltests/integration/replication-aof-sync.tcltests/integration/replication.tcltests/integration/skip-rdb-checksum.tcltests/integration/valkey-check-rdb.tcltests/support/server.tcltests/test_helper.tclvalkey.conf
| - name: Install libbacktrace | ||
| uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | ||
| with: | ||
| repository: ianlancetaylor/libbacktrace | ||
| ref: b9e40069c0b47a722286b94eb5231f7f05c08713 | ||
| path: libbacktrace | ||
| - run: cd libbacktrace && ./configure && make && sudo make install | ||
| - name: Checkout Valkey | ||
| uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 |
There was a problem hiding this comment.
🔒 Security & Privacy | 🟠 Major | ⚡ Quick win
Disable credential persistence on both checkout steps.
Both new actions/checkout steps leave the workflow token in local git config, and this job then runs PR-controlled make/./runtest. That gives the new job an unnecessary token-exfil path. Set persist-credentials: false on both checkouts.
Suggested change
- name: Install libbacktrace
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
+ persist-credentials: false
repository: ianlancetaylor/libbacktrace
ref: b9e40069c0b47a722286b94eb5231f7f05c08713
path: libbacktrace
@@
- name: Checkout Valkey
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ with:
+ persist-credentials: false📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - name: Install libbacktrace | |
| uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | |
| with: | |
| repository: ianlancetaylor/libbacktrace | |
| ref: b9e40069c0b47a722286b94eb5231f7f05c08713 | |
| path: libbacktrace | |
| - run: cd libbacktrace && ./configure && make && sudo make install | |
| - name: Checkout Valkey | |
| uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | |
| - name: Install libbacktrace | |
| uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | |
| with: | |
| persist-credentials: false | |
| repository: ianlancetaylor/libbacktrace | |
| ref: b9e40069c0b47a722286b94eb5231f7f05c08713 | |
| path: libbacktrace | |
| - run: cd libbacktrace && ./configure && make && sudo make install | |
| - name: Checkout Valkey | |
| uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | |
| with: | |
| persist-credentials: false |
🧰 Tools
🪛 zizmor (1.26.1)
[warning] 221-226: credential persistence through GitHub Actions artifacts (artipacked): does not set persist-credentials: false
(artipacked)
[warning] 228-229: credential persistence through GitHub Actions artifacts (artipacked): does not set persist-credentials: false
(artipacked)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @.github/workflows/ci.yml around lines 221 - 229, Both `actions/checkout`
steps in the workflow currently persist the GitHub token in local git config,
which is unnecessary for this job. Update the `Install libbacktrace` checkout
and the subsequent `Checkout Valkey` step to set `persist-credentials: false` so
the token is not left available to the PR-controlled `make` and `./runtest`
commands.
Source: Linters/SAST tools
| static replDecodeResult replDecodeFeed(replDecompressor *rd, const uint8_t *in, size_t len, size_t output_max) { | ||
| size_t off = 0; | ||
| while (off < len) { | ||
| char out_buf[REPL_DECODE_CHUNK]; | ||
| size_t consumed = 0; | ||
| ssize_t produced = streamDecompressorFeed(&rd->decompressor, (uint8_t *)out_buf, sizeof(out_buf), | ||
| in + off, len - off, &consumed); | ||
| if (produced < 0 || consumed > len - off) return REPL_DECODE_ERR; | ||
| if (produced > 0) { | ||
| if (sdslen(rd->decode_buf) + (size_t)produced > output_max) return REPL_DECODE_OVERFLOW; | ||
| rd->decode_buf = sdscatlen(rd->decode_buf, out_buf, (size_t)produced); | ||
| } | ||
| off += consumed; | ||
| /* A long-lived replication stream must never reach a compressed frame | ||
| * end. If it does, the stream is corrupt or the primary sent an | ||
| * unexpected terminator: the caller should disconnect. */ | ||
| if (rd->decompressor.frame_done) return REPL_DECODE_FRAME_DONE; | ||
| /* No progress: the codec has buffered a partial frame and needs more | ||
| * bytes. Resume on the next read tick. */ | ||
| if (consumed == 0 && produced == 0) break; | ||
| } | ||
| return REPL_DECODE_OK; |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🔴 Critical | 🏗️ Heavy lift
Preserve unconsumed compressed input before returning REPL_DECODE_OK.
When streamDecompressorFeed() returns consumed == 0 && produced == 0, this adapter breaks without retaining in + off. replDecompressQueryBuf() then treats REPL_DECODE_OK as permission to remove the whole raw range from c->querybuf, so a split compressed block/header can be dropped and corrupt the replication stream. Keep a pending compressed buffer in replDecompressor (or return consumed bytes so the caller can retain the tail) before accepting partial input. The stream reader contract in compression_stream.c preserves the unconsumed buffer in this exact no-progress case.
Suggested direction
-static replDecodeResult replDecodeFeed(replDecompressor *rd, const uint8_t *in, size_t len, size_t output_max) {
+static replDecodeResult replDecodeFeed(replDecompressor *rd, const uint8_t *in, size_t len,
+ size_t output_max, size_t *consumed_total) {
size_t off = 0;
+ *consumed_total = 0;
while (off < len) {
...
off += consumed;
+ *consumed_total = off;
...
if (consumed == 0 && produced == 0) break;
}
return REPL_DECODE_OK;
}Then append new compressed bytes to a replDecompressor pending SDS, drain from that SDS, and sdsrange() away only consumed_total bytes after a successful feed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/compression_repl.c` around lines 108 - 129, The no-progress path in
replDecodeFeed() drops unconsumed compressed input, so replDecompressQueryBuf()
may discard a partial frame and corrupt replication. Update
replDecompressor/replDecodeFeed() to retain pending bytes across calls (or
otherwise report how many bytes were actually consumed) when
streamDecompressorFeed() returns consumed == 0 and produced == 0, and only allow
the caller to trim the query buffer by the true consumed amount. Keep the
behavior aligned with the stream reader contract used by compression_stream.c so
split headers/blocks are preserved until they can be decoded.
| ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) { | ||
| /* Writes after finish are a caller bug; silently dropping them would | ||
| * corrupt the consumer's view of the stream. */ | ||
| if (writer->finished) return -1; | ||
| if (len == 0) return 0; | ||
| if (len > (size_t)SSIZE_MAX) return -1; | ||
|
|
||
| const uint8_t *src = (const uint8_t *)buf; | ||
| size_t remaining = len; | ||
| uint64_t emitted_before = writer->bytes_emitted; | ||
| if (streamWriterEnsureEnvelope(writer) != 0) return -1; | ||
| while (remaining > 0) { | ||
| size_t chunk_len = remaining < STREAM_WRITER_INPUT_CHUNK_SIZE | ||
| ? remaining | ||
| : STREAM_WRITER_INPUT_CHUNK_SIZE; | ||
| if (streamWriterFeedAndEmit(writer, src, chunk_len, FLUSH_CONTINUE) != 0) return -1; | ||
| src += chunk_len; | ||
| remaining -= chunk_len; | ||
| } | ||
| uint64_t emitted_delta = writer->bytes_emitted - emitted_before; | ||
| if (emitted_delta > (uint64_t)SSIZE_MAX) { | ||
| writer->errored = true; | ||
| return -1; | ||
| } | ||
| return (ssize_t)emitted_delta; | ||
| } | ||
|
|
||
| int streamWriterFlush(streamWriter *writer) { | ||
| /* Flush after finish is a no-op: frame is already closed. */ | ||
| if (writer->finished) return 0; | ||
|
|
||
| if (!writer->envelope_written || !writer->compressor.stream_started) return 0; | ||
| return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_SYNC); | ||
| } | ||
|
|
||
| int streamWriterFinish(streamWriter *writer) { | ||
| if (writer->finished) return 0; | ||
| writer->finished = true; | ||
|
|
||
| /* Even an empty stream produces a valid envelope + empty frame so the | ||
| * loader sees a well-formed file. */ | ||
| if (streamWriterEnsureEnvelope(writer) != 0) return -1; | ||
| return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_END); |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Make writer errors sticky before accepting more stream operations.
After an emit failure, compressor state may already have advanced, but streamWriterWrite() can still accept later input, and streamWriterFinish() can report success on a repeated call because finished was set before the failed final-frame emit. Reject all public writer operations once writer->errored is set.
Proposed fix
ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) {
+ if (writer->errored) return -1;
/* Writes after finish are a caller bug; silently dropping them would
* corrupt the consumer's view of the stream. */
if (writer->finished) return -1;
@@
int streamWriterFlush(streamWriter *writer) {
+ if (writer->errored) return -1;
/* Flush after finish is a no-op: frame is already closed. */
if (writer->finished) return 0;
@@
int streamWriterFinish(streamWriter *writer) {
+ if (writer->errored) return -1;
if (writer->finished) return 0;
writer->finished = true;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) { | |
| /* Writes after finish are a caller bug; silently dropping them would | |
| * corrupt the consumer's view of the stream. */ | |
| if (writer->finished) return -1; | |
| if (len == 0) return 0; | |
| if (len > (size_t)SSIZE_MAX) return -1; | |
| const uint8_t *src = (const uint8_t *)buf; | |
| size_t remaining = len; | |
| uint64_t emitted_before = writer->bytes_emitted; | |
| if (streamWriterEnsureEnvelope(writer) != 0) return -1; | |
| while (remaining > 0) { | |
| size_t chunk_len = remaining < STREAM_WRITER_INPUT_CHUNK_SIZE | |
| ? remaining | |
| : STREAM_WRITER_INPUT_CHUNK_SIZE; | |
| if (streamWriterFeedAndEmit(writer, src, chunk_len, FLUSH_CONTINUE) != 0) return -1; | |
| src += chunk_len; | |
| remaining -= chunk_len; | |
| } | |
| uint64_t emitted_delta = writer->bytes_emitted - emitted_before; | |
| if (emitted_delta > (uint64_t)SSIZE_MAX) { | |
| writer->errored = true; | |
| return -1; | |
| } | |
| return (ssize_t)emitted_delta; | |
| } | |
| int streamWriterFlush(streamWriter *writer) { | |
| /* Flush after finish is a no-op: frame is already closed. */ | |
| if (writer->finished) return 0; | |
| if (!writer->envelope_written || !writer->compressor.stream_started) return 0; | |
| return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_SYNC); | |
| } | |
| int streamWriterFinish(streamWriter *writer) { | |
| if (writer->finished) return 0; | |
| writer->finished = true; | |
| /* Even an empty stream produces a valid envelope + empty frame so the | |
| * loader sees a well-formed file. */ | |
| if (streamWriterEnsureEnvelope(writer) != 0) return -1; | |
| return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_END); | |
| ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) { | |
| if (writer->errored) return -1; | |
| /* Writes after finish are a caller bug; silently dropping them would | |
| * corrupt the consumer's view of the stream. */ | |
| if (writer->finished) return -1; | |
| if (len == 0) return 0; | |
| if (len > (size_t)SSIZE_MAX) return -1; | |
| const uint8_t *src = (const uint8_t *)buf; | |
| size_t remaining = len; | |
| uint64_t emitted_before = writer->bytes_emitted; | |
| if (streamWriterEnsureEnvelope(writer) != 0) return -1; | |
| while (remaining > 0) { | |
| size_t chunk_len = remaining < STREAM_WRITER_INPUT_CHUNK_SIZE | |
| ? remaining | |
| : STREAM_WRITER_INPUT_CHUNK_SIZE; | |
| if (streamWriterFeedAndEmit(writer, src, chunk_len, FLUSH_CONTINUE) != 0) return -1; | |
| src += chunk_len; | |
| remaining -= chunk_len; | |
| } | |
| uint64_t emitted_delta = writer->bytes_emitted - emitted_before; | |
| if (emitted_delta > (uint64_t)SSIZE_MAX) { | |
| writer->errored = true; | |
| return -1; | |
| } | |
| return (ssize_t)emitted_delta; | |
| } | |
| int streamWriterFlush(streamWriter *writer) { | |
| if (writer->errored) return -1; | |
| /* Flush after finish is a no-op: frame is already closed. */ | |
| if (writer->finished) return 0; | |
| if (!writer->envelope_written || !writer->compressor.stream_started) return 0; | |
| return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_SYNC); | |
| } | |
| int streamWriterFinish(streamWriter *writer) { | |
| if (writer->errored) return -1; | |
| if (writer->finished) return 0; | |
| writer->finished = true; | |
| /* Even an empty stream produces a valid envelope + empty frame so the | |
| * loader sees a well-formed file. */ | |
| if (streamWriterEnsureEnvelope(writer) != 0) return -1; | |
| return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_END); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/compression_stream.c` around lines 259 - 301, The stream writer does not
stop accepting operations after a failure, so later calls can proceed on
partially advanced state. Update streamWriterWrite, streamWriterFlush, and
streamWriterFinish to check writer->errored up front and return failure/no-op
immediately, and ensure streamWriterFinish only marks writer->finished after
streamWriterFeedAndEmit succeeds so a failed final emit stays sticky. Use the
existing streamWriterWrite, streamWriterFlush, streamWriterFinish, and
writer->errored/finished fields to keep all public writer operations rejected
once an error has occurred.
| /* --- Test: pre-frame errors are recoverable, mid-frame errors are permanent --- */ | ||
| TEST_F(CompressionTest, streamCompressorFeedErrorRecovery) { | ||
| streamCompressor sc; | ||
| ASSERT_EQ(streamCompressorInit(&sc, ALGO_LZ4, 0), 0); | ||
|
|
||
| /* Pre-frame error: compressBegin fails with tiny buffer, but no frame | ||
| * bytes have been emitted yet — this is recoverable. */ | ||
| uint8_t tiny[1]; | ||
| ssize_t ret = streamCompressorFeed(&sc, tiny, 1, | ||
| (const uint8_t *)"test data", 9, FLUSH_END); | ||
| ASSERT_EQ(ret, -1) << "should fail with tiny buffer"; | ||
| ASSERT_EQ(sc.stream_started, false) << "stream_started should still be false"; | ||
|
|
||
| /* Retry with a proper buffer — should succeed */ | ||
| size_t bound = streamCompressorOutputBound(&sc, 9); | ||
| uint8_t *buf = (uint8_t *)zmalloc(bound); | ||
| ssize_t ret2 = streamCompressorFeed(&sc, buf, bound, | ||
| (const uint8_t *)"test data", 9, FLUSH_END); | ||
| ASSERT_GT(ret2, 0) << "retry after pre-frame error should succeed"; | ||
| zfree(buf); | ||
| streamCompressorFree(&sc); | ||
|
|
||
| /* Mid-frame error: start a frame, then force an error with a tiny buffer. */ | ||
| streamCompressor sc2; | ||
| ASSERT_EQ(streamCompressorInit(&sc2, ALGO_LZ4, 0), 0); | ||
|
|
||
| size_t bound2 = streamCompressorOutputBound(&sc2, 5); | ||
| uint8_t *buf2 = (uint8_t *)zmalloc(bound2); | ||
| ssize_t ret3 = streamCompressorFeed(&sc2, buf2, bound2, | ||
| (const uint8_t *)"hello", 5, FLUSH_CONTINUE); | ||
| ASSERT_GE(ret3, 0) << "first write should succeed"; | ||
| ASSERT_EQ(sc2.stream_started, true) << "stream should be started"; | ||
|
|
||
| uint8_t tiny2[1]; | ||
| ssize_t ret4 = streamCompressorFeed(&sc2, tiny2, 1, | ||
| (const uint8_t *)"more data to compress", 21, | ||
| FLUSH_END); | ||
| ASSERT_EQ(ret4, -1) << "mid-frame error should fail"; | ||
|
|
||
| zfree(buf2); | ||
| streamCompressorFree(&sc2); | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Add a regression for the header-written retry path.
This test only covers the fully pre-frame failure (tiny[1]). The still-open corruption case is when streamCompressorFeed() has enough output space to emit the LZ4 frame header but not enough to finish the write/flush; that can return -1 after mutating the stream, and the retry may continue without a fresh header. Please add a subcase that fails after the header is emitted and then verifies the retried stream is still a valid round-trip.
As per coding guidelines, src/unit/**/*.{c,cpp}: Unit tests live under src/unit/ and use GoogleTest (gtest/gmock). Data-structure and low-level logic changes should be covered by unit tests.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/unit/test_compression.cpp` around lines 304 - 345, The current regression
test in streamCompressorFeedErrorRecovery only covers the fully pre-frame
failure path, so add a second subcase that forces streamCompressorFeed() to emit
the LZ4 frame header and then fail before completing the flush/write. Use the
existing streamCompressorInit, streamCompressorOutputBound, and
streamCompressorFeed flow to trigger the partial-write error, then retry on the
same stream and verify the output still round-trips correctly instead of
continuing from a corrupted state. Keep the test in the same GoogleTest fixture
and extend the assertions around stream_started and the retry behavior to cover
this header-written recovery path.
Source: Coding guidelines
| if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) { | ||
| rdbCheckInfo("Skipping logical RDB checksum for streaming-compressed input."); | ||
| } else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) { | ||
| rdbCheckInfo("RDB file was saved with checksum disabled: skipped checksum for this transfer."); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win
Check the decompression flag in the checker.
Line 851 uses RIO_FLAG_STREAMING_COMPRESSION, but this path is reading through a decompression wrapper, so compressed inputs fall through to the generic skip message.
Proposed fix
- if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
+ if ((rdb->flags & RIO_FLAG_STREAMING_DECOMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) { | |
| rdbCheckInfo("Skipping logical RDB checksum for streaming-compressed input."); | |
| } else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) { | |
| rdbCheckInfo("RDB file was saved with checksum disabled: skipped checksum for this transfer."); | |
| if ((rdb->flags & RIO_FLAG_STREAMING_DECOMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) { | |
| rdbCheckInfo("Skipping logical RDB checksum for streaming-compressed input."); | |
| } else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) { | |
| rdbCheckInfo("RDB file was saved with checksum disabled: skipped checksum for this transfer."); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/valkey-check-rdb.c` around lines 851 - 854, The checksum-logging branch
in the RDB checker is using the wrong transport flag and misclassifies
decompressed inputs. Update the condition in the RDB check logic around the RIO
flag handling to key off the decompression wrapper state instead of
RIO_FLAG_STREAMING_COMPRESSION, so compressed inputs are recognized correctly
before falling through to the generic skip message. Use the existing checksum
decision block in the checker function that logs via rdbCheckInfo as the place
to make this change.
| test "Replica restart reuses disk-based sync RDB when primary rdbcompression is lz4-stream" { | ||
| start_server {overrides {appendonly yes aof-use-rdb-preamble yes repl-diskless-sync no save "" rdbcompression lz4-stream}} { | ||
| set primary [srv 0 client] | ||
| set primary_host [srv 0 host] | ||
| set primary_port [srv 0 port] | ||
|
|
||
| for {set i 0} {$i < 40} {incr i} { | ||
| $primary set "lz4-key:$i" "value:$i" | ||
| } | ||
| waitForBgrewriteaof $primary | ||
|
|
||
| start_server {overrides {appendonly yes aof-use-rdb-preamble yes repl-diskless-sync no save ""}} { | ||
| set replica [srv 0 client] | ||
| set replica_log [srv 0 stdout] | ||
|
|
||
| $replica replicaof $primary_host $primary_port | ||
| wait_for_sync $replica | ||
|
|
||
| wait_for_condition 50 100 { | ||
| [log_file_matches $replica_log "*Reused RDB file from primary sync as AOF base file*"] | ||
| } else { | ||
| fail "Expected log message about reusing RDB file not found" | ||
| } | ||
|
|
||
| assert {![log_file_matches $replica_log "*uses streaming compression, falling back to BGREWRITEAOF*"]} | ||
| set manifest_path [get_aof_manifest_path $replica] | ||
| set base_name [get_cur_base_aof_name $manifest_path] | ||
| assert {$base_name ne ""} | ||
| assert {[string match "*.rdb" $base_name]} | ||
|
|
||
| for {set i 40} {$i < 60} {incr i} { | ||
| $primary set "lz4-key:$i" "value:$i" | ||
| } | ||
| wait_for_ofs_sync $primary $replica | ||
|
|
||
| $replica replicaof no one | ||
|
|
||
| restart_server 0 true false | ||
| set replica [srv 0 client] | ||
| wait_done_loading $replica | ||
|
|
||
| assert_equal 60 [$replica dbsize] | ||
| for {set i 0} {$i < 60} {incr i} { | ||
| assert_equal "value:$i" [$replica get "lz4-key:$i"] | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
This test expects the opposite of the current AOF fallback behavior.
With rdbcompression lz4-stream on Line 168, the synced dump.rdb is streaming-compressed. src/aof.c still treats that file as non-reusable, logs the falling back to BGREWRITEAOF message, and returns C_ERR instead of reusing it as the AOF base. Unless that runtime-layer guard is removed in the same stack, this test will fail deterministically. Either update the runtime behavior with this PR or flip the assertions here to validate the fallback path.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/replication-aof-sync.tcl` around lines 167 - 214, The new
replication restart test is asserting RDB reuse for a primary using lz4-stream
compression, but the current AOF sync path still falls back to BGREWRITEAOF for
that case. Either update the AOF sync/reuse logic in the runtime code path that
handles `src/aof.c` so `wait_for_sync` can reuse the synced RDB as the AOF base,
or adjust this test in `replication-aof-sync.tcl` to expect the fallback log and
non-reuse behavior; keep the checks around `wait_for_sync`, `log_file_matches`,
and `get_cur_base_aof_name` consistent with whichever behavior is intended.
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
…ion-streaming-compression-pr
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Summary
Adds per-replica streaming compression for the incremental replication stream on top of #3531, with
lz4as the first supported codec. Negotiated per-replica via the existing REPLCONFcapahandshake (REPLICA_CAPA_COMPRESSION); default off (repl-compression no); replicas that don't advertise the capability stay uncompressed. Works for diskless full sync, disk-based full sync, and partial (PSYNC) resyncs.Compression runs off the main thread on the existing IO-thread write path. All per-replica compression state and the replica-side decoder live behind a small
compression_repl.cadapter (mirroringcompression_rio.c).Headline results (BlockMesh tweets, 3M keys x ~315 B):
Config added:
repl-compression(enum, defaultno):noorlz4-stream. Enables compression on a primary or replica.Related to #3531. ZSTD support follows in #3798.
See the fork review PR: roshkhatri#18
Full description: design, threading model, config, observability, benchmarks
The replication stream from primary to replica is wrapped in a
VCSenvelope (STREAM_KIND_REPL) and compressed as a single long-lived frame at the per-replica buffer layer. Default behavior is unchanged withrepl-compression no. The negotiation is per-replica via the REPLCONFcapahandshake; the newREPLICA_CAPA_COMPRESSIONcapability lets each side opt in independently.Architecture
Threading model
Primary side: compression runs on whichever IO thread picks up the replica's write job from the shared inbox. There is no dedicated compression thread and no per-replica thread pinning; the per-replica
replCompressorstate lives inclient->repl_dataand is touched only by the IO thread currently processing that client (or the main thread before any IO dispatch).Replica side: decompression runs inline on the main thread inside the existing
readQueryFromClientpath. No worker threads; decompressed bytes feed directly intoprocessInputBuffer.Key design decisions
Capability negotiation reuses the existing REPLCONF
capahandshake. Replicas advertisecapa compressionwhenrepl-compressionis enabled; the primary recordsREPLICA_CAPA_COMPRESSIONon the per-replica bitmask and compresses only for replicas that opted in and when its ownrepl-compressionis enabled. The replica stores the negotiated decision per link and gates decoder setup on it (not the live config), so a mid-handshake config change cannot desync the stream.compression_repl.cadapter on top ofcompression_stream.replCompressor(primary) owns thestreamWriterplus the staged compressed-output buffer;replDecompressor(replica) owns astreamDecompressorplus a small envelope accumulator and decode scratch. This is the replication counterpart tocompression_rio.c(the RDB adapter), keeping codec state out ofnetworking.c.Direct-feed decoder on the replica (no rio, no pull/push callback). The replica reads a non-blocking socket inside the event loop, so it cannot use the blocking pull callback the RDB path uses. The adapter instead classifies the stream from its leading bytes and feeds the codec directly: on a
VCSenvelope it parses the header, initializes the decoder, and callsstreamDecompressorFeed(the codec retains partial-frame state internally, so no feed queue is needed); a non-VCSstream is forwarded untouched. Passthrough matters because the primary only compresses when its own config is enabled, so a capability-advertising replica may still receive plaintext.One long-lived LZ4 frame per replica connection. Frame-done mid-stream is treated as protocol corruption and triggers disconnect.
STREAM_KIND_REPLdistinguishes replication frames from RDB frames. RDBSTREAM_KIND_RDBpayloads from Streaming Compression support for RDB #3531 are unaffected.Replication-offset accounting stays in logical (uncompressed) bytes.
replDecompressQueryBufadjustsread_reploffby the raw-to-decoded delta so PSYNC offset invariants hold across compressed and uncompressed connections.LZ4 fast mode (level 0). LZ4 fast compresses at ~5 GB/s vs ~500 MB/s for HC modes; for replication the network is typically the bottleneck before compression CPU. Trade-off: ~10-15% worse ratio than HC modes.
Runtime config behavior
repl-compressionis runtime-modifiable. A live link can't switch between a plaintext and a compressed (VCS/LZ4) frame mid-stream, so a toggle reconnects the affected links so they renegotiate on reconnect (reconcileReplicaCompression):compressioncapability but are still on a plaintext link are reconnected and come back compressed; replicas currently on a compressed link are reconnected and come back plaintext.capawith the new value.Only links that negotiated the compression capability are affected. Replicas that never advertised it stay connected and untouched. The reconnect is deferred until after the
CONFIG SETfully commits, so a rolled-back multi-optionCONFIG SETreconnects nothing.Config
repl-compressionnonoorlz4-stream. Enables replication compression on a primary or replica.Internal constants (not user-configurable in this PR; surfaced for reviewer context):
REPLICA_CAPA_COMPRESSION1 << 4capabit advertised ascapa compression.REPL_COMPRESSION_BATCH_LIMIT1 MBREPL_STREAM_DECODER_OUTPUT_MAX256 MBThe codec and level are derived from the configured mode (
lz4-stream-> LZ4, codec default level), mirroring howrdbcompressionselects its algorithm; there is no separate algo/level constant.Observability
INFO replicationper-replica fields (primary side):compression=lz4,compressed_bytes,uncompressed_bytes,compression_ratio(compressed/uncompressed),compression_errorscompression_cpu_usec: primary-side CPU spent compressing for this replicaINFO replicationserver-level (replica side):repl_decompression_errors,repl_decompression_cpu_usec,repl_decompressed_bytes_totalThe per-replica IO-thread-written counters (
compression_errors,compression_cpu_usec) are atomic (relaxed); the byte totals are main-thread-only.Performance
Tested on
r7g.4xlarge(16 vCPU, Graviton3) primary inus-east-1, 8 IO threads, 2 cross-region replicas inus-west-2, 1 same-region uncompressed replica. Client:c7g.2xlarge, 30 connections, pipeline 50, COB limit 10.6 GB.Test 1: BlockMesh tweets: 3M keys x ~315 B JSON (1,073 MB uncompressed/replica).
Test 2: CNN DailyMail: 3M keys x ~4 KB articles (~12,260 MB uncompressed/replica).
Compression effectiveness and primary-side cost
BlockMesh tweets (~315 B values, 1,073 MB):
CNN DailyMail (~4 KB values, ~12,260 MB):
At sustained primary write rates ZSTD-3+ (12-30s CPU/replica) becomes a CPU bottleneck while LZ4-0 (2.5s) does not, which is the main reason LZ4 is the default.
Primary write throughput (SET keys/sec, BlockMesh)
Compression overhead vs uncompressed baseline is <1% at pipeline=50 and <=3% at pipeline=10.
Replica-side decompression cost (BlockMesh, 1,073 MB decompressed)
Enabling compression on the replica adds ~20% CPU to the replication apply path for LZ4; worth noting for replica capacity planning. LZ4 decompresses at ~1.25 GB/s; ZSTD-9 at ~617 MB/s.
Notes
lz4-streamenum value leaves room for a futurezstd-streamvalue).Related to #3531.