Skip to content

Streaming Compression support for RDB#3531

Open
sarthakaggarwal97 wants to merge 41 commits into
valkey-io:unstablefrom
sarthakaggarwal97:streaming-compression-rio-pr
Open

Streaming Compression support for RDB#3531
sarthakaggarwal97 wants to merge 41 commits into
valkey-io:unstablefrom
sarthakaggarwal97:streaming-compression-rio-pr

Conversation

@sarthakaggarwal97

@sarthakaggarwal97 sarthakaggarwal97 commented Apr 17, 2026

Copy link
Copy Markdown
Contributor

Related to #3195

Overview

Today, rdbcompression only affects individual string payloads inside an otherwise normal RDB stream. This PR adds Valkey Compressed Stream (VCS) support for RDB persistence, with lz4 as the first supported whole-stream codec.

The RDB can now be wrapped in a VCS envelope and compressed as a single stream at the rio layer. The default behavior remains unchanged with lzf. On load, the RDB path probes for the envelope first. If present and valid, the input is transparently decompressed before normal RDB parsing continues. If absent, loading falls back to the existing plain RDB path. If the envelope is malformed or incompatible, the load fails early.

Architecture

+------------------------+       +------------------------------+       +----------------------+
|    RDB save/load       |       |         rio wrappers         |       |      transport       |
|                        |       |                              |       |                      |
| rdbSaveRio             |------>| compress_rio_t               |------>| dump.rdb / sync RDB  |
| rdbLoadRio             |<------| decompress_rio_t             |<------| file rio             |
| rdbSaveRawString       |       | stream_writer / reader       |       |                      |
| valkey-check-rdb       |       | VCS envelope + codec state  |       |                      |
+------------------------+       +------------------------------+       +----------------------+

Key Design Decisions

  1. Whole-stream compression is implemented as a rio decorator rather than as part of object serialization.
  2. VCS adds a small envelope ahead of the compressed payload so loaders can classify the stream before normal RDB parsing begins.
  3. lzf remains the default and plain RDB compatibility is preserved.
  4. Per-string LZF is skipped when whole-stream compression is active, so the two compression modes are not stacked.
  5. stream_kind validation rejects incompatible VCS streams early.
  6. rdbLoad and valkey-check-rdb both handle plain and VCS-wrapped inputs transparently.
  7. Diskless replication stays on the existing non-VCS path in this PR.

Data Flow: Save Path

rdbSave()
   |
   | if rdbcompression=yes and algo=lz4
   v
rioInitWithCompress(...)
   |
   | write VCS envelope
   | feed normal RDB bytes into LZ4 frame encoder
   v
dump.rdb

Important notes:

  • The VCS header stores magic, version, codec, flags, and stream_kind.
  • rdbchecksum yes enables codec-native integrity signaling for the compressed stream.
  • When whole-stream compression is active, the existing per-string LZF path is bypassed.

Data Flow: Load Path

rdbLoad() / valkey-check-rdb
   |
   | rdbInputStreamPrepare()
   | probe first bytes
   v
[plain RDB] ------------------------> passthrough rio -> normal RDB parser
[VCS + kind=RDB + valid header] ---> decompress_rio -> normal RDB parser
[malformed/incompatible VCS] ------> fail early

This PR is intentionally scoped to RDB streaming compression and the persistence/load paths that need to understand it. It does not add diskless sync and replication compression APIs.

Benchmarks

Benchmarked on r7g.2xlarge (Graviton, 8 vCPUs, 61GB RAM, NVMe). All results averaged over 3 repeats.

Datasets

  • Improved Realistic JSON: Synthetic JSON with natural language text and varied field types. This data is more compressible due to natural language. Tested at 100B–10KB value sizes.
final_improved_combined
  • BlockMesh Tweets — 1M/5M unique real tweets from BlockMesh/tweets. Multilingual, avg ~270B, zero cross-key repetition.
final_blockmesh_combined

Notes:

  1. LZ4 streaming beats LZF on every metric at every size. 30-77% faster saves, 24-73% faster loads, 45-73% smaller RDBs.
  2. LZ4 library is currently vendored from https://github.com/lz4/lz4. The decision was taken here [NEW] RDB Compression via LZ4, Batching, and Batch-Level Dictionary #1962 (comment)

@codecov

codecov Bot commented Apr 18, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 92.04545% with 154 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.13%. Comparing base (79bca53) to head (40e00fc).

Files with missing lines Patch % Lines
src/compression_stream.c 90.25% 35 Missing ⚠️
src/compression_rio.c 74.78% 29 Missing ⚠️
src/valkey-check-rdb.c 59.37% 26 Missing ⚠️
src/rio.c 67.18% 21 Missing ⚠️
src/compression.c 72.72% 18 Missing ⚠️
src/rdb.c 80.82% 14 Missing ⚠️
src/aof.c 70.00% 6 Missing ⚠️
src/unit/test_compression.cpp 99.72% 3 Missing ⚠️
src/compression_lz4.c 97.40% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #3531      +/-   ##
============================================
+ Coverage     76.68%   77.13%   +0.45%     
============================================
  Files           162      167       +5     
  Lines         81021    82890    +1869     
============================================
+ Hits          62129    63938    +1809     
- Misses        18892    18952      +60     
Files with missing lines Coverage Δ
src/config.c 78.93% <ø> (ø)
src/rdb.h 100.00% <ø> (ø)
src/rio.h 100.00% <ø> (ø)
src/server.h 100.00% <ø> (ø)
src/compression_lz4.c 97.40% <97.40%> (ø)
src/unit/test_compression.cpp 99.72% <99.72%> (ø)
src/aof.c 80.17% <70.00%> (-0.14%) ⬇️
src/rdb.c 76.94% <80.82%> (-0.13%) ⬇️
src/compression.c 72.72% <72.72%> (ø)
src/rio.c 82.79% <67.18%> (-1.46%) ⬇️
... and 3 more

... and 21 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@sarthakaggarwal97 sarthakaggarwal97 changed the title Compression: Add Streaming RIO-based RDB compression Add Streaming Compression support for RDB Apr 20, 2026
@sarthakaggarwal97 sarthakaggarwal97 changed the title Add Streaming Compression support for RDB Streaming Compression support for RDB Apr 20, 2026
@sarthakaggarwal97 sarthakaggarwal97 marked this pull request as ready for review April 21, 2026 16:13
@sarthakaggarwal97 sarthakaggarwal97 force-pushed the streaming-compression-rio-pr branch 3 times, most recently from a241fd7 to 6af049a Compare April 27, 2026 15:47
@github-actions

Copy link
Copy Markdown

❌ Provenance Check Alert

Potential code similarities detected with upstream repository.

  • 2026-04-27 15:48:12 [INFO] - matches redis/redis PR #14433 (similarity: 0.992, method: file_simhash+deep)
  • 2026-04-27 15:48:12 [INFO] - matches redis/redis PR #14609 (similarity: 0.992, method: file_simhash+deep)
  • 2026-04-27 15:48:12 [INFO] - matches redis/redis PR #14435 (similarity: 0.992, method: file_simhash+deep)

This check was performed automatically by the Provenance Guard Action.

@xdk-amz xdk-amz left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good but recommend separating RDB checksum concerns from compression checksum concerns and letting operators decide what combination they want instead of mixing the handling between the two modules. I would like to see some exploration on the lz4 configs I commented on as well as some defensive fixes to potentially unsafe code + other nits.

Comment thread src/aof.c Outdated
Comment thread src/aof.c Outdated
Comment thread src/rio.c Outdated
Comment thread src/rdb.c Outdated
Comment thread src/compression_rio.c Outdated
Comment thread src/compression_lz4.c Outdated
Comment thread src/compression_stream.c Outdated
Comment thread src/compression_lz4.c
Comment thread src/compression_lz4.c Outdated
Comment thread src/compression_stream.c Outdated
@sarthakaggarwal97

sarthakaggarwal97 commented May 5, 2026

Copy link
Copy Markdown
Contributor Author

Benchmarks with using RDB checksum as is over logical RDB Stream:

LZ4@0 Save (ms): CRC over logical stream vs LZ4 Block Checksum only over 1M Keys:

Value Size LZ4 Block Checksum CRC + Block Checksum Overhead
100B 489 628 +28%
256B 890 1062 +19%
512B 1464 1773 +21%
1024B 2228 2798 +26%
10KB 13778 17705 +28%

LZ4@0 Load (ms):

Size LZ4 Block Checksum CRC + Block Checksum Overhead
100B 641 764 +19%
256B 840 1050 +25%
512B 1260 1582 +26%
1024B 1910 2447 +28%
10KB 13061 15852 +21%

@sarthakaggarwal97

Copy link
Copy Markdown
Contributor Author
image image image image

@coderabbitai

coderabbitai Bot commented May 12, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds public streaming-compression API (src/compression.h) and LZ4-specific declarations (src/compression_lz4.h) exposing lifecycle, feed, and sizing helpers used by the LZ4 codec implementation and rio/streaming integrations.

Changes

Streaming compression public headers

Layer / File(s) Summary
Streaming compression public interface
src/compression.h
Adds exported enums, streamCompressor/streamDecompressor structs, codec query helpers, lifecycle init/destroy, accessors, conservative output bound helper, and streaming feed APIs for compression/decompression.
LZ4 codec public declarations
src/compression_lz4.h
Adds LZ4-specific header declaring compressor/decompressor Init/Destroy, compressionLz4OutputBound, and streaming feed functions including input_consumed semantics.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 34.34% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Streaming Compression support for RDB' directly matches the main purpose of the changeset: adding whole-stream LZ4 compression support for RDB persistence.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description clearly relates to the changeset by explaining the addition of Valkey Compressed Stream (VKCS) support for RDB persistence with LZ4 compression, matching the file changes shown.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/aof.c (1)

1009-1054: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Please request @core-team architectural review for this aof.c behavior change.

This changes AOF/RDB sync-restart behavior (including fallback policy), which should get explicit core-team architectural sign-off before merge.

As per coding guidelines, "Request @core-team architectural review for changes to cluster*.c, replication.c, rdb.c, or aof.c".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aof.c` around lines 1009 - 1054, The change in aof.c modifies AOF/RDB
sync-restart behavior (in functions restartAOFWithSyncRdb and
rdbFileUsesStreamingCompression) and the fallback policy when a
streamed-compressed RDB is detected; request an explicit architectural review
from `@core-team` before merging by adding a review request or TODO in the PR and
code comments referencing restartAOFWithSyncRdb and
rdbFileUsesStreamingCompression so core-team can approve the behavioral change,
and ensure the PR description summarizes the new fallback logic (fall back to
BGREWRITEAOF when streaming compression is detected) for their sign-off.
src/rdb.c (1)

3689-3695: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Differentiate VKCS loads from checksum-disabled RDBs in this notice.

RIO_FLAG_SKIP_RDB_CHECKSUM now also covers the streaming-compressed path, so this branch will log that checksums were disabled even when integrity is coming from the codec layer. That makes load diagnostics misleading.

Suggested fix
-            if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
-                serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: skipped checksum for this transfer");
+            if ((rdb->flags & RIO_FLAG_STREAMING_COMPRESSION) &&
+                (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM)) {
+                serverLog(LL_NOTICE,
+                          "Skipping logical RDB checksum for streaming-compressed input");
+            } else if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
+                serverLog(LL_NOTICE,
+                          "RDB file was saved with checksum disabled: "
+                          "skipped checksum for this transfer");
             } else if (cksum == 0) {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rdb.c` around lines 3689 - 3695, The notice incorrectly attributes
checksum skipping to RIO_FLAG_SKIP_RDB_CHECKSUM even for streaming-compressed
(VKCS) loads; update the branch in rdb.c (where server.rdb_checksum,
server.skip_checksum_validation, memrev64ifbe, rdb->flags and
RIO_FLAG_SKIP_RDB_CHECKSUM are handled) to detect when the input is coming from
a streaming-compressed/codec-backed path (e.g., check the RIO or codec metadata
on rdb to see if a VKCS/streaming codec provided integrity) and emit a distinct
serverLog message for that case versus a true checksum-disabled RDB; keep the
existing behavior for cksum==0 and cksum!=expected, but split the
RIO_FLAG_SKIP_RDB_CHECKSUM branch into two paths (codec-backed integrity vs.
checksum-disabled file) with appropriate, different log text.
🧹 Nitpick comments (3)
src/compression_lz4.h (1)

12-28: ⚡ Quick win

Add API contract comments for exported LZ4 entry points.

These declarations are public-facing but currently undocumented at function level (ownership, error model, buffer/input-consumed guarantees). Please add concise contract comments to prevent misuse.

As per coding guidelines "Document why code exists, not just what it does; document all functions in C code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/compression_lz4.h` around lines 12 - 28, Add concise API contract
comments above each exported LZ4 function declaration
(compressionLz4CompressorInit, compressionLz4CompressorDestroy,
compressionLz4DecompressorInit, compressionLz4DecompressorDestroy,
compressionLz4OutputBound, compressionLz4CompressFeed,
compressionLz4DecompressFeed) that describe ownership semantics (who
allocates/frees streamCompressor/streamDecompressor and buffer pointers), the
error model and possible return values (success codes vs negative error codes),
guarantees about how much output will be produced or bounded (for
compressionLz4OutputBound and CompressFeed), and for decompression functions
explicitly state how input_consumed is set and when callers must retry vs drop
input; keep each comment short, consistent, and focused on why the function
contract exists (caller responsibilities, lifetime, and failure behavior).
deps/lz4/Makefile (1)

26-28: ⚡ Quick win

Declare clean as phony.

If a file named clean appears in this directory, make clean can silently no-op. Add .PHONY: clean for reliable cleanup.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@deps/lz4/Makefile` around lines 26 - 28, The Makefile target "clean" should
be declared phony to avoid being skipped if a file named "clean" exists; update
the Makefile by adding a .PHONY declaration for the clean target (e.g., add
".PHONY: clean") so that the clean rule (which removes *.o and *.a) always runs.
src/rdb.c (1)

3150-3197: ⚡ Quick win

Make the new rdbInputStream* helpers file-local.

These helpers are only referenced inside src/rdb.c, so exporting them widens the symbol surface for no gain.

Suggested fix
-void rdbInputStreamInit(rdbInputStream *input, rio *raw_rio) {
+static void rdbInputStreamInit(rdbInputStream *input, rio *raw_rio) {
@@
-decompressRioInitResult rdbInputStreamPrepare(rdbInputStream *input) {
+static decompressRioInitResult rdbInputStreamPrepare(rdbInputStream *input) {
@@
-void rdbInputStreamDestroy(rdbInputStream *input) {
+static void rdbInputStreamDestroy(rdbInputStream *input) {
@@
-int rdbInputStreamValidateEnd(rdbInputStream *input) {
+static int rdbInputStreamValidateEnd(rdbInputStream *input) {
@@
-bool rdbRioHasCorruptCompressedInput(const rio *rdb) {
+static bool rdbRioHasCorruptCompressedInput(const rio *rdb) {

As per coding guidelines, "Use static keyword for file-local functions in C code"

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rdb.c` around lines 3150 - 3197, Mark the helper functions as file-local
by adding the static storage specifier to their definitions: rdbInputStreamInit,
rdbInputStreamPrepare, rdbInputStreamDestroy, rdbInputStreamValidateEnd, and
rdbRioHasCorruptCompressedInput. Leave the rdbInputStream type and any external
declarations untouched; only change the function declarations/definitions to
static so these helpers are not exported from the object file.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@deps/lz4/lz4frame.c`:
- Around line 1619-1621: LZ4F_decompress performs pointer arithmetic on
srcBuffer without checking for NULL; add a guard at the start of LZ4F_decompress
to handle srcBuffer == NULL (or *srcSizePtr == 0) before computing
srcStart/srcEnd/srcPtr so you never do NULL + offset arithmetic; if srcBuffer is
NULL or *srcSizePtr == 0, initialize srcStart/srcEnd/srcPtr to a safe
non-dereferenced value (or skip computing srcEnd) and ensure subsequent logic
(e.g., the frame parsing path exercised by LZ4F_getFrameInfo calling
LZ4F_decompress with NULL/0) treats this as an empty input rather than
performing pointer arithmetic or dereferencing.

In `@src/compression_rio.c`:
- Around line 89-100: compressRioFlush and compressRioFinish currently mark the
stream writer errored but do not set the rio-level write error flag, so update
both functions (compressRioFlush and compressRioFinish) to set
RIO_FLAG_WRITE_ERROR on the rio when any flush/finalize failure occurs;
specifically, when streamWriterIsErrored(cr->writer) is detected, when
streamWriterFlush(cr->writer) != 0, and when
cr->inner->flush/cr->inner->finalize returns 0, set the rio flag (e.g., r->flags
|= RIO_FLAG_WRITE_ERROR or call the existing helper to mark rio write error) in
addition to calling streamWriterSetError(cr->writer) so callers relying on the
rio flag see the error.

---

Outside diff comments:
In `@src/aof.c`:
- Around line 1009-1054: The change in aof.c modifies AOF/RDB sync-restart
behavior (in functions restartAOFWithSyncRdb and
rdbFileUsesStreamingCompression) and the fallback policy when a
streamed-compressed RDB is detected; request an explicit architectural review
from `@core-team` before merging by adding a review request or TODO in the PR and
code comments referencing restartAOFWithSyncRdb and
rdbFileUsesStreamingCompression so core-team can approve the behavioral change,
and ensure the PR description summarizes the new fallback logic (fall back to
BGREWRITEAOF when streaming compression is detected) for their sign-off.

In `@src/rdb.c`:
- Around line 3689-3695: The notice incorrectly attributes checksum skipping to
RIO_FLAG_SKIP_RDB_CHECKSUM even for streaming-compressed (VKCS) loads; update
the branch in rdb.c (where server.rdb_checksum, server.skip_checksum_validation,
memrev64ifbe, rdb->flags and RIO_FLAG_SKIP_RDB_CHECKSUM are handled) to detect
when the input is coming from a streaming-compressed/codec-backed path (e.g.,
check the RIO or codec metadata on rdb to see if a VKCS/streaming codec provided
integrity) and emit a distinct serverLog message for that case versus a true
checksum-disabled RDB; keep the existing behavior for cksum==0 and
cksum!=expected, but split the RIO_FLAG_SKIP_RDB_CHECKSUM branch into two paths
(codec-backed integrity vs. checksum-disabled file) with appropriate, different
log text.

---

Nitpick comments:
In `@deps/lz4/Makefile`:
- Around line 26-28: The Makefile target "clean" should be declared phony to
avoid being skipped if a file named "clean" exists; update the Makefile by
adding a .PHONY declaration for the clean target (e.g., add ".PHONY: clean") so
that the clean rule (which removes *.o and *.a) always runs.

In `@src/compression_lz4.h`:
- Around line 12-28: Add concise API contract comments above each exported LZ4
function declaration (compressionLz4CompressorInit,
compressionLz4CompressorDestroy, compressionLz4DecompressorInit,
compressionLz4DecompressorDestroy, compressionLz4OutputBound,
compressionLz4CompressFeed, compressionLz4DecompressFeed) that describe
ownership semantics (who allocates/frees streamCompressor/streamDecompressor and
buffer pointers), the error model and possible return values (success codes vs
negative error codes), guarantees about how much output will be produced or
bounded (for compressionLz4OutputBound and CompressFeed), and for decompression
functions explicitly state how input_consumed is set and when callers must retry
vs drop input; keep each comment short, consistent, and focused on why the
function contract exists (caller responsibilities, lifetime, and failure
behavior).

In `@src/rdb.c`:
- Around line 3150-3197: Mark the helper functions as file-local by adding the
static storage specifier to their definitions: rdbInputStreamInit,
rdbInputStreamPrepare, rdbInputStreamDestroy, rdbInputStreamValidateEnd, and
rdbRioHasCorruptCompressedInput. Leave the rdbInputStream type and any external
declarations untouched; only change the function declarations/definitions to
static so these helpers are not exported from the object file.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: a47a8b43-59a8-4016-a872-af1fde59642c

📥 Commits

Reviewing files that changed from the base of the PR and between 6c9d7fc and ae21623.

📒 Files selected for processing (40)
  • cmake/Modules/SourceFiles.cmake
  • cmake/Modules/ValkeySetup.cmake
  • deps/CMakeLists.txt
  • deps/Makefile
  • deps/lz4/CMakeLists.txt
  • deps/lz4/LICENSE
  • deps/lz4/Makefile
  • deps/lz4/lz4.c
  • deps/lz4/lz4.h
  • deps/lz4/lz4frame.c
  • deps/lz4/lz4frame.h
  • deps/lz4/lz4hc.c
  • deps/lz4/lz4hc.h
  • deps/lz4/xxhash.c
  • deps/lz4/xxhash.h
  • src/CMakeLists.txt
  • src/Makefile
  • src/aof.c
  • src/compression.c
  • src/compression.h
  • src/compression_lz4.c
  • src/compression_lz4.h
  • src/compression_rio.c
  • src/compression_rio.h
  • src/compression_stream.c
  • src/compression_stream.h
  • src/config.c
  • src/rdb.c
  • src/rdb.h
  • src/rio.c
  • src/rio.h
  • src/server.h
  • src/unit/CMakeLists.txt
  • src/unit/Makefile
  • src/unit/test_compression.cpp
  • src/valkey-check-rdb.c
  • tests/integration/rdb-compression.tcl
  • tests/integration/replication-aof-sync.tcl
  • tests/integration/valkey-check-rdb.tcl
  • valkey.conf

Comment thread deps/lz4/lz4frame.c
Comment thread src/compression_rio.c
Add streaming LZ4-backed RDB compression with rio decorators, stream envelope handling, integration changes, and the follow-up fixes and config cleanup needed on top of unstable.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
- Remove dead code: rdbIsValidMagic() and unused #include <string.h> in rdb.h
- Remove redundant first RIO_FLAG_SKIP_RDB_CHECKSUM set in rdbSaveInternal
- Remove unrelated changes: config_parse_depth, USE_FAST_FLOAT, write-make-settings
- Validate full 8-byte VKCS envelope in aof.c rdbFileUsesStreamingCompression
- Add SAFETY comment for rdbRioHasCorruptCompressedInput cast invariant
- Rename all snake_case identifiers to camelCase per Valkey conventions:
  types (compression_algo_t -> compressionAlgo, stream_compressor_t ->
  streamCompressor, compress_rio_t -> compressRio, etc.), functions
  (stream_writer_create -> streamWriterCreate, compress_rio_finish ->
  compressRioFinish, write_vkcs_envelope -> writeVkcsEnvelope, etc.),
  and static variables (compression_lz4_codec_impl -> compressionLz4CodecImpl)
- Drop _t suffix from all types to match Valkey convention
- Fix typo: streamWriterIsErrord -> streamWriterIsErrored
- Replace silent dummy buffer allocation with assert(needed > 0) in
  streamWriterEnsureOutBuf

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
* Address streaming RDB compression review

* Skip RDB CRC for streaming compression

* Remove brittle 32-bit compression unit test

---------

Co-authored-by: Sarthak Aggarwal <sarthagg@amazon.com>

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Comment thread valkey.conf Outdated
@sarthakaggarwal97 sarthakaggarwal97 force-pushed the streaming-compression-rio-pr branch from ae21623 to 2d6bfae Compare May 12, 2026 23:59
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@sarthakaggarwal97 sarthakaggarwal97 force-pushed the streaming-compression-rio-pr branch from 2d6bfae to b494e19 Compare May 12, 2026 23:59

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@deps/lz4/Makefile`:
- Around line 26-28: The Makefile's clean target (target name "clean") should be
declared phony to avoid being skipped if a file named "clean" exists; add a
.PHONY declaration for the clean target (e.g., add a line declaring .PHONY:
clean) near the top or before the clean target so make always runs the clean
recipe (rm -f *.o and rm -f *.a).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: ebf9c0e3-08ee-4c87-a906-6a8e31eb4a87

📥 Commits

Reviewing files that changed from the base of the PR and between ae21623 and b494e19.

📒 Files selected for processing (40)
  • cmake/Modules/SourceFiles.cmake
  • cmake/Modules/ValkeySetup.cmake
  • deps/CMakeLists.txt
  • deps/Makefile
  • deps/lz4/CMakeLists.txt
  • deps/lz4/LICENSE
  • deps/lz4/Makefile
  • deps/lz4/lz4.c
  • deps/lz4/lz4.h
  • deps/lz4/lz4frame.c
  • deps/lz4/lz4frame.h
  • deps/lz4/lz4hc.c
  • deps/lz4/lz4hc.h
  • deps/lz4/xxhash.c
  • deps/lz4/xxhash.h
  • src/CMakeLists.txt
  • src/Makefile
  • src/aof.c
  • src/compression.c
  • src/compression.h
  • src/compression_lz4.c
  • src/compression_lz4.h
  • src/compression_rio.c
  • src/compression_rio.h
  • src/compression_stream.c
  • src/compression_stream.h
  • src/config.c
  • src/rdb.c
  • src/rdb.h
  • src/rio.c
  • src/rio.h
  • src/server.h
  • src/unit/CMakeLists.txt
  • src/unit/Makefile
  • src/unit/test_compression.cpp
  • src/valkey-check-rdb.c
  • tests/integration/rdb-compression.tcl
  • tests/integration/replication-aof-sync.tcl
  • tests/integration/valkey-check-rdb.tcl
  • valkey.conf
✅ Files skipped from review due to trivial changes (2)
  • cmake/Modules/ValkeySetup.cmake
  • deps/lz4/LICENSE
🚧 Files skipped from review as they are similar to previous changes (33)
  • src/CMakeLists.txt
  • deps/CMakeLists.txt
  • src/unit/CMakeLists.txt
  • tests/integration/valkey-check-rdb.tcl
  • valkey.conf
  • cmake/Modules/SourceFiles.cmake
  • src/config.c
  • src/compression_rio.h
  • src/rio.h
  • tests/integration/replication-aof-sync.tcl
  • deps/lz4/CMakeLists.txt
  • src/aof.c
  • src/server.h
  • src/rdb.h
  • deps/Makefile
  • src/compression_lz4.c
  • src/Makefile
  • src/compression_lz4.h
  • src/compression.h
  • src/unit/Makefile
  • src/compression_rio.c
  • src/compression_stream.h
  • deps/lz4/xxhash.c
  • deps/lz4/xxhash.h
  • tests/integration/rdb-compression.tcl
  • src/rdb.c
  • src/valkey-check-rdb.c
  • deps/lz4/lz4frame.h
  • src/compression.c
  • src/compression_stream.c
  • deps/lz4/lz4hc.c
  • deps/lz4/lz4.h
  • deps/lz4/lz4hc.h

Comment thread deps/lz4/Makefile
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@sarthakaggarwal97 sarthakaggarwal97 added the run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP) label Jun 5, 2026
valkey-check-rdb reported offsets via rioTell() on every read, and
rdbLoadProgressCallback computed the transport offset via rioTell() on
every read even when no progress event was due. Read processed_bytes
directly for the offset, and only compute the transport position when a
progress event is actually reported.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Replace the separate rdb-compression-algo config with a single
rdbcompression no|yes|lz4-stream. 'yes' keeps the legacy per-string LZF
path; 'lz4-stream' selects whole-file streaming LZ4 RDB saves.

Update valkey.conf and the RDB compression, check-rdb, and replication
AOF sync tests to use the new config value.

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@sarthakaggarwal97

Copy link
Copy Markdown
Contributor Author

@hpatro @madolson Thank you so much for the review! I think I have addressed all the feedback on the PR. Please take a look again!

@roshkhatri roshkhatri left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor concern about using streaming compression for disk based full sync replication

Comment thread src/rdb.c
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@sarthakaggarwal97 sarthakaggarwal97 force-pushed the streaming-compression-rio-pr branch from 57c5bfe to 52d4a47 Compare June 22, 2026 16:02
@hpatro

hpatro commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

A minor concern of mine is it's a bit difficult for me to follow multiple layer of interfaces and might not be easy to get started one (might not be a bad thing if things stay pretty much untouched for years to come).

I chatted a bunch with Claude to understand if we could simplify further. Just adding the current structure and what could be removed. Would love to hear author/reviewers thoughts as well @xdk-amz @madolson @sarthakaggarwal97 @roshkhatri @nmvk

The PR adds six layered interfaces (per direction). Here's the call graph end-to-end, then the layering picture,
  then where layers do and don't earn their keep.

  Save path call chain

  rdbSave (rdb.c)
    └─ rdbSaveInternal
         ├─ rioInitWithFile(&rdb, fp)                              ──► rio (file)
         ├─ rioInitWithCompression(&cr, &rdb, &cfg)                ──► compressRio (rio decorator)
         │    └─ streamWriterInit(&cr.writer, cfg, emit_cb, &cr)   ──► streamWriter
         │         └─ streamCompressorInit(&w.compressor, algo, …) ──► streamCompressor (vtable)
         │              └─ compressionLz4CompressorInit            ──► compressionLz4*
         │                   └─ LZ4F_createCompressionContext      ──► LZ4F (vendored)
         │
         ├─ rdbSaveRio(…, save_rio = (rio*)&cr, …)
         │    └─ rioWrite (existing)         ◄─── existing rio API
         │         └─ compressRioWrite       ◄─── rio decorator override
         │              └─ streamWriterWrite
         │                   ├─ streamWriterEnsureEnvelope ─► emit_cb ─► rioWrite(inner)
         │                   └─ streamWriterFeedAndEmit
         │                        ├─ streamCompressorFeed ─► compressionLz4CompressFeed ─► LZ4F_compressUpdate
         │                        └─ emit_cb ─► rioWrite(inner) ─► rioFileWrite
         │
         └─ compressRioFinish
              └─ streamWriterFinish ─► streamCompressorFeed(FLUSH_END) ─► emit_cb ─► rioWrite(inner)

  Load path call chain

  rdbLoad (rdb.c) / redis_check_rdb (valkey-check-rdb.c)
    ├─ rioInitWithFile(&rdb, fp)                                   ──► rio (file)
    ├─ rdbInputStreamInit(&input, &rdb)                            ──► rdbInputStream
    │   (input.rdb_rio = &rdb)
    ├─ rdbInputStreamPrepare(&input)
    │   └─ rioInitWithDecompression(&input.decompressor, &rdb, …)  ──► decompressRio
    │        └─ streamReaderInit(&dr.reader, cfg, read_cb, &dr)    ──► streamReader
    │             └─ streamReaderGetInfo
    │                  └─ streamReaderProbe                        (consumes first ≤7 bytes)
    │                       └─ read_cb ─► rioReadPartial(inner)    ──► rio.read_some (NEW)
    │   (input.rdb_rio = (rio*)&input.decompressor)
    │
    ├─ rdbLoadRio(input.rdb_rio, …)
    │    └─ rioRead (existing)
    │         └─ decompressRioRead          ◄─── rio decorator override
    │              └─ streamReaderRead
    │                   ├─ [passthrough]  streamReaderReadPassthrough ─► read_cb ─► rioReadPartial(inner)
    │                   └─ [compressed]   streamReaderReadCompressed
    │                        ├─ streamReaderRefillCompressedBuf ─► read_cb ─► rioReadPartial(inner)
    │                        └─ streamReaderDrainCompressedBuf
    │                             └─ streamDecompressorFeed ─► compressionLz4DecompressFeed ─► LZ4F_decompress
    │
    ├─ rdbInputStreamValidateEnd ─► decompressRioValidateEnd ─► streamReaderValidateEnd
    └─ rdbInputStreamFree         ─► decompressRioFree         ─► streamReaderFree

  Layer stack (read it bottom-up)

  ┌──────────────────────────────────────────────────────────────────────────┐
  │ Callers: rdb.c rdbSave/rdbLoad, valkey-check-rdb.c, aof.c                │
  └──────────────────────────────────────────────────────────────────────────┘
              │ (read side only)              │ (both)
              ▼                                ▼
  ┌─────────────────────────────┐      ┌──────────────────────────────────┐
  │ rdbInputStream  (rdb.h)     │      │ aof.c reads first 7 bytes        │
  │   raw_rio, rdb_rio,         │      │ + streamReadEnvelopeInfo()       │
  │   decompressRio, info       │      │  (no rio at all)                 │
  └─────────────────────────────┘      └──────────────────────────────────┘
              │                                │
              ▼                                │
  ┌─────────────────────────────┐  ┌────────────────────────────┐
  │ decompressRio (rio dec.)    │  │ compressRio (rio dec.)     │
  │   compression_rio.c         │  │                            │
  └─────────────────────────────┘  └────────────────────────────┘
              │                                │
              ▼                                ▼
  ┌─────────────────────────────┐  ┌────────────────────────────┐
  │ streamReader                │  │ streamWriter               │
  │   probe + 2 ring buffers    │  │   envelope + scratch buf   │
  │   compression_stream.c      │  │   + chunking (1 MB)        │
  └─────────────────────────────┘  └────────────────────────────┘
              │                                │
              ▼                                ▼
  ┌─────────────────────────────────────────────────────────────┐
  │ streamDecompressor / streamCompressor   (vtable: 1 codec)   │
  │   compression.c   compressionCodec{}                        │
  └─────────────────────────────────────────────────────────────┘
                                │
                                ▼
  ┌─────────────────────────────────────────────────────────────┐
  │ compressionLz4* (thin LZ4F wrapper)                         │
  │   compression_lz4.c                                         │
  └─────────────────────────────────────────────────────────────┘
                                │
                                ▼
  ┌─────────────────────────────────────────────────────────────┐
  │ deps/lz4  LZ4F_compressBegin/Update/End, LZ4F_decompress    │
  └─────────────────────────────────────────────────────────────┘

  Where the layers earn their keep, and where they don't

  ┌──────────────────────────────────────┬─────────────┬───────────────────────────────────────────────────────────────┐
  │                Layer                 │   Pulls     │                              Why                              │
  │                                      │   weight?   │                                                               │
  ├──────────────────────────────────────┼─────────────┼───────────────────────────────────────────────────────────────┤
  │ LZ4F (vendored)                      │ ✅          │ Codec implementation.                                         │
  ├──────────────────────────────────────┼─────────────┼───────────────────────────────────────────────────────────────┤
  │ compressionLz4*                      │ ✅          │ Translates LZ4F's "magic-error-code" return convention into   │
  │                                      │             │ the project's signed-ssize_t. Necessary glue.                 │
  ├──────────────────────────────────────┼─────────────┼───────────────────────────────────────────────────────────────┤
  │                                      │             │ One codec exists. The vtable is an extension point for future │
  │ streamCompressor/streamDecompressor  │ ⚠️          │  codecs (zstd was mentioned in the PR description). Today     │
  │ (vtable)                             │ Speculative │ it's pure indirection: a switch per call + a struct of 7 fn   │
  │                                      │             │ pointers.                                                     │
  ├──────────────────────────────────────┼─────────────┼───────────────────────────────────────────────────────────────┤
  │                                      │             │ Real logic — VCS envelope, probe + replay, two-buffer         │
  │ streamWriter/streamReader            │ ✅          │ pipeline, chunking, end-of-frame validation. Reusable without │
  │                                      │             │  a rio (the AOF check imports streamReadEnvelopeInfo          │
  │                                      │             │ directly).                                                    │
  ├──────────────────────────────────────┼─────────────┼───────────────────────────────────────────────────────────────┤
  │                                      │             │ Adapter to rio. Translates conventions (1/0 vs 0/-1), tracks  │
  │ compressRio/decompressRio            │ ✅          │ RIO flags, propagates RIO_FLAG_SKIP_RDB_CHECKSUM. Has only    │
  │                                      │             │ one consumer of stream{Writer,Reader} though — see below.     │
  ├──────────────────────────────────────┼─────────────┼───────────────────────────────────────────────────────────────┤
  │ rdbInputStream                       │ ❌          │ Barely a layer. 5 fields, 4 functions, all delegation. Adds   │
  │                                      │             │ ~6 lines per call site over inlining.                         │
  └──────────────────────────────────────┴─────────────┴───────────────────────────────────────────────────────────────┘```

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@sarthakaggarwal97 sarthakaggarwal97 removed the run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP) label Jun 23, 2026
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
@sarthakaggarwal97

Copy link
Copy Markdown
Contributor Author

Thanks @hpatro for taking a look. Post your review, I did some more refactoring. I removed rdbInputStream as a layer, removed the codec vtable (we can probably rely on just switch cases), reduced some API surface. For rest of the abstraction, I feel they serve meaningful purpose since we want to introduce zstd (#3798) as an alternate algorithm, and replication compression (#3853). I feel once the interfaces are set in, it would be easier to support these upcoming changes.

@hpatro

hpatro commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

@valkey-review-bot Please review this change.

@valkey-review-bot

valkey-review-bot Bot commented Jun 25, 2026

Copy link
Copy Markdown

Review complete — nothing met the evidence bar for a comment. (After tracing the save/load/check-rdb paths, the config enum migration, the rio refactor (rioConnRead/rioFileRead/rioReadPartial), checksum policy, AOF-base rejection and replication gating, plus a clean -Werror build, I found no novel high/medium-confidence defect; the candidate concerns (checksum coupling, vendored lz4, signed/unsigned in rioBufferReadSome, the validateEnd dead-check) are already covered by existing review threads or are accepted design decisions with maintainer sign-off.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants