Skip to content

Streaming Compression support for Replication#3853

Open
roshkhatri wants to merge 63 commits into
valkey-io:unstablefrom
roshkhatri:replication-streaming-compression-pr
Open

Streaming Compression support for Replication#3853
roshkhatri wants to merge 63 commits into
valkey-io:unstablefrom
roshkhatri:replication-streaming-compression-pr

Conversation

@roshkhatri

@roshkhatri roshkhatri commented May 28, 2026

Copy link
Copy Markdown
Member

Summary

Adds per-replica streaming compression for the incremental replication stream on top of #3531, with lz4 as the first supported codec. Negotiated per-replica via the existing REPLCONF capa handshake (REPLICA_CAPA_COMPRESSION); default off (repl-compression no); replicas that don't advertise the capability stay uncompressed. Works for diskless full sync, disk-based full sync, and partial (PSYNC) resyncs.

Compression runs off the main thread on the existing IO-thread write path. All per-replica compression state and the replica-side decoder live behind a small compression_repl.c adapter (mirroring compression_rio.c).

Headline results (BlockMesh tweets, 3M keys x ~315 B):

Bandwidth savings Compression CPU Throughput overhead
LZ4 level 0 (default) 52% 2.5s <1%
ZSTD level 9 (future, #3798) 75% 29.7s <3%

Config added:

  • repl-compression (enum, default no): no or lz4-stream. Enables compression on a primary or replica.

Related to #3531. ZSTD support follows in #3798.

See the fork review PR: roshkhatri#18

Full description: design, threading model, config, observability, benchmarks

The replication stream from primary to replica is wrapped in a VCS envelope (STREAM_KIND_REPL) and compressed as a single long-lived frame at the per-replica buffer layer. Default behavior is unchanged with repl-compression no. The negotiation is per-replica via the REPLCONF capa handshake; the new REPLICA_CAPA_COMPRESSION capability lets each side opt in independently.

Architecture

+-----------------------------+       +------------------------------+       +----------------------+
|      Primary side           |       |   compression_repl adapter   |       |       transport      |
|                             |       |                              |       |                      |
| addReplyToReplicaBuffer     |------>| replCompressor               |------>| TCP socket -> replica|
| writeToReplicaCompressed    |       |  (streamWriter + out_buf)    |       |                      |
+-----------------------------+       +------------------------------+       +----------------------+

+-----------------------------+       +------------------------------+       +----------------------+
|      Replica side           |       |   compression_repl adapter   |       |       transport      |
|                             |       |                              |       |                      |
| readQueryFromClient         |<------| replDecompressor             |<------| TCP socket <- primary|
| replDecompressQueryBuf      |       |  (streamDecompressor, direct)|       |                      |
+-----------------------------+       +------------------------------+       +----------------------+

Threading model

Primary side: compression runs on whichever IO thread picks up the replica's write job from the shared inbox. There is no dedicated compression thread and no per-replica thread pinning; the per-replica replCompressor state lives in client->repl_data and is touched only by the IO thread currently processing that client (or the main thread before any IO dispatch).

Primary
+--------------------------------------------------------------------------+
|                            MAIN PROCESS                                  |
|  Thread 0 (main)            IO Thread t1 .. tN                           |
|    | command execution       |                                          |
|    v                         |                                          |
|  shared repl backlog         |                                          |
|  (uncompressed bytes)        |                                          |
|    v                         |                                          |
|  beforeSleep: route write ->  shared IO inbox  -> writeToReplicaCompressed
|                              |    step1: drain leftover                 |
|                              |    step2: compress (LZ4 frame)           |
|                              |    step3: flush                          |
|                              |    step4: connWrite --- socket --> replica
|    postWriteToReplica:        <-- completion                            |
|      if out_buf fully sent: advance backlog cursor by raw bytes          |
+--------------------------------------------------------------------------+

Replica side: decompression runs inline on the main thread inside the existing readQueryFromClient path. No worker threads; decompressed bytes feed directly into processInputBuffer.

Replica
+--------------------------------------------------------------------------+
|  Thread 0 (main)                                                         |
|    readQueryFromClient (primary connection)                              |
|      v                                                                   |
|    replDecompressQueryBuf                                                |
|      feed compressed bytes -> replDecompressor (direct streamDecompressorFeed)
|      pull decompressed RESP <-                                           |
|      adjust read_reploff by raw->decoded delta                           |
|      v                                                                   |
|    processInputBuffer (apply replicated commands)                        |
+--------------------------------------------------------------------------+

Key design decisions

  1. Capability negotiation reuses the existing REPLCONF capa handshake. Replicas advertise capa compression when repl-compression is enabled; the primary records REPLICA_CAPA_COMPRESSION on the per-replica bitmask and compresses only for replicas that opted in and when its own repl-compression is enabled. The replica stores the negotiated decision per link and gates decoder setup on it (not the live config), so a mid-handshake config change cannot desync the stream.

  2. compression_repl.c adapter on top of compression_stream. replCompressor (primary) owns the streamWriter plus the staged compressed-output buffer; replDecompressor (replica) owns a streamDecompressor plus a small envelope accumulator and decode scratch. This is the replication counterpart to compression_rio.c (the RDB adapter), keeping codec state out of networking.c.

  3. Direct-feed decoder on the replica (no rio, no pull/push callback). The replica reads a non-blocking socket inside the event loop, so it cannot use the blocking pull callback the RDB path uses. The adapter instead classifies the stream from its leading bytes and feeds the codec directly: on a VCS envelope it parses the header, initializes the decoder, and calls streamDecompressorFeed (the codec retains partial-frame state internally, so no feed queue is needed); a non-VCS stream is forwarded untouched. Passthrough matters because the primary only compresses when its own config is enabled, so a capability-advertising replica may still receive plaintext.

  4. One long-lived LZ4 frame per replica connection. Frame-done mid-stream is treated as protocol corruption and triggers disconnect.

  5. STREAM_KIND_REPL distinguishes replication frames from RDB frames. RDB STREAM_KIND_RDB payloads from Streaming Compression support for RDB #3531 are unaffected.

  6. Replication-offset accounting stays in logical (uncompressed) bytes. replDecompressQueryBuf adjusts read_reploff by the raw-to-decoded delta so PSYNC offset invariants hold across compressed and uncompressed connections.

  7. LZ4 fast mode (level 0). LZ4 fast compresses at ~5 GB/s vs ~500 MB/s for HC modes; for replication the network is typically the bottleneck before compression CPU. Trade-off: ~10-15% worse ratio than HC modes.

Runtime config behavior

repl-compression is runtime-modifiable. A live link can't switch between a plaintext and a compressed (VCS/LZ4) frame mid-stream, so a toggle reconnects the affected links so they renegotiate on reconnect (reconcileReplicaCompression):

  • On a primary: online replicas that already advertised the compression capability but are still on a plaintext link are reconnected and come back compressed; replicas currently on a compressed link are reconnected and come back plaintext.
  • On a replica: the change drops and reconnects the upstream primary link so the replica re-advertises capa with the new value.

Only links that negotiated the compression capability are affected. Replicas that never advertised it stay connected and untouched. The reconnect is deferred until after the CONFIG SET fully commits, so a rolled-back multi-option CONFIG SET reconnects nothing.

Config

Config Default Description
repl-compression no no or lz4-stream. Enables replication compression on a primary or replica.

Internal constants (not user-configurable in this PR; surfaced for reviewer context):

Constant Value Notes
REPLICA_CAPA_COMPRESSION 1 << 4 REPLCONF capa bit advertised as capa compression.
REPL_COMPRESSION_BATCH_LIMIT 1 MB Per-dispatch raw-bytes cap; bounds worst-case batch latency.
REPL_STREAM_DECODER_OUTPUT_MAX 256 MB Replica-side decompressed-output cap; guards against a malformed stream.

The codec and level are derived from the configured mode (lz4-stream -> LZ4, codec default level), mirroring how rdbcompression selects its algorithm; there is no separate algo/level constant.

Observability

INFO replication per-replica fields (primary side):

  • compression=lz4, compressed_bytes, uncompressed_bytes, compression_ratio (compressed/uncompressed), compression_errors
  • compression_cpu_usec: primary-side CPU spent compressing for this replica

INFO replication server-level (replica side):

  • repl_decompression_errors, repl_decompression_cpu_usec, repl_decompressed_bytes_total

The per-replica IO-thread-written counters (compression_errors, compression_cpu_usec) are atomic (relaxed); the byte totals are main-thread-only.

Performance

Tested on r7g.4xlarge (16 vCPU, Graviton3) primary in us-east-1, 8 IO threads, 2 cross-region replicas in us-west-2, 1 same-region uncompressed replica. Client: c7g.2xlarge, 30 connections, pipeline 50, COB limit 10.6 GB.

Test 1: BlockMesh tweets: 3M keys x ~315 B JSON (1,073 MB uncompressed/replica).
Test 2: CNN DailyMail: 3M keys x ~4 KB articles (~12,260 MB uncompressed/replica).

Compression effectiveness and primary-side cost

BlockMesh tweets (~315 B values, 1,073 MB):

Algorithm Ratio Wire bytes/replica Bandwidth savings Compression CPU/replica
LZ4 level 0 (default) 0.48 515 MB 52% 2.5s
ZSTD level 0 0.34 365 MB 66% n/a
ZSTD level 3 0.34 362 MB 67% 12.1s
ZSTD level 6 0.30 322 MB 70% 19.2s
ZSTD level 9 0.25 268 MB 75% 29.7s

CNN DailyMail (~4 KB values, ~12,260 MB):

Algorithm Ratio Wire bytes/replica Bandwidth savings
LZ4 level 0 (default) 0.61 ~7,500 MB 39%
ZSTD level 0 0.37 ~4,500 MB 63%

At sustained primary write rates ZSTD-3+ (12-30s CPU/replica) becomes a CPU bottleneck while LZ4-0 (2.5s) does not, which is the main reason LZ4 is the default.

Primary write throughput (SET keys/sec, BlockMesh)

Config Pipeline 10 Pipeline 20 Pipeline 50
No compression 105,002 111,493 117,315
LZ4 level 0 104,627 111,389 118,015
ZSTD level 3 103,403 112,433 118,163
ZSTD level 9 102,422 111,641 113,948

Compression overhead vs uncompressed baseline is <1% at pipeline=50 and <=3% at pipeline=10.

Replica-side decompression cost (BlockMesh, 1,073 MB decompressed)

Config Decompression CPU Apply CPU Total Overhead vs uncompressed (3.64s)
No compression 0.12s 3.52s 3.64s baseline
LZ4 level 0 0.86s 3.46s 4.33s +19%
ZSTD level 9 1.74s 3.36s 5.09s +40%

Enabling compression on the replica adds ~20% CPU to the replication apply path for LZ4; worth noting for replica capacity planning. LZ4 decompresses at ~1.25 GB/s; ZSTD-9 at ~617 MB/s.

Notes

  1. ZSTD rows in the benchmarks are for comparison only; this PR ships LZ4. ZSTD support lands in #3798 (the lz4-stream enum value leaves room for a future zstd-stream value).

Related to #3531.

sarthakaggarwal97 and others added 15 commits May 12, 2026 12:42
Add streaming LZ4-backed RDB compression with rio decorators, stream envelope handling, integration changes, and the follow-up fixes and config cleanup needed on top of unstable.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- Remove dead code: rdbIsValidMagic() and unused #include <string.h> in rdb.h
- Remove redundant first RIO_FLAG_SKIP_RDB_CHECKSUM set in rdbSaveInternal
- Remove unrelated changes: config_parse_depth, USE_FAST_FLOAT, write-make-settings
- Validate full 8-byte VKCS envelope in aof.c rdbFileUsesStreamingCompression
- Add SAFETY comment for rdbRioHasCorruptCompressedInput cast invariant
- Rename all snake_case identifiers to camelCase per Valkey conventions:
  types (compression_algo_t -> compressionAlgo, stream_compressor_t ->
  streamCompressor, compress_rio_t -> compressRio, etc.), functions
  (stream_writer_create -> streamWriterCreate, compress_rio_finish ->
  compressRioFinish, write_vkcs_envelope -> writeVkcsEnvelope, etc.),
  and static variables (compression_lz4_codec_impl -> compressionLz4CodecImpl)
- Drop _t suffix from all types to match Valkey convention
- Fix typo: streamWriterIsErrord -> streamWriterIsErrored
- Replace silent dummy buffer allocation with assert(needed > 0) in
  streamWriterEnsureOutBuf

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
* Address streaming RDB compression review

* Skip RDB CRC for streaming compression

* Remove brittle 32-bit compression unit test

---------

Co-authored-by: Sarthak Aggarwal <sarthagg@amazon.com>

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- rdbSaveInternal: the old comment claimed per-string LZF was disabled
  'when algo != LZF', but the actual gate is RIO_FLAG_STREAMING_COMPRESSION
  on the wrapper rio. Standalone rios (DUMP, AOF rewrite, diskless) keep
  using LZF regardless of algo.
- rdbInputStreamPrepare: flag the synchronous probe IO so a future
  non-blocking caller (replication) doesn't accidentally block the loop.
- rdbRioHasCorruptCompressedInput: the cast is sound today only because
  one producer sets RIO_FLAG_STREAMING_DECOMPRESSION. Replace the
  'SAFETY' assertion with a note telling the next person to add a type
  discriminator before adding a second producer.
- rdbSaveRawString: minor wording cleanup on the per-string LZF gate.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Match the surrounding Valkey style: drop comments that restate the
code, drop the Ownership/Threading/Returns boilerplate from headers,
collapse repeated 'capacity-shortage is retriable' notes into one
explanation per function. Behavior is unchanged.

Net -285 lines; integration tests (21/21) and build are clean.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Adds replication wire compression on top of valkey-io#3531 with lz4 as the first
supported codec for the incremental replication stream. The replication
stream from primary to replica is wrapped in a VKCS envelope (using
STREAM_KIND_REPL) and compressed as a single long-lived frame at the
per-replica buffer layer. Default behavior is unchanged with
'replcompression no'; existing replicas without the new capability stay
uncompressed.

Negotiation is per-replica via the existing PSYNC handshake; a new
REPLICA_CAPA_COMPRESSION capability lets each side opt in independently.
Compression runs inline on the IO thread that owns the replica's write
job; no dedicated compression thread, no IPC, no reordering. Optional
sticky thread affinity (lazy ownership + event-driven rebalance) keeps
the long-lived LZ4 frame state on a single IO thread for cache locality.

Configs:
  replcompression                   bool, default no
  repl-compression-thread-affinity  bool, default yes

Internal constants:
  REPLICA_CAPA_COMPRESSION         (1 << 4)
  REPL_COMPRESSION_ALGO            ALGO_LZ4
  REPL_COMPRESSION_LEVEL           0  (LZ4 fast mode)
  REPL_COMPRESSION_BATCH_LIMIT     1 MB raw input per dispatch
  REPL_STREAM_DECODER_OUTPUT_MAX   256 MB

INFO replication per-replica fields:
  compression=lz4, compressed_bytes, uncompressed_bytes,
  compression_ratio, compression_errors, compression_cpu_usec,
  debug_compression_pending_drains, debug_thread_switches

INFO replication server-level (replica side):
  repl_decompression_errors, repl_decompression_cpu_usec,
  repl_decompressed_bytes_total, repl_apply_cpu_usec,
  repl_apply_batches

CI adds a test-replication-compression job that runs the
replication-tagged integration tests with replcompression=yes to
exercise compression across the broader replication test surface.

Tests: 18 streamReader push-mode unit tests + 3 replCompression unit
tests + 27 integration tests.

Performance (BlockMesh tweets, 3M keys x ~315 byte JSON values, 1,073
MB uncompressed per replica, 30 clients, pipeline 50, 2 cross-region
replicas):
  LZ4 level 0 (default): 0.48 ratio, 52% bandwidth saved, 2.5s
                         compression CPU per replica, <1% throughput
                         overhead vs uncompressed baseline.
Affinity ON vs OFF: throughput unchanged (118.6K vs 118.1K keys/s) but
thread switches drop from ~800K to ~30 per replica.

ZSTD support follows in valkey-io#3798.

Related to valkey-io#3531.

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds LZ4-based streaming compression support, wires it into RDB and replication transport paths, updates config and build integration, and adds tests plus a CI job for the new compression scenarios.

Changes

Streaming compression and replication transport

Layer / File(s) Summary
Dependency import and build wiring
deps/*, cmake/Modules/*, src/*CMakeLists.txt, src/Makefile, src/unit/*, .github/workflows/ci.yml
Vendored LZ4/xxHash sources are added, build targets and link flags include the new library, and CI runs a replication-compression job.
Compression contracts and stream codecs
src/compression*, src/compression_lz4*, src/compression_repl*, deps/lz4/*
Defines the generic streaming compression API, LZ4 frame/HC/xxHash implementations, and the replication-stream compressor/decompressor adapters.
Rio partial reads and RDB tooling
src/rio*, src/compression_rio*, src/aof.c, src/valkey-check-rdb.c
Adds partial-read support to rio, wraps compressed streams for RDB I/O, and updates AOF/RDB-check code to detect and consume streaming-compressed input.
Replication and server runtime wiring
src/config.c, src/server.h, src/server.c, src/networking.c, src/replication.c, src/io_threads.c, src/rdb.c, src/rdb.h
Threads compression state through config, server state, replication send/receive paths, IO-thread dispatch, and RDB save/load handling.
Integration coverage, harness tags, and docs
tests/integration/*, src/unit/*, tests/support/server.tcl, tests/test_helper.tcl, valkey.conf
Adds compression-focused unit and integration tests, updates harness tag allowlists, and documents the new RDB/replication compression configuration.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Suggested reviewers

  • madolson
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title is concise and accurately summarizes the main change: adding streaming compression for replication.
Description check ✅ Passed The description is clearly about the same replication streaming compression work and matches the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (7)
.github/workflows/ci.yml (1)

212-220: ⚡ Quick win

Consider setting persist-credentials: false for supply-chain hardening.

The checkout actions at lines 213 and 220 do not explicitly set persist-credentials: false. While not a functional issue, setting this option prevents the action from configuring Git credentials that could leak in logs or be misused by malicious code.

🔒 Proposed fix to add persist-credentials: false
       - name: Install libbacktrace
         uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
         with:
           repository: ianlancetaylor/libbacktrace
           ref: b9e40069c0b47a722286b94eb5231f7f05c08713
           path: libbacktrace
+          persist-credentials: false
       - run: cd libbacktrace && ./configure && make && sudo make install
       - name: Checkout Valkey
         uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+        with:
+          persist-credentials: false
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/ci.yml around lines 212 - 220, Update the two
actions/checkout steps (the "Install libbacktrace" checkout using uses:
actions/checkout@de0fac2e4500d... and the "Checkout Valkey" checkout with the
same uses) to include persist-credentials: false under their with: blocks to
prevent Git credentials from being persisted; ensure you add the
persist-credentials: false key alongside the existing repository/ref/path keys
and keep YAML indentation consistent.
src/compression.h (1)

47-55: ⚡ Quick win

Add function-level docs for the remaining public declarations.

compressionAlgoSupportsStreaming, compressionAlgoName, and the init/destroy APIs are currently undocumented. Please add brief contract comments (return semantics, ownership/lifecycle expectations) for these declarations to keep this public header fully self-describing.

As per coding guidelines: "Document why code exists, not just what it does; document all functions in C code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/compression.h` around lines 47 - 55, Add brief function-level comments
for compressionAlgoSupportsStreaming, compressionAlgoName, streamCompressorInit,
streamCompressorDestroy, streamDecompressorInit, and streamDecompressorDestroy
that describe each function's contract: the meaning of return values (e.g.,
true/false or 0/error codes), who owns any returned pointers or resources,
lifecycle expectations (caller allocates/initializes struct before/after call,
who must call Destroy), and any error conditions; place these comments
immediately above each declaration in the header so the public API is
self-describing and documents why the functions exist as well as how callers
must use them.
src/rdb.c (1)

3151-3192: ⚡ Quick win

Document the new input-stream helper lifecycle.

rdbInputStreamInit(), rdbInputStreamDestroy(), and rdbInputStreamValidateEnd() add a non-trivial wrapper lifecycle, but only rdbInputStreamPrepare() is documented. Please add short contract comments so callers know the required call order and ownership rules.

As per coding guidelines, "Document all functions in C code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rdb.c` around lines 3151 - 3192, Add short contract comments for
rdbInputStreamInit, rdbInputStreamPrepare, rdbInputStreamValidateEnd, and
rdbInputStreamDestroy describing the required call order (Init() to set up the
struct, optional Prepare() to wrap raw_rio into a decompressor which may replace
rdb_rio and set RIO_FLAG_SKIP_RDB_CHECKSUM, ValidateEnd() to be called after
reads to verify the decompressor stream, and Destroy() to free the
decompressor), the ownership rules (input does not take ownership of the
provided raw_rio; Prepare() initializes internal decompressor state that must be
cleaned up by Destroy()), and return/behavior expectations (Prepare() returns
DECOMPRESS_RIO_INIT_* codes, Destroy() is idempotent, ValidateEnd() returns
C_OK/C_ERR only when initialized). Include these comments immediately above the
definitions of rdbInputStreamInit, rdbInputStreamPrepare,
rdbInputStreamValidateEnd, and rdbInputStreamDestroy and reference the struct
fields input->raw_rio, input->rdb_rio, input->decompressor, and
input->initialized.
src/rdb.h (1)

190-197: ⚡ Quick win

Add declaration-level contract docs for rdbInputStream lifecycle API.

Please document ownership and required call order (Init -> Prepare -> ValidateEnd -> Destroy) in the header so callers can’t misuse the stream wrapper.

♻️ Suggested header comment shape
+/* Wraps a raw input rio and exposes a logical RDB byte stream.
+ * Lifecycle: Init -> Prepare -> (use input.rdb_rio) -> ValidateEnd -> Destroy.
+ * `raw_rio` ownership remains with caller.
+ */
 typedef struct {
     rio *raw_rio;
     rio *rdb_rio;
     decompressRio decompressor;
     streamReaderInfo stream_info;
     bool initialized;
 } rdbInputStream;

+/* Initialize wrapper state around `raw_rio` (no stream inspection yet). */
 void rdbInputStreamInit(rdbInputStream *input, rio *raw_rio);
+/* Prepare input stream; may attach decompression rio depending on envelope. */
 decompressRioInitResult rdbInputStreamPrepare(rdbInputStream *input);
+/* Validate stream terminates cleanly at logical end. */
 int rdbInputStreamValidateEnd(rdbInputStream *input);
+/* Release wrapper/decompression resources. */
 void rdbInputStreamDestroy(rdbInputStream *input);

As per coding guidelines "Document why code exists, not just what it does; document all functions in C code".

Also applies to: 234-238

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rdb.h` around lines 190 - 197, Add a clear declaration-level lifecycle
contract comment for the rdbInputStream type describing ownership and required
call order: state that callers own the struct, which fields are initialized by
the creator vs by Init/Prepare, and that the API must be used in order Init ->
Prepare -> ValidateEnd -> Destroy (with Destroy cleaning up
raw_rio/rdb_rio/decompressor if initialized). Reference the rdbInputStream type
and the lifecycle functions (Init, Prepare, ValidateEnd, Destroy) and document
when initialized boolean is set/cleared, which function takes/returns ownership
of rio pointers, and that calling functions out of order is undefined to prevent
misuse.
tests/integration/rdb-compression.tcl (1)

273-275: ⚡ Quick win

Strengthen corruption test assertions to validate the failure reason.

These checks only match *Error*, which is too broad. Prefer asserting specific error/log patterns per scenario (like the existing verify_log_message usage) so false positives don’t mask regressions.

As per coding guidelines "Use clear assertions with meaningful error messages in tests".

Also applies to: 294-296, 350-352, 368-369

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/rdb-compression.tcl` around lines 273 - 275, The test uses
broad assertions like the catch/err + assert_match "*Error*" pattern (e.g., the
catch {r debug reload nosave} err followed by assert_match "*Error*" $err) which
is too generic; replace those with precise checks that validate the expected
failure reason—either call the existing verify_log_message helper with the
expected error/log substring or change assert_match to match a specific error
pattern (include the expected text) for each scenario (also update the similar
occurrences at the other reported spots). Ensure you update the specific
statements (the catch/err blocks and their assert_match lines) so they assert
the exact error or log message that the test intends to trigger rather than the
generic "*Error*".
src/replication.c (1)

90-219: Please pull in @core-team for this replication.c change set.

This patch changes replication protocol negotiation, state transitions, and IO-thread interaction in a file the repo treats as architecture-review-required.

As per coding guidelines, "Request @core-team architectural review for changes to cluster*.c, replication.c, rdb.c, or aof.c".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/replication.c` around lines 90 - 219, This change touches replication.c
(functions like markCompressedReplicasForDisconnect, replBalanceAffinity,
replicaInitCompressionOnPsync, replInitDecompression, replDestroyDecompression)
which per project policy requires an architecture-level review; please add
`@core-team` to the PR/reviewers and include a short justification referencing the
altered replication protocol negotiation, state transitions, and IO-thread
interaction so the core team can perform the requested architectural review
before merging.
src/server.h (1)

3048-3055: ⚡ Quick win

Add brief header docs for the new replication-compression API.

These are new exported entry points, but the lifecycle/ownership split is not obvious from the names alone. A short comment block here describing per-replica vs singleton state, expected call order, and return semantics would make this interface much safer to consume.

📝 Example
+/* Primary-side per-replica compression lifecycle. Returns C_OK/C_ERR. */
 void markCompressedReplicasForDisconnect(void);
 int replInitCompression(client *c, compressionAlgo algo, int level);
 void replDestroyCompression(client *c);
 void replBalanceAffinity(void);
+
+/* Replica-side compressed stream decode path. `new_data_start` is the first
+ * newly appended byte in `c->querybuf`. Returns C_OK/C_ERR. */
 int replDecompressQueryBuf(client *c, size_t new_data_start);
+
+/* Replica-side singleton decompressor lifecycle. */
 int replInitDecompression(void);
 void replDestroyDecompression(void);
 void replRefreshDecompression(void);
As per coding guidelines, "Document all functions in C code" and "Use comments for non-obvious behavior and rationale, not for restating code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server.h` around lines 3048 - 3055, Add brief header documentation above
the new replication-compression API declaring which functions operate on
per-replica state vs global/singleton state, the expected call order/lifecycle
(e.g. replInitCompression/replDestroyCompression per-client,
replInitDecompression/replDestroyDecompression once for the process, when to
call replRefreshDecompression and markCompressedReplicasForDisconnect), and the
return semantics (which functions return 0 on success / non-zero on error and
whether callers own/must free any returned resources). Reference the exact
symbols in the docs: markCompressedReplicasForDisconnect,
replInitCompression(client *c, compressionAlgo algo, int level),
replDestroyCompression(client *c), replBalanceAffinity,
replDecompressQueryBuf(client *c, size_t new_data_start), replInitDecompression,
replDestroyDecompression, and replRefreshDecompression so callers can quickly
see ownership and ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@deps/lz4/lz4frame.c`:
- Around line 1619-1620: The code computes srcStart = (const BYTE*)srcBuffer and
srcEnd = srcStart + *srcSizePtr even when srcBuffer can be NULL (seen when
calling LZ4F_decompress(dctx, NULL, &o, NULL, &i, NULL)), which causes undefined
behavior; instead of patching the vendored file inline, update/re-vendor to an
upstream LZ4 release that contains the fix, and while doing so verify that the
logic around LZ4F_decompress, srcBuffer, srcSizePtr, srcStart and srcEnd
guards/skips the pointer arithmetic when srcBuffer == NULL or *srcSizePtr == 0
(i.e., ensure the code checks for NULL/zero before computing srcEnd or uses a
safe branch), then run the decompression unit tests that exercise the NULL input
path to confirm the upstream fix resolves the UB.

In `@src/aof.c`:
- Around line 1014-1022: The code currently treats a short read (0 < nread <
sizeof(header)) as success by returning 0, which lets a truncated RDB header be
considered uncompressed; change this so any partial read is treated as an error:
after reading into header from fd (variables nread, header, sizeof(header), fd),
if nread == -1 restore read_errno and return -1 (as already done), and if nread
< (ssize_t)sizeof(header) also set errno to EIO (or restore read_errno if
appropriate) and return -1 so the caller will treat the probe as a failure and
trigger the fallback path instead of accepting a truncated header.

In `@src/compression_stream.c`:
- Around line 818-820: The cap check can overflow because effective_len + len
may wrap; update the check in the push/feed logic (where effective_len is
computed from sdslen(t->feed_queue) - t->feed_head) to first guard against
addition overflow and oversized single inputs by verifying either len >
t->feed_cap or that effective_len is greater than t->feed_cap - len (or
equivalently SIZE_MAX - effective_len < len) before calling
streamReaderSetError(t, STREAM_READER_ERROR_IO); this ensures you never perform
wrapping addition and correctly detect when the new data would exceed
t->feed_cap.

In `@src/config.c`:
- Around line 2590-2596: The updateReplCompression apply-hook is performing an
irreversible action (markCompressedReplicasForDisconnect) during CONFIG SET,
which can be rolled back by configSetCommand; change updateReplCompression to
avoid side effects: instead of calling markCompressedReplicasForDisconnect
directly, record the intent (e.g., set a new flag or push affected replica IDs
into a pending list) and return success; then invoke
markCompressedReplicasForDisconnect only from the post-commit path in
configSetCommand (or a dedicated commit callback) after all option apply-hooks
succeed, using the new flag/pending list; keep the function signature of
updateReplCompression and use symbols updateReplCompression,
markCompressedReplicasForDisconnect, and configSetCommand to locate and wire the
deferred action.

In `@src/networking.c`:
- Around line 4598-4606: c->querybuf is being mutated with sdscatlen() which may
realloc the SDS and corrupt the thread-local shared pointer (thread_shared_qb)
if the primary is still using it; before appending
server.repl_stream_decode_buf, check if c->querybuf == thread_shared_qb and if
so call sdsnewlen()/sdsdup to make a private copy (or otherwise allocate a fresh
SDS) and assign it to c->querybuf, then perform the sdscatlen() append and
update c->querybuf_peak; refer to c->querybuf, server.repl_stream_decode_buf,
thread_shared_qb and the decompression path to implement this guard.

In `@src/rdb.c`:
- Around line 3695-3698: The conditional is checking the wrong streaming flag:
replace the RIO_FLAG_STREAMING_COMPRESSION test with
RIO_FLAG_STREAMING_DECOMPRESSION so load-side streaming-decompression hits the
special notice; update the branch that currently reads "(rdb->flags &
RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)" to
use RIO_FLAG_STREAMING_DECOMPRESSION, keeping the RIO_FLAG_SKIP_RDB_CHECKSUM
check and the serverLog(LL_NOTICE, ...) calls unchanged (refer to rdb->flags,
RIO_FLAG_STREAMING_DECOMPRESSION, RIO_FLAG_SKIP_RDB_CHECKSUM, and serverLog).

In `@src/replication.c`:
- Around line 4042-4049: The code only advertises compression when
use_diskless_load is true, making server.repl_compression a no-op for disk-based
syncs; change the branch to advertise REPLICA_CAPA_COMPRESSION_STR whenever
server.repl_compression is enabled (i.e., check server.repl_compression instead
of use_diskless_load) so the capability is sent for all full-sync/PSYNC flows;
update the block that sets argv/ lens/ argc (the one referencing
use_diskless_load, REPLICA_CAPA_COMPRESSION_STR and server.repl_compression) to
add the capability whenever server.repl_compression is true and leave any
diskless-specific logic unchanged.
- Around line 216-219: replRefreshDecompression currently drops failures from
replInitDecompression; change replRefreshDecompression to propagate errors by
returning an int (return C_ERR on failure, C_OK on success), call
replDestroyDecompression() then check the return value of
replInitDecompression() and return C_ERR if it fails, and update all new call
sites (those that run handshake/full-sync immediately after calling
replRefreshDecompression) to check its return value and abort the sync session
on C_ERR; keep references to replInitDecompression, replDestroyDecompression,
replRefreshDecompression and use the C_ERR/C_OK constants so callers can act
accordingly.

In `@src/rio.c`:
- Around line 506-509: In rioReadPartial, before calling r->read_some, ensure
the computed bytes_to_read (the min of len and r->max_processing_chunk) is not
greater than SSIZE_MAX; if it is, reject the request and return an appropriate
error instead of invoking r->read_some to avoid backend-dependent truncation/
sign issues. Update the logic around bytes_to_read (used in the existing
computation) to perform this SSIZE_MAX bound check and return an error (and set
errno consistently, e.g. EINVAL/ERANGE) when exceeded; also include the required
header for SSIZE_MAX if not present.

In `@src/server.c`:
- Around line 6654-6665: The compression_ratio calculation is inverted: change
the expression that now computes (uncompressed / compressed) to compute
(compressed / uncompressed) instead, guarding against division by zero. In the
printf-style block that builds the stats string (reference symbols:
"compression_ratio=%.2f", compressionAlgoName(REPL_COMPRESSION_ALGO),
replica->repl_data->repl_compressed_bytes_total,
replica->repl_data->repl_uncompressed_bytes_total), replace the ternary that
returns (double)repl_uncompressed_bytes_total /
(double)repl_compressed_bytes_total with one that returns
repl_uncompressed_bytes_total == 0 ? 0.0 :
(double)replica->repl_data->repl_compressed_bytes_total /
(double)replica->repl_data->repl_uncompressed_bytes_total so compression_ratio
reflects compressed/uncompressed safely.
- Around line 6650-6669: The INFO path reads replication compression counters
(repl_compressed_bytes_total, repl_uncompressed_bytes_total,
repl_compression_errors, repl_compression_cpu_usec,
repl_compression_pending_drains, repl_compression_thread_switches) while IO
threads update them in writeToReplicaCompressed/postWriteToReplica, creating a
data race; fix by making these counters atomic (e.g., _Atomic size_t or
atomic_ullong) or by protecting updates/reads with the replica lock and then use
atomic_load (or the lock) when building the INFO string in src/server.c; update
all increment/+= sites in writeToReplicaCompressed/postWriteToReplica to use
atomic_fetch_add (or acquire the same lock) and replace direct reads in the INFO
formatting block with atomic loads (or locked reads) to ensure race-free access.

In `@src/server.h`:
- Around line 1281-1288: The lifetime replication counters use size_t/long long
and will wrap on 32-bit builds; change repl_compressed_bytes_total,
repl_uncompressed_bytes_total, repl_compression_errors,
repl_compression_pending_drains and repl_compression_thread_switches from size_t
to uint64_t and change repl_compression_cpu_usec from long long to uint64_t
(leave last_processed_tid and affinity_tid as int), ensure the header includes
<stdint.h> and update any INFO/printf format specifiers or casts to use
PRIu64/uint64_t where these fields are printed; apply the same replacements to
the other identical field group mentioned in the comment.

---

Nitpick comments:
In @.github/workflows/ci.yml:
- Around line 212-220: Update the two actions/checkout steps (the "Install
libbacktrace" checkout using uses: actions/checkout@de0fac2e4500d... and the
"Checkout Valkey" checkout with the same uses) to include persist-credentials:
false under their with: blocks to prevent Git credentials from being persisted;
ensure you add the persist-credentials: false key alongside the existing
repository/ref/path keys and keep YAML indentation consistent.

In `@src/compression.h`:
- Around line 47-55: Add brief function-level comments for
compressionAlgoSupportsStreaming, compressionAlgoName, streamCompressorInit,
streamCompressorDestroy, streamDecompressorInit, and streamDecompressorDestroy
that describe each function's contract: the meaning of return values (e.g.,
true/false or 0/error codes), who owns any returned pointers or resources,
lifecycle expectations (caller allocates/initializes struct before/after call,
who must call Destroy), and any error conditions; place these comments
immediately above each declaration in the header so the public API is
self-describing and documents why the functions exist as well as how callers
must use them.

In `@src/rdb.c`:
- Around line 3151-3192: Add short contract comments for rdbInputStreamInit,
rdbInputStreamPrepare, rdbInputStreamValidateEnd, and rdbInputStreamDestroy
describing the required call order (Init() to set up the struct, optional
Prepare() to wrap raw_rio into a decompressor which may replace rdb_rio and set
RIO_FLAG_SKIP_RDB_CHECKSUM, ValidateEnd() to be called after reads to verify the
decompressor stream, and Destroy() to free the decompressor), the ownership
rules (input does not take ownership of the provided raw_rio; Prepare()
initializes internal decompressor state that must be cleaned up by Destroy()),
and return/behavior expectations (Prepare() returns DECOMPRESS_RIO_INIT_* codes,
Destroy() is idempotent, ValidateEnd() returns C_OK/C_ERR only when
initialized). Include these comments immediately above the definitions of
rdbInputStreamInit, rdbInputStreamPrepare, rdbInputStreamValidateEnd, and
rdbInputStreamDestroy and reference the struct fields input->raw_rio,
input->rdb_rio, input->decompressor, and input->initialized.

In `@src/rdb.h`:
- Around line 190-197: Add a clear declaration-level lifecycle contract comment
for the rdbInputStream type describing ownership and required call order: state
that callers own the struct, which fields are initialized by the creator vs by
Init/Prepare, and that the API must be used in order Init -> Prepare ->
ValidateEnd -> Destroy (with Destroy cleaning up raw_rio/rdb_rio/decompressor if
initialized). Reference the rdbInputStream type and the lifecycle functions
(Init, Prepare, ValidateEnd, Destroy) and document when initialized boolean is
set/cleared, which function takes/returns ownership of rio pointers, and that
calling functions out of order is undefined to prevent misuse.

In `@src/replication.c`:
- Around line 90-219: This change touches replication.c (functions like
markCompressedReplicasForDisconnect, replBalanceAffinity,
replicaInitCompressionOnPsync, replInitDecompression, replDestroyDecompression)
which per project policy requires an architecture-level review; please add
`@core-team` to the PR/reviewers and include a short justification referencing the
altered replication protocol negotiation, state transitions, and IO-thread
interaction so the core team can perform the requested architectural review
before merging.

In `@src/server.h`:
- Around line 3048-3055: Add brief header documentation above the new
replication-compression API declaring which functions operate on per-replica
state vs global/singleton state, the expected call order/lifecycle (e.g.
replInitCompression/replDestroyCompression per-client,
replInitDecompression/replDestroyDecompression once for the process, when to
call replRefreshDecompression and markCompressedReplicasForDisconnect), and the
return semantics (which functions return 0 on success / non-zero on error and
whether callers own/must free any returned resources). Reference the exact
symbols in the docs: markCompressedReplicasForDisconnect,
replInitCompression(client *c, compressionAlgo algo, int level),
replDestroyCompression(client *c), replBalanceAffinity,
replDecompressQueryBuf(client *c, size_t new_data_start), replInitDecompression,
replDestroyDecompression, and replRefreshDecompression so callers can quickly
see ownership and ordering.

In `@tests/integration/rdb-compression.tcl`:
- Around line 273-275: The test uses broad assertions like the catch/err +
assert_match "*Error*" pattern (e.g., the catch {r debug reload nosave} err
followed by assert_match "*Error*" $err) which is too generic; replace those
with precise checks that validate the expected failure reason—either call the
existing verify_log_message helper with the expected error/log substring or
change assert_match to match a specific error pattern (include the expected
text) for each scenario (also update the similar occurrences at the other
reported spots). Ensure you update the specific statements (the catch/err blocks
and their assert_match lines) so they assert the exact error or log message that
the test intends to trigger rather than the generic "*Error*".
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 4465ea09-be4a-45a9-9f78-c6764e703003

📥 Commits

Reviewing files that changed from the base of the PR and between e4fdae4 and f13038b.

📒 Files selected for processing (47)
  • .github/workflows/ci.yml
  • cmake/Modules/SourceFiles.cmake
  • cmake/Modules/ValkeySetup.cmake
  • deps/CMakeLists.txt
  • deps/Makefile
  • deps/lz4/CMakeLists.txt
  • deps/lz4/LICENSE
  • deps/lz4/Makefile
  • deps/lz4/lz4.c
  • deps/lz4/lz4.h
  • deps/lz4/lz4frame.c
  • deps/lz4/lz4frame.h
  • deps/lz4/lz4hc.c
  • deps/lz4/lz4hc.h
  • deps/lz4/xxhash.c
  • deps/lz4/xxhash.h
  • src/CMakeLists.txt
  • src/Makefile
  • src/aof.c
  • src/compression.c
  • src/compression.h
  • src/compression_lz4.c
  • src/compression_lz4.h
  • src/compression_rio.c
  • src/compression_rio.h
  • src/compression_stream.c
  • src/compression_stream.h
  • src/config.c
  • src/io_threads.c
  • src/networking.c
  • src/rdb.c
  • src/rdb.h
  • src/replication.c
  • src/rio.c
  • src/rio.h
  • src/server.c
  • src/server.h
  • src/unit/CMakeLists.txt
  • src/unit/Makefile
  • src/unit/test_compression.cpp
  • src/unit/test_repl_compression.cpp
  • src/valkey-check-rdb.c
  • tests/integration/rdb-compression.tcl
  • tests/integration/repl-compression.tcl
  • tests/integration/replication-aof-sync.tcl
  • tests/integration/valkey-check-rdb.tcl
  • valkey.conf

Comment thread deps/lz4/lz4frame.c
Comment on lines +1619 to +1620
const BYTE* const srcStart = (const BYTE*)srcBuffer;
const BYTE* const srcEnd = srcStart + *srcSizePtr;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

❓ Verification inconclusive

🏁 Script executed:

#!/bin/bash
# Verify the reachable NULL/0 probe path and the null-pointer arithmetic.
rg -n -C2 'LZ4F_decompress\(dctx, NULL, &o, NULL, &i, NULL\)|srcEnd = srcStart \+ \*srcSizePtr' deps/lz4/lz4frame.c

Repository: valkey-io/valkey

Length of output: 590


Prevent UB from srcEnd = srcStart + *srcSizePtr when srcBuffer is NULL in deps/lz4/lz4frame.c

LZ4F_decompress(dctx, NULL, &o, NULL, &i, NULL) is called at ~line 1496, and the file later computes srcStart = (const BYTE*)srcBuffer and srcEnd = srcStart + *srcSizePtr at lines 1619–1620. If that block can run with srcBuffer == NULL, pointer arithmetic on a NULL-derived pointer is undefined in C; confirm the probe path’s i/*srcSizePtr value (and that the code guards/skips this arithmetic for the NULL/0 case) and fix via re-vendoring a corrected upstream LZ4 release rather than patching the vendor file in place.

🧰 Tools
🪛 Cppcheck (2.20.0)

[error] 1620-1620: Pointer addition with NULL pointer.

(nullPointerArithmetic)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@deps/lz4/lz4frame.c` around lines 1619 - 1620, The code computes srcStart =
(const BYTE*)srcBuffer and srcEnd = srcStart + *srcSizePtr even when srcBuffer
can be NULL (seen when calling LZ4F_decompress(dctx, NULL, &o, NULL, &i, NULL)),
which causes undefined behavior; instead of patching the vendored file inline,
update/re-vendor to an upstream LZ4 release that contains the fix, and while
doing so verify that the logic around LZ4F_decompress, srcBuffer, srcSizePtr,
srcStart and srcEnd guards/skips the pointer arithmetic when srcBuffer == NULL
or *srcSizePtr == 0 (i.e., ensure the code checks for NULL/zero before computing
srcEnd or uses a safe branch), then run the decompression unit tests that
exercise the NULL input path to confirm the upstream fix resolves the UB.

Comment thread src/aof.c
Comment on lines +1014 to +1022
ssize_t nread = read(fd, header, sizeof(header));
int read_errno = errno;
close(fd);

if (nread == -1) {
errno = read_errno;
return -1;
}
if (nread < (ssize_t)sizeof(header)) return 0;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't treat a short probe as “not compressed.”

0 < nread < sizeof(header) currently returns 0, so a truncated sync RDB is treated as safe to reuse as an AOF base. That bypasses the new fallback path and can persist a broken base file instead of forcing BGREWRITEAOF.

🔎 Suggested fix
-    ssize_t nread = read(fd, header, sizeof(header));
+    ssize_t nread;
+    do {
+        nread = read(fd, header, sizeof(header));
+    } while (nread == -1 && errno == EINTR);
     int read_errno = errno;
     close(fd);

     if (nread == -1) {
         errno = read_errno;
         return -1;
     }
-    if (nread < (ssize_t)sizeof(header)) return 0;
+    if (nread < (ssize_t)sizeof(header)) {
+        errno = EINVAL;
+        return -1;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ssize_t nread = read(fd, header, sizeof(header));
int read_errno = errno;
close(fd);
if (nread == -1) {
errno = read_errno;
return -1;
}
if (nread < (ssize_t)sizeof(header)) return 0;
ssize_t nread;
do {
nread = read(fd, header, sizeof(header));
} while (nread == -1 && errno == EINTR);
int read_errno = errno;
close(fd);
if (nread == -1) {
errno = read_errno;
return -1;
}
if (nread < (ssize_t)sizeof(header)) {
errno = EINVAL;
return -1;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aof.c` around lines 1014 - 1022, The code currently treats a short read
(0 < nread < sizeof(header)) as success by returning 0, which lets a truncated
RDB header be considered uncompressed; change this so any partial read is
treated as an error: after reading into header from fd (variables nread, header,
sizeof(header), fd), if nread == -1 restore read_errno and return -1 (as already
done), and if nread < (ssize_t)sizeof(header) also set errno to EIO (or restore
read_errno if appropriate) and return -1 so the caller will treat the probe as a
failure and trigger the fallback path instead of accepting a truncated header.

Comment thread src/compression_stream.c Outdated
Comment thread src/config.c
Comment thread src/networking.c
Comment thread src/replication.c Outdated
Comment thread src/rio.c
Comment on lines +506 to +509
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
ssize_t got = r->read_some(r, buf, bytes_to_read);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add SSIZE_MAX bound check before read_some.

rioReadPartial() should reject requests above SSIZE_MAX (Line 506 onward) before calling backend read_some, to avoid backend-dependent truncation/sign behavior on large reads.

Proposed fix
     size_t bytes_to_read =
         (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
+    if (bytes_to_read > (size_t)SSIZE_MAX) {
+        r->flags |= RIO_FLAG_READ_ERROR;
+        return -1;
+    }
     ssize_t got = r->read_some(r, buf, bytes_to_read);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
ssize_t got = r->read_some(r, buf, bytes_to_read);
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (bytes_to_read > (size_t)SSIZE_MAX) {
r->flags |= RIO_FLAG_READ_ERROR;
return -1;
}
ssize_t got = r->read_some(r, buf, bytes_to_read);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rio.c` around lines 506 - 509, In rioReadPartial, before calling
r->read_some, ensure the computed bytes_to_read (the min of len and
r->max_processing_chunk) is not greater than SSIZE_MAX; if it is, reject the
request and return an appropriate error instead of invoking r->read_some to
avoid backend-dependent truncation/ sign issues. Update the logic around
bytes_to_read (used in the existing computation) to perform this SSIZE_MAX bound
check and return an error (and set errno consistently, e.g. EINVAL/ERANGE) when
exceeded; also include the required header for SSIZE_MAX if not present.

Comment thread src/server.c Outdated
Comment thread src/server.c
Comment thread src/server.h Outdated
@sarthakaggarwal97

Copy link
Copy Markdown
Contributor

@CodeRabbit review the replication compression implementation only

@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
cat src/replication.c | grep -n "compress\|Compress\|COMPRESS\|lz4\|LZ4\|decompres\|Decompres\|CAPA_COMPRESS\|replBalance\|markCompressed\|replInitDecomp\|replDestroyDecomp\|replRefreshDecomp\|replInitComp\|replDestroyComp" | head -200

Length of output: 3685


🏁 Script executed:

#!/bin/bash
cat src/networking.c | grep -n "compress\|Compress\|COMPRESS\|lz4\|LZ4\|decompres\|Decompres\|replDecomp\|replInitComp\|replDestroyComp\|compressed_buf\|writeToReplicaComp\|postWriteToReplica" | head -200

Length of output: 9715


🏁 Script executed:

#!/bin/bash
# Get broader context of replication.c compression functions
rg -n "markCompressedReplicasForDisconnect|replBalanceAffinity|replInitDecompression|replDestroyDecompression|replRefreshDecompression|REPLICA_CAPA_COMPRESSION|repl_compression|repl_stream_decoder" src/replication.c | head -150

Length of output: 1831


🏁 Script executed:

#!/bin/bash
# Read the compression-related sections from replication.c
awk '/markCompressedReplicasForDisconnect|replBalanceAffinity|replInitDecompression|replDestroyDecompression|replRefreshDecompression/{found=1; start=NR} found && NR>=start && NR<=start+60{print NR": "$0} NR>start+60{found=0}' src/replication.c | head -400

Length of output: 19200


🏁 Script executed:

#!/bin/bash
# Read networking.c compression functions
awk '/replInitCompression|replDestroyCompression|writeToReplicaCompressed|replDecompressQueryBuf|postWriteToReplica/{found=1; start=NR} found && NR>=start && NR<=start+80{print NR": "$0} NR>start+80{found=0}' src/networking.c | head -500

Length of output: 22176


🏁 Script executed:

#!/bin/bash
cat src/compression_stream.c | head -150

Length of output: 4581


🏁 Script executed:

#!/bin/bash
cat src/compression_lz4.c

Length of output: 4981


🏁 Script executed:

#!/bin/bash
cat src/io_threads.c | grep -n "compress\|Compress\|replBalance\|affinity\|repl_stream_decoder\|repl_compression\|COMPRESSION" | head -100

Length of output: 747


🏁 Script executed:

#!/bin/bash
# Read server.h for the ClientReplicationData and valkeyServer compression fields
rg -n "repl_compressor|compressed_buf|repl_stream_decoder|repl_compression|REPLICA_CAPA_COMPRESS|compression_error|ClientReplicationData|repl_decompression|repl_apply" src/server.h | head -100

Length of output: 2393



@sarthakaggarwal97 Here is a focused review of the replication compression implementation.


Replication Compression Review

Architecture Summary

The design is sound: one long-lived LZ4 frame per replica (streamWriter / compressed_buf), push-mode streamReader singleton on the replica side, capability negotiation via REPLICA_CAPA_COMPRESSION, and optional IO-thread affinity to preserve LZ4's internal dictionary locality. The backlog-cursor/offset split (logical uncompressed bytes vs. wire bytes) is handled correctly.


Issues Found

1. replRefreshDecompression() silently swallows init failures

// src/replication.c line 216
void replRefreshDecompression(void) {
    replDestroyDecompression();
    replInitDecompression();   // ← return value ignored
}

replInitDecompression() returns C_OK/C_ERR (it allocates a streamReaderCreatePush and an SDS). If it fails under memory pressure, server.repl_stream_decoder remains NULL and the replica silently stops decompressing subsequent bytes — leading to garbled data being fed to the command parser. The replRefreshDecompression callsites (lines 2518, 3892, 4419) also discard the result.

Suggestion: Propagate the error and either disconnect the replication link or panic:

int replRefreshDecompression(void) {
    replDestroyDecompression();
    if (replInitDecompression() != C_OK) {
        serverLog(LL_WARNING, "Failed to (re)initialize replication decompressor — disconnecting");
        replicationHandlePrimaryDisconnection();
        return C_ERR;
    }
    return C_OK;
}

2. postWriteToReplica hardcodes the algorithm in the error log

// src/networking.c line 2412
compressionAlgoName(REPL_COMPRESSION_ALGO),   // compile-time constant

REPL_COMPRESSION_ALGO is a compile-time macro. The error path logs it after the compressor may already be in an errored state, rather than reading the algorithm from the client's active streamWriter. Once ZSTD or other codecs are added, this line will always report lz4 regardless of what was actually used.

Suggestion: Read the algo from c->repl_data->repl_compressor (via a getter), or at minimum store the negotiated compressionAlgo in ClientReplicationData.


3. Affinity ownership is never established when the main thread runs first

// src/networking.c line 2572
int my_tid = getCurTid();   // returns 0 for main thread

// line 2580
if (server.repl_compression_thread_affinity && c->repl_data->affinity_tid <= 0) {
    c->repl_data->affinity_tid = my_tid;   // sets affinity_tid = 0
}

affinity_tid is initialised to -1 and the ownership guard is <= 0. If writeToReplicaCompressed is first called from the main thread (getCurTid() == 0), it sets affinity_tid = 0. The fast-path enqueue in io_threads.c checks tid > 0 (implicitly, since it uses the tid as an index), so affinity_tid = 0 never triggers the private-inbox route — effectively disabling affinity without any log or counter. The condition then keeps resetting the "owner" to the main thread on every call.

Suggestion: Only record ownership when my_tid > 0 (an actual IO thread):

if (server.repl_compression_thread_affinity && c->repl_data->affinity_tid <= 0 && my_tid > 0) {
    c->repl_data->affinity_tid = my_tid;
}

4. compressionLz4CompressFeed: partial header written on capacity-short first call

// src/compression_lz4.c
if (!sc->stream_started) {
    size_t r = LZ4F_compressBegin(cctx, output, output_capacity, &prefs);
    if (LZ4F_isError(r)) return -1;
    offset = r;                    // header bytes written to caller's buffer
    sc->stream_started = true;
}

if (input_len > 0) {
    if (offset >= output_capacity) return -1;   // ← returns -1 but header already in 'output'

If output_capacity is exactly the LZ4 frame header size, LZ4F_compressBegin succeeds and writes the header, sc->stream_started = true, but the subsequent offset >= output_capacity guard returns -1. The caller sees an error, but the output buffer has already been partially written (the frame header). On a retry the caller would allocate a larger buffer and call again — but stream_started is now true so LZ4F_compressBegin is skipped, producing a frame without a header for the compressed block.

The comment "capacity-shortage early returns are retriable" is only accurate when no bytes have been emitted yet. Once LZ4F_compressBegin writes to output, the buffer is no longer clean. This is a corner case (exact capacity match) but worth guarding:

Suggestion: After LZ4F_compressBegin succeeds but before returning -1 for capacity reasons, set sc->errored = true (or reset stream_started) to prevent silent frame corruption on retry.


5. Minor: FLUSH_SYNC on every streamWriterFlush call produces a synchronisation block mid-frame

writeToReplicaCompressed calls streamWriterFlush() after each 1 MB batch. Inside streamWriterFlush, the VKCS layer calls streamCompressFeed with FLUSH_SYNC. For LZ4, FLUSH_SYNC emits a flush block, which adds a 4-byte end-of-block marker per batch. At high throughput with many small flushes this adds measurable overhead. Consider whether FLUSH_SYNC is necessary here versus simply relying on the emit callback's natural block boundaries (FLUSH_CONTINUE), given that framing is already handled by the VKCS envelope.


6. Capability gating on use_diskless_load is not enforced server-side

// src/replication.c line 4042
if (server.repl_compression && use_diskless_load) {
    // ... advertise REPLICA_CAPA_COMPRESSION
}

The primary trusts the advertised capability from the replica's REPLCONF capa compression (line 1593-1594) without cross-checking that the replica is actually in diskless-load mode. If a buggy or misconfigured replica advertises the capability without diskless load, the primary will compress the incremental stream while the replica may not have the decompressor active. The replInitDecompression is gated on server.repl_compression (not on the load mode). This is likely fine in practice since the primary controls the full-sync path, but a server-side assertion or capability-reconciliation step would add resilience.


Positive Observations

  • Thread safety on teardown (waitForClientIO before streamWriterDestroy) is correct and critical.
  • Cursor pinning until the full compressed batch drains (compressed_buf_pos tracking) correctly prevents logical-offset drift.
  • Frame-done detection in replDecompressQueryBuf (logging and disconnecting on unexpected frame_done) is handled well.
  • Main-thread-only reads for primary clients with an active decoder (io_threads.c line 517) is an essential correctness guard.
  • The 1 MB batch cap (REPL_COMPRESSION_BATCH_LIMIT) and 256 MB decompression cap (REPL_DECOMPRESSION_BATCH_LIMIT) prevent unbounded latency spikes.
  • Observability (per-replica and global INFO fields, thread-switch counters, CPU metrics) is comprehensive.

# Conflicts:
#	src/io_threads.c
#	src/networking.c
#	src/replication.c
@coderabbitai

coderabbitai Bot commented May 28, 2026

Copy link
Copy Markdown

Actionable comments posted: 0

@sarthakaggarwal97 sarthakaggarwal97 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @roshkhatri for this PR. I took an initial look at this change. The idea looks sound to me, and I have posted some comments related to API. I do have another suggestion.

At this point, based on the numbers, thread-affinity doesn't look too helpful. If we conclude that this is helpful. It would be a good follow up PR to this change. I think you also mentioned this offline.

Comment thread .github/workflows/ci.yml
Comment thread valkey.conf Outdated
Comment thread valkey.conf Outdated
Comment thread src/compression_stream.c Outdated
Comment thread src/server.h Outdated
Comment thread src/replication.c Outdated
Comment thread src/compression_stream.c Outdated
Comment thread src/networking.c Outdated
---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@roshkhatri

Copy link
Copy Markdown
Member Author

@CodeRabbit That C_ERR branch for replRefreshDecompression is unreachable (replDecompressorCreate only fails on allocation, which zmalloc aborts on; codec init is lazy in replDecompressorDecode and already returns REPL_DECODE_ERR -> disconnect), so I removed the dead C_ERR plumbing and made replRefreshDecompression void

@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown

Tip

For best results, initiate chat on the files or code changes.

@roshkhatri That reasoning is sound. Since zmalloc aborts on OOM in Valkey, replDecompressorCreate has no failure mode that returns — so the C_ERR path was indeed unreachable dead code. And the lazy codec initialization in replDecompressorDecode returning REPL_DECODE_ERR → disconnect is the correct path for any runtime decode-time failures. Making replRefreshDecompression void is the right cleanup.

Issue #2 (replRefreshDecompression failure not acted on) is withdrawn — the concern was based on an incorrect assumption about OOM behaviour in Valkey.

@sarthakaggarwal97 sarthakaggarwal97 added the run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP) label Jun 18, 2026
@roshkhatri

Copy link
Copy Markdown
Member Author

Test failures are not related to this PR change

sarthakaggarwal97 and others added 8 commits June 22, 2026 09:02
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

@sarthakaggarwal97 sarthakaggarwal97 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharing some more comments on the low level implementation.

Comment thread src/compression_repl.c
const uint8_t *in = src;
size_t off = 0;

/* Probe phase: classify the stream from its leading bytes. The magic may

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the probe phase? Through CAPA we would know if the stream is compressed or not right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will not know if the stream is compresses as the primary would only compress if compression is enabled on primary.

The key distinction of CAPA is that it tells the primary that the replica can decode compression and has the capabilty of decompression, but doesn't guarantee the primary will actually use it, so thats the reason probe phase is needed to detect whether the incoming stream is actually compressed.

Comment thread src/compression_repl.c Outdated
/* ===== Primary-side per-replica compressor ===== */

/* Emit callback for the streamWriter: append compressed bytes to out_buf. */
static int replCompressorEmit(void *ctx, const uint8_t *data, size_t len) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a second compressed-output buffer on top of streamWriter's internal out_buf: compression writes into the writer scratch, then this callback copies into replCompressor.out_buf for socket staging.

We do need staging for partial socket writes, but I don't think we need both buffers. Could the stream writer emit directly into the per-replica staging buffer, or let the replication adapter provide the output buffer used by the codec? That would remove one memcpy per emitted compressed chunk.

Comment thread src/networking.c
if (c->querybuf == thread_shared_qb && sdsavail(c->querybuf) < decompressed_len) {
initSharedQueryBuf();
}
c->querybuf = sdscatlen(c->querybuf, decode_buf, decompressed_len);

@sarthakaggarwal97 sarthakaggarwal97 Jun 25, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path does two copies for every decompressed byte: codec output -> rd->decode_buf, then rd->decode_buf -> c->querybuf.

Since replica-side decompression runs on the main thread, I think we should be stricter about copies here. Can the adapter decode directly into the query buffer after preserving the pre-read prefix? Something like replDecompressorDecodeIntoSds(..., &c->querybuf, ...) would keep the ownership clear and remove a full memcpy from the hot path.

@roshkhatri roshkhatri Jun 27, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need one copy, as like codec outputs to rd->decode_buf, then rd->decode_buf -> c->querybuf. as we cannot replace the compressed bytes with the decompressed bytes in place as decompressed bytes will be larger and will replace the compressed ones. We need preserving the compressed bytes until we finish decompressing the batch. and then replace the compressed set of bytes with the decompressed ones

Comment thread src/server.c
"slave_read_only:%d\r\n", server.repl_replica_ro,
"replica_announced:%d\r\n", server.replica_announced));
if (server.repl_decompression_errors > 0) {
info = sdscatfmt(info, "repl_decompression_errors:%U\r\n",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new replication decompression counters are emitted in INFO, but I don't see them reset by CONFIG RESETSTAT.

If these are intended to behave like the adjacent replication/network counters, we should reset repl_decompression_errors, repl_decompression_cpu_usec, and repl_decompressed_bytes_total as well.

@sarthakaggarwal97

Copy link
Copy Markdown
Contributor

@coderabbitai do full review for replication compression

@coderabbitai

coderabbitai Bot commented Jun 25, 2026

Copy link
Copy Markdown

@sarthakaggarwal97 I'll do a full review focused on the replication compression changes.

✅ Action performed

Full review finished.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (1)
src/compression_lz4.c (1)

91-129: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Make undersized-output failures atomic.

Once LZ4F_compressBegin() succeeds, the frame header may already be in output and cctx has advanced. The later offset >= output_capacity branches therefore are not safely retriable: a caller with room only for the header gets -1, and a retry on the same compressor can continue without re-emitting the header.

#!/bin/bash
set -euo pipefail

printf '\n--- compression feed implementation ---\n'
sed -n '80,140p' src/compression_lz4.c
printf '\n--- public feed contract ---\n'
sed -n '65,90p' src/compression.h
printf '\n--- output-bound helper and codec dispatch ---\n'
sed -n '1,90p' src/compression.c
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/compression_lz4.c` around lines 91 - 129, The undersized-output handling
in the LZ4 feed path is not atomic because `LZ4F_compressBegin()` can partially
emit the frame header before later `offset >= output_capacity` checks return
`-1`. Update `compression_lz4.c`’s feed/compress routine so any output-capacity
failure after `LZ4F_compressBegin()` is treated as a committed partial write,
and make retries use a fresh compressor state instead of continuing from
`cctx`/`stream_started`; align the retry contract with the public feed behavior
in `compression.h` and the dispatch path in `compression.c`.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.github/workflows/ci.yml:
- Around line 221-229: Both `actions/checkout` steps in the workflow currently
persist the GitHub token in local git config, which is unnecessary for this job.
Update the `Install libbacktrace` checkout and the subsequent `Checkout Valkey`
step to set `persist-credentials: false` so the token is not left available to
the PR-controlled `make` and `./runtest` commands.

In `@src/compression_lz4.c`:
- Around line 49-52: The LZ4 init path in compressionLz4CompressorInit and
compressionLz4DecompressorInit should not assert on a NULL context and then
still report success; update these init functions to detect allocation failure
from the
LZ4F_createCompressionContext_advanced/LZ4F_createDecompressionContext_advanced
calls, return -1 on failure, and leave cleanup to
streamCompressorInit/streamDecompressorInit so the wrapper can handle the error
path correctly.

In `@src/compression_repl.c`:
- Around line 108-129: The no-progress path in replDecodeFeed() drops unconsumed
compressed input, so replDecompressQueryBuf() may discard a partial frame and
corrupt replication. Update replDecompressor/replDecodeFeed() to retain pending
bytes across calls (or otherwise report how many bytes were actually consumed)
when streamDecompressorFeed() returns consumed == 0 and produced == 0, and only
allow the caller to trim the query buffer by the true consumed amount. Keep the
behavior aligned with the stream reader contract used by compression_stream.c so
split headers/blocks are preserved until they can be decoded.

In `@src/compression_stream.c`:
- Around line 259-301: The stream writer does not stop accepting operations
after a failure, so later calls can proceed on partially advanced state. Update
streamWriterWrite, streamWriterFlush, and streamWriterFinish to check
writer->errored up front and return failure/no-op immediately, and ensure
streamWriterFinish only marks writer->finished after streamWriterFeedAndEmit
succeeds so a failed final emit stays sticky. Use the existing
streamWriterWrite, streamWriterFlush, streamWriterFinish, and
writer->errored/finished fields to keep all public writer operations rejected
once an error has occurred.

In `@src/unit/test_compression.cpp`:
- Around line 304-345: The current regression test in
streamCompressorFeedErrorRecovery only covers the fully pre-frame failure path,
so add a second subcase that forces streamCompressorFeed() to emit the LZ4 frame
header and then fail before completing the flush/write. Use the existing
streamCompressorInit, streamCompressorOutputBound, and streamCompressorFeed flow
to trigger the partial-write error, then retry on the same stream and verify the
output still round-trips correctly instead of continuing from a corrupted state.
Keep the test in the same GoogleTest fixture and extend the assertions around
stream_started and the retry behavior to cover this header-written recovery
path.

In `@src/valkey-check-rdb.c`:
- Around line 851-854: The checksum-logging branch in the RDB checker is using
the wrong transport flag and misclassifies decompressed inputs. Update the
condition in the RDB check logic around the RIO flag handling to key off the
decompression wrapper state instead of RIO_FLAG_STREAMING_COMPRESSION, so
compressed inputs are recognized correctly before falling through to the generic
skip message. Use the existing checksum decision block in the checker function
that logs via rdbCheckInfo as the place to make this change.

In `@tests/integration/repl-compression.tcl`:
- Line 1: The repl-compression integration suite is still registered only under
the generic repl/external:skip tags, so the new repl-compression tag in
server.tcl never controls these tests. Update the tag declaration in
repl-compression.tcl to use repl-compression as the top-level tag and remove the
custom repl-compression-suite deny-tag gating so --tags repl-compression and
--tags -repl-compression correctly include or exclude the suite.
- Around line 79-114: The test temporarily changes the primary’s
repl-compression setting in the “Replica with repl-compression lz4-stream and
disk-backed load also negotiates compression” case, but it only restores it on
the success path. Update the test around the $primary config set and $primary
config set repl-compression no calls to use a try/finally pattern so the
original setting is always restored even if any wait_for_condition,
assert_equal, or replicaof step fails.

In `@tests/integration/replication-aof-sync.tcl`:
- Around line 167-214: The new replication restart test is asserting RDB reuse
for a primary using lz4-stream compression, but the current AOF sync path still
falls back to BGREWRITEAOF for that case. Either update the AOF sync/reuse logic
in the runtime code path that handles `src/aof.c` so `wait_for_sync` can reuse
the synced RDB as the AOF base, or adjust this test in
`replication-aof-sync.tcl` to expect the fallback log and non-reuse behavior;
keep the checks around `wait_for_sync`, `log_file_matches`, and
`get_cur_base_aof_name` consistent with whichever behavior is intended.

---

Duplicate comments:
In `@src/compression_lz4.c`:
- Around line 91-129: The undersized-output handling in the LZ4 feed path is not
atomic because `LZ4F_compressBegin()` can partially emit the frame header before
later `offset >= output_capacity` checks return `-1`. Update
`compression_lz4.c`’s feed/compress routine so any output-capacity failure after
`LZ4F_compressBegin()` is treated as a committed partial write, and make retries
use a fresh compressor state instead of continuing from `cctx`/`stream_started`;
align the retry contract with the public feed behavior in `compression.h` and
the dispatch path in `compression.c`.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 7d6c12b0-40b0-4924-b389-7d7f1c8923dc

📥 Commits

Reviewing files that changed from the base of the PR and between bee30d4 and 58fe0b3.

📒 Files selected for processing (58)
  • .github/workflows/ci.yml
  • cmake/Modules/SourceFiles.cmake
  • cmake/Modules/ValkeySetup.cmake
  • deps/CMakeLists.txt
  • deps/Makefile
  • deps/lz4/CMakeLists.txt
  • deps/lz4/LICENSE
  • deps/lz4/Makefile
  • deps/lz4/lz4.c
  • deps/lz4/lz4.h
  • deps/lz4/lz4frame.c
  • deps/lz4/lz4frame.h
  • deps/lz4/lz4hc.c
  • deps/lz4/lz4hc.h
  • deps/lz4/xxhash.c
  • deps/lz4/xxhash.h
  • src/CMakeLists.txt
  • src/Makefile
  • src/aof.c
  • src/compression.c
  • src/compression.h
  • src/compression_lz4.c
  • src/compression_lz4.h
  • src/compression_repl.c
  • src/compression_repl.h
  • src/compression_rio.c
  • src/compression_rio.h
  • src/compression_stream.c
  • src/compression_stream.h
  • src/config.c
  • src/io_threads.c
  • src/networking.c
  • src/rdb.c
  • src/rdb.h
  • src/replication.c
  • src/rio.c
  • src/rio.h
  • src/server.c
  • src/server.h
  • src/unit/CMakeLists.txt
  • src/unit/Makefile
  • src/unit/test_compression.cpp
  • src/unit/test_repl_compression.cpp
  • src/valkey-check-rdb.c
  • tests/integration/block-repl.tcl
  • tests/integration/dual-channel-replication.tcl
  • tests/integration/rdb-compression.tcl
  • tests/integration/repl-compression.tcl
  • tests/integration/replication-2.tcl
  • tests/integration/replication-3.tcl
  • tests/integration/replication-4.tcl
  • tests/integration/replication-aof-sync.tcl
  • tests/integration/replication.tcl
  • tests/integration/skip-rdb-checksum.tcl
  • tests/integration/valkey-check-rdb.tcl
  • tests/support/server.tcl
  • tests/test_helper.tcl
  • valkey.conf

Comment thread .github/workflows/ci.yml
Comment on lines +221 to +229
- name: Install libbacktrace
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: ianlancetaylor/libbacktrace
ref: b9e40069c0b47a722286b94eb5231f7f05c08713
path: libbacktrace
- run: cd libbacktrace && ./configure && make && sudo make install
- name: Checkout Valkey
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 Security & Privacy | 🟠 Major | ⚡ Quick win

Disable credential persistence on both checkout steps.

Both new actions/checkout steps leave the workflow token in local git config, and this job then runs PR-controlled make/./runtest. That gives the new job an unnecessary token-exfil path. Set persist-credentials: false on both checkouts.

Suggested change
       - name: Install libbacktrace
         uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
         with:
+          persist-credentials: false
           repository: ianlancetaylor/libbacktrace
           ref: b9e40069c0b47a722286b94eb5231f7f05c08713
           path: libbacktrace
@@
       - name: Checkout Valkey
         uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+        with:
+          persist-credentials: false
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- name: Install libbacktrace
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: ianlancetaylor/libbacktrace
ref: b9e40069c0b47a722286b94eb5231f7f05c08713
path: libbacktrace
- run: cd libbacktrace && ./configure && make && sudo make install
- name: Checkout Valkey
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install libbacktrace
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
repository: ianlancetaylor/libbacktrace
ref: b9e40069c0b47a722286b94eb5231f7f05c08713
path: libbacktrace
- run: cd libbacktrace && ./configure && make && sudo make install
- name: Checkout Valkey
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
🧰 Tools
🪛 zizmor (1.26.1)

[warning] 221-226: credential persistence through GitHub Actions artifacts (artipacked): does not set persist-credentials: false

(artipacked)


[warning] 228-229: credential persistence through GitHub Actions artifacts (artipacked): does not set persist-credentials: false

(artipacked)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/ci.yml around lines 221 - 229, Both `actions/checkout`
steps in the workflow currently persist the GitHub token in local git config,
which is unnecessary for this job. Update the `Install libbacktrace` checkout
and the subsequent `Checkout Valkey` step to set `persist-credentials: false` so
the token is not left available to the PR-controlled `make` and `./runtest`
commands.

Source: Linters/SAST tools

Comment thread src/compression_lz4.c
Comment thread src/compression_repl.c
Comment on lines +108 to +129
static replDecodeResult replDecodeFeed(replDecompressor *rd, const uint8_t *in, size_t len, size_t output_max) {
size_t off = 0;
while (off < len) {
char out_buf[REPL_DECODE_CHUNK];
size_t consumed = 0;
ssize_t produced = streamDecompressorFeed(&rd->decompressor, (uint8_t *)out_buf, sizeof(out_buf),
in + off, len - off, &consumed);
if (produced < 0 || consumed > len - off) return REPL_DECODE_ERR;
if (produced > 0) {
if (sdslen(rd->decode_buf) + (size_t)produced > output_max) return REPL_DECODE_OVERFLOW;
rd->decode_buf = sdscatlen(rd->decode_buf, out_buf, (size_t)produced);
}
off += consumed;
/* A long-lived replication stream must never reach a compressed frame
* end. If it does, the stream is corrupt or the primary sent an
* unexpected terminator: the caller should disconnect. */
if (rd->decompressor.frame_done) return REPL_DECODE_FRAME_DONE;
/* No progress: the codec has buffered a partial frame and needs more
* bytes. Resume on the next read tick. */
if (consumed == 0 && produced == 0) break;
}
return REPL_DECODE_OK;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🔴 Critical | 🏗️ Heavy lift

Preserve unconsumed compressed input before returning REPL_DECODE_OK.

When streamDecompressorFeed() returns consumed == 0 && produced == 0, this adapter breaks without retaining in + off. replDecompressQueryBuf() then treats REPL_DECODE_OK as permission to remove the whole raw range from c->querybuf, so a split compressed block/header can be dropped and corrupt the replication stream. Keep a pending compressed buffer in replDecompressor (or return consumed bytes so the caller can retain the tail) before accepting partial input. The stream reader contract in compression_stream.c preserves the unconsumed buffer in this exact no-progress case.

Suggested direction
-static replDecodeResult replDecodeFeed(replDecompressor *rd, const uint8_t *in, size_t len, size_t output_max) {
+static replDecodeResult replDecodeFeed(replDecompressor *rd, const uint8_t *in, size_t len,
+                                       size_t output_max, size_t *consumed_total) {
     size_t off = 0;
+    *consumed_total = 0;
     while (off < len) {
         ...
         off += consumed;
+        *consumed_total = off;
         ...
         if (consumed == 0 && produced == 0) break;
     }
     return REPL_DECODE_OK;
 }

Then append new compressed bytes to a replDecompressor pending SDS, drain from that SDS, and sdsrange() away only consumed_total bytes after a successful feed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/compression_repl.c` around lines 108 - 129, The no-progress path in
replDecodeFeed() drops unconsumed compressed input, so replDecompressQueryBuf()
may discard a partial frame and corrupt replication. Update
replDecompressor/replDecodeFeed() to retain pending bytes across calls (or
otherwise report how many bytes were actually consumed) when
streamDecompressorFeed() returns consumed == 0 and produced == 0, and only allow
the caller to trim the query buffer by the true consumed amount. Keep the
behavior aligned with the stream reader contract used by compression_stream.c so
split headers/blocks are preserved until they can be decoded.

Comment thread src/compression_stream.c Outdated
Comment on lines +259 to +301
ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) {
/* Writes after finish are a caller bug; silently dropping them would
* corrupt the consumer's view of the stream. */
if (writer->finished) return -1;
if (len == 0) return 0;
if (len > (size_t)SSIZE_MAX) return -1;

const uint8_t *src = (const uint8_t *)buf;
size_t remaining = len;
uint64_t emitted_before = writer->bytes_emitted;
if (streamWriterEnsureEnvelope(writer) != 0) return -1;
while (remaining > 0) {
size_t chunk_len = remaining < STREAM_WRITER_INPUT_CHUNK_SIZE
? remaining
: STREAM_WRITER_INPUT_CHUNK_SIZE;
if (streamWriterFeedAndEmit(writer, src, chunk_len, FLUSH_CONTINUE) != 0) return -1;
src += chunk_len;
remaining -= chunk_len;
}
uint64_t emitted_delta = writer->bytes_emitted - emitted_before;
if (emitted_delta > (uint64_t)SSIZE_MAX) {
writer->errored = true;
return -1;
}
return (ssize_t)emitted_delta;
}

int streamWriterFlush(streamWriter *writer) {
/* Flush after finish is a no-op: frame is already closed. */
if (writer->finished) return 0;

if (!writer->envelope_written || !writer->compressor.stream_started) return 0;
return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_SYNC);
}

int streamWriterFinish(streamWriter *writer) {
if (writer->finished) return 0;
writer->finished = true;

/* Even an empty stream produces a valid envelope + empty frame so the
* loader sees a well-formed file. */
if (streamWriterEnsureEnvelope(writer) != 0) return -1;
return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_END);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Make writer errors sticky before accepting more stream operations.

After an emit failure, compressor state may already have advanced, but streamWriterWrite() can still accept later input, and streamWriterFinish() can report success on a repeated call because finished was set before the failed final-frame emit. Reject all public writer operations once writer->errored is set.

Proposed fix
 ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) {
+    if (writer->errored) return -1;
     /* Writes after finish are a caller bug; silently dropping them would
      * corrupt the consumer's view of the stream. */
     if (writer->finished) return -1;
@@
 int streamWriterFlush(streamWriter *writer) {
+    if (writer->errored) return -1;
     /* Flush after finish is a no-op: frame is already closed. */
     if (writer->finished) return 0;
@@
 int streamWriterFinish(streamWriter *writer) {
+    if (writer->errored) return -1;
     if (writer->finished) return 0;
     writer->finished = true;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) {
/* Writes after finish are a caller bug; silently dropping them would
* corrupt the consumer's view of the stream. */
if (writer->finished) return -1;
if (len == 0) return 0;
if (len > (size_t)SSIZE_MAX) return -1;
const uint8_t *src = (const uint8_t *)buf;
size_t remaining = len;
uint64_t emitted_before = writer->bytes_emitted;
if (streamWriterEnsureEnvelope(writer) != 0) return -1;
while (remaining > 0) {
size_t chunk_len = remaining < STREAM_WRITER_INPUT_CHUNK_SIZE
? remaining
: STREAM_WRITER_INPUT_CHUNK_SIZE;
if (streamWriterFeedAndEmit(writer, src, chunk_len, FLUSH_CONTINUE) != 0) return -1;
src += chunk_len;
remaining -= chunk_len;
}
uint64_t emitted_delta = writer->bytes_emitted - emitted_before;
if (emitted_delta > (uint64_t)SSIZE_MAX) {
writer->errored = true;
return -1;
}
return (ssize_t)emitted_delta;
}
int streamWriterFlush(streamWriter *writer) {
/* Flush after finish is a no-op: frame is already closed. */
if (writer->finished) return 0;
if (!writer->envelope_written || !writer->compressor.stream_started) return 0;
return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_SYNC);
}
int streamWriterFinish(streamWriter *writer) {
if (writer->finished) return 0;
writer->finished = true;
/* Even an empty stream produces a valid envelope + empty frame so the
* loader sees a well-formed file. */
if (streamWriterEnsureEnvelope(writer) != 0) return -1;
return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_END);
ssize_t streamWriterWrite(streamWriter *writer, const void *buf, size_t len) {
if (writer->errored) return -1;
/* Writes after finish are a caller bug; silently dropping them would
* corrupt the consumer's view of the stream. */
if (writer->finished) return -1;
if (len == 0) return 0;
if (len > (size_t)SSIZE_MAX) return -1;
const uint8_t *src = (const uint8_t *)buf;
size_t remaining = len;
uint64_t emitted_before = writer->bytes_emitted;
if (streamWriterEnsureEnvelope(writer) != 0) return -1;
while (remaining > 0) {
size_t chunk_len = remaining < STREAM_WRITER_INPUT_CHUNK_SIZE
? remaining
: STREAM_WRITER_INPUT_CHUNK_SIZE;
if (streamWriterFeedAndEmit(writer, src, chunk_len, FLUSH_CONTINUE) != 0) return -1;
src += chunk_len;
remaining -= chunk_len;
}
uint64_t emitted_delta = writer->bytes_emitted - emitted_before;
if (emitted_delta > (uint64_t)SSIZE_MAX) {
writer->errored = true;
return -1;
}
return (ssize_t)emitted_delta;
}
int streamWriterFlush(streamWriter *writer) {
if (writer->errored) return -1;
/* Flush after finish is a no-op: frame is already closed. */
if (writer->finished) return 0;
if (!writer->envelope_written || !writer->compressor.stream_started) return 0;
return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_SYNC);
}
int streamWriterFinish(streamWriter *writer) {
if (writer->errored) return -1;
if (writer->finished) return 0;
writer->finished = true;
/* Even an empty stream produces a valid envelope + empty frame so the
* loader sees a well-formed file. */
if (streamWriterEnsureEnvelope(writer) != 0) return -1;
return streamWriterFeedAndEmit(writer, NULL, 0, FLUSH_END);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/compression_stream.c` around lines 259 - 301, The stream writer does not
stop accepting operations after a failure, so later calls can proceed on
partially advanced state. Update streamWriterWrite, streamWriterFlush, and
streamWriterFinish to check writer->errored up front and return failure/no-op
immediately, and ensure streamWriterFinish only marks writer->finished after
streamWriterFeedAndEmit succeeds so a failed final emit stays sticky. Use the
existing streamWriterWrite, streamWriterFlush, streamWriterFinish, and
writer->errored/finished fields to keep all public writer operations rejected
once an error has occurred.

Comment on lines +304 to +345
/* --- Test: pre-frame errors are recoverable, mid-frame errors are permanent --- */
TEST_F(CompressionTest, streamCompressorFeedErrorRecovery) {
streamCompressor sc;
ASSERT_EQ(streamCompressorInit(&sc, ALGO_LZ4, 0), 0);

/* Pre-frame error: compressBegin fails with tiny buffer, but no frame
* bytes have been emitted yet — this is recoverable. */
uint8_t tiny[1];
ssize_t ret = streamCompressorFeed(&sc, tiny, 1,
(const uint8_t *)"test data", 9, FLUSH_END);
ASSERT_EQ(ret, -1) << "should fail with tiny buffer";
ASSERT_EQ(sc.stream_started, false) << "stream_started should still be false";

/* Retry with a proper buffer — should succeed */
size_t bound = streamCompressorOutputBound(&sc, 9);
uint8_t *buf = (uint8_t *)zmalloc(bound);
ssize_t ret2 = streamCompressorFeed(&sc, buf, bound,
(const uint8_t *)"test data", 9, FLUSH_END);
ASSERT_GT(ret2, 0) << "retry after pre-frame error should succeed";
zfree(buf);
streamCompressorFree(&sc);

/* Mid-frame error: start a frame, then force an error with a tiny buffer. */
streamCompressor sc2;
ASSERT_EQ(streamCompressorInit(&sc2, ALGO_LZ4, 0), 0);

size_t bound2 = streamCompressorOutputBound(&sc2, 5);
uint8_t *buf2 = (uint8_t *)zmalloc(bound2);
ssize_t ret3 = streamCompressorFeed(&sc2, buf2, bound2,
(const uint8_t *)"hello", 5, FLUSH_CONTINUE);
ASSERT_GE(ret3, 0) << "first write should succeed";
ASSERT_EQ(sc2.stream_started, true) << "stream should be started";

uint8_t tiny2[1];
ssize_t ret4 = streamCompressorFeed(&sc2, tiny2, 1,
(const uint8_t *)"more data to compress", 21,
FLUSH_END);
ASSERT_EQ(ret4, -1) << "mid-frame error should fail";

zfree(buf2);
streamCompressorFree(&sc2);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Add a regression for the header-written retry path.

This test only covers the fully pre-frame failure (tiny[1]). The still-open corruption case is when streamCompressorFeed() has enough output space to emit the LZ4 frame header but not enough to finish the write/flush; that can return -1 after mutating the stream, and the retry may continue without a fresh header. Please add a subcase that fails after the header is emitted and then verifies the retried stream is still a valid round-trip.

As per coding guidelines, src/unit/**/*.{c,cpp}: Unit tests live under src/unit/ and use GoogleTest (gtest/gmock). Data-structure and low-level logic changes should be covered by unit tests.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/unit/test_compression.cpp` around lines 304 - 345, The current regression
test in streamCompressorFeedErrorRecovery only covers the fully pre-frame
failure path, so add a second subcase that forces streamCompressorFeed() to emit
the LZ4 frame header and then fail before completing the flush/write. Use the
existing streamCompressorInit, streamCompressorOutputBound, and
streamCompressorFeed flow to trigger the partial-write error, then retry on the
same stream and verify the output still round-trips correctly instead of
continuing from a corrupted state. Keep the test in the same GoogleTest fixture
and extend the assertions around stream_started and the retry behavior to cover
this header-written recovery path.

Source: Coding guidelines

Comment thread src/valkey-check-rdb.c
Comment on lines +851 to +854
if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
rdbCheckInfo("Skipping logical RDB checksum for streaming-compressed input.");
} else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
rdbCheckInfo("RDB file was saved with checksum disabled: skipped checksum for this transfer.");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Check the decompression flag in the checker.

Line 851 uses RIO_FLAG_STREAMING_COMPRESSION, but this path is reading through a decompression wrapper, so compressed inputs fall through to the generic skip message.

Proposed fix
-        if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
+        if ((rdb->flags & RIO_FLAG_STREAMING_DECOMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
rdbCheckInfo("Skipping logical RDB checksum for streaming-compressed input.");
} else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
rdbCheckInfo("RDB file was saved with checksum disabled: skipped checksum for this transfer.");
if ((rdb->flags & RIO_FLAG_STREAMING_DECOMPRESSION) && (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
rdbCheckInfo("Skipping logical RDB checksum for streaming-compressed input.");
} else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
rdbCheckInfo("RDB file was saved with checksum disabled: skipped checksum for this transfer.");
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/valkey-check-rdb.c` around lines 851 - 854, The checksum-logging branch
in the RDB checker is using the wrong transport flag and misclassifies
decompressed inputs. Update the condition in the RDB check logic around the RIO
flag handling to key off the decompression wrapper state instead of
RIO_FLAG_STREAMING_COMPRESSION, so compressed inputs are recognized correctly
before falling through to the generic skip message. Use the existing checksum
decision block in the checker function that logs via rdbCheckInfo as the place
to make this change.

Comment thread tests/integration/repl-compression.tcl Outdated
Comment thread tests/integration/repl-compression.tcl
Comment on lines +167 to +214
test "Replica restart reuses disk-based sync RDB when primary rdbcompression is lz4-stream" {
start_server {overrides {appendonly yes aof-use-rdb-preamble yes repl-diskless-sync no save "" rdbcompression lz4-stream}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]

for {set i 0} {$i < 40} {incr i} {
$primary set "lz4-key:$i" "value:$i"
}
waitForBgrewriteaof $primary

start_server {overrides {appendonly yes aof-use-rdb-preamble yes repl-diskless-sync no save ""}} {
set replica [srv 0 client]
set replica_log [srv 0 stdout]

$replica replicaof $primary_host $primary_port
wait_for_sync $replica

wait_for_condition 50 100 {
[log_file_matches $replica_log "*Reused RDB file from primary sync as AOF base file*"]
} else {
fail "Expected log message about reusing RDB file not found"
}

assert {![log_file_matches $replica_log "*uses streaming compression, falling back to BGREWRITEAOF*"]}
set manifest_path [get_aof_manifest_path $replica]
set base_name [get_cur_base_aof_name $manifest_path]
assert {$base_name ne ""}
assert {[string match "*.rdb" $base_name]}

for {set i 40} {$i < 60} {incr i} {
$primary set "lz4-key:$i" "value:$i"
}
wait_for_ofs_sync $primary $replica

$replica replicaof no one

restart_server 0 true false
set replica [srv 0 client]
wait_done_loading $replica

assert_equal 60 [$replica dbsize]
for {set i 0} {$i < 60} {incr i} {
assert_equal "value:$i" [$replica get "lz4-key:$i"]
}
}
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

This test expects the opposite of the current AOF fallback behavior.

With rdbcompression lz4-stream on Line 168, the synced dump.rdb is streaming-compressed. src/aof.c still treats that file as non-reusable, logs the falling back to BGREWRITEAOF message, and returns C_ERR instead of reusing it as the AOF base. Unless that runtime-layer guard is removed in the same stack, this test will fail deterministically. Either update the runtime behavior with this PR or flip the assertions here to validate the fallback path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/replication-aof-sync.tcl` around lines 167 - 214, The new
replication restart test is asserting RDB reuse for a primary using lz4-stream
compression, but the current AOF sync path still falls back to BGREWRITEAOF for
that case. Either update the AOF sync/reuse logic in the runtime code path that
handles `src/aof.c` so `wait_for_sync` can reuse the synced RDB as the AOF base,
or adjust this test in `replication-aof-sync.tcl` to expect the fallback log and
non-reuse behavior; keep the checks around `wait_for_sync`, `log_file_matches`,
and `get_cur_base_aof_name` consistent with whichever behavior is intended.

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants